diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 70e407c232..4e8fa04f95 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -34,9 +34,11 @@ use common::executor::{ use common::log::{error, warn, LogOnError}; use common::{bits256, log, new_uuid, now_ms, now_sec}; use crypto::privkey::SerializableSecp256k1Keypair; +use crypto::secret_hash_algo::SecretHashAlgo; use crypto::{CryptoCtx, CryptoCtxError}; use derive_more::Display; -use futures::{compat::Future01CompatExt, lock::Mutex as AsyncMutex, TryFutureExt}; +use futures::channel::mpsc::{unbounded, UnboundedSender}; +use futures::{compat::Future01CompatExt, lock::Mutex as AsyncMutex, StreamExt, TryFutureExt}; use hash256_std_hasher::Hash256StdHasher; use hash_db::Hasher; use http::Response; @@ -56,8 +58,6 @@ use mm2_rpc::data::legacy::{ TakerAction, TakerRequestForRpc, }; use mm2_state_machine::prelude::*; -#[cfg(test)] -use mocktopus::macros::*; use my_orders_storage::{ delete_my_maker_order, delete_my_taker_order, save_maker_order_on_update, save_my_new_maker_order, save_my_new_taker_order, MyActiveOrders, MyOrdersFilteringHistory, MyOrdersHistory, MyOrdersStorage, @@ -65,7 +65,8 @@ use my_orders_storage::{ use num_traits::identities::Zero; use order_events::{OrderStatusEvent, OrderStatusStreamer}; use orderbook_events::{OrderbookItemChangeEvent, OrderbookStreamer}; -use parking_lot::Mutex as PaMutex; +use parking_lot::{Mutex as PaMutex, RwLock as PaRwLock}; +use primitives::hash::{H256, H264}; use rpc::v1::types::H256 as H256Json; use secp256k1::PublicKey as Secp256k1Pubkey; use serde_json::{self as json, Value as Json}; @@ -101,7 +102,6 @@ use crate::lp_swap::taker_swap::FailAt; use coins::rpc_command::tendermint::ibc::ChannelId; pub use best_orders::{best_orders_rpc, best_orders_rpc_v2}; -use crypto::secret_hash_algo::SecretHashAlgo; pub use orderbook_depth::orderbook_depth_rpc; pub use orderbook_rpc::{orderbook_rpc, orderbook_rpc_v2}; @@ -112,12 +112,17 @@ cfg_wasm32! { pub type OrdermatchDbLocked<'a> = DbLocked<'a, OrdermatchDb>; } +// test-only imports +#[cfg(test)] +use futures::channel::oneshot; +#[cfg(test)] +use mocktopus::macros::*; + mod best_orders; mod lp_bot; pub use lp_bot::{ start_simple_market_maker_bot, stop_simple_market_maker_bot, StartSimpleMakerBotRequest, TradingBotEvent, }; -use primitives::hash::{H256, H264}; mod my_orders_storage; mod new_protocol; @@ -262,52 +267,56 @@ fn process_pubkey_full_trie( orderbook: &mut Orderbook, new_trie_orders: PubkeyOrders, params: ProcessTrieParams, -) -> H64 { - remove_pubkey_pair_orders(orderbook, params.pubkey, params.alb_pair); - +) -> Vec { + // 1) Index-only removal of existing orders for (pubkey, pair), + // emit RemovedItem events, do NOT generate per-UUID trie ops. + orderbook.index_remove_pubkey_pair_orders(params.pubkey, params.alb_pair); + + // 2) Start with a single ClearPair op to reset trie/history/root for (pubkey, pair) + let mut ops = vec![TrieOp::ClearPair { + pubkey: params.pubkey.to_owned(), + alb_pair: params.alb_pair.to_owned(), + }]; + + // 3) Re-insert all incoming orders (index + trie ops) for (uuid, order) in new_trie_orders { - orderbook.insert_or_update_order_update_trie(OrderbookItem::from_p2p_and_info( + let item = OrderbookItem::from_p2p_and_info( order, params.protocol_infos.get(&uuid).cloned().unwrap_or_default(), params.conf_infos.get(&uuid).cloned(), - )); + ); + let mut insert_ops = orderbook.index_insert_or_update(item); + ops.append(&mut insert_ops); } - let new_root = pubkey_state_mut(&mut orderbook.pubkeys_state, params.pubkey) - .trie_roots - .get(params.alb_pair) - .copied() - .unwrap_or_default(); - new_root + ops } fn process_trie_delta( orderbook: &mut Orderbook, delta_orders: HashMap>, params: ProcessTrieParams, -) -> H64 { - for (uuid, order) in delta_orders { - match order { - Some(order) => orderbook.insert_or_update_order_update_trie(OrderbookItem::from_p2p_and_info( - order, - params.protocol_infos.get(&uuid).cloned().unwrap_or_default(), - params.conf_infos.get(&uuid).cloned(), - )), +) -> Vec { + let mut ops = Vec::with_capacity(delta_orders.len()); + for (uuid, maybe_order) in delta_orders { + match maybe_order { + Some(order) => { + let item = OrderbookItem::from_p2p_and_info( + order, + params.protocol_infos.get(&uuid).cloned().unwrap_or_default(), + params.conf_infos.get(&uuid).cloned(), + ); + let mut insert_ops = orderbook.index_insert_or_update(item); + ops.append(&mut insert_ops); + }, None => { - orderbook.remove_order_trie_update(uuid); + if let Some((_removed, op)) = orderbook.index_remove(uuid) { + ops.push(op); + } }, } } - - let new_root = match orderbook.pubkeys_state.get(params.pubkey) { - Some(pubkey_state) => pubkey_state - .trie_roots - .get(params.alb_pair) - .copied() - .unwrap_or_default(), - None => H64::default(), - }; - new_root + ops } async fn process_orders_keep_alive( @@ -318,10 +327,17 @@ async fn process_orders_keep_alive( i_am_relay: bool, ) -> OrderbookP2PHandlerResult { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); - let to_request = ordermatch_ctx - .orderbook - .lock() - .process_keep_alive(&from_pubkey, keep_alive, i_am_relay); + let to_request = { + let subscribed_topics: HashSet = { + let subs = ordermatch_ctx.orderbook_subscriptions.read(); + subs.keys().cloned().collect() + }; + + let mut trie_store = ordermatch_ctx.trie_store.lock(); + trie_store.prepare_sync_request_for_keep_alive(&from_pubkey, keep_alive, i_am_relay, |topic: &str| { + subscribed_topics.contains(topic) + }) + }; let req = match to_request { Some(req) => req, @@ -342,18 +358,29 @@ async fn process_orders_keep_alive( ))) })?; - let mut orderbook = ordermatch_ctx.orderbook.lock(); - for (pair, diff) in response.pair_orders_diff { - let params = ProcessTrieParams { - pubkey: &from_pubkey, - alb_pair: &pair, - protocol_infos: &response.protocol_infos, - conf_infos: &response.conf_infos, - }; - let _new_root = match diff { - DeltaOrFullTrie::Delta(delta) => process_trie_delta(&mut orderbook, delta, params), - DeltaOrFullTrie::FullTrie(values) => process_pubkey_full_trie(&mut orderbook, values, params), - }; + // Phase 1: derive all index mutations and collect trie ops under the Orderbook lock + let ops = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + let mut ops = Vec::new(); + for (pair, diff) in response.pair_orders_diff { + let params = ProcessTrieParams { + pubkey: &from_pubkey, + alb_pair: &pair, + protocol_infos: &response.protocol_infos, + conf_infos: &response.conf_infos, + }; + let mut pair_ops = match diff { + DeltaOrFullTrie::Delta(delta) => process_trie_delta(&mut orderbook, delta, params), + DeltaOrFullTrie::FullTrie(values) => process_pubkey_full_trie(&mut orderbook, values, params), + }; + ops.append(&mut pair_ops); + } + ops + }; + + // Phase 2: enqueue trie ops for background application + if !ops.is_empty() { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(ops); } Ok(()) @@ -366,20 +393,29 @@ fn process_maker_order_created(ctx: &MmArc, from_pubkey: String, created_msg: ne } fn process_maker_order_updated( - ctx: MmArc, + ctx: &MmArc, from_pubkey: String, updated_msg: new_protocol::MakerOrderUpdated, ) -> OrderbookP2PHandlerResult { - let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); + let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); let uuid = updated_msg.uuid(); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - let mut order = orderbook - .find_order_by_uuid_and_pubkey(&uuid, &from_pubkey) - .ok_or_else(|| MmError::new(OrderbookP2PHandlerError::OrderNotFound(uuid)))?; - order.apply_updated(&updated_msg); - drop_mutability!(order); - orderbook.insert_or_update_order_update_trie(order); + // Phase 1: mutate in-memory order and build trie ops + let ops = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + let mut order = orderbook + .find_order_by_uuid_and_pubkey(&uuid, &from_pubkey) + .ok_or_else(|| MmError::new(OrderbookP2PHandlerError::OrderNotFound(uuid)))?; + order.apply_updated(&updated_msg); + + orderbook.index_insert_or_update(order) + }; + + // Phase 2: enqueue trie ops + if !ops.is_empty() { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(ops); + } Ok(()) } @@ -387,18 +423,33 @@ fn process_maker_order_updated( fn process_maker_order_cancelled(ctx: &MmArc, from_pubkey: String, cancelled_msg: new_protocol::MakerOrderCancelled) { let uuid = Uuid::from(cancelled_msg.uuid); let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - // Add the order to the recently cancelled list to ignore it if a new order with the same uuid - // is received within the `RECENTLY_CANCELLED_TIMEOUT` timeframe. - // We do this even if the order is in the order_set, because it could have been added through - // means other than the order creation message. - orderbook - .recently_cancelled - .insert_expirable(uuid, from_pubkey.clone(), RECENTLY_CANCELLED_TIMEOUT); - if let Some(order) = orderbook.order_set.get(&uuid) { - if order.pubkey == from_pubkey { - orderbook.remove_order_trie_update(uuid); + + // Phase 1: update index and collect trie op + let maybe_op = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + // Add the order to the recently cancelled list to ignore it if a new order with the same uuid + // is received within the `RECENTLY_CANCELLED_TIMEOUT` timeframe. + // We do this even if the order is in the order_set, because it could have been added through + // means other than the order creation message. + orderbook + .recently_cancelled + .insert_expirable(uuid, from_pubkey.clone(), RECENTLY_CANCELLED_TIMEOUT); + + if let Some(order) = orderbook.order_set.get(&uuid) { + if order.pubkey == from_pubkey { + orderbook.index_remove(uuid).map(|(_removed, op)| op) + } else { + None + } + } else { + None } + }; + + // Phase 2: enqueue trie op + if let Some(op) = maybe_op { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(vec![op]); } } @@ -459,49 +510,65 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul }; let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock(); let my_pubsecp = mm2_internal_pubkey_hex(ctx, String::from).map_err(MmError::into_inner)?; - let alb_pair = alb_ordered_pair(base, rel); - for (pubkey, GetOrderbookPubkeyItem { orders, .. }) in pubkey_orders { - let pubkey_bytes = match hex::decode(&pubkey) { - Ok(b) => b, - Err(e) => { - warn!("Error {} decoding pubkey {}", e, pubkey); + // Phase 1: build all index mutations and collect trie ops under the Orderbook lock + let ops = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + let alb_pair = alb_ordered_pair(base, rel); + let mut all_ops = Vec::new(); + + for (pubkey, GetOrderbookPubkeyItem { orders, .. }) in pubkey_orders { + let pubkey_bytes = match hex::decode(&pubkey) { + Ok(b) => b, + Err(e) => { + warn!("Error {} decoding pubkey {}", e, pubkey); + continue; + }, + }; + + let pubkey_without_prefix: [u8; 32] = match pubkey_bytes.get(1..).map(|slice| slice.try_into()) { + Some(Ok(arr)) => arr, + _ => { + warn!("Invalid pubkey length (not 32 bytes) for {}", pubkey); + continue; + }, + }; + + if is_my_order(&pubkey, &my_pubsecp, &orderbook.my_p2p_pubkeys) { continue; - }, - }; + } - let pubkey_without_prefix: [u8; 32] = match pubkey_bytes.get(1..).map(|slice| slice.try_into()) { - Some(Ok(arr)) => arr, - _ => { - warn!("Invalid pubkey length (not 32 bytes) for {}", pubkey); + if is_pubkey_banned(ctx, &pubkey_without_prefix.into()) { + warn!("Pubkey {} is banned", pubkey); continue; - }, - }; + } - if is_my_order(&pubkey, &my_pubsecp, &orderbook.my_p2p_pubkeys) { - continue; + let params = ProcessTrieParams { + pubkey: &pubkey, + alb_pair: &alb_pair, + protocol_infos: &protocol_infos, + conf_infos: &conf_infos, + }; + let mut pair_ops = process_pubkey_full_trie(&mut orderbook, orders, params); + all_ops.append(&mut pair_ops); } - if is_pubkey_banned(ctx, &pubkey_without_prefix.into()) { - warn!("Pubkey {} is banned", pubkey); - continue; - } - let params = ProcessTrieParams { - pubkey: &pubkey, - alb_pair: &alb_pair, - protocol_infos: &protocol_infos, - conf_infos: &conf_infos, - }; - let _new_root = process_pubkey_full_trie(&mut orderbook, orders, params); + all_ops + }; + + // Phase 2: enqueue trie ops under the background worker + if !ops.is_empty() { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(ops); } let topic = orderbook_topic_from_base_rel(base, rel); - orderbook - .topics_subscribed_to - .insert(topic, OrderbookRequestingState::Requested); + { + let mut subs = ordermatch_ctx.orderbook_subscriptions.write(); + subs.insert(topic, OrderbookRequestingState::Requested); + } Ok(()) } @@ -510,26 +577,55 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul /// Note this function locks the [`OrdermatchContext::orderbook`] async mutex. fn insert_or_update_order(ctx: &MmArc, item: OrderbookItem) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - orderbook.insert_or_update_order_update_trie(item) + + // Phase 1: index under Orderbook lock + let ops = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + orderbook.index_insert_or_update(item) + }; + + // Phase 2: enqueue trie ops under the background worker + if !ops.is_empty() { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(ops); + } } // use this function when notify maker order created fn insert_or_update_my_order(ctx: &MmArc, item: OrderbookItem, my_order: &MakerOrder) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - orderbook.insert_or_update_order_update_trie(item); - if let Some(key) = my_order.p2p_privkey { - orderbook.my_p2p_pubkeys.insert(hex::encode(key.public_slice())); + + // Phase 1: index + my_p2p_pubkeys under Orderbook lock + let ops = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + let ops = orderbook.index_insert_or_update(item); + if let Some(key) = my_order.p2p_privkey { + orderbook.my_p2p_pubkeys.insert(hex::encode(key.public_slice())); + } + ops + }; + + // Phase 2: enqueue trie ops under the background worker + if !ops.is_empty() { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(ops); } } fn delete_my_order(ctx: &MmArc, uuid: Uuid, p2p_privkey: Option) { let ordermatch_ctx: Arc = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - orderbook.remove_order_trie_update(uuid); - if let Some(key) = p2p_privkey { - orderbook.my_p2p_pubkeys.remove(&hex::encode(key.public_slice())); + + // Phase 1: index remove + pubkey cleanup + let op = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + let op = orderbook.index_remove(uuid).map(|(_removed, op)| op); + if let Some(key) = p2p_privkey { + orderbook.my_p2p_pubkeys.remove(&hex::encode(key.public_slice())); + } + op + }; + + // Phase 2: enqueue trie op + if let Some(op) = op { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(vec![op]); } } @@ -545,40 +641,6 @@ where } } -fn remove_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, alb_pair: &str) { - let pubkey_state = match orderbook.pubkeys_state.get_mut(pubkey) { - Some(state) => state, - None => return, - }; - - if !pubkey_state.trie_roots.contains_key(alb_pair) { - return; - } - - pubkey_state.order_pairs_trie_state_history.remove(&alb_pair.to_owned()); - - let mut orders_to_remove = Vec::with_capacity(pubkey_state.orders_uuids.len()); - pubkey_state.orders_uuids.retain(|(uuid, alb)| { - if alb == alb_pair { - orders_to_remove.push(*uuid); - false - } else { - true - } - }); - - for order in orders_to_remove { - orderbook.remove_order_trie_update(order); - } - - let pubkey_state = match orderbook.pubkeys_state.get_mut(pubkey) { - Some(state) => state, - None => return, - }; - - pubkey_state.trie_roots.remove(alb_pair); -} - pub async fn handle_orderbook_msg( ctx: MmArc, topic: &TopicHash, @@ -637,7 +699,7 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay: Ok(()) }, new_protocol::OrdermatchMessage::MakerOrderUpdated(updated_msg) => { - process_maker_order_updated(ctx, pubkey.to_hex(), updated_msg) + process_maker_order_updated(&ctx, pubkey.to_hex(), updated_msg) }, } }, @@ -790,12 +852,14 @@ fn process_get_orderbook_request(ctx: MmArc, base: String, rel: String) -> Resul return ERR!("Orderbook too large"); } + let trie_store = ordermatch_ctx.trie_store.lock(); + let orders_to_send = pubkeys_orders .uuids_by_pubkey .into_iter() .map(|(pubkey, orders)| { - let pubkey_state = orderbook.pubkeys_state.get(&pubkey).ok_or(ERRL!( - "Orderbook::pubkeys_state is expected to contain the {:?} pubkey", + let pubkey_state = trie_store.pubkeys_state.get(&pubkey).ok_or(ERRL!( + "TrieStore::pubkeys_state is expected to contain the {:?} pubkey", pubkey ))?; @@ -942,7 +1006,8 @@ fn process_sync_pubkey_orderbook_state( ) -> Result, String> { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); let orderbook = ordermatch_ctx.orderbook.lock(); - let pubkey_state = some_or_return_ok_none!(orderbook.pubkeys_state.get(&pubkey)); + let trie_store = ordermatch_ctx.trie_store.lock(); + let pubkey_state = some_or_return_ok_none!(trie_store.pubkeys_state.get(&pubkey)); let order_getter = |uuid: &Uuid| orderbook.order_set.get(uuid).cloned(); let pair_orders_diff: Result, _> = trie_roots @@ -955,10 +1020,10 @@ fn process_sync_pubkey_orderbook_state( let delta_result = match pubkey_state.order_pairs_trie_state_history.get(&pair) { Some(history) => { - DeltaOrFullTrie::from_history(history, root, *actual_pair_root, &orderbook.memory_db, order_getter) + DeltaOrFullTrie::from_history(history, root, *actual_pair_root, &trie_store.memory_db, order_getter) }, None => { - get_full_trie(actual_pair_root, &orderbook.memory_db, order_getter).map(DeltaOrFullTrie::FullTrie) + get_full_trie(actual_pair_root, &trie_store.memory_db, order_getter).map(DeltaOrFullTrie::FullTrie) }, }; @@ -1096,12 +1161,23 @@ fn maker_order_created_p2p_notify( fn process_my_maker_order_updated(ctx: &MmArc, message: &new_protocol::MakerOrderUpdated) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - let uuid = message.uuid(); - if let Some(mut order) = orderbook.find_order_by_uuid(&uuid) { - order.apply_updated(message); - orderbook.insert_or_update_order_update_trie(order); + // Phase 1: update index and build trie ops + let ops = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + let uuid = message.uuid(); + if let Some(mut order) = orderbook.find_order_by_uuid(&uuid) { + order.apply_updated(message); + orderbook.index_insert_or_update(order) + } else { + Vec::new() + } + }; + + // Phase 2: enqueue trie ops + if !ops.is_empty() { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(ops); } } @@ -2428,14 +2504,14 @@ impl From for new_protocol::OrdermatchMessage { } } -fn broadcast_keep_alive_for_pub(ctx: &MmArc, pubkey: &str, orderbook: &Orderbook, p2p_privkey: Option<&KeyPair>) { - let state = match orderbook.pubkeys_state.get(pubkey) { +fn broadcast_keep_alive_for_pub(ctx: &MmArc, pubkey: &str, trie_store: &TrieStore, p2p_privkey: Option<&KeyPair>) { + let state = match trie_store.pubkeys_state.get(pubkey) { Some(s) => s, None => return, }; for (alb_pair, root) in state.trie_roots.iter() { - if *root == H64::default() && *root == hashed_null_node::() { + if *root == H64::default() || *root == hashed_null_node::() { continue; } @@ -2472,13 +2548,13 @@ pub async fn broadcast_maker_orders_keep_alive_loop(ctx: MmArc) { // but it seems to keep holding the guard drop(order); let pubsecp = hex::encode(p2p_privkey.public_slice()); - let orderbook = ordermatch_ctx.orderbook.lock(); - broadcast_keep_alive_for_pub(&ctx, &pubsecp, &orderbook, Some(p2p_privkey.key_pair())); + let trie_store = ordermatch_ctx.trie_store.lock(); + broadcast_keep_alive_for_pub(&ctx, &pubsecp, &trie_store, Some(p2p_privkey.key_pair())); } } - let orderbook = ordermatch_ctx.orderbook.lock(); - broadcast_keep_alive_for_pub(&ctx, &persistent_pubsecp, &orderbook, None); + let trie_store = ordermatch_ctx.trie_store.lock(); + broadcast_keep_alive_for_pub(&ctx, &persistent_pubsecp, &trie_store, None); } } @@ -2517,6 +2593,37 @@ enum OrderbookRequestingState { type H64 = [u8; 8]; +/// A narrow contract for trie mutations. The Orderbook builds these ops, +/// TrieStore applies them (and only TrieStore mutates MemoryDB/history). +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +enum TrieOp { + /// Reset an entire (pubkey, pair) subtrie. + /// + /// - Drops the subtrie's root and delta history for this (pubkey, pair). + /// - Intended to precede a full rebuild via subsequent Insert ops. + /// - After a clear, any remote delta requests that reference old roots will fall back to a FullTrie response. + ClearPair { pubkey: String, alb_pair: String }, + Insert { + pubkey: String, + alb_pair: String, + uuid: Uuid, + /// Full OrderbookItem is needed to maintain delta history + /// (and to regenerate deltas for Sync responses). + order: OrderbookItem, + }, + Remove { + pubkey: String, + alb_pair: String, + uuid: Uuid, + }, + /// Remove all trie state for a pubkey after prior per-UUID removals have been applied. + RemovePubkey { pubkey: String }, + #[cfg(test)] + /// Barrier op: notify when all previous ops (across prior messages) have been applied. + Flush(oneshot::Sender<()>), +} + #[derive(Debug, Clone, Eq, PartialEq)] struct TrieDiff { delta: Vec<(Key, Option)>, @@ -2624,6 +2731,268 @@ fn collect_orderbook_metrics(ctx: &MmArc, orderbook: &Orderbook) { mm_gauge!(ctx.metrics, "orderbook.len", orderbook.order_set.len() as f64); } +/// Trie-related state extracted from `Orderbook` to reduce contention on the main +/// order index. All trie operations go through this store. +#[derive(Default)] +pub struct TrieStore { + /// a map of orderbook states of known maker pubkeys + pubkeys_state: HashMap, + /// MemoryDB instance to store Patricia Tries data + memory_db: MemoryDB, +} + +impl TrieStore { + /// Apply a sequence of trie operations produced by the Orderbook. + /// This is the only place mutating MemoryDB and trie histories. + fn apply_ops(&mut self, ops: I) + where + I: IntoIterator, + { + #[derive(Default)] + struct Group { + clear: bool, + inserts: Vec<(Uuid, OrderbookItem)>, + removes: Vec, + } + + // 1) Group ops by (pubkey, alb_pair) to minimize repeated trie/historical touches. + let mut groups: HashMap<(String, String), Group> = HashMap::new(); + // Track pubkeys to remove after all per-pair ops are applied. + let mut pubkeys_to_remove: HashSet = HashSet::new(); + #[cfg(test)] + // Pending flush acknowledgements for this batch. + let mut flush_senders: Vec> = Vec::new(); + + for op in ops { + match op { + TrieOp::ClearPair { pubkey, alb_pair } => { + groups.entry((pubkey, alb_pair)).or_default().clear = true; + }, + TrieOp::Insert { + pubkey, + alb_pair, + uuid, + order, + } => { + let g = groups.entry((pubkey, alb_pair)).or_default(); + g.inserts.push((uuid, order)); + }, + TrieOp::Remove { pubkey, alb_pair, uuid } => { + groups.entry((pubkey, alb_pair)).or_default().removes.push(uuid); + }, + TrieOp::RemovePubkey { pubkey } => { + pubkeys_to_remove.insert(pubkey); + }, + #[cfg(test)] + TrieOp::Flush(done) => { + flush_senders.push(done); + }, + } + } + + // 2) Apply per group: ClearPair (if any) -> all Inserts -> all Removes + for ((pubkey, alb_pair), g) in groups { + if g.clear { + self.apply_clear_pair(&pubkey, &alb_pair); + } + // TODO(perf): reuse a single TrieDBMut for all inserts in this (pubkey, pair) group. + for (uuid, order) in g.inserts { + self.apply_insert(&pubkey, &alb_pair, uuid, order); + } + for uuid in g.removes { + self.apply_remove(&pubkey, &alb_pair, uuid); + } + } + + // 3) Remove entire pubkey states after their per-UUID removals have been processed. + for pubkey in pubkeys_to_remove { + self.pubkeys_state.remove(&pubkey); + } + + #[cfg(test)] + // 4) Notify flush waiters for this batch. + for tx in flush_senders { + let _ = tx.send(()); + } + } + + fn apply_insert(&mut self, pubkey: &str, alb_pair: &str, uuid: Uuid, order: OrderbookItem) { + let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, pubkey); + + let pair_root = order_pair_root_mut(&mut pubkey_state.trie_roots, alb_pair); + let prev_root = *pair_root; + + pubkey_state.orders_uuids.insert((uuid, alb_pair.to_owned())); + + { + let mut pair_trie = match get_trie_mut(&mut self.memory_db, pair_root) { + Ok(trie) => trie, + Err(e) => { + error!("Error {} getting trie with root {:?}", e, prev_root); + return; + }, + }; + let order_bytes = order.trie_state_bytes(); + if let Err(e) = pair_trie.insert(uuid.as_bytes(), &order_bytes) { + error!( + "Error {:?} on insertion to trie. Key {}, value {:?}", + e, uuid, order_bytes + ); + return; + }; + } + + if prev_root != H64::default() { + let alb_pair_owned = alb_pair.to_owned(); + + let _ = pubkey_state + .order_pairs_trie_state_history + .update_expiration_status(alb_pair_owned.clone(), Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT)); + + let history = match pubkey_state + .order_pairs_trie_state_history + .get_mut_unchecked(&alb_pair_owned) + { + Some(t) => t, + None => { + pubkey_state.order_pairs_trie_state_history.insert_expirable( + alb_pair_owned.clone(), + TrieOrderHistory { + inner: TimedMap::new_with_map_kind(MapKind::FxHashMap), + }, + Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT), + ); + + pubkey_state + .order_pairs_trie_state_history + .get_mut_unchecked(&alb_pair_owned) + .expect("must exist") + }, + }; + + history.insert_new_diff( + prev_root, + TrieDiff { + delta: vec![(uuid, Some(order))], + next_root: *pair_root, + }, + ); + } + } + + fn apply_remove(&mut self, pubkey: &str, alb_pair: &str, uuid: Uuid) { + let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, pubkey); + let pair_state = order_pair_root_mut(&mut pubkey_state.trie_roots, alb_pair); + let old_state = *pair_state; + + let to_remove = &(uuid, alb_pair.to_owned()); + pubkey_state.orders_uuids.remove(to_remove); + + if old_state == H64::default() || old_state == hashed_null_node::() { + return; + } + + *pair_state = match delta_trie_root::( + &mut self.memory_db, + *pair_state, + vec![(*uuid.as_bytes(), None::>)], + ) { + Ok(root) => root, + Err(_) => { + error!("Failed to get existing trie with root {:?}", pair_state); + return; + }, + }; + + let alb_pair_owned = alb_pair.to_owned(); + + let _ = pubkey_state + .order_pairs_trie_state_history + .update_expiration_status(alb_pair_owned.clone(), Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT)); + + if let Some(history) = pubkey_state + .order_pairs_trie_state_history + .get_mut_unchecked(&alb_pair_owned) + { + history.insert_new_diff( + old_state, + TrieDiff { + delta: vec![(uuid, None)], + next_root: *pair_state, + }, + ); + } + } + + fn apply_clear_pair(&mut self, pubkey: &str, alb_pair: &str) { + if let Some(pubkey_state) = self.pubkeys_state.get_mut(pubkey) { + pubkey_state.order_pairs_trie_state_history.remove(&alb_pair.to_owned()); + + pubkey_state.orders_uuids.retain(|(_uuid, pair)| pair != alb_pair); + + pubkey_state.trie_roots.remove(alb_pair); + } + } + + /// Build a SyncPubkeyOrderbookState request if keep-alive indicates our local trie roots are stale. + /// This avoids touching Orderbook (and its lock). Topic subscription is provided via `is_subscribed`. + fn prepare_sync_request_for_keep_alive( + &mut self, + from_pubkey: &str, + message: new_protocol::PubkeyKeepAlive, + i_am_relay: bool, + is_subscribed: impl Fn(&str) -> bool, + ) -> Option { + let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); + pubkey_state.last_keep_alive = now_sec(); + + let mut trie_roots_to_request = HashMap::new(); + for (alb_pair, trie_root) in message.trie_roots { + let topic = orderbook_topic_from_ordered_pair(&alb_pair); + let subscribed = is_subscribed(&topic); + if !subscribed && !i_am_relay { + continue; + } + + if trie_root == H64::default() || trie_root == hashed_null_node::() { + log::debug!( + "Received zero or hashed_null_node pair {} trie root from pub {}", + alb_pair, + from_pubkey + ); + continue; + } + + let actual_trie_root = order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_pair); + if *actual_trie_root != trie_root { + trie_roots_to_request.insert(alb_pair, trie_root); + } + } + + if trie_roots_to_request.is_empty() { + return None; + } + + Some(OrdermatchRequest::SyncPubkeyOrderbookState { + pubkey: from_pubkey.to_owned(), + trie_roots: trie_roots_to_request, + }) + } +} + +fn spawn_trie_store_worker(ctx: &MmArc, trie_store: Arc>) -> UnboundedSender> { + // TODO(rate-limiting): when implementing rate-limiting for orderbook messages we can have a bounded channel here if needed + let (tx, mut rx) = unbounded::>(); + let spawner = ctx.spawner(); + spawner.spawn(async move { + while let Some(ops) = rx.next().await { + let mut store = trie_store.lock(); + store.apply_ops(ops); + } + }); + tx +} + struct Orderbook { /// A map from (base, rel). ordered: HashMap<(String, String), BTreeSet>, @@ -2634,16 +3003,11 @@ struct Orderbook { /// A map from (base, rel). unordered: HashMap<(String, String), HashSet>, order_set: HashMap, - /// a map of orderbook states of known maker pubkeys - pubkeys_state: HashMap, /// `TimedMap` of recently canceled orders, mapping `Uuid` to the maker pubkey as `String`, /// used to avoid order recreation in case of out-of-order p2p messages, /// e.g., when receiving the order cancellation message before the order is created. /// Entries are kept for `RECENTLY_CANCELLED_TIMEOUT` seconds. recently_cancelled: TimedMap, - topics_subscribed_to: HashMap, - /// MemoryDB instance to store Patricia Tries data - memory_db: MemoryDB, my_p2p_pubkeys: HashSet, /// A copy of the streaming manager to stream orderbook events out. streaming_manager: StreamingManager, @@ -2657,10 +3021,7 @@ impl Default for Orderbook { pairs_existing_for_rel: HashMap::default(), unordered: HashMap::default(), order_set: HashMap::default(), - pubkeys_state: HashMap::default(), recently_cancelled: TimedMap::new_with_map_kind(MapKind::FxHashMap), - topics_subscribed_to: HashMap::default(), - memory_db: MemoryDB::default(), my_p2p_pubkeys: HashSet::default(), streaming_manager: Default::default(), } @@ -2693,90 +3054,41 @@ impl Orderbook { self.order_set.get(uuid).cloned() } - fn insert_or_update_order_update_trie(&mut self, order: OrderbookItem) { + /// Index-only method: updates in-memory indices and returns the trie mutations + /// that must be applied by TrieStore. No trie/memory_db mutation happens here. + fn index_insert_or_update(&mut self, order: OrderbookItem) -> Vec { // Ignore the order if it was recently cancelled if self.recently_cancelled.get(&order.uuid) == Some(&order.pubkey) { warn!("Maker order {} was recently cancelled, ignoring", order.uuid); - return; + return Vec::new(); } + let mut trie_ops = vec![]; let zero = BigRational::from_integer(0.into()); - if order.max_volume <= zero || order.price <= zero || order.min_volume < zero { - self.remove_order_trie_update(order.uuid); - return; - } // else insert the order - self.insert_or_update_order(order.clone()); - - let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, &order.pubkey); - - let alb_ordered = alb_ordered_pair(&order.base, &order.rel); - let pair_root = order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_ordered); - let prev_root = *pair_root; - - pubkey_state.orders_uuids.insert((order.uuid, alb_ordered.clone())); - - { - let mut pair_trie = match get_trie_mut(&mut self.memory_db, pair_root) { - Ok(trie) => trie, - Err(e) => { - error!("Error getting {} trie with root {:?}", e, prev_root); - return; - }, - }; - let order_bytes = order.trie_state_bytes(); - if let Err(e) = pair_trie.insert(order.uuid.as_bytes(), &order_bytes) { - error!( - "Error {:?} on insertion to trie. Key {}, value {:?}", - e, order.uuid, order_bytes - ); - return; - }; + if order.max_volume <= zero || order.price <= zero || order.min_volume < zero { + if let Some((_removed, op)) = self.index_remove(order.uuid) { + trie_ops.push(op); + } + return trie_ops; } - if prev_root != H64::default() { - let _ = pubkey_state - .order_pairs_trie_state_history - .update_expiration_status(alb_ordered.clone(), Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT)); - - let history = match pubkey_state - .order_pairs_trie_state_history - .get_mut_unchecked(&alb_ordered) - { - Some(t) => t, - None => { - pubkey_state.order_pairs_trie_state_history.insert_expirable( - alb_ordered.clone(), - TrieOrderHistory { - inner: TimedMap::new_with_map_kind(MapKind::FxHashMap), - }, - Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT), - ); - - pubkey_state - .order_pairs_trie_state_history - .get_mut_unchecked(&alb_ordered) - .expect("must exist") - }, - }; - - history.insert_new_diff( - prev_root, - TrieDiff { - delta: vec![(order.uuid, Some(order.clone()))], - next_root: *pair_root, - }, - ); - } + let alb_pair = alb_ordered_pair(&order.base, &order.rel); + let op = TrieOp::Insert { + pubkey: order.pubkey.clone(), + alb_pair, + uuid: order.uuid, + order: order.clone(), + }; + self.index_insert_or_update_inner(order); + trie_ops.push(op); + trie_ops } - fn insert_or_update_order(&mut self, order: OrderbookItem) { + /// Pure index update (no trie changes): replaces/creates an order in memory structures + /// and emits the "NewOrUpdatedItem" event. + fn index_insert_or_update_inner(&mut self, order: OrderbookItem) { log::debug!("Inserting order {:?}", order); - let zero = BigRational::from_integer(0.into()); - if order.max_volume <= zero || order.price <= zero || order.min_volume < zero { - self.remove_order_trie_update(order.uuid); - return; - } // else insert the order let base_rel = (order.base.clone(), order.rel.clone()); @@ -2817,7 +3129,9 @@ impl Orderbook { self.order_set.insert(order.uuid, order); } - fn remove_order_trie_update(&mut self, uuid: Uuid) -> Option { + /// Pure index removal (no trie changes): removes from in-memory indices, + /// emits the "RemovedItem" event and returns the removed order and TrieOp. + fn index_remove(&mut self, uuid: Uuid) -> Option<(OrderbookItem, TrieOp)> { let order = self.order_set.remove(&uuid)?; let base_rel = (order.base.clone(), order.rel.clone()); @@ -2842,96 +3156,44 @@ impl Orderbook { } } - let alb_ordered = alb_ordered_pair(&order.base, &order.rel); - let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, &order.pubkey); - let pair_state = order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_ordered); - let old_state = *pair_state; - - let to_remove = &(uuid, alb_ordered.clone()); - pubkey_state.orders_uuids.remove(to_remove); - - *pair_state = match delta_trie_root::( - &mut self.memory_db, - *pair_state, - vec![(*order.uuid.as_bytes(), None::>)], - ) { - Ok(root) => root, - Err(_) => { - error!("Failed to get existing trie with root {:?}", pair_state); - return Some(order); - }, - }; - - let _ = pubkey_state - .order_pairs_trie_state_history - .update_expiration_status(alb_ordered.clone(), Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT)); - - if let Some(history) = pubkey_state - .order_pairs_trie_state_history - .get_mut_unchecked(&alb_ordered) - { - history.insert_new_diff( - old_state, - TrieDiff { - delta: vec![(uuid, None)], - next_root: *pair_state, - }, - ); - } - self.streaming_manager .send_fn( &OrderbookStreamer::derive_streamer_id((&order.base, &order.rel)), || OrderbookItemChangeEvent::RemovedItem(order.uuid), ) .ok(); - Some(order) - } - fn is_subscribed_to(&self, topic: &str) -> bool { - self.topics_subscribed_to.contains_key(topic) + // Stage a trie removal op for TrieStore + let alb_pair = alb_ordered_pair(&order.base, &order.rel); + let op = TrieOp::Remove { + pubkey: order.pubkey.clone(), + alb_pair, + uuid, + }; + + Some((order, op)) } - fn process_keep_alive( - &mut self, - from_pubkey: &str, - message: new_protocol::PubkeyKeepAlive, - i_am_relay: bool, - ) -> Option { - let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); - pubkey_state.last_keep_alive = now_sec(); - let mut trie_roots_to_request = HashMap::new(); - for (alb_pair, trie_root) in message.trie_roots { - let subscribed = self - .topics_subscribed_to - .contains_key(&orderbook_topic_from_ordered_pair(&alb_pair)); - if !subscribed && !i_am_relay { - continue; - } + fn index_remove_pubkey_pair_orders(&mut self, pubkey: &str, alb_pair: &str) { + let (base, rel) = match alb_pair.split_once(':') { + Some((a, b)) => (a, b), + None => return, + }; - if trie_root == H64::default() || trie_root == hashed_null_node::() { - log::debug!( - "Received zero or hashed_null_node pair {} trie root from pub {}", - alb_pair, - from_pubkey - ); + let pairs = [(base.to_owned(), rel.to_owned()), (rel.to_owned(), base.to_owned())]; - continue; - } - let actual_trie_root = order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_pair); - if *actual_trie_root != trie_root { - trie_roots_to_request.insert(alb_pair, trie_root); + for pair in pairs { + if let Some(uuids) = self.unordered.get(&pair).cloned() { + for uuid in uuids { + if let Some(order) = self.order_set.get(&uuid) { + if order.pubkey == pubkey { + // ignore the trie op here, we’re going to ClearPair at the trie layer + let _ = self.index_remove(uuid); + } + } + } } } - - if trie_roots_to_request.is_empty() { - return None; - } - - Some(OrdermatchRequest::SyncPubkeyOrderbookState { - pubkey: from_pubkey.to_owned(), - trie_roots: trie_roots_to_request, - }) } fn orderbook_item_with_proof(&self, order: OrderbookItem) -> OrderbookItemWithProof { @@ -2947,6 +3209,12 @@ struct OrdermatchContext { pub maker_orders_ctx: PaMutex, pub my_taker_orders: AsyncMutex>, pub orderbook: PaMutex, + /// Trie data store is extracted from `Orderbook` to reduce contention. + pub trie_store: Arc>, + /// Sender to enqueue trie mutations for background application. + pub trie_ops_tx: UnboundedSender>, + /// Tracks which orderbook topics we are subscribed to, separate from the order index. + pub orderbook_subscriptions: PaRwLock>, /// The map from coin original ticker to the orderbook ticker /// It is used to share the same orderbooks for concurrently activated coins with different protocols /// E.g. BTC and BTC-Segwit @@ -2985,10 +3253,16 @@ pub fn init_ordermatch_context(ctx: &MmArc) -> OrdermatchInitResult<()> { } } + let trie_store = Arc::new(PaMutex::new(TrieStore::default())); + let trie_ops_tx = spawn_trie_store_worker(ctx, trie_store.clone()); + let ordermatch_context = OrdermatchContext { maker_orders_ctx: PaMutex::new(MakerOrdersContext::new(ctx)?), my_taker_orders: Default::default(), orderbook: PaMutex::new(Orderbook::new(ctx.event_stream_manager.clone())), + trie_store, + trie_ops_tx, + orderbook_subscriptions: PaRwLock::new(HashMap::default()), pending_maker_reserved: Default::default(), orderbook_tickers, original_tickers, @@ -3015,10 +3289,15 @@ impl OrdermatchContext { #[cfg(test)] fn from_ctx(ctx: &MmArc) -> Result, String> { Ok(try_s!(from_ctx(&ctx.ordermatch_ctx, move || { + let trie_store = Arc::new(PaMutex::new(TrieStore::default())); + let trie_ops_tx = spawn_trie_store_worker(ctx, trie_store.clone()); Ok(OrdermatchContext { maker_orders_ctx: PaMutex::new(try_s!(MakerOrdersContext::new(ctx))), my_taker_orders: Default::default(), orderbook: PaMutex::new(Orderbook::new(ctx.event_stream_manager.clone())), + trie_store, + trie_ops_tx, + orderbook_subscriptions: PaRwLock::new(HashMap::default()), pending_maker_reserved: Default::default(), orderbook_tickers: Default::default(), original_tickers: Default::default(), @@ -3047,6 +3326,16 @@ impl OrdermatchContext { pub async fn ordermatch_db(&self) -> InitDbResult> { self.ordermatch_db.get_or_initialize().await } + + /// Block until the background trie worker has applied all previously enqueued ops. + /// This still goes through the unbounded_send path and only waits for a Flush ack. + #[cfg(test)] + pub fn wait_trie_ops_flushed(&self) { + let (tx, rx) = oneshot::channel::<()>(); + let _ = self.trie_ops_tx.unbounded_send(vec![TrieOp::Flush(tx)]); + // Wait for acknowledgement from the worker + let _ = futures::executor::block_on(rx); + } } pub struct MakerOrdersContext { @@ -3684,30 +3973,49 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { } { - // remove "timed out" pubkeys states with their orders from orderbook - let mut orderbook = ordermatch_ctx.orderbook.lock(); - let mut uuids_to_remove = vec![]; - let mut pubkeys_to_remove = vec![]; - for (pubkey, state) in orderbook.pubkeys_state.iter() { - let is_ours = orderbook.my_p2p_pubkeys.contains(pubkey); - let to_keep = - pubkey == &my_pubsecp || is_ours || state.last_keep_alive + maker_order_timeout > now_sec(); - if !to_keep { - for (uuid, _) in &state.orders_uuids { - uuids_to_remove.push(*uuid); + // remove "timed out" pubkeys states with their orders from trie store + index + // Step 1: snapshot which uuids and pubkeys to remove + let (uuids_to_remove, pubkeys_to_remove) = { + let orderbook = ordermatch_ctx.orderbook.lock(); + let trie_store = ordermatch_ctx.trie_store.lock(); + let mut uuids = Vec::new(); + let mut pubs = Vec::new(); + for (pubkey, state) in trie_store.pubkeys_state.iter() { + let is_ours = orderbook.my_p2p_pubkeys.contains(pubkey); + let to_keep = + pubkey == &my_pubsecp || is_ours || state.last_keep_alive + maker_order_timeout > now_sec(); + if !to_keep { + for (uuid, _) in &state.orders_uuids { + uuids.push(*uuid); + } + pubs.push(pubkey.clone()); } - pubkeys_to_remove.push(pubkey.clone()); } - } + (uuids, pubs) + }; - for uuid in uuids_to_remove { - orderbook.remove_order_trie_update(uuid); - } + // Step 2: drop from index and build trie ops + let mut ops = { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + let mut ops = Vec::new(); + for uuid in uuids_to_remove { + if let Some((_removed, op)) = orderbook.index_remove(uuid) { + ops.push(op); + } + } + collect_orderbook_metrics(&ctx, &orderbook); + ops + }; + + // Step 3: enqueue pubkey removals to be applied AFTER per-UUID trie removals for pubkey in pubkeys_to_remove { - orderbook.pubkeys_state.remove(&pubkey); + ops.push(TrieOp::RemovePubkey { pubkey }); } - collect_orderbook_metrics(&ctx, &orderbook); + // Step 4: enqueue trie ops + if !ops.is_empty() { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(ops); + } } { @@ -3831,8 +4139,8 @@ pub async fn clean_memory_loop(ctx_weak: MmWeak) { } let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - orderbook.memory_db.purge(); + let mut trie_store = ordermatch_ctx.trie_store.lock(); + trie_store.memory_db.purge(); } Timer::sleep(600.).await; } @@ -6156,9 +6464,9 @@ async fn subscribe_to_orderbook_topic( let topic = orderbook_topic_from_base_rel(base, rel); let is_orderbook_filled = { let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx)); - let mut orderbook = ordermatch_ctx.orderbook.lock(); + let mut subs = ordermatch_ctx.orderbook_subscriptions.write(); - match orderbook.topics_subscribed_to.entry(topic.clone()) { + match subs.entry(topic.clone()) { Entry::Vacant(e) => { // we weren't subscribed to the topic yet e.insert(OrderbookRequestingState::NotRequested { diff --git a/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs b/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs index 1772acbe61..a420d98408 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs @@ -50,12 +50,13 @@ pub async fn orderbook_depth_rpc(ctx: MmArc, req: Json) -> Result = { let orderbook = ordermatch_ctx.orderbook.lock(); + let subs = ordermatch_ctx.orderbook_subscriptions.read(); req.pairs .into_iter() .filter_map(|original_pair| { let orderbook_pair = ordermatch_ctx.orderbook_pair_bypass(&original_pair); let topic = orderbook_topic_from_base_rel(&orderbook_pair.0, &orderbook_pair.1); - if orderbook.is_subscribed_to(&topic) { + if subs.contains_key(&topic) { let asks = orderbook .unordered .get(&orderbook_pair) diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index 56915a4e25..d2dabaac8d 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -1910,15 +1910,10 @@ fn test_process_get_orderbook_request() { orders_by_pubkeys.insert(pubkey2, pubkey2_orders); orders_by_pubkeys.insert(pubkey3, pubkey3_orders); - let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - for order in orders_by_pubkeys.values().flatten() { - orderbook.insert_or_update_order_update_trie(order.clone()); + insert_or_update_order(&ctx, order.clone()); } - - // avoid dead lock on orderbook as process_get_orderbook_request also acquires it - drop(orderbook); + flush_trie(&ctx); let encoded = process_get_orderbook_request(ctx, "RICK".into(), "MORTY".into()) .unwrap() @@ -1950,8 +1945,6 @@ fn test_process_get_orderbook_request() { #[test] fn test_process_get_orderbook_request_limit() { let (ctx, pubkey, secret) = make_ctx_for_tests(); - let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock(); let orders = make_random_orders( pubkey, @@ -1962,11 +1955,9 @@ fn test_process_get_orderbook_request_limit() { ); for order in orders { - orderbook.insert_or_update_order_update_trie(order); + insert_or_update_order(&ctx, order); } - - // avoid dead lock on orderbook as process_get_orderbook_request also acquires it - drop(orderbook); + flush_trie(&ctx); let err = process_get_orderbook_request(ctx, "RICK".into(), "MORTY".into()).expect_err("Expected an error"); @@ -2007,6 +1998,7 @@ fn test_request_and_fill_orderbook() { insert_or_update_order(&ctx, extra_order); } } + flush_trie(&ctx); let expected_request = P2PRequest::Ordermatch(OrdermatchRequest::GetOrderbook { base: "RICK".into(), @@ -2059,6 +2051,7 @@ fn test_request_and_fill_orderbook() { }); block_on(request_and_fill_orderbook(&ctx, "RICK", "MORTY")).unwrap(); + flush_trie(&ctx); // check if the best asks and bids are in the orderbook let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); @@ -2090,7 +2083,8 @@ fn test_request_and_fill_orderbook() { let rick_morty_pair = alb_ordered_pair("RICK", "MORTY"); for (pubkey, orders) in expected_orders { - let pubkey_state = orderbook + let trie_store = ordermatch_ctx.trie_store.lock(); + let pubkey_state = trie_store .pubkeys_state .get(&pubkey) .unwrap_or_else(|| panic!("!pubkey_state.get() {} pubkey", pubkey)); @@ -2107,7 +2101,7 @@ fn test_request_and_fill_orderbook() { .unwrap_or_else(|| panic!("!pubkey_state.trie_roots.get() {}", rick_morty_pair)); // check if the root contains only expected orders - let trie = TrieDB::::new(&orderbook.memory_db, root).expect("!TrieDB::new()"); + let trie = TrieDB::::new(&trie_store.memory_db, root).expect("!TrieDB::new()"); let mut in_trie: Vec<(Uuid, OrderbookItem)> = trie .iter() .expect("!TrieDB::iter()") @@ -2428,16 +2422,16 @@ fn test_taker_request_can_match_with_uuid() { #[test] fn test_orderbook_insert_or_update_order() { - let (_, pubkey, secret) = make_ctx_for_tests(); - let mut orderbook = Orderbook::default(); + let (ctx, pubkey, secret) = make_ctx_for_tests(); let order = make_random_orders(pubkey, &secret, "C1".into(), "C2".into(), 1).remove(0); - orderbook.insert_or_update_order_update_trie(order); + insert_or_update_order(&ctx, order); + flush_trie(&ctx); } fn pair_trie_root_by_pub(ctx: &MmArc, pubkey: &str, pair: &str) -> H64 { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let orderbook = ordermatch_ctx.orderbook.lock(); - *orderbook + let trie_store = ordermatch_ctx.trie_store.lock(); + *trie_store .pubkeys_state .get(pubkey) .unwrap() @@ -2448,14 +2442,23 @@ fn pair_trie_root_by_pub(ctx: &MmArc, pubkey: &str, pair: &str) -> H64 { fn clone_orderbook_memory_db(ctx: &MmArc) -> MemoryDB { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let orderbook = ordermatch_ctx.orderbook.lock(); - orderbook.memory_db.clone() + let trie_store = ordermatch_ctx.trie_store.lock(); + trie_store.memory_db.clone() } fn remove_order(ctx: &MmArc, uuid: Uuid) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - orderbook.remove_order_trie_update(uuid); + if let Some((_removed, op)) = ordermatch_ctx.orderbook.lock().index_remove(uuid) { + let _ = ordermatch_ctx.trie_ops_tx.unbounded_send(vec![op]); + // Ensure the removal is applied by the background worker before proceeding + ordermatch_ctx.wait_trie_ops_flushed(); + }; +} + +/// Wait until the background trie worker has applied all pending ops. +fn flush_trie(ctx: &MmArc) { + let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); + ordermatch_ctx.wait_trie_ops_flushed(); } #[test] @@ -2466,6 +2469,7 @@ fn test_process_sync_pubkey_orderbook_state_after_new_orders_added() { for order in orders { insert_or_update_order(&ctx, order); } + flush_trie(&ctx); let alb_ordered_pair = alb_ordered_pair("C1", "C2"); let pair_trie_root = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); @@ -2478,6 +2482,7 @@ fn test_process_sync_pubkey_orderbook_state_after_new_orders_added() { for order in new_orders { insert_or_update_order(&ctx, order.clone()); } + flush_trie(&ctx); let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state) .unwrap() @@ -2516,16 +2521,18 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_insert() { for order in orders.clone() { insert_or_update_order(&ctx, order); } + flush_trie(&ctx); let alb_ordered_pair = alb_ordered_pair("C1", "C2"); let pair_trie_root = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); for order in orders { insert_or_update_order(&ctx, order); } + flush_trie(&ctx); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let orderbook = ordermatch_ctx.orderbook.lock(); - let pubkey_state = orderbook.pubkeys_state.get(&pubkey).unwrap(); + let trie_store = ordermatch_ctx.trie_store.lock(); + let pubkey_state = trie_store.pubkeys_state.get(&pubkey).unwrap(); assert!(!pubkey_state .order_pairs_trie_state_history .get(&alb_ordered_pair) @@ -2541,6 +2548,7 @@ fn test_process_sync_pubkey_orderbook_state_after_orders_removed() { for order in orders.clone() { insert_or_update_order(&ctx, order); } + flush_trie(&ctx); let alb_ordered_pair = alb_ordered_pair("C1", "C2"); let pair_trie_root = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); @@ -2555,6 +2563,7 @@ fn test_process_sync_pubkey_orderbook_state_after_orders_removed() { for order in to_remove { remove_order(&ctx, order.uuid); } + flush_trie(&ctx); let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state) .unwrap() @@ -2598,13 +2607,14 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_remove() { for uuid in &to_remove { remove_order(&ctx, *uuid); } + flush_trie(&ctx); let alb_ordered_pair = alb_ordered_pair("C1", "C2"); let pair_trie_root = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let orderbook = ordermatch_ctx.orderbook.lock(); - let pubkey_state = orderbook.pubkeys_state.get(&pubkey).unwrap(); + let trie_store = ordermatch_ctx.trie_store.lock(); + let pubkey_state = trie_store.pubkeys_state.get(&pubkey).unwrap(); assert!(!pubkey_state .order_pairs_trie_state_history .get(&alb_ordered_pair) @@ -2614,11 +2624,15 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_remove() { #[test] fn test_orderbook_pubkey_sync_request() { - let mut orderbook = Orderbook::default(); - orderbook.topics_subscribed_to.insert( - orderbook_topic_from_base_rel("C1", "C2"), - OrderbookRequestingState::Requested, - ); + let ctx = mm_ctx_with_iguana(None); + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + { + let mut subs = ordermatch_ctx.orderbook_subscriptions.write(); + subs.insert( + orderbook_topic_from_base_rel("C1", "C2"), + OrderbookRequestingState::Requested, + ); + } let pubkey = "pubkey"; let mut trie_roots = HashMap::new(); @@ -2630,7 +2644,15 @@ fn test_orderbook_pubkey_sync_request() { timestamp: now_sec(), }; - let request = orderbook.process_keep_alive(pubkey, message, false).unwrap(); + let subscribed_topics: HashSet = { + let subs = ordermatch_ctx.orderbook_subscriptions.read(); + subs.keys().cloned().collect() + }; + let mut trie_store = ordermatch_ctx.trie_store.lock(); + let request = trie_store + .prepare_sync_request_for_keep_alive(pubkey, message, false, |topic| subscribed_topics.contains(topic)) + .unwrap(); + match request { OrdermatchRequest::SyncPubkeyOrderbookState { trie_roots: pairs_trie_roots, @@ -2645,11 +2667,15 @@ fn test_orderbook_pubkey_sync_request() { #[test] fn test_orderbook_pubkey_sync_request_relay() { - let mut orderbook = Orderbook::default(); - orderbook.topics_subscribed_to.insert( - orderbook_topic_from_base_rel("C1", "C2"), - OrderbookRequestingState::Requested, - ); + let ctx = mm_ctx_with_iguana(None); + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + { + let mut subs = ordermatch_ctx.orderbook_subscriptions.write(); + subs.insert( + orderbook_topic_from_base_rel("C1", "C2"), + OrderbookRequestingState::Requested, + ); + } let pubkey = "pubkey"; let mut trie_roots = HashMap::new(); @@ -2661,7 +2687,15 @@ fn test_orderbook_pubkey_sync_request_relay() { timestamp: now_sec(), }; - let request = orderbook.process_keep_alive(pubkey, message, true).unwrap(); + let subscribed_topics: HashSet = { + let subs = ordermatch_ctx.orderbook_subscriptions.read(); + subs.keys().cloned().collect() + }; + let mut trie_store = ordermatch_ctx.trie_store.lock(); + let request = trie_store + .prepare_sync_request_for_keep_alive(pubkey, message, true, |topic| subscribed_topics.contains(topic)) + .unwrap(); + match request { OrdermatchRequest::SyncPubkeyOrderbookState { trie_roots: pairs_trie_roots, @@ -2732,16 +2766,17 @@ fn test_process_sync_pubkey_orderbook_state_points_to_not_uptodate_trie_root() { for order in orders.iter() { insert_or_update_order(&ctx, order.clone()); } + flush_trie(&ctx); let alb_pair = alb_ordered_pair("RICK", "MORTY"); // update trie root by adding a new order and do not update history let (old_root, _new_root) = { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + let mut trie_store = ordermatch_ctx.trie_store.lock(); let mut orderbook = ordermatch_ctx.orderbook.lock(); - log!("{:?}, found {:?}", pubkey, orderbook.pubkeys_state.keys()); - let old_root = *orderbook + let old_root = *trie_store .pubkeys_state .get_mut(&pubkey) .expect("!pubkeys_state") @@ -2751,13 +2786,13 @@ fn test_process_sync_pubkey_orderbook_state_points_to_not_uptodate_trie_root() { let order_bytes = new_order.trie_state_bytes(); let mut new_root = old_root; - let mut trie = get_trie_mut(&mut orderbook.memory_db, &mut new_root).expect("!get_trie_mut"); + let mut trie = get_trie_mut(&mut trie_store.memory_db, &mut new_root).expect("!get_trie_mut"); trie.insert(new_order.uuid.as_bytes(), &order_bytes) .expect("Error on order insertion"); drop(trie); - // update root in orderbook trie_roots - orderbook + // update root in trie_store trie_roots + trie_store .pubkeys_state .get_mut(&pubkey) .expect("!pubkeys_state") @@ -2791,8 +2826,11 @@ fn test_process_sync_pubkey_orderbook_state_points_to_not_uptodate_trie_root() { assert_eq!(full_trie, expected); } -fn check_if_orderbook_contains_only(orderbook: &Orderbook, pubkey: &str, orders: &[OrderbookItem]) { - let pubkey_state = orderbook.pubkeys_state.get(pubkey).expect("!pubkeys_state"); +fn check_if_orderbook_contains_only(ctx: &MmArc, pubkey: &str, orders: &[OrderbookItem]) { + let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); + let orderbook = ordermatch_ctx.orderbook.lock(); + let trie_store = ordermatch_ctx.trie_store.lock(); + let pubkey_state = trie_store.pubkeys_state.get(pubkey).expect("!pubkeys_state"); // order_set let expected_set: HashMap<_, _> = orders.iter().map(|order| (order.uuid, order.clone())).collect(); @@ -2822,7 +2860,7 @@ fn check_if_orderbook_contains_only(orderbook: &Orderbook, pubkey: &str, orders: } assert_eq!(orderbook.unordered, expected_unordered); - // history + // history keys let actual_keys: HashSet<_> = pubkey_state .order_pairs_trie_state_history .keys() @@ -2842,12 +2880,12 @@ fn check_if_orderbook_contains_only(orderbook: &Orderbook, pubkey: &str, orders: .collect(); assert_eq!(pubkey_state.orders_uuids, expected_uuids); - // trie_roots + // trie_roots contents let actual_trie_orders: HashMap<_, _> = pubkey_state .trie_roots .iter() .map(|(alb_pair, trie_root)| { - let trie = TrieDB::::new(&orderbook.memory_db, trie_root).expect("!TrieDB::new"); + let trie = TrieDB::::new(&trie_store.memory_db, trie_root).expect("!TrieDB::new"); let mut trie: Vec<(Uuid, OrderbookItem)> = trie .iter() .expect("!TrieDB::iter") @@ -2862,6 +2900,7 @@ fn check_if_orderbook_contains_only(orderbook: &Orderbook, pubkey: &str, orders: (alb_pair.clone(), trie) }) .collect(); + let mut expected_trie_orders = HashMap::new(); for order in orders.iter() { let trie = expected_trie_orders @@ -2884,14 +2923,23 @@ fn test_remove_and_purge_pubkey_pair_orders() { for order in rick_morty_orders.iter().chain(rick_kmd_orders.iter()) { insert_or_update_order(&ctx, order.clone()); } + flush_trie(&ctx); let rick_morty_pair = alb_ordered_pair("RICK", "MORTY"); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - - remove_pubkey_pair_orders(&mut orderbook, &pubkey, &rick_morty_pair); - check_if_orderbook_contains_only(&orderbook, &pubkey, &rick_kmd_orders); + { + let mut orderbook = ordermatch_ctx.orderbook.lock(); + orderbook.index_remove_pubkey_pair_orders(&pubkey, &rick_morty_pair); + } + { + let mut trie_store = ordermatch_ctx.trie_store.lock(); + trie_store.apply_ops(vec![TrieOp::ClearPair { + pubkey: pubkey.clone(), + alb_pair: rick_morty_pair.clone(), + }]); + } + check_if_orderbook_contains_only(&ctx, &pubkey, &rick_kmd_orders); } #[test] @@ -2904,40 +2952,46 @@ fn test_orderbook_sync_trie_diff_time_cache() { for order in &rick_morty_orders[..5] { insert_or_update_order(&ctx_bob, order.clone()); } + flush_trie(&ctx_bob); std::thread::sleep(Duration::from_secs(3)); for order in &rick_morty_orders[5..10] { insert_or_update_order(&ctx_bob, order.clone()); } + flush_trie(&ctx_bob); let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); - let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + let trie_store_bob = ordermatch_ctx_bob.trie_store.lock(); + let bob_state = trie_store_bob.pubkeys_state.get(&pubkey_bob).unwrap(); let rick_morty_history_bob = bob_state.order_pairs_trie_state_history.get(&rick_morty_pair).unwrap(); assert_eq!(rick_morty_history_bob.len(), 5); // alice has an outdated state, for which bob doesn't have history anymore as it's expired - let (ctx_alice, ..) = make_ctx_for_tests(); + let (ctx_alice, pubkey_alice, ..) = make_ctx_for_tests(); for order in &rick_morty_orders[..3] { insert_or_update_order(&ctx_alice, order.clone()); } + flush_trie(&ctx_alice); let ordermatch_ctx_alice = OrdermatchContext::from_ctx(&ctx_alice).unwrap(); - let mut orderbook_alice = ordermatch_ctx_alice.orderbook.lock(); - let bob_state_on_alice_side = orderbook_alice.pubkeys_state.get(&pubkey_bob).unwrap(); + let mut trie_store_alice = ordermatch_ctx_alice.trie_store.lock(); + let bob_state_on_alice_side = trie_store_alice.pubkeys_state.get(&pubkey_bob).unwrap(); let alice_root = bob_state_on_alice_side.trie_roots.get(&rick_morty_pair).unwrap(); let bob_root = bob_state.trie_roots.get(&rick_morty_pair).unwrap(); - let bob_history_on_sync = DeltaOrFullTrie::from_history( - rick_morty_history_bob, - *alice_root, - *bob_root, - &orderbook_bob.memory_db, - |uuid: &Uuid| orderbook_bob.order_set.get(uuid).cloned(), - ) + let bob_history_on_sync = { + let orderbook_bob_guard = ordermatch_ctx_bob.orderbook.lock(); + DeltaOrFullTrie::from_history( + rick_morty_history_bob, + *alice_root, + *bob_root, + &trie_store_bob.memory_db, + |uuid: &Uuid| orderbook_bob_guard.order_set.get(uuid).cloned(), + ) + } .unwrap(); let full_trie = match bob_history_on_sync { @@ -2951,45 +3005,70 @@ fn test_orderbook_sync_trie_diff_time_cache() { protocol_infos: &HashMap::new(), conf_infos: &HashMap::new(), }; - let new_alice_root = process_pubkey_full_trie( - &mut orderbook_alice, - full_trie - .into_iter() - .map(|(uuid, order)| (uuid, order.into())) - .collect(), - params, - ); + { + // Phase 1: build ops (index only) + let ops = { + let mut orderbook_alice = ordermatch_ctx_alice.orderbook.lock(); + process_pubkey_full_trie( + &mut orderbook_alice, + full_trie + .into_iter() + .map(|(uuid, order)| (uuid, order.into())) + .collect(), + params, + ) + }; + + // Phase 2: apply trie ops to update roots + if !ops.is_empty() { + trie_store_alice.apply_ops(ops); + } + } + + // Compare the updated root on Alice with Bob + let new_alice_root = { + *trie_store_alice + .pubkeys_state + .get(&pubkey_alice) + .unwrap() + .trie_roots + .get(&rick_morty_pair) + .unwrap() + }; assert_eq!(new_alice_root, *bob_root); - drop(orderbook_bob); - drop(orderbook_alice); + drop(trie_store_bob); + drop(trie_store_alice); for order in &rick_morty_orders[10..] { insert_or_update_order(&ctx_bob, order.clone()); } + flush_trie(&ctx_bob); - let mut orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); - - orderbook_bob.remove_order_trie_update(rick_morty_orders[12].uuid); + remove_order(&ctx_bob, rick_morty_orders[12].uuid); - let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + let trie_store_bob = ordermatch_ctx_bob.trie_store.lock(); + let bob_state = trie_store_bob.pubkeys_state.get(&pubkey_bob).unwrap(); let rick_morty_history_bob = bob_state.order_pairs_trie_state_history.get(&rick_morty_pair).unwrap(); - let mut orderbook_alice = ordermatch_ctx_alice.orderbook.lock(); - let bob_state_on_alice_side = orderbook_alice.pubkeys_state.get(&pubkey_bob).unwrap(); + let mut trie_store_alice = ordermatch_ctx_alice.trie_store.lock(); + let bob_state_on_alice_side = trie_store_alice.pubkeys_state.get(&pubkey_bob).unwrap(); let alice_root = bob_state_on_alice_side.trie_roots.get(&rick_morty_pair).unwrap(); let bob_root = bob_state.trie_roots.get(&rick_morty_pair).unwrap(); - let bob_history_on_sync = DeltaOrFullTrie::from_history( - rick_morty_history_bob, - *alice_root, - *bob_root, - &orderbook_bob.memory_db, - |uuid: &Uuid| orderbook_bob.order_set.get(uuid).cloned(), - ) - .unwrap(); + let bob_history_on_sync = { + let orderbook_bob_guard = ordermatch_ctx_bob.orderbook.lock(); + DeltaOrFullTrie::from_history( + rick_morty_history_bob, + *alice_root, + *bob_root, + &trie_store_bob.memory_db, + |uuid: &Uuid| orderbook_bob_guard.order_set.get(uuid).cloned(), + ) + .unwrap() + }; // Check that alice gets orders from history this time let trie_delta = match bob_history_on_sync { @@ -3003,14 +3082,35 @@ fn test_orderbook_sync_trie_diff_time_cache() { protocol_infos: &HashMap::new(), conf_infos: &HashMap::new(), }; - let new_alice_root = process_trie_delta( - &mut orderbook_alice, - trie_delta - .into_iter() - .map(|(uuid, order)| (uuid, order.map(From::from))) - .collect(), - params, - ); + + // Phase 1: build ops under Orderbook + let ops = { + let mut orderbook_alice_guard = ordermatch_ctx_alice.orderbook.lock(); + process_trie_delta( + &mut orderbook_alice_guard, + trie_delta + .into_iter() + .map(|(uuid, order)| (uuid, order.map(From::from))) + .collect(), + params, + ) + }; + + // Phase 2: apply trie ops + if !ops.is_empty() { + trie_store_alice.apply_ops(ops); + } + + // Compare updated root with Bob + let new_alice_root = { + *trie_store_alice + .pubkeys_state + .get(&pubkey_bob) + .unwrap() + .trie_roots + .get(&rick_morty_pair) + .unwrap() + }; assert_eq!(new_alice_root, *bob_root); } @@ -3032,10 +3132,11 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() for order in &rick_morty_orders[5..10] { insert_or_update_order(&ctx_bob, order.clone()); } + flush_trie(&ctx_bob); let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); - let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + let trie_store_bob = ordermatch_ctx_bob.trie_store.lock(); + let bob_state = trie_store_bob.pubkeys_state.get(&pubkey_bob).unwrap(); // Only the last inserted 5 orders are found assert_eq!( @@ -3047,7 +3148,7 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() 5 ); - drop(orderbook_bob); + drop(trie_store_bob); std::thread::sleep(Duration::from_secs(2)); @@ -3055,10 +3156,11 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() for order in &rick_morty_orders[10..] { insert_or_update_order(&ctx_bob, order.clone()); } + flush_trie(&ctx_bob); let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); - let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + let trie_store_bob = ordermatch_ctx_bob.trie_store.lock(); + let bob_state = trie_store_bob.pubkeys_state.get(&pubkey_bob).unwrap(); assert_eq!( bob_state @@ -3069,13 +3171,13 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() 10 ); - drop(orderbook_bob); + drop(trie_store_bob); std::thread::sleep(Duration::from_secs(1)); let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); - let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + let trie_store_bob = ordermatch_ctx_bob.trie_store.lock(); + let bob_state = trie_store_bob.pubkeys_state.get(&pubkey_bob).unwrap(); // After 3 seconds from inserting orders number 6-10 these orders have not expired due to updated expiration on inserting orders 11-15 assert_eq!(