diff --git a/Cargo.toml b/Cargo.toml index a8e85e6a..a40b9f86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,8 +124,8 @@ bs58 = { version = "0.5.1", features = ["std", "check"] } jsonrpsee = { version = "0.24.0", features = ["full"] } # rocksdb = { version = "0.21.0", features = ["serde", "multi-threaded-cf"] } home = { version = "0.5.9" } -indicatif = "0.17.8" - +indicatif = {version = "0.17.8", features = ["rayon"]} +jemallocator = "0.5" [patch.'https://github.com/0xPolygonZero/plonky2.git'] plonky2 = { git = "https://github.com/QEDProtocol/plonky2-hwa", rev = "6a8ca008da97890b67a84f64784cfbc488b5238d" } diff --git a/Makefile b/Makefile index b5835217..2640590b 100644 --- a/Makefile +++ b/Makefile @@ -126,6 +126,7 @@ shutdown: @sudo rm -fr redis-data || true @sudo rm -fr db || true @sudo rm -fr /tmp/plonky2_proof || true + @sudo rm -fr /tmp/city_rollup.lock || true # @sudo rm -fr ~/.dogecoin || true # @sudo rm -fr ~/.city-rollup/keystore || true diff --git a/city_common_circuit/Cargo.toml b/city_common_circuit/Cargo.toml index ebbad406..b8145bf5 100644 --- a/city_common_circuit/Cargo.toml +++ b/city_common_circuit/Cargo.toml @@ -36,6 +36,7 @@ city_common = { path = "../city_common" } city_rollup_common = { path = "../city_rollup_common" } async-trait = { workspace = true } tracing = { workspace = true } +bincode = "1.3.3" [dev-dependencies] criterion = "0.5.1" diff --git a/city_common_circuit/src/circuits/zk_signature/mod.rs b/city_common_circuit/src/circuits/zk_signature/mod.rs index 65f2d5a3..96711059 100644 --- a/city_common_circuit/src/circuits/zk_signature/mod.rs +++ b/city_common_circuit/src/circuits/zk_signature/mod.rs @@ -17,6 +17,9 @@ use self::{ }; use super::traits::qstandard::{provable::QStandardCircuitProvable, QStandardCircuit}; +use crate::circuits::{ + l1_secp256k1_signature::L1Secp256K1SignatureCircuit, +}; #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(bound = "")] pub struct ZKSignatureCircuitInput { @@ -141,3 +144,15 @@ where Ok(()) } + +pub fn verify_secp256k1_signature_proof + 'static, const D: usize>( + _public_key: QHashOut, + signature_proof: Vec, +) -> anyhow::Result<()> +where + C::Hasher: AlgebraicHasher, +{ + let circuit = L1Secp256K1SignatureCircuit::::new(); + let proof = bincode::deserialize::>(&signature_proof)?; + circuit.minifier_chain.verify(proof) +} diff --git a/city_macros/src/lib.rs b/city_macros/src/lib.rs index 4a2730d4..87287c75 100644 --- a/city_macros/src/lib.rs +++ b/city_macros/src/lib.rs @@ -408,3 +408,29 @@ macro_rules! city_rpc_call_sync { } }}; } + +#[macro_export] +macro_rules! async_rpc_call_with_response_handling { + ($instance:ident, $params:expr, $retype:ty) => {{ + let response = $instance + .client + .post($instance.url) + .json(&RpcRequest { + jsonrpc: Version::V2, + request: $params, + id: Id::Number(1), + }) + .send() + .await? + .json::>() + .await?; + + if let ResponseResult::Success(s) = response.result { + Ok(s) + } else { + Err(anyhow::format_err!("rpc call failed")) + } + }}; + +} + diff --git a/city_redis_store/Cargo.toml b/city_redis_store/Cargo.toml index 5042617c..162e6e67 100644 --- a/city_redis_store/Cargo.toml +++ b/city_redis_store/Cargo.toml @@ -19,6 +19,8 @@ serde_json = {workspace = true} r2d2 = { workspace = true } r2d2_redis = { workspace = true } bitcoin = { workspace = true } +lazy_static = { workspace = true } +log = { workspace = true } [dev-dependencies] criterion = "0.5.1" diff --git a/city_redis_store/src/lib.rs b/city_redis_store/src/lib.rs index 89e9721d..117e752b 100644 --- a/city_redis_store/src/lib.rs +++ b/city_redis_store/src/lib.rs @@ -1,11 +1,14 @@ +use std::sync::{Arc, RwLock}; use city_rollup_common::api::data::store::CityUserState; use city_rollup_common::qworker::job_id::QProvingJobDataID; use city_rollup_common::qworker::proof_store::QProofStoreReaderSync; use city_rollup_common::qworker::proof_store::QProofStoreWriterSync; use plonky2::plonk::config::GenericConfig; +use lazy_static::lazy_static; use plonky2::plonk::proof::ProofWithPublicInputs; use r2d2_redis::RedisConnectionManager; use redis::Commands; +use log::info; // Table pub const USER_STATE: &'static str = "user_state"; @@ -13,6 +16,10 @@ pub const USER_STATE: &'static str = "user_state"; pub const PROOFS: &'static str = "proofs"; pub const PROOF_COUNTERS: &'static str = "proof_counters"; +lazy_static! { + pub static ref REDIS_CACHE: Arc>> = Arc::new(RwLock::new(None)); +} + #[derive(Clone)] pub struct RedisStore { pool: r2d2::Pool, @@ -48,6 +55,7 @@ impl RedisStore { )?; Ok(()) } + } impl QProofStoreReaderSync for RedisStore { @@ -110,3 +118,31 @@ impl QProofStoreWriterSync for RedisStore { self.write_multidimensional_jobs_core(jobs_levels, next_jobs) } } +pub fn initialize_redis_cache(redis_uri: &str) -> anyhow::Result<()> { + let store = RedisStore::new(redis_uri)?; + let mut redis_cache = REDIS_CACHE.write() + .expect("Failed to acquire write lock on REDIS_CACHE"); + *redis_cache = Some(store); + Ok(()) +} + +pub fn update_cache_user_state(user_state: &CityUserState) -> anyhow::Result<()> { + info!( + "Updating user state in cache: {}", + serde_json::to_string(user_state).unwrap() + ); + // Save user state to Redis cache + if let Ok(redis_cache) = REDIS_CACHE.read() { + redis_cache.clone().unwrap().set_user_state(user_state)?; + } + Ok(()) +} + +pub fn get_cached_user_state(user_id: u64) -> anyhow::Result> { + info!("Getting user state from cache: user_id({})", user_id); + if let Ok(redis_cache) = REDIS_CACHE.read() { + Ok(redis_cache.clone().unwrap().get_user_state(user_id).ok()) + } else { + Ok(None) + } +} diff --git a/city_rollup_common/Cargo.toml b/city_rollup_common/Cargo.toml index 499e5754..df68749f 100644 --- a/city_rollup_common/Cargo.toml +++ b/city_rollup_common/Cargo.toml @@ -26,6 +26,7 @@ async-trait = { workspace = true } serde_repr = { workspace = true } bincode = { workspace = true } tracing = { workspace = true } +rayon = { workspace = true } hex-literal = "0.4.1" [dev-dependencies] diff --git a/city_rollup_common/src/actors/rpc_processor.rs b/city_rollup_common/src/actors/rpc_processor.rs index 78a7ed59..b045e590 100644 --- a/city_rollup_common/src/actors/rpc_processor.rs +++ b/city_rollup_common/src/actors/rpc_processor.rs @@ -1,3 +1,4 @@ +use rayon::prelude::{IntoParallelRefIterator, ParallelIterator}; use crate::{ api::data::block::{ requested_actions::{ @@ -63,6 +64,18 @@ impl OrchestratorEventReceiverSync for CityScenarioRequestedAct fn wait_for_produce_block(&mut self) -> anyhow::Result { Ok(false) } + fn flush_all(&self) -> anyhow::Result> { + let claim_l1_deposits = self.clone().flush_claim_deposits()?; + let register_users = self.clone().flush_register_users()?; + let add_withdrawals = self.clone().flush_add_withdrawals()?; + let token_transfers = self.clone().flush_token_transfers()?; + Ok(CityScenarioRequestedActionsFromRPC { + token_transfers, + register_users, + claim_l1_deposits, + add_withdrawals, + }) + } } impl OrchestratorEventSenderSync for CityScenarioRequestedActionsFromRPC { fn notify_claim_deposit(&mut self, event: &CityClaimDepositRequest) -> anyhow::Result<()> { @@ -216,10 +229,14 @@ impl QRPCProcessor { rpc_node_id: u32, reqs: &[CityRegisterUserRPCRequest], ) -> anyhow::Result<()> { - for req in reqs { - let register = self.injest_rpc_register_user(rpc_node_id, req)?; - self.output.register_users.push(register); - } + let mut registers = reqs + .par_iter() + .map(|req| { + self.injest_rpc_register_user(rpc_node_id, req) + .expect("injest_rpc_register_user failed") + }) + .collect::>(); + self.output.register_users.append(&mut registers); Ok(()) } } diff --git a/city_rollup_common/src/actors/traits.rs b/city_rollup_common/src/actors/traits.rs index 97135061..0fc85f32 100644 --- a/city_rollup_common/src/actors/traits.rs +++ b/city_rollup_common/src/actors/traits.rs @@ -1,6 +1,7 @@ use plonky2::hash::hash_types::RichField; use crate::{ + actors::rpc_processor::CityScenarioRequestedActionsFromRPC, api::data::{ block::{ requested_actions::{ @@ -103,6 +104,7 @@ pub trait OrchestratorEventReceiverSync { fn flush_token_transfers(&mut self) -> anyhow::Result>; fn wait_for_produce_block(&mut self) -> anyhow::Result; + fn flush_all(&self) -> anyhow::Result>; } pub trait WorkerEventReceiverSync { fn wait_for_next_job(&mut self) -> anyhow::Result; diff --git a/city_rollup_common/src/link/link_api.rs b/city_rollup_common/src/link/link_api.rs index 5ca1bd8d..e4ec7187 100644 --- a/city_rollup_common/src/link/link_api.rs +++ b/city_rollup_common/src/link/link_api.rs @@ -5,10 +5,12 @@ use city_crypto::hash::base_types::hash256::Hash256; use reqwest::blocking::ClientBuilder; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::thread::sleep; +use std::time::Duration; use crate::{ errors::data_resolver::BTCDataResolverError, introspection::transaction::BTCTransaction, }; - +use tracing::{debug}; use super::{ data::{BTCAddress160, BTCFeeRateEstimate, BTCTransactionWithVout, BTCUTXO}, traits::{QBitcoinAPIFunderSync, QBitcoinAPISync}, @@ -186,32 +188,42 @@ impl BTCLinkAPI { &self, path: String, ) -> Result { - let client = if self.no_proxy { - ClientBuilder::new() - .no_proxy() - .build() - .expect("Client::new()") - } else { - ClientBuilder::new().build().expect("Client::new()") - }; - let resp = client - .get(format!("{}/{}", self.electrs_url, path)) - .send() - .map_err(|err| BTCDataResolverError { - message: err.to_string(), - })? - .error_for_status() - .map_err(|err| BTCDataResolverError { - message: err.to_string(), - })?; - let text = resp.text().map_err(|err| BTCDataResolverError { - message: err.to_string(), - })?; - Ok( - serde_json::from_str::(&text).map_err(|err| BTCDataResolverError { - message: err.to_string(), - })?, - ) + const RETRY_INTERVAL: Duration = Duration::from_millis(200); // in milliseconds + const MAX_RETRIES: usize = 300; // 300 * 200ms = 60s, may need to adjust + + let client = self.create_http_client(); + let uri = format!("{}/{}", self.electrs_url, path); + + for attempt in 1..=MAX_RETRIES { + debug!("Attempt {} to fetch UTXO from Electrum", attempt); + let response = client.get(&uri).send().map_err(|e| BTCDataResolverError { message: e.to_string() })?; + let text = response.text().map_err(|e| BTCDataResolverError { message: e.to_string() })?; + + // Check if the response is an array that only contains "[]" + if text != "[]" { + debug!("Response from Electrum: {}", text); + return match serde_json::from_str::(&text) { + Ok(data) => Ok(data), + Err(e) => Err(BTCDataResolverError { message: e.to_string() }), + } + } else { + debug!("Received empty response, retrying..."); + } + + // Check if we have reached the maximum number of retries + if attempt == MAX_RETRIES { + return Err(BTCDataResolverError { + message: "Maximum retries reached with empty response".to_string(), + }); + } + + // sleep for a short interval before retrying + sleep(RETRY_INTERVAL); + } + + Err(BTCDataResolverError { + message: "Failed to retrieve data after maximum retries".to_string(), + }) } pub fn is_doge(&self) -> bool { self.rpc_config.is_doge @@ -295,6 +307,16 @@ impl BTCLinkAPI { ) -> Result { self.send_command("estimatesmartfee", "1.0", (n_blocks,)) } + pub fn create_http_client(&self) -> reqwest::blocking::Client { + if self.no_proxy { + ClientBuilder::new() + .no_proxy() + .build() + .expect("Failed to create HTTP client with no proxy") + } else { + ClientBuilder::new().build() .expect("Failed to create HTTP client") + } + } } impl QBitcoinAPISync for BTCLinkAPI { diff --git a/city_rollup_core_node/Cargo.toml b/city_rollup_core_node/Cargo.toml index 8347a8ca..1565d767 100644 --- a/city_rollup_core_node/Cargo.toml +++ b/city_rollup_core_node/Cargo.toml @@ -34,6 +34,7 @@ hyper = { workspace = true } hyper-util = { workspace = true } jsonrpsee = { workspace = true } tracing = { workspace = true } +rayon = { workspace = true } tower = { workspace = true } tower-http = { workspace = true } diff --git a/city_rollup_core_node/src/handler.rs b/city_rollup_core_node/src/handler.rs index a3822e8d..01b6fa2b 100644 --- a/city_rollup_core_node/src/handler.rs +++ b/city_rollup_core_node/src/handler.rs @@ -4,17 +4,32 @@ use std::sync::Arc; use bytes::Bytes; use city_common::cli::args::RPCServerArgs; +use city_common_circuit::circuits::zk_signature::{ + verify_secp256k1_signature_proof,verify_standard_wrapped_zk_signature_proof +}; use city_redis_store::RedisStore; -use city_rollup_common::actors::traits::OrchestratorRPCEventSenderSync; -use city_rollup_common::api::data::block::rpc_request::*; +use city_rollup_common::{ + actors::traits::OrchestratorRPCEventSenderSync, + api::data::block::{ + requested_actions::{ + CityAddWithdrawalRequest, CityClaimDepositRequest, CityRegisterUserRequest, + CityTokenTransferRequest, + }, + rpc_request::*, + }, + qworker::{job_id::QProvingJobDataID, proof_store::QProofStoreWriterSync}, +}; + use city_rollup_worker_dispatch::implementations::redis::QueueCmd; use city_rollup_worker_dispatch::implementations::redis::RedisQueue; -use city_rollup_worker_dispatch::implementations::redis::Q_CMD; -use city_rollup_worker_dispatch::implementations::redis::Q_RPC_ADD_WITHDRAWAL; -use city_rollup_worker_dispatch::implementations::redis::Q_RPC_CLAIM_DEPOSIT; -use city_rollup_worker_dispatch::implementations::redis::Q_RPC_REGISTER_USER; -use city_rollup_worker_dispatch::implementations::redis::Q_RPC_TOKEN_TRANSFER; -use city_rollup_worker_dispatch::traits::proving_dispatcher::ProvingDispatcher; +use city_rollup_worker_dispatch::{ + implementations::redis::{ + Q_CMD, Q_RPC_ADD_WITHDRAWAL, Q_RPC_CLAIM_DEPOSIT, + Q_RPC_REGISTER_USER, Q_RPC_TOKEN_TRANSFER, + }, + traits::{proving_dispatcher::ProvingDispatcher, proving_worker::ProvingWorkerListener}, +}; +use city_store::config::C; use http_body_util::BodyExt; use http_body_util::Full; use hyper::body::Incoming; @@ -30,9 +45,11 @@ use jsonrpsee::core::client::ClientT; use jsonrpsee::http_client::HttpClient; use jsonrpsee::http_client::HttpClientBuilder; use plonky2::hash::hash_types::RichField; -use serde_json::json; -use serde_json::Value; -use tokio::net::TcpListener; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use serde::de::DeserializeOwned; +use serde_json::{json, Value}; +use tokio::{net::TcpListener, task::spawn_blocking}; + use crate::rpc::ErrorCode; use crate::rpc::ExternalRequestParams; @@ -105,6 +122,22 @@ impl CityRollupRPCServerHandler { request: ProduceBlock, .. }) => self.produce_block().map(|r| json!(r)), + Ok(RpcRequest { + request: GatherRegisterUser, + .. + }) => self.gather_register_user().map(|r| json!(r)), + Ok(RpcRequest { + request: GatherClaimDeposit(checkpoint_id), + .. + }) => self.gather_claim_deposit(checkpoint_id).map(|r| json!(r)), + Ok(RpcRequest { + request: GatherAddWithdrawal(checkpoint_id), + .. + }) => self.gather_add_withdrawal(checkpoint_id).map(|r| json!(r)), + Ok(RpcRequest { + request: GatherTokenTransfer(checkpoint_id), + .. + }) => self.gather_token_transfer(checkpoint_id).map(|r| json!(r)), Err(_) => { let request = serde_json::from_slice::>(&whole_body)? @@ -151,7 +184,121 @@ impl CityRollupRPCServerHandler { self.notify_rpc_register_user(&req)?; Ok(()) } + pub fn flush_rpc_requests( + &mut self, + topic: &'static str, + ) -> anyhow::Result> { + Ok(self + .tx_queue + .pop_all(topic)? + .into_iter() + .map(|v| Ok(serde_json::from_slice(&v)?)) + .collect::>>()?) + } + fn gather_register_user(&mut self) -> Result>, anyhow::Error> { + let ret = + match self.flush_rpc_requests::>(Q_RPC_REGISTER_USER) { + Ok(v) => Ok(v + .par_iter() + .map(|req| CityRegisterUserRequest::::new(req.public_key)) + .collect::>()), + Err(e) => Err(e), + }; + ret + } + fn gather_claim_deposit( + &mut self, + checkpoint_id: u64, + ) -> Result, anyhow::Error> { + //todo: have rpc_node id from somewhere + let rpc_node_id = 0; + let reqs = self.flush_rpc_requests::(Q_RPC_CLAIM_DEPOSIT)?; + let deposit = reqs + .par_iter() + .enumerate() + .map(|(i, req)| { + let count = i as u32; + let signature_proof_id = QProvingJobDataID::claim_deposit_l1_signature_proof( + rpc_node_id, + checkpoint_id, + count, //it's index indeed + ); + let mut ps = self.store.clone(); + ps.set_bytes_by_id(signature_proof_id, &req.signature_proof)?; + + Ok(CityClaimDepositRequest::new( + req.user_id, + req.deposit_id, + req.value, + req.txid, + req.public_key, + signature_proof_id, + )) + }) + .collect::>>()?; + Ok(deposit) + } + fn gather_add_withdrawal( + &mut self, + checkpoint_id: u64, + ) -> Result, anyhow::Error> { + let rpc_node_id = 0; + let reqs = self.flush_rpc_requests::(Q_RPC_ADD_WITHDRAWAL)?; + let withdraw = reqs + .par_iter() + .enumerate() + .map(|(i, req)| { + let count = i as u32; + let signature_proof_id = QProvingJobDataID::withdrawal_signature_proof( + rpc_node_id, + checkpoint_id, + count, //it's index indeed + ); + let mut ps = self.store.clone(); + ps.set_bytes_by_id(signature_proof_id, &req.signature_proof)?; + + Ok(CityAddWithdrawalRequest::new( + req.user_id, + req.value, + req.nonce, + req.destination_type, + req.destination, + signature_proof_id, + )) + }) + .collect::>>()?; + Ok(withdraw) + } + fn gather_token_transfer( + &mut self, + checkpoint_id: u64, + ) -> Result, anyhow::Error> { + let rpc_node_id = 0; + let reqs = self.flush_rpc_requests::(Q_RPC_TOKEN_TRANSFER)?; + let transfer = reqs + .par_iter() + .enumerate() + .map(|(i, req)| { + let count = i as u32; + let signature_proof_id = QProvingJobDataID::transfer_signature_proof( + rpc_node_id, + checkpoint_id, + count, //it's index indeed + ); + let mut ps = self.store.clone(); + ps.set_bytes_by_id(signature_proof_id, &req.signature_proof)?; + Ok(CityTokenTransferRequest::new( + req.user_id, + req.to, + req.value, + req.nonce, + signature_proof_id, + )) + }) + .collect::>>()?; + Ok(transfer) + } fn produce_block(&mut self) -> Result<(), anyhow::Error> { Ok(self.notify_rpc_produce_block()?) } @@ -170,7 +317,7 @@ impl CityRollupRPCServerHandler { &mut self, req: CityClaimDepositRPCRequest, ) -> Result<(), anyhow::Error> { - self.verify_signature_proof(req.user_id, req.signature_proof.clone()) + self.verify_signature_proof_secp256k1(req.user_id, req.signature_proof.clone()) .await?; self.notify_rpc_claim_deposit(&req)?; Ok(()) @@ -186,19 +333,36 @@ impl CityRollupRPCServerHandler { Ok(()) } + async fn verify_signature_proof_secp256k1( + &self, + _user_id: u64, + signature_proof: Vec, + ) -> anyhow::Result<()> { + + spawn_blocking(move || { + verify_secp256k1_signature_proof::( + Default::default(), + signature_proof, + )?; + Ok::<_, anyhow::Error>(()) + }) + .await??; + Ok(()) + } async fn verify_signature_proof( &self, _user_id: u64, - _signature_proof: Vec, + signature_proof: Vec, ) -> anyhow::Result<()> { - // let pubkey_bytes = self.store.get_user_state(user_id)?.public_key; - // - // spawn_blocking(move || { - // verify_standard_wrapped_zk_signature_proof::(pubkey_bytes, signature_proof)?; - // Ok::<_, anyhow::Error>(()) - // }) - // .await??; + spawn_blocking(move || { + verify_standard_wrapped_zk_signature_proof::( + Default::default(), + signature_proof, + )?; + Ok::<_, anyhow::Error>(()) + }) + .await??; Ok(()) } } diff --git a/city_rollup_core_node/src/rpc.rs b/city_rollup_core_node/src/rpc.rs index 64d10e76..8dd4c68b 100644 --- a/city_rollup_core_node/src/rpc.rs +++ b/city_rollup_core_node/src/rpc.rs @@ -1,15 +1,12 @@ use std::borrow::Cow; -use city_rollup_common::api::data::block::rpc_request::CityAddWithdrawalRPCRequest; -use city_rollup_common::api::data::block::rpc_request::CityClaimDepositRPCRequest; -use city_rollup_common::api::data::block::rpc_request::CityRegisterUserRPCRequest; -use city_rollup_common::api::data::block::rpc_request::CityTokenTransferRPCRequest; +use city_rollup_common::api::data::block::rpc_request::{ + CityAddWithdrawalRPCRequest, CityClaimDepositRPCRequest, CityRegisterUserRPCRequest, + CityTokenTransferRPCRequest, +}; use jsonrpsee::core::traits::ToRpcParams; use plonky2::hash::hash_types::RichField; -use serde::Deserialize; -use serde::Deserializer; -use serde::Serialize; -use serde::Serializer; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::value::RawValue; use serde_with::serde_as; @@ -43,6 +40,14 @@ pub enum RequestParams { RegisterUser(CityRegisterUserRPCRequest), #[serde(rename = "cr_produce_block")] ProduceBlock, + #[serde(rename = "cr_gather_register_user")] + GatherRegisterUser, + #[serde(rename = "cr_gather_claim_deposit")] + GatherClaimDeposit(u64), + #[serde(rename = "cr_gather_add_withdrawal")] + GatherAddWithdrawal(u64), + #[serde(rename = "cr_gather_token_transfer")] + GatherTokenTransfer(u64), } diff --git a/city_rollup_core_orchestrator/Cargo.toml b/city_rollup_core_orchestrator/Cargo.toml index f5f0cbac..f66dc8c5 100644 --- a/city_rollup_core_orchestrator/Cargo.toml +++ b/city_rollup_core_orchestrator/Cargo.toml @@ -26,6 +26,8 @@ city_rollup_common = { path = "../city_rollup_common" } city_rollup_worker_dispatch = { path = "../city_rollup_worker_dispatch" } city_rollup_core_worker = { path = "../city_rollup_core_worker" } city_rollup_core_api = { path = "../city_rollup_core_api" } +city_rollup_rpc_provider = { path = "../city_rollup_rpc_provider" } + tokio = { workspace = true } redis = { workspace = true } city_macros = { path = "../city_macros" } @@ -35,7 +37,8 @@ kvq_store_redb = { path = "../kvq_store_redb" } redb = { workspace = true } hex-literal = { workspace = true } tracing = { workspace = true } - +rayon = { workspace = true } +lazy_static = { workspace = true } [dev-dependencies] criterion = "0.5.1" rand_chacha = "0.3.1" diff --git a/city_rollup_core_orchestrator/src/debug/scenario/actors/simple.rs b/city_rollup_core_orchestrator/src/debug/scenario/actors/simple.rs index de0cf3f4..1f8fe00c 100644 --- a/city_rollup_core_orchestrator/src/debug/scenario/actors/simple.rs +++ b/city_rollup_core_orchestrator/src/debug/scenario/actors/simple.rs @@ -6,7 +6,7 @@ use city_crypto::hash::base_types::{felt252::felt252_hashout_to_hash256_le, hash use city_rollup_common::{ actors::{ requested_actions::CityScenarioRequestedActions, - rpc_processor::CityScenarioRequestedActionsFromRPC, + traits::{OrchestratorEventReceiverSync, WorkerEventTransmitterSync}, }, api::data::store::CityL1Withdrawal, @@ -231,10 +231,13 @@ impl SimpleActorOrchestrator { let checkpoint_id = last_block.checkpoint_id + 1; let mut timer = DebugTimer::new(&format!("produce_block [{}]", checkpoint_id)); - let register_users = event_receiver.flush_register_users()?; - let claim_l1_deposits = event_receiver.flush_claim_deposits()?; - let add_withdrawals = event_receiver.flush_add_withdrawals()?; - let token_transfers = event_receiver.flush_token_transfers()?; + // let register_users = event_receiver.flush_register_users()?; + // let claim_l1_deposits = event_receiver.flush_claim_deposits()?; + // let add_withdrawals = event_receiver.flush_add_withdrawals()?; + // let token_transfers = event_receiver.flush_token_transfers()?;\ + let rpc_all = event_receiver.flush_all()?; + timer.lap(&"end process rpc_all".to_string()); + tracing::info!( "last_block_address: {}", BTCAddress160::new_p2sh(last_block_address,).to_address_string() @@ -283,12 +286,13 @@ impl SimpleActorOrchestrator { all_inputs.append(&mut deposit_utxos); let block_requested = CityScenarioRequestedActions::new_from_requested_rpc( - CityScenarioRequestedActionsFromRPC { - register_users, - claim_l1_deposits, - add_withdrawals, - token_transfers, - }, + // CityScenarioRequestedActionsFromRPC { + // register_users, + // claim_l1_deposits, + // add_withdrawals, + // token_transfers, + // }, + rpc_all, all_inputs.iter().skip(1), &last_block, SIGHASH_CIRCUIT_MAX_WITHDRAWALS, @@ -307,6 +311,12 @@ impl SimpleActorOrchestrator { let next_address = CityStore::get_city_block_deposit_address(store, checkpoint_id + 1)?; let next_script = CityStore::get_city_block_script(store, checkpoint_id + 1)?; + tracing::info!( + "next_address: {}", + BTCAddress160::new_p2sh(next_address).to_address_string() + ); + + /*tracing::info!( "next_address: {}", BTCAddress160::new_p2sh(next_address).to_address_string() diff --git a/city_rollup_core_orchestrator/src/event_receiver.rs b/city_rollup_core_orchestrator/src/event_receiver.rs index 3214c10f..536645d0 100644 --- a/city_rollup_core_orchestrator/src/event_receiver.rs +++ b/city_rollup_core_orchestrator/src/event_receiver.rs @@ -12,6 +12,7 @@ use city_rollup_common::api::data::block::rpc_request::{ CityAddWithdrawalRPCRequest, CityClaimDepositRPCRequest, CityRegisterUserRPCRequest, CityTokenTransferRPCRequest, }; +use city_rollup_rpc_provider::{CityRpcProvider, RpcProvider}; use city_rollup_common::qworker::proof_store::QProofStore; use city_rollup_worker_dispatch::implementations::redis::{ QueueCmd, RedisQueue, Q_CMD, Q_RPC_ADD_WITHDRAWAL, Q_RPC_CLAIM_DEPOSIT, Q_RPC_REGISTER_USER, @@ -21,6 +22,7 @@ use city_rollup_worker_dispatch::traits::proving_dispatcher::ProvingDispatcher; use city_rollup_worker_dispatch::traits::proving_worker::ProvingWorkerListener; use plonky2::hash::hash_types::RichField; use serde::de::DeserializeOwned; +use tokio::join; #[derive(Clone)] pub struct CityEventReceiver { @@ -136,8 +138,38 @@ impl OrchestratorEventReceiverSync for CityEventReceiver { } } } -} + fn flush_all(&self) -> anyhow::Result> { + let check_point_id = self.rpc_processor.checkpoint_id; + let result = tokio::runtime::Runtime::new() + .unwrap() + .block_on(flush_all_async::(check_point_id)); + result + } +} +async fn flush_all_async( + checkpoint_id: u64, +) -> anyhow::Result> { + //get rpc client + let rpc_client = match RpcProvider::get_rpc_provider().clone() { + Some(rpc_client) => rpc_client, + None => return Err(anyhow::format_err!("rpc client not found")), + }; + //post + let (r, c, t, a) = join!( + rpc_client.gather_register_user::(), + rpc_client.gather_claim_deposit::(checkpoint_id), + rpc_client.gather_token_transfer::(checkpoint_id), + rpc_client.gather_add_withdrawal::(checkpoint_id), + ); + + Ok(CityScenarioRequestedActionsFromRPC { + register_users: r?, + claim_l1_deposits: c?, + token_transfers: t?, + add_withdrawals: a?, + }) +} // Dev only impl OrchestratorRPCEventSenderSync for CityEventReceiver { fn notify_rpc_claim_deposit( diff --git a/city_rollup_core_orchestrator/src/lib.rs b/city_rollup_core_orchestrator/src/lib.rs index ebb3b2ab..dd165a01 100644 --- a/city_rollup_core_orchestrator/src/lib.rs +++ b/city_rollup_core_orchestrator/src/lib.rs @@ -1,9 +1,9 @@ -use std::{sync::Arc, time::Duration}; +use std::{process, sync::Arc, time::Duration}; use city_common::{cli::args::OrchestratorArgs, units::UNIT_BTC}; use city_crypto::hash::{base_types::hash256::Hash256, qhashout::QHashOut}; use city_macros::sync_infinite_loop; -use city_redis_store::RedisStore; +use city_redis_store::{RedisStore, initialize_redis_cache}; use city_rollup_circuit::{wallet::memory::CityMemoryWallet, worker::toolbox::circuits::CRWorkerToolboxCoreCircuits}; use city_rollup_common::{ actors::{ @@ -20,10 +20,14 @@ use city_rollup_common::{ use city_rollup_core_api::KV; use city_rollup_core_worker::event_processor::CityEventProcessor; use city_rollup_worker_dispatch::implementations::redis::RedisQueue; -use city_store::store::{city::base::CityStore, sighash::SigHashMerkleTree}; +use city_store::{ + file_lock::{try_lock, FileLockStatus}, + store::{city::base::CityStore, sighash::SigHashMerkleTree}, +}; use kvq_store_redb::KVQReDBStore; use plonky2::{field::goldilocks_field::GoldilocksField, plonk::config::PoseidonGoldilocksConfig}; use redb::Database; +use tracing::{debug, log::info}; use crate::{ debug::scenario::actors::simple::SimpleActorOrchestrator, event_receiver::CityEventReceiver, @@ -37,7 +41,17 @@ type C = PoseidonGoldilocksConfig; type F = GoldilocksField; pub fn run(args: OrchestratorArgs) -> anyhow::Result<()> { + let file_lock = match try_lock(){ + Ok(file_lock) => file_lock, + Err(e) => { + eprintln!("{}", e); + process::exit(1); + } + }; + debug!("file lock status: {:?}", file_lock.status); + let mut proof_store = RedisStore::new(&args.redis_uri)?; + initialize_redis_cache(&args.redis_uri)?; let queue = RedisQueue::new(&args.redis_uri)?; let mut event_processor = CityEventProcessor::new(queue.clone()); let mut api = BTCLinkAPI::new_str(&args.bitcoin_rpc, &args.electrs_api); @@ -47,26 +61,7 @@ pub fn run(args: OrchestratorArgs) -> anyhow::Result<()> { let mut rpc_queue = CityEventReceiver::::new(queue.clone(), QRPCProcessor::new(0), proof_store.clone()); - let mut wallet = CityMemoryWallet::::new_fast_setup(); - let genesis_funder_public_key = wallet.add_secp256k1_private_key(Hash256( - hex_literal::hex!("133700f4676a0d0e16aaced646ed693626fcf1329db55be8eee13ad8df001337"), - ))?; - let genesis_funder_address = BTCAddress160::from_p2pkh_key(genesis_funder_public_key); - let deposit_0_public_key = wallet.add_secp256k1_private_key(Hash256(hex_literal::hex!( - "e6baf19a8b0b9b8537b9354e178a0a42d0887371341d4b2303537c5d18d7bb87" - )))?; - let _deposit_0_address = BTCAddress160::from_p2pkh_key(deposit_0_public_key); - let deposit_1_public_key = wallet.add_secp256k1_private_key(Hash256(hex_literal::hex!( - "51dfec6b389f5f033bbe815d5df995a20851227fd845a3be389ca9ad2b6924f0" - )))?; - let _deposit_1_address = BTCAddress160::from_p2pkh_key(deposit_1_public_key); - let sighash_whitelist_tree = SigHashMerkleTree::new(); - let block0 = CityL2BlockState::default(); - let block1 = CityL2BlockState { - checkpoint_id: 1, - ..Default::default() - }; let db = Arc::new(Database::create(&args.db_path)?); let wxn = db.begin_write()?; { @@ -92,8 +87,37 @@ pub fn run(args: OrchestratorArgs) -> anyhow::Result<()> { Ok::<_, anyhow::Error>(()) }); }); - CityStore::set_block_state(&mut store, &block0)?; - CityStore::set_block_state(&mut store, &block1)?; + + //if lock file is created and locked, then set block[0] and block[1] to the + // store + if file_lock.status == FileLockStatus::FileCreatedAndLocked { + debug!("file_lock.status: {:?}", file_lock.status); + + let block0 = CityL2BlockState::default(); + let block1 = CityL2BlockState { + checkpoint_id: 1, + ..Default::default() + }; + info!("set block0 and block1 to the store"); + CityStore::set_block_state(&mut store, &block0)?; + CityStore::set_block_state(&mut store, &block1)?; + + let mut wallet = CityMemoryWallet::::new_fast_setup(); + let genesis_funder_public_key = + wallet.add_secp256k1_private_key(Hash256(hex_literal::hex!( + "133700f4676a0d0e16aaced646ed693626fcf1329db55be8eee13ad8df001337" + )))?; + let genesis_funder_address = BTCAddress160::from_p2pkh_key(genesis_funder_public_key); + let deposit_0_public_key = + wallet.add_secp256k1_private_key(Hash256(hex_literal::hex!( + "e6baf19a8b0b9b8537b9354e178a0a42d0887371341d4b2303537c5d18d7bb87" + )))?; + let _deposit_0_address = BTCAddress160::from_p2pkh_key(deposit_0_public_key); + let deposit_1_public_key = + wallet.add_secp256k1_private_key(Hash256(hex_literal::hex!( + "51dfec6b389f5f033bbe815d5df995a20851227fd845a3be389ca9ad2b6924f0" + )))?; + let _deposit_1_address = BTCAddress160::from_p2pkh_key(deposit_1_public_key); let genesis_state_hash = CityStore::get_city_root(&store, 0)?; let setup_fee = 100000 * 500; @@ -131,6 +155,7 @@ pub fn run(args: OrchestratorArgs) -> anyhow::Result<()> { .map(|x| rpc_queue.notify_rpc_register_user(&x)) .collect::>>()?; } +} wxn.commit()?; /* @@ -162,6 +187,7 @@ pub fn run(args: OrchestratorArgs) -> anyhow::Result<()> { .map(|x| rpc_queue.notify_rpc_register_user(&x)) .collect::>>()?; */ + let sighash_whitelist_tree = SigHashMerkleTree::new(); sync_infinite_loop!(1000, { let wxn = db.begin_write()?; diff --git a/city_rollup_dev_cli/Cargo.toml b/city_rollup_dev_cli/Cargo.toml index a97c2018..6828d361 100644 --- a/city_rollup_dev_cli/Cargo.toml +++ b/city_rollup_dev_cli/Cargo.toml @@ -31,6 +31,7 @@ city_rollup_core_node = { path = "../city_rollup_core_node" } city_rollup_core_worker = { path = "../city_rollup_core_worker" } city_rollup_worker_dispatch = { path = "../city_rollup_worker_dispatch" } city_rollup_core_orchestrator = { path = "../city_rollup_core_orchestrator" } +city_rollup_rpc_provider = {path = "../city_rollup_rpc_provider"} bitcoincore-rpc = { workspace = true } clap = { workspace = true } dotenv = { workspace = true } @@ -42,5 +43,21 @@ tokio = { workspace = true } tracing = { workspace = true } redb = { workspace = true } +reqwest = { version = "0.12", features = ["json"] } +async-trait = "0.1" +bytes = "1" +http-body-util = "0.1" +hyper = "1.4" +hyper-tls = "0.6" +hyper-util = "0.1" +rlt = "0.1" +jemallocator = "0.5" +redis = "0.25.0" +once_cell = "1.19.0" +core_affinity = "0.8" +time = "0.3.36" +log = "0.4" +env_logger = "0.11" + [build-dependencies] shadow-rs = { workspace = true } diff --git a/city_rollup_rpc_provider/Cargo.toml b/city_rollup_rpc_provider/Cargo.toml index e4050387..074f5a6e 100644 --- a/city_rollup_rpc_provider/Cargo.toml +++ b/city_rollup_rpc_provider/Cargo.toml @@ -16,3 +16,4 @@ city_store = { path = "../city_store" } plonky2 = { workspace = true } serde_json = { workspace = true } city_macros = { path = "../city_macros" } +lazy_static = { workspace = true } diff --git a/city_rollup_rpc_provider/src/lib.rs b/city_rollup_rpc_provider/src/lib.rs index 671a1d0f..180a47ec 100644 --- a/city_rollup_rpc_provider/src/lib.rs +++ b/city_rollup_rpc_provider/src/lib.rs @@ -1,11 +1,18 @@ -use std::sync::Arc; +use std::sync::{Arc, RwLock, RwLockReadGuard}; +use anyhow::Context; use city_common::data::{kv::SimpleKVPair, u8bytes::U8Bytes}; use city_crypto::hash::base_types::hash160::Hash160; use city_crypto::hash::base_types::hash256::Hash256; -use city_macros::{city_external_rpc_call, city_external_rpc_call_sync, city_rpc_call, city_rpc_call_sync}; +use city_macros::{city_external_rpc_call, city_external_rpc_call_sync, city_rpc_call, city_rpc_call_sync, async_rpc_call_with_response_handling}; use city_rollup_common::{api::data::{ - block::rpc_request::*, + block::{ + requested_actions::{ + CityAddWithdrawalRequest, CityClaimDepositRequest, CityRegisterUserRequest, + CityTokenTransferRequest, + }, + rpc_request::*, + }, store::{CityL1DepositJSON, CityL1Withdrawal, CityL2BlockState, CityUserState}, }, qworker::job_id::QProvingJobDataIDSerializedWrapped}; use city_rollup_core_node::rpc::{ @@ -13,10 +20,15 @@ use city_rollup_core_node::rpc::{ Version, }; use city_store::config::{CityHash, CityMerkleProof}; +use lazy_static::lazy_static; use plonky2::hash::hash_types::RichField; use reqwest::Client; use serde_json::json; +lazy_static! { + pub static ref RPC_PROVIDER: RwLock> = RwLock::new(None); +} + #[derive(Clone, Debug)] pub struct RpcProvider { client: Arc, @@ -30,6 +42,17 @@ impl RpcProvider { url: Box::leak(url.to_string().into_boxed_str()), } } + pub fn get_rpc_provider() -> RwLockReadGuard<'static, Option> { + RPC_PROVIDER + .read() + .expect("cannot get RPC_PROVIDER read lock") + } + pub fn initialize_rpc_provider(address: &str) { + let mut rpc_provider = RPC_PROVIDER + .write() + .expect("cannot get RPC_PROVIDER write lock"); + *rpc_provider = Some(RpcProvider::new(address)); + } } #[derive(Clone, Debug)] @@ -178,6 +201,21 @@ pub trait CityRpcProvider { &self, req: CityTokenTransferRPCRequest, ) -> anyhow::Result<()>; + async fn gather_register_user( + &self, + ) -> anyhow::Result>>; + async fn gather_claim_deposit( + &self, + checkpoint_id: u64, + ) -> anyhow::Result>; + async fn gather_add_withdrawal( + &self, + checkpoint_id: u64, + ) -> anyhow::Result>; + async fn gather_token_transfer( + &self, + checkpoint_id: u64, + ) -> anyhow::Result>; } pub trait CityRpcProviderSync { @@ -615,9 +653,67 @@ impl CityRpcProvider for RpcProvider { ) -> anyhow::Result<()> { city_rpc_call!(self, RequestParams::::TokenTransfer(req)) } -} + async fn gather_register_user( + &self, + ) -> anyhow::Result>> { + let ret = { + let response = self + .client + .post(self.url) + .json(&RpcRequest { + jsonrpc: Version::V2, + request: RequestParams::::GatherRegisterUser, + id: Id::Number(1), + }) + .send() + .await? + .json::>>>() + .await?; + + if let ResponseResult::Success(s) = response.result { + Ok(s) + } else { + Err(anyhow::format_err!("rpc call failed")) + } + }; + ret.context("gather_register_user failed") + } + async fn gather_claim_deposit( + &self, + checkpoint_id: u64, + ) -> anyhow::Result> { + let ret = async_rpc_call_with_response_handling!( + self, + RequestParams::::GatherClaimDeposit(checkpoint_id), + Vec + ); + ret.context("gather_claim_deposit failed") + } + async fn gather_add_withdrawal( + &self, + checkpoint_id: u64, + ) -> anyhow::Result> { + let ret = async_rpc_call_with_response_handling!( + self, + RequestParams::::GatherAddWithdrawal(checkpoint_id), + Vec + ); + ret.context("gather_add_withdrawal failed") + } + async fn gather_token_transfer( + &self, + checkpoint_id: u64, + ) -> anyhow::Result> { + let ret = async_rpc_call_with_response_handling!( + self, + RequestParams::::GatherTokenTransfer(checkpoint_id), + Vec + ); + ret.context("gather_token_transfer failed") + } +} impl CityRpcProviderSync for RpcProviderSync { fn get_user_tree_root_sync(&self, checkpoint_id: u64) -> anyhow::Result { diff --git a/city_rollup_user_cli/Cargo.toml b/city_rollup_user_cli/Cargo.toml index d972184f..15b54a18 100644 --- a/city_rollup_user_cli/Cargo.toml +++ b/city_rollup_user_cli/Cargo.toml @@ -37,6 +37,7 @@ reqwest = { workspace = true } kvq = { path = "../kvq" } city_store = { path = "../city_store" } city_rollup_rpc_provider = { path = "../city_rollup_rpc_provider" } +jemallocator = { workspace = true } repl-rs = "0.2.8" [build-dependencies] diff --git a/city_rollup_user_cli/src/main.rs b/city_rollup_user_cli/src/main.rs index 8e861aec..7a863d4e 100644 --- a/city_rollup_user_cli/src/main.rs +++ b/city_rollup_user_cli/src/main.rs @@ -5,6 +5,9 @@ use shadow_rs::shadow; shadow!(build); +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + use clap::Parser; use error::Result; diff --git a/city_store/Cargo.toml b/city_store/Cargo.toml index 7b11811c..b2106f45 100644 --- a/city_store/Cargo.toml +++ b/city_store/Cargo.toml @@ -27,8 +27,11 @@ city_common = { path = "../city_common" } city_rollup_common = { path = "../city_rollup_common" } kvq_store_redb = { path = "../kvq_store_redb" } city_crypto = { path = "../city_crypto" } +city_redis_store = { path = "../city_redis_store" } tracing = { workspace = true } - +fs2 = "0.4" +strum = "0.26.0" +strum_macros = "0.26.0" [dev-dependencies] criterion = "0.5.1" rand_chacha = "0.3.1" diff --git a/city_store/src/config/mod.rs b/city_store/src/config/mod.rs index 74b33d4d..3106ee65 100644 --- a/city_store/src/config/mod.rs +++ b/city_store/src/config/mod.rs @@ -82,3 +82,5 @@ pub type L2UserIdsStore = L2UserIdsModel< S, KVQStandardAdapter, u64>, >; + +pub const LOCK_FILE_PATH: &str = "/tmp/city_rollup.lock"; diff --git a/city_store/src/file_lock.rs b/city_store/src/file_lock.rs new file mode 100644 index 00000000..600f98b8 --- /dev/null +++ b/city_store/src/file_lock.rs @@ -0,0 +1,56 @@ +use std::{ + fs::OpenOptions, +}; +use fs2::FileExt; +use strum_macros::{AsRefStr, Display, EnumString}; +use crate::config::LOCK_FILE_PATH; + +#[allow(dead_code)] +pub struct FileLock { + file: std::fs::File, //the lock's lifetime is tied to the file. + pub status: FileLockStatus, +} +#[derive(Debug, Clone, Eq, PartialEq, EnumString, Display, AsRefStr)] +pub enum FileLockStatus { + #[strum(serialize = "FileAlreadyExistedAndLocked")] + FileAlreadyExistedAndLocked, + #[strum(serialize = "FileCreatedAndLocked")] + FileCreatedAndLocked, +} + +pub fn try_lock() -> Result { + match check_and_lock_file(LOCK_FILE_PATH) { + Some(file_lock) => Ok(file_lock), + None => Err("Another instance of the program is already running.".to_string()) + } +} +fn check_and_lock_file(path: &str) -> Option { + let file_exists = std::fs::metadata(path).is_ok(); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path); + + match file { + Ok(f) => { + if f.try_lock_exclusive().is_ok() { + let status = if file_exists{ + // The file already existed and was locked successfully. + FileLockStatus::FileAlreadyExistedAndLocked + }else { + // The file was just created and locked. + FileLockStatus::FileCreatedAndLocked + }; + Some(FileLock { file: f, status }) + }else { + None + } + + } + Err(e) => { + eprintln!("Failed to open or create the lock file: {}", e); + None + } + } +} diff --git a/city_store/src/lib.rs b/city_store/src/lib.rs index 442299db..b1a0d13a 100644 --- a/city_store/src/lib.rs +++ b/city_store/src/lib.rs @@ -1,3 +1,4 @@ pub mod config; +pub mod file_lock; pub mod models; pub mod store; diff --git a/city_store/src/store/city/user.rs b/city_store/src/store/city/user.rs index 40f9fb93..14204071 100644 --- a/city_store/src/store/city/user.rs +++ b/city_store/src/store/city/user.rs @@ -1,4 +1,5 @@ use city_crypto::hash::qhashout::QHashOut; +use city_redis_store::{get_cached_user_state, update_cache_user_state}; use city_rollup_common::api::data::store::CityUserState; use kvq::traits::{KVQBinaryStore, KVQBinaryStoreReader}; use plonky2::{ @@ -32,10 +33,18 @@ impl CityStore { checkpoint_id: u64, user_id: u64, ) -> anyhow::Result { + //try to get user state from redis cache + if let Some(user_state) = get_cached_user_state(user_id)? { + return Ok(user_state); + } + let leaf_id = user_id * 2; let left = GlobalUserTreeStore::::get_leaf_value_fc(store, checkpoint_id, leaf_id)?; let right = GlobalUserTreeStore::::get_leaf_value_fc(store, checkpoint_id, leaf_id + 1)?; - Ok(CityUserState::from_hash(user_id, left, right)) + let user_state = CityUserState::from_hash(user_id, left, right); + update_cache_user_state(&user_state)?; + + Ok(user_state) } pub fn get_user_merkle_proof_by_id( store: &S, @@ -92,6 +101,8 @@ impl CityStore { leaf_id + if left_before_right { 1 } else { 0 }, second_leaf, )?; + update_cache_user_state(user)?; + Ok((first_proof, second_proof)) } pub fn register_user( @@ -100,6 +111,9 @@ impl CityStore { user_id: u64, public_key: CityHash, ) -> anyhow::Result { + let user_state = CityUserState::new_user_with_public_key(user_id, public_key); + update_cache_user_state(&user_state)?; + let leaf_id = user_id * 2; L2UserIdsStore::set_user_id_public_key_pair(store, user_id, public_key)?; GlobalUserTreeStore::set_leaf_fc(store, checkpoint_id, leaf_id + 1, public_key) @@ -136,7 +150,9 @@ impl CityStore { current_leaf.0.elements[3], ], }); - + let right = GlobalUserTreeStore::get_leaf_value_fc(store, checkpoint_id, leaf_id + 1)?; + let user_state = CityUserState::from_hash(user_id, new_user_leaf, right); + update_cache_user_state(&user_state)?; GlobalUserTreeStore::set_leaf_fc(store, checkpoint_id, leaf_id, new_user_leaf) } @@ -175,7 +191,9 @@ impl CityStore { current_leaf.0.elements[3], ], }); - + let right = GlobalUserTreeStore::get_leaf_value_fc(store, checkpoint_id, leaf_id + 1)?; + let user_state = CityUserState::from_hash(user_id, new_user_leaf, right); + update_cache_user_state(&user_state)?; GlobalUserTreeStore::set_leaf_fc(store, checkpoint_id, leaf_id, new_user_leaf) } }