Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ bs58 = { version = "0.5.1", features = ["std", "check"] }
jsonrpsee = { version = "0.24.0", features = ["full"] }
# rocksdb = { version = "0.21.0", features = ["serde", "multi-threaded-cf"] }
home = { version = "0.5.9" }
indicatif = "0.17.8"

indicatif = {version = "0.17.8", features = ["rayon"]}
jemallocator = "0.5"
[patch.'https://github.com/0xPolygonZero/plonky2.git']
plonky2 = { git = "https://github.com/QEDProtocol/plonky2-hwa", rev = "6a8ca008da97890b67a84f64784cfbc488b5238d" }

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ shutdown:
@sudo rm -fr redis-data || true
@sudo rm -fr db || true
@sudo rm -fr /tmp/plonky2_proof || true
@sudo rm -fr /tmp/city_rollup.lock || true
# @sudo rm -fr ~/.dogecoin || true
# @sudo rm -fr ~/.city-rollup/keystore || true

Expand Down
1 change: 1 addition & 0 deletions city_common_circuit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ city_common = { path = "../city_common" }
city_rollup_common = { path = "../city_rollup_common" }
async-trait = { workspace = true }
tracing = { workspace = true }
bincode = "1.3.3"

[dev-dependencies]
criterion = "0.5.1"
Expand Down
15 changes: 15 additions & 0 deletions city_common_circuit/src/circuits/zk_signature/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use self::{
};

use super::traits::qstandard::{provable::QStandardCircuitProvable, QStandardCircuit};
use crate::circuits::{
l1_secp256k1_signature::L1Secp256K1SignatureCircuit,
};
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(bound = "")]
pub struct ZKSignatureCircuitInput<F: RichField> {
Expand Down Expand Up @@ -141,3 +144,15 @@ where

Ok(())
}

pub fn verify_secp256k1_signature_proof<C: GenericConfig<D> + 'static, const D: usize>(
_public_key: QHashOut<C::F>,
signature_proof: Vec<u8>,
) -> anyhow::Result<()>
where
C::Hasher: AlgebraicHasher<C::F>,
{
let circuit = L1Secp256K1SignatureCircuit::<C, D>::new();
let proof = bincode::deserialize::<ProofWithPublicInputs<C::F, C, D>>(&signature_proof)?;
circuit.minifier_chain.verify(proof)
}
26 changes: 26 additions & 0 deletions city_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,29 @@ macro_rules! city_rpc_call_sync {
}
}};
}

#[macro_export]
macro_rules! async_rpc_call_with_response_handling {
($instance:ident, $params:expr, $retype:ty) => {{
let response = $instance
.client
.post($instance.url)
.json(&RpcRequest {
jsonrpc: Version::V2,
request: $params,
id: Id::Number(1),
})
.send()
.await?
.json::<RpcResponse<$retype>>()
.await?;

if let ResponseResult::Success(s) = response.result {
Ok(s)
} else {
Err(anyhow::format_err!("rpc call failed"))
}
}};

}

2 changes: 2 additions & 0 deletions city_redis_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ serde_json = {workspace = true}
r2d2 = { workspace = true }
r2d2_redis = { workspace = true }
bitcoin = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }

[dev-dependencies]
criterion = "0.5.1"
Expand Down
36 changes: 36 additions & 0 deletions city_redis_store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
use std::sync::{Arc, RwLock};
use city_rollup_common::api::data::store::CityUserState;
use city_rollup_common::qworker::job_id::QProvingJobDataID;
use city_rollup_common::qworker::proof_store::QProofStoreReaderSync;
use city_rollup_common::qworker::proof_store::QProofStoreWriterSync;
use plonky2::plonk::config::GenericConfig;
use lazy_static::lazy_static;
use plonky2::plonk::proof::ProofWithPublicInputs;
use r2d2_redis::RedisConnectionManager;
use redis::Commands;
use log::info;

// Table
pub const USER_STATE: &'static str = "user_state";

pub const PROOFS: &'static str = "proofs";
pub const PROOF_COUNTERS: &'static str = "proof_counters";

lazy_static! {
pub static ref REDIS_CACHE: Arc<RwLock<Option<RedisStore>>> = Arc::new(RwLock::new(None));
}

#[derive(Clone)]
pub struct RedisStore {
pool: r2d2::Pool<RedisConnectionManager>,
Expand Down Expand Up @@ -48,6 +55,7 @@ impl RedisStore {
)?;
Ok(())
}

}

impl QProofStoreReaderSync for RedisStore {
Expand Down Expand Up @@ -110,3 +118,31 @@ impl QProofStoreWriterSync for RedisStore {
self.write_multidimensional_jobs_core(jobs_levels, next_jobs)
}
}
pub fn initialize_redis_cache(redis_uri: &str) -> anyhow::Result<()> {
let store = RedisStore::new(redis_uri)?;
let mut redis_cache = REDIS_CACHE.write()
.expect("Failed to acquire write lock on REDIS_CACHE");
*redis_cache = Some(store);
Ok(())
}

pub fn update_cache_user_state(user_state: &CityUserState) -> anyhow::Result<()> {
info!(
"Updating user state in cache: {}",
serde_json::to_string(user_state).unwrap()
);
// Save user state to Redis cache
if let Ok(redis_cache) = REDIS_CACHE.read() {
redis_cache.clone().unwrap().set_user_state(user_state)?;
}
Ok(())
}

pub fn get_cached_user_state(user_id: u64) -> anyhow::Result<Option<CityUserState>> {
info!("Getting user state from cache: user_id({})", user_id);
if let Ok(redis_cache) = REDIS_CACHE.read() {
Ok(redis_cache.clone().unwrap().get_user_state(user_id).ok())
} else {
Ok(None)
}
}
1 change: 1 addition & 0 deletions city_rollup_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async-trait = { workspace = true }
serde_repr = { workspace = true }
bincode = { workspace = true }
tracing = { workspace = true }
rayon = { workspace = true }

hex-literal = "0.4.1"
[dev-dependencies]
Expand Down
25 changes: 21 additions & 4 deletions city_rollup_common/src/actors/rpc_processor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use crate::{
api::data::block::{
requested_actions::{
Expand Down Expand Up @@ -63,6 +64,18 @@ impl<F: RichField> OrchestratorEventReceiverSync<F> for CityScenarioRequestedAct
fn wait_for_produce_block(&mut self) -> anyhow::Result<bool> {
Ok(false)
}
fn flush_all(&self) -> anyhow::Result<CityScenarioRequestedActionsFromRPC<F>> {
let claim_l1_deposits = self.clone().flush_claim_deposits()?;
let register_users = self.clone().flush_register_users()?;
let add_withdrawals = self.clone().flush_add_withdrawals()?;
let token_transfers = self.clone().flush_token_transfers()?;
Ok(CityScenarioRequestedActionsFromRPC {
token_transfers,
register_users,
claim_l1_deposits,
add_withdrawals,
})
}
}
impl<F: RichField> OrchestratorEventSenderSync<F> for CityScenarioRequestedActionsFromRPC<F> {
fn notify_claim_deposit(&mut self, event: &CityClaimDepositRequest) -> anyhow::Result<()> {
Expand Down Expand Up @@ -216,10 +229,14 @@ impl<F: RichField> QRPCProcessor<F> {
rpc_node_id: u32,
reqs: &[CityRegisterUserRPCRequest<F>],
) -> anyhow::Result<()> {
for req in reqs {
let register = self.injest_rpc_register_user(rpc_node_id, req)?;
self.output.register_users.push(register);
}
let mut registers = reqs
.par_iter()
.map(|req| {
self.injest_rpc_register_user(rpc_node_id, req)
.expect("injest_rpc_register_user failed")
})
.collect::<Vec<_>>();
self.output.register_users.append(&mut registers);
Ok(())
}
}
2 changes: 2 additions & 0 deletions city_rollup_common/src/actors/traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use plonky2::hash::hash_types::RichField;

use crate::{
actors::rpc_processor::CityScenarioRequestedActionsFromRPC,
api::data::{
block::{
requested_actions::{
Expand Down Expand Up @@ -103,6 +104,7 @@ pub trait OrchestratorEventReceiverSync<F: RichField> {
fn flush_token_transfers(&mut self) -> anyhow::Result<Vec<CityTokenTransferRequest>>;

fn wait_for_produce_block(&mut self) -> anyhow::Result<bool>;
fn flush_all(&self) -> anyhow::Result<CityScenarioRequestedActionsFromRPC<F>>;
}
pub trait WorkerEventReceiverSync {
fn wait_for_next_job(&mut self) -> anyhow::Result<QProvingJobDataID>;
Expand Down
76 changes: 49 additions & 27 deletions city_rollup_common/src/link/link_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use city_crypto::hash::base_types::hash256::Hash256;
use reqwest::blocking::ClientBuilder;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use std::thread::sleep;
use std::time::Duration;
use crate::{
errors::data_resolver::BTCDataResolverError, introspection::transaction::BTCTransaction,
};

use tracing::{debug};
use super::{
data::{BTCAddress160, BTCFeeRateEstimate, BTCTransactionWithVout, BTCUTXO},
traits::{QBitcoinAPIFunderSync, QBitcoinAPISync},
Expand Down Expand Up @@ -186,32 +188,42 @@ impl BTCLinkAPI {
&self,
path: String,
) -> Result<R, BTCDataResolverError> {
let client = if self.no_proxy {
ClientBuilder::new()
.no_proxy()
.build()
.expect("Client::new()")
} else {
ClientBuilder::new().build().expect("Client::new()")
};
let resp = client
.get(format!("{}/{}", self.electrs_url, path))
.send()
.map_err(|err| BTCDataResolverError {
message: err.to_string(),
})?
.error_for_status()
.map_err(|err| BTCDataResolverError {
message: err.to_string(),
})?;
let text = resp.text().map_err(|err| BTCDataResolverError {
message: err.to_string(),
})?;
Ok(
serde_json::from_str::<R>(&text).map_err(|err| BTCDataResolverError {
message: err.to_string(),
})?,
)
const RETRY_INTERVAL: Duration = Duration::from_millis(200); // in milliseconds
const MAX_RETRIES: usize = 300; // 300 * 200ms = 60s, may need to adjust

let client = self.create_http_client();
let uri = format!("{}/{}", self.electrs_url, path);

for attempt in 1..=MAX_RETRIES {
debug!("Attempt {} to fetch UTXO from Electrum", attempt);
let response = client.get(&uri).send().map_err(|e| BTCDataResolverError { message: e.to_string() })?;
let text = response.text().map_err(|e| BTCDataResolverError { message: e.to_string() })?;

// Check if the response is an array that only contains "[]"
if text != "[]" {
debug!("Response from Electrum: {}", text);
return match serde_json::from_str::<R>(&text) {
Ok(data) => Ok(data),
Err(e) => Err(BTCDataResolverError { message: e.to_string() }),
}
} else {
debug!("Received empty response, retrying...");
}

// Check if we have reached the maximum number of retries
if attempt == MAX_RETRIES {
return Err(BTCDataResolverError {
message: "Maximum retries reached with empty response".to_string(),
});
}

// sleep for a short interval before retrying
sleep(RETRY_INTERVAL);
}

Err(BTCDataResolverError {
message: "Failed to retrieve data after maximum retries".to_string(),
})
}
pub fn is_doge(&self) -> bool {
self.rpc_config.is_doge
Expand Down Expand Up @@ -295,6 +307,16 @@ impl BTCLinkAPI {
) -> Result<BTCFeeRateEstimate, BTCDataResolverError> {
self.send_command("estimatesmartfee", "1.0", (n_blocks,))
}
pub fn create_http_client(&self) -> reqwest::blocking::Client {
if self.no_proxy {
ClientBuilder::new()
.no_proxy()
.build()
.expect("Failed to create HTTP client with no proxy")
} else {
ClientBuilder::new().build() .expect("Failed to create HTTP client")
}
}
}

impl QBitcoinAPISync for BTCLinkAPI {
Expand Down
1 change: 1 addition & 0 deletions city_rollup_core_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ hyper = { workspace = true }
hyper-util = { workspace = true }
jsonrpsee = { workspace = true }
tracing = { workspace = true }
rayon = { workspace = true }

tower = { workspace = true }
tower-http = { workspace = true }
Expand Down
Loading