From a73a71429b0638da9fb3fce6978a55309bbafa9e Mon Sep 17 00:00:00 2001 From: shamardy Date: Mon, 6 Oct 2025 10:36:16 +0300 Subject: [PATCH 01/16] Revert "Revert "fix(ordermatch): ignore loop-back; clear on null root; reject stale keep-alives (#2580)"" This reverts commit fa3fc5dae0af15b9ea7ba5bfaf66cacdb26596da. --- mm2src/mm2_main/src/lp_ordermatch.rs | 114 ++++++++++++++---------- mm2src/mm2_main/src/ordermatch_tests.rs | 69 +++++++++++++- 2 files changed, 134 insertions(+), 49 deletions(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index c441c7baee..df9ab7ba54 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -600,6 +600,15 @@ pub async fn handle_orderbook_msg( pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay: bool) -> OrderbookP2PHandlerResult { match decode_signed::(msg) { Ok((message, _sig, pubkey)) => { + { + let my_persistent = mm2_internal_pubkey_hex(&ctx, String::from).ok().flatten(); + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); + let my_p2p = &ordermatch_ctx.orderbook.lock().my_p2p_pubkeys; + if is_my_order(&pubkey.to_hex(), &my_persistent, my_p2p) { + return Ok(()); + } + } + if is_pubkey_banned(&ctx, &pubkey.unprefixed().into()) { return MmError::err(OrderbookP2PHandlerError::PubkeyNotAllowed(pubkey.to_hex())); } @@ -614,14 +623,14 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay: }, new_protocol::OrdermatchMessage::TakerRequest(taker_request) => { let msg = TakerRequest::from_new_proto_and_pubkey(taker_request, pubkey.unprefixed().into()); - process_taker_request(ctx, pubkey.unprefixed().into(), msg).await; + process_taker_request(ctx, msg).await; Ok(()) }, new_protocol::OrdermatchMessage::MakerReserved(maker_reserved) => { let msg = MakerReserved::from_new_proto_and_pubkey(maker_reserved, pubkey.unprefixed().into()); // spawn because process_maker_reserved may take significant time to run let spawner = ctx.spawner(); - spawner.spawn(process_maker_reserved(ctx, pubkey.unprefixed().into(), msg)); + spawner.spawn(process_maker_reserved(ctx, msg)); Ok(()) }, new_protocol::OrdermatchMessage::TakerConnect(taker_connect) => { @@ -2573,8 +2582,12 @@ impl TrieDiffHistory { type TrieOrderHistory = TrieDiffHistory; struct OrderbookPubkeyState { - /// Timestamp of the latest keep alive message received + /// Local receive time (seconds) when we last accepted a keep-alive from this pubkey. + /// Used by inactivity GC to purge stale pubkeys and their orders. last_keep_alive: u64, + /// Monotonic maker-published timestamp of the last processed PubkeyKeepAlive. + /// Used to ignore out-of-order or replayed keep-alive messages from this pubkey. + latest_maker_timestamp: u64, /// The map storing historical data about specific pair subtrie changes /// Used to get diffs of orders of pair between specific root hashes order_pairs_trie_state_history: TimedMap, @@ -2587,7 +2600,10 @@ struct OrderbookPubkeyState { impl OrderbookPubkeyState { pub fn new() -> OrderbookPubkeyState { OrderbookPubkeyState { + // Keep `last_keep_alive` based on local receive time. This is used for cleaning up orders of an inactive pubkey. last_keep_alive: now_sec(), + // Start at 0 so the first message from this pubkey always passes the monotonic check. + latest_maker_timestamp: 0, order_pairs_trie_state_history: TimedMap::new_with_map_kind(MapKind::FxHashMap), orders_uuids: HashSet::default(), trie_roots: HashMap::default(), @@ -2907,9 +2923,23 @@ impl Orderbook { message: new_protocol::PubkeyKeepAlive, i_am_relay: bool, ) -> Option { - let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); - pubkey_state.last_keep_alive = now_sec(); + { + let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); + if message.timestamp <= pubkey_state.latest_maker_timestamp { + log::debug!( + "Ignoring PubkeyKeepAlive from {}: message.timestamp={} <= last_processed_timestamp={} (stale/replayed)", + from_pubkey, + message.timestamp, + pubkey_state.latest_maker_timestamp + ); + return None; + } + pubkey_state.latest_maker_timestamp = message.timestamp; + pubkey_state.last_keep_alive = now_sec(); + } + let mut trie_roots_to_request = HashMap::new(); + for (alb_pair, trie_root) in message.trie_roots { let subscribed = self .topics_subscribed_to @@ -2918,6 +2948,8 @@ impl Orderbook { continue; } + // Empty/null root: clear local orders for (pubkey, pair), + // remember the null root, and don't request a sync. if trie_root == H64::default() || trie_root == hashed_null_node::() { log::debug!( "Received zero or hashed_null_node pair {} trie root from pub {}", @@ -2925,10 +2957,24 @@ impl Orderbook { from_pubkey ); + // Clear local orders for this pubkey/pair. + remove_pubkey_pair_orders(self, from_pubkey, &alb_pair); + + // Remember that the latest known root for this pair is null/empty. + { + let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); + pubkey_state.trie_roots.insert(alb_pair.clone(), trie_root); + } + continue; } - let actual_trie_root = order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_pair); - if *actual_trie_root != trie_root { + + // For non-null roots, compare with our current view and request sync if it differs. + let current_root = { + let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); + *order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_pair) + }; + if current_root != trie_root { trie_roots_to_request.insert(alb_pair, trie_root); } } @@ -3962,7 +4008,7 @@ async fn handle_timed_out_maker_matches(ctx: MmArc, ordermatch_ctx: &OrdermatchC } } -async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg: MakerReserved) { +async fn process_maker_reserved(ctx: MmArc, reserved_msg: MakerReserved) { log::debug!("Processing MakerReserved {:?}", reserved_msg); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); { @@ -3972,15 +4018,6 @@ async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg: } } - // Taker order existence is checked previously - it can't be created if CryptoCtx is not initialized - let our_public_id = CryptoCtx::from_ctx(&ctx) - .expect("'CryptoCtx' must be initialized already") - .mm2_internal_public_id(); - if our_public_id.bytes == from_pubkey.0 { - log::warn!("Skip maker reserved from our pubkey"); - return; - } - let uuid = reserved_msg.taker_order_uuid; { let mut pending_map = ordermatch_ctx.pending_maker_reserved.lock().await; @@ -4041,6 +4078,9 @@ async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg: false, ) { + let our_public_id = CryptoCtx::from_ctx(&ctx) + .expect("'CryptoCtx' must be initialized already") + .mm2_internal_public_id(); let connect = TakerConnect { sender_pubkey: H256Json::from(our_public_id.bytes), dest_pub_key: reserved_msg.sender_pubkey, @@ -4079,17 +4119,6 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: log::debug!("Processing MakerConnected {:?}", connected); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let our_public_id = match CryptoCtx::from_ctx(&ctx) { - Ok(ctx) => ctx.mm2_internal_public_id(), - Err(_) => return, - }; - - let unprefixed_from = from_pubkey.unprefixed(); - if our_public_id.bytes == unprefixed_from { - log::warn!("Skip maker connected from our pubkey"); - return; - } - let mut my_taker_orders = ordermatch_ctx.my_taker_orders.lock().await; let my_order_entry = match my_taker_orders.entry(connected.taker_order_uuid) { Entry::Occupied(e) => e, @@ -4106,7 +4135,7 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: }, }; - if order_match.reserved.sender_pubkey != unprefixed_from.into() { + if order_match.reserved.sender_pubkey != from_pubkey.unprefixed().into() { error!("Connected message sender pubkey != reserved message sender pubkey"); return; } @@ -4132,18 +4161,14 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: .ok(); } -async fn process_taker_request(ctx: MmArc, from_pubkey: H256Json, taker_request: TakerRequest) { +async fn process_taker_request(ctx: MmArc, taker_request: TakerRequest) { + log::debug!("Processing request {:?}", taker_request); + let our_public_id: H256Json = match CryptoCtx::from_ctx(&ctx) { Ok(ctx) => ctx.mm2_internal_public_id().bytes.into(), Err(_) => return, }; - if our_public_id == from_pubkey { - log::warn!("Skip the request originating from our pubkey"); - return; - } - log::debug!("Processing request {:?}", taker_request); - if !taker_request.can_match_with_maker_pubkey(&our_public_id) { return; } @@ -4248,17 +4273,6 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg log::debug!("Processing TakerConnect {:?}", connect_msg); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let our_public_id = match CryptoCtx::from_ctx(&ctx) { - Ok(ctx) => ctx.mm2_internal_public_id(), - Err(_) => return, - }; - - let sender_unprefixed = sender_pubkey.unprefixed(); - if our_public_id.bytes == sender_unprefixed { - log::warn!("Skip taker connect from our pubkey"); - return; - } - let order_mutex = { match ordermatch_ctx .maker_orders_ctx @@ -4281,12 +4295,16 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg return; }, }; - if order_match.request.sender_pubkey != sender_unprefixed.into() { + if order_match.request.sender_pubkey != sender_pubkey.unprefixed().into() { log::warn!("Connect message sender pubkey != request message sender pubkey"); return; } if order_match.connected.is_none() && order_match.connect.is_none() { + // Taker order existence is checked previously - it can't be created if CryptoCtx is not initialized + let our_public_id = CryptoCtx::from_ctx(&ctx) + .expect("'CryptoCtx' must be initialized already") + .mm2_internal_public_id(); let connected = MakerConnected { sender_pubkey: our_public_id.bytes.into(), dest_pub_key: connect_msg.sender_pubkey, diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index 56915a4e25..4e7c664072 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -1444,7 +1444,7 @@ fn should_process_request_only_once() { let request: TakerRequest = json::from_str( r#"{"base":"ETH","rel":"JST","base_amount":"0.1","base_amount_rat":[[1,[1]],[1,[10]]],"rel_amount":"0.2","rel_amount_rat":[[1,[1]],[1,[5]]],"action":"Buy","uuid":"2f9afe84-7a89-4194-8947-45fba563118f","method":"request","sender_pubkey":"031d4256c4bc9f99ac88bf3dba21773132281f65f9bf23a59928bce08961e2f3","dest_pub_key":"0000000000000000000000000000000000000000000000000000000000000000","match_by":{"type":"Any"}}"#, ).unwrap(); - block_on(process_taker_request(ctx, Default::default(), request)); + block_on(process_taker_request(ctx, request)); let maker_orders = &ordermatch_ctx.maker_orders_ctx.lock().orders; let order = block_on(maker_orders.get(&uuid).unwrap().lock()); // when new request is processed match is replaced with new instance resetting @@ -3510,3 +3510,70 @@ fn test_maker_order_balance_loops() { assert!(!maker_orders_ctx.balance_loop_exists(morty_ticker)); assert_eq!(*maker_orders_ctx.count_by_tickers.get(morty_ticker).unwrap(), 0); } + +#[test] +fn process_orders_keep_alive_with_null_root_clears_pair_and_does_not_request() { + let (ctx, _our_pubkey, _our_secret) = make_ctx_for_tests(); + let (maker_pubkey, maker_secret) = pubkey_and_secret_for_test("maker-passphrase"); + + // Build one remote order from maker_pubkey for RICK/MORTY and insert it locally. + let mut orders = make_random_orders(maker_pubkey.clone(), &maker_secret, "RICK".into(), "MORTY".into(), 1); + let order = orders.pop().expect("one order generated"); + let inserted_uuid = order.uuid; + + { + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + // Mark the pair as subscribed so keep-alive handling considers it. + orderbook.topics_subscribed_to.insert( + orderbook_topic_from_base_rel("RICK", "MORTY"), + OrderbookRequestingState::Requested, + ); + + // Insert as if it came from the network; this updates trie state as well. + orderbook.insert_or_update_order_update_trie(order); + + // Sanity: the order is present. + assert!( + orderbook.order_set.contains_key(&inserted_uuid), + "precondition: order must be present" + ); + } + + // Craft a keep-alive that advertises a null/empty root for (maker_pubkey, alb_pair). + let alb_pair = alb_ordered_pair("RICK", "MORTY"); + let keep_alive = PubkeyKeepAlive { + trie_roots: HashMap::from_iter(std::iter::once((alb_pair.clone(), [0u8; 8]))), + timestamp: now_sec(), + }; + + // For a null root, we must clear the pair locally and NOT request a sync. + let res = block_on(process_orders_keep_alive( + ctx.clone(), + "dummy_peer".to_string(), + maker_pubkey.clone(), + keep_alive, + false, + )); + assert!(res.is_ok(), "process_orders_keep_alive returned error"); + + // The order must be gone now. + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + let orderbook = ordermatch_ctx.orderbook.lock(); + assert!( + !orderbook.order_set.contains_key(&inserted_uuid), + "orders for (pubkey, pair) must be cleared by null-root keep-alive" + ); + + // And the remembered trie root for that pair should be the null root. + let state = orderbook + .pubkeys_state + .get(&maker_pubkey) + .expect("pubkey state missing"); + assert_eq!( + state.trie_roots.get(&alb_pair).copied(), + Some([0u8; 8]), + "null root should be remembered in pubkey state" + ); +} From e3ad3ca6be972e1fbd40ab8e83fb08b6e9b95cb5 Mon Sep 17 00:00:00 2001 From: shamardy Date: Mon, 6 Oct 2025 10:36:25 +0300 Subject: [PATCH 02/16] Revert "Revert "fix(orderbook): validate roots before commit (#2605)"" This reverts commit 9a76d11c2f505d3a325f16aa77fcfbec2d78ec40. --- .github/workflows/test.yml | 6 +- mm2src/mm2_main/Cargo.toml | 19 +- mm2src/mm2_main/src/lp_ordermatch.rs | 490 ++++++++++++++---- mm2src/mm2_main/src/ordermatch_tests.rs | 34 +- .../mm2_main/tests/feature_gate_for_tests.rs | 8 + .../tests/mm2_tests/mm2_tests_inner.rs | 20 +- mm2src/mm2_main/tests/mm2_tests_main.rs | 2 +- 7 files changed, 429 insertions(+), 150 deletions(-) create mode 100644 mm2src/mm2_main/tests/feature_gate_for_tests.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index af01024734..ffe68bad58 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -123,7 +123,7 @@ jobs: - name: Test run: | wget -O - https://raw.githubusercontent.com/KomodoPlatform/komodo/635112d590618165a152dfa0f31e95a9be39a8f6/zcutil/fetch-params-alt.sh | bash - cargo test --test 'mm2_tests_main' --no-fail-fast + cargo test --test 'mm2_tests_main' --features for-tests --no-fail-fast mac-x86-64-kdf-integration: timeout-minutes: 90 @@ -155,7 +155,7 @@ jobs: - name: Test run: | wget -O - https://raw.githubusercontent.com/KomodoPlatform/komodo/635112d590618165a152dfa0f31e95a9be39a8f6/zcutil/fetch-params-alt.sh | bash - cargo test --test 'mm2_tests_main' --no-fail-fast + cargo test --test 'mm2_tests_main' --features for-tests --no-fail-fast win-x86-64-kdf-integration: timeout-minutes: 90 @@ -191,7 +191,7 @@ jobs: - name: Test run: | Invoke-WebRequest -Uri https://raw.githubusercontent.com/KomodoPlatform/komodo/635112d590618165a152dfa0f31e95a9be39a8f6/zcutil/fetch-params-alt.bat -OutFile \cmd.bat && \cmd.bat - cargo test --test 'mm2_tests_main' --no-fail-fast + cargo test --test 'mm2_tests_main' --features for-tests --no-fail-fast docker-tests: timeout-minutes: 90 diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index 0c360d60da..0a4994305d 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -17,7 +17,7 @@ custom-swap-locktime = [] # only for testing purposes, should never be activated native = [] # Deprecated track-ctx-pointer = ["common/track-ctx-pointer"] zhtlc-native-tests = ["coins/zhtlc-native-tests"] -run-docker-tests = ["coins/run-docker-tests"] +run-docker-tests = ["for-tests", "coins/run-docker-tests"] default = [] trezor-udp = ["crypto/trezor-udp"] # use for tests to connect to trezor emulator over udp run-device-tests = [] @@ -29,7 +29,12 @@ new-db-arch = ["mm2_core/new-db-arch"] # A temporary feature to integrate the ne # Temporary feature for implementing IBC wrap/unwrap mechanism and will be removed # once we consider it as stable. ibc-routing-for-swaps = [] -for-tests = [] +for-tests = [ + "coins/for-tests", + "coins_activation/for-tests", + "common/for-tests", + "trading_api/for-tests" +] enable-solana = ["coins/enable-solana", "coins_activation/enable-solana"] [dependencies] @@ -157,3 +162,13 @@ gstuff.workspace = true prost-build = { version = "0.12", default-features = false } regex.workspace = true +[[test]] +name = "docker_tests_main" +path = "tests/docker_tests_main.rs" +required-features = ["run-docker-tests"] + +[[test]] +name = "mm2_tests_main" +path = "tests/mm2_tests_main.rs" +required-features = ["for-tests"] + diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index df9ab7ba54..9b063f16ad 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -134,9 +134,9 @@ pub mod ordermatch_tests; mod ordermatch_wasm_db; pub const ORDERBOOK_PREFIX: TopicPrefix = "orbk"; -#[cfg(not(test))] +#[cfg(not(any(test, feature = "for-tests")))] pub const MIN_ORDER_KEEP_ALIVE_INTERVAL: u64 = 30; -#[cfg(test)] +#[cfg(any(test, feature = "for-tests"))] pub const MIN_ORDER_KEEP_ALIVE_INTERVAL: u64 = 5; const BALANCE_REQUEST_INTERVAL: f64 = 30.; const MAKER_ORDER_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 3; @@ -158,7 +158,7 @@ const SWAP_VERSION_DEFAULT: u8 = 2; pub type OrderbookP2PHandlerResult = Result<(), MmError>; -#[derive(Display)] +#[derive(Debug, Display)] pub enum OrderbookP2PHandlerError { #[display(fmt = "'{_0}' is an invalid topic for the orderbook handler.")] InvalidTopic(String), @@ -176,11 +176,32 @@ pub enum OrderbookP2PHandlerError { OrderNotFound(Uuid), Internal(String), + + #[display( + fmt = "Received stale keep alive from pubkey '{from_pubkey}' propagated from '{propagated_from}', will ignore it" + )] + StaleKeepAlive { + from_pubkey: String, + propagated_from: String, + }, + + #[display( + fmt = "Sync failure for pubkey '{from_pubkey}' via '{propagated_from}'; unresolved pairs: {unresolved_pairs:?}" + )] + SyncFailure { + from_pubkey: String, + propagated_from: String, + unresolved_pairs: Vec, + }, } impl OrderbookP2PHandlerError { pub(crate) fn is_warning(&self) -> bool { - matches!(self, OrderbookP2PHandlerError::OrderNotFound(_)) + // treat SyncFailure as a warning for now due to outdated nodes + matches!( + self, + OrderbookP2PHandlerError::OrderNotFound(_) | OrderbookP2PHandlerError::SyncFailure { .. } + ) } } @@ -310,50 +331,162 @@ fn process_trie_delta( new_root } +fn build_pubkey_state_sync_request( + pubkey: &str, + pending_pairs: &HashSet, + expected_pair_roots: &HashMap, +) -> OrdermatchRequest { + let trie_roots = pending_pairs + .iter() + .filter_map(|p| expected_pair_roots.get(p).map(|&root| (p.clone(), root))) + .collect(); + OrdermatchRequest::SyncPubkeyOrderbookState { + pubkey: pubkey.to_owned(), + trie_roots, + } +} + +fn apply_pair_orders_diff( + orderbook: &mut Orderbook, + from_pubkey: &str, + pair: &AlbOrderedOrderbookPair, + diff: DeltaOrFullTrie, + protocol_infos: &HashMap, + conf_infos: &HashMap, +) -> H64 { + let params = ProcessTrieParams { + pubkey: from_pubkey, + alb_pair: pair, + protocol_infos, + conf_infos, + }; + match diff { + DeltaOrFullTrie::Delta(delta) => process_trie_delta(orderbook, delta, params), + DeltaOrFullTrie::FullTrie(values) => process_pubkey_full_trie(orderbook, values, params), + } +} + +fn apply_and_validate_pubkey_state_sync_response( + orderbook_mutex: &PaMutex, + from_pubkey: &str, + peer: &str, + response: SyncPubkeyOrderbookStateRes, + expected_pair_roots: &HashMap, + pending_pairs: &mut HashSet, + keep_alive_timestamp: u64, +) { + let mut orderbook = orderbook_mutex.lock(); + for (pair, diff) in response.pair_orders_diff { + // Ignore unsolicited pairs we didn't request to prevent state poisoning. + if !pending_pairs.contains(&pair) { + continue; + } + + let new_root = apply_pair_orders_diff( + &mut orderbook, + from_pubkey, + &pair, + diff, + &response.protocol_infos, + &response.conf_infos, + ); + + if let Some(expected) = expected_pair_roots.get(&pair) { + if &new_root == expected { + pending_pairs.remove(&pair); + // Mark per-pair maker-published timestamp once accepted + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, from_pubkey); + state + .latest_root_timestamp_by_pair + .insert(pair.clone(), keep_alive_timestamp); + state.pair_last_seen_local.insert(pair.clone(), now_sec()); + } else { + warn!( + "Sync validation failed for pubkey {} pair {} from {}: expected {:?}, got {:?}. Reverting pair.", + from_pubkey, pair, peer, expected, new_root + ); + remove_pubkey_pair_orders(&mut orderbook, from_pubkey, &pair); + } + } + } +} + +async fn request_and_apply_pubkey_state_sync_from_peer( + ctx: &MmArc, + orderbook: &PaMutex, + from_pubkey: &str, + peer: &str, + expected_pair_roots: &HashMap, + pending_pairs: &mut HashSet, + keep_alive_timestamp: u64, +) -> OrderbookP2PHandlerResult { + let current_req = build_pubkey_state_sync_request(from_pubkey, pending_pairs, expected_pair_roots); + + if let Some(resp) = request_one_peer::( + ctx.clone(), + P2PRequest::Ordermatch(current_req), + peer.to_string(), + ) + .await + .map_mm_err()? + { + apply_and_validate_pubkey_state_sync_response( + orderbook, + from_pubkey, + peer, + resp, + expected_pair_roots, + pending_pairs, + keep_alive_timestamp, + ); + } + + Ok(()) +} + async fn process_orders_keep_alive( ctx: MmArc, - propagated_from_peer: String, + propagated_from: String, from_pubkey: String, keep_alive: new_protocol::PubkeyKeepAlive, i_am_relay: bool, ) -> OrderbookP2PHandlerResult { + let keep_alive_timestamp = keep_alive.timestamp; let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); - let to_request = ordermatch_ctx - .orderbook - .lock() - .process_keep_alive(&from_pubkey, keep_alive, i_am_relay); - - let req = match to_request { - Some(req) => req, - // The message was processed, simply forward it - None => return Ok(()), - }; - let response = request_one_peer::( - ctx.clone(), - P2PRequest::Ordermatch(req), - propagated_from_peer.clone(), + // Update local state and decide whether to sync. + let trie_roots_to_request = + ordermatch_ctx + .orderbook + .lock() + .process_keep_alive(&from_pubkey, keep_alive, i_am_relay, &propagated_from)?; + + if trie_roots_to_request.is_empty() { + // The message was processed, return Ok to forward it + return Ok(()); + } + + // Query ONLY the keepalive propagator. + let mut remaining_pairs: HashSet = trie_roots_to_request.keys().cloned().collect(); + let _ = request_and_apply_pubkey_state_sync_from_peer( + &ctx, + &ordermatch_ctx.orderbook, + &from_pubkey, + &propagated_from, + &trie_roots_to_request, + &mut remaining_pairs, + keep_alive_timestamp, ) - .await - .map_mm_err()? - .ok_or_else(|| { - MmError::new(OrderbookP2PHandlerError::P2PRequestError(format!( - "No response was received from peer {propagated_from_peer} for SyncPubkeyOrderbookState request!" - ))) - })?; - - let mut orderbook = ordermatch_ctx.orderbook.lock(); - for (pair, diff) in response.pair_orders_diff { - let params = ProcessTrieParams { - pubkey: &from_pubkey, - alb_pair: &pair, - protocol_infos: &response.protocol_infos, - conf_infos: &response.conf_infos, - }; - let _new_root = match diff { - DeltaOrFullTrie::Delta(delta) => process_trie_delta(&mut orderbook, delta, params), - DeltaOrFullTrie::FullTrie(values) => process_pubkey_full_trie(&mut orderbook, values, params), - }; + .await; + + // Phase 4: Finalize; if unresolved, mark unsynced and DO NOT FORWARD. + if !remaining_pairs.is_empty() { + let unresolved_pairs = remaining_pairs.into_iter().collect::>(); + return MmError::err(OrderbookP2PHandlerError::SyncFailure { + from_pubkey, + propagated_from, + unresolved_pairs, + }); } Ok(()) @@ -395,28 +528,38 @@ fn process_maker_order_cancelled(ctx: &MmArc, from_pubkey: String, cancelled_msg orderbook .recently_cancelled .insert_expirable(uuid, from_pubkey.clone(), RECENTLY_CANCELLED_TIMEOUT); - if let Some(order) = orderbook.order_set.get(&uuid) { - if order.pubkey == from_pubkey { - orderbook.remove_order_trie_update(uuid); + + let maybe_pair = orderbook + .order_set + .get(&uuid) + .filter(|o| o.pubkey == from_pubkey) + .map(|o| alb_ordered_pair(&o.base, &o.rel)); + + if let Some(alb_pair) = maybe_pair { + orderbook.remove_order_trie_update(uuid); + + // Advance the per-pair maker timestamp floor to the cancel timestamp + // so any earlier keep-alive for this pair will be rejected as stale. + // + // Note: we do NOT refresh liveness (pair_last_seen_local) here. + // Liveness is refreshed only when we accept state-carrying messages + // (e.g., a keep-alive that matches the expected root, a validated sync, + // a MakerOrderCreated/Updated, or a full-trie fill), not on cancel. + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, &from_pubkey); + state + .latest_root_timestamp_by_pair + .insert(alb_pair.clone(), cancelled_msg.timestamp); + + // If this cancel emptied the pair (root is zero/hashed-null), drop liveness for this pair. + // This mirrors the zero-root keep-alive path and allows faster GC of empty pairs. + if let Some(&pair_root) = state.trie_roots.get(&alb_pair) { + if pair_root == H64::default() || pair_root == hashed_null_node::() { + state.pair_last_seen_local.remove(&alb_pair); + } } } } -// fn verify_pubkey_orderbook(orderbook: &GetOrderbookPubkeyItem) -> Result<(), String> { -// let keys: Vec<(_, _)> = orderbook -// .orders -// .iter() -// .map(|(uuid, order)| { -// let order_bytes = rmp_serde::to_vec(&order).expect("Serialization should never fail"); -// (uuid.as_bytes(), Some(order_bytes)) -// }) -// .collect(); -// let (orders_root, proof) = &orderbook.pair_orders_trie_root; -// verify_trie_proof::(orders_root, proof, &keys) -// .map_err(|e| ERRL!("Error on pair_orders_trie_root verification: {}", e))?; -// Ok(()) -// } - // Some coins, for example ZHTLC, have privacy features like random keypair to sign P2P messages per every order. // So, each order of such coin has unique «pubkey» field that doesn’t match node persistent pubkey derived from passphrase. // We can compare pubkeys from maker_orders and from asks or bids, to find our order. @@ -496,6 +639,9 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul conf_infos: &conf_infos, }; let _new_root = process_pubkey_full_trie(&mut orderbook, orders, params); + + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, &pubkey); + state.pair_last_seen_local.insert(alb_pair.clone(), now_sec()); } let topic = orderbook_topic_from_base_rel(base, rel); @@ -808,8 +954,10 @@ fn process_get_orderbook_request(ctx: MmArc, base: String, rel: String) -> Resul pubkey ))?; + let pubkey_last_seen = pubkey_state.pair_last_seen_local.values().max().copied().unwrap_or(0); + let item = GetOrderbookPubkeyItem { - last_keep_alive: pubkey_state.last_keep_alive, + last_keep_alive: pubkey_last_seen, orders, // TODO save last signed payload to pubkey state last_signed_pubkey_payload: vec![], @@ -921,10 +1069,9 @@ impl DeltaOrFull return Ok(DeltaOrFullTrie::Delta(total_delta)); } - log::warn!( + warn!( "History started from {:?} ends with not up-to-date trie root {:?}", - from_hash, - actual_trie_root + from_hash, actual_trie_root ); } @@ -1162,7 +1309,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler { None => return, }; if coin.wallet_only(&ctx) { - log::warn!( + warn!( "coin: {} is wallet only, skip BalanceTradeFeeUpdatedHandler", coin.ticker() ); @@ -1174,7 +1321,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler { Ok(vol_info) => vol_info.volume, Err(e) if e.get_inner().not_sufficient_balance() => MmNumber::from(0), Err(e) => { - log::warn!("Couldn't handle the 'balance_updated' event: {}", e); + warn!("Couldn't handle the 'balance_updated' event: {}", e); return; }, }; @@ -2444,10 +2591,6 @@ fn broadcast_keep_alive_for_pub(ctx: &MmArc, pubkey: &str, orderbook: &Orderbook }; for (alb_pair, root) in state.trie_roots.iter() { - if *root == H64::default() && *root == hashed_null_node::() { - continue; - } - let message = new_protocol::PubkeyKeepAlive { trie_roots: HashMap::from([(alb_pair.clone(), *root)]), timestamp: now_sec(), @@ -2582,12 +2725,20 @@ impl TrieDiffHistory { type TrieOrderHistory = TrieDiffHistory; struct OrderbookPubkeyState { - /// Local receive time (seconds) when we last accepted a keep-alive from this pubkey. - /// Used by inactivity GC to purge stale pubkeys and their orders. - last_keep_alive: u64, - /// Monotonic maker-published timestamp of the last processed PubkeyKeepAlive. - /// Used to ignore out-of-order or replayed keep-alive messages from this pubkey. - latest_maker_timestamp: u64, + /// Monotonic maker-published timestamp for the last accepted root per pair. + /// Used to ignore out-of-order or replayed keep-alive roots for specific pairs. + /// + /// Retention policy: + /// We intentionally retain entries even after a pair is pruned by inactivity GC to defend against + /// stale replays resurrecting old state. These entries are dropped only when the entire pubkey + /// state is removed. + latest_root_timestamp_by_pair: HashMap, + /// Local receive time (seconds) of the last accepted keep‑alive per alphabetically ordered pair + /// owned by this pubkey. This is the authoritative liveness signal used by inactivity GC and + /// trie‑state pruning. Pairs not updated within the timeout are purged; if all pairs are stale, + /// the entire pubkey state is removed. + /// Key: `AlbOrderedOrderbookPair` ("BASE:REL"), Value: local Unix time in seconds. + pair_last_seen_local: HashMap, /// The map storing historical data about specific pair subtrie changes /// Used to get diffs of orders of pair between specific root hashes order_pairs_trie_state_history: TimedMap, @@ -2600,10 +2751,8 @@ struct OrderbookPubkeyState { impl OrderbookPubkeyState { pub fn new() -> OrderbookPubkeyState { OrderbookPubkeyState { - // Keep `last_keep_alive` based on local receive time. This is used for cleaning up orders of an inactive pubkey. - last_keep_alive: now_sec(), - // Start at 0 so the first message from this pubkey always passes the monotonic check. - latest_maker_timestamp: 0, + latest_root_timestamp_by_pair: HashMap::default(), + pair_last_seen_local: HashMap::default(), order_pairs_trie_state_history: TimedMap::new_with_map_kind(MapKind::FxHashMap), orders_uuids: HashSet::default(), trie_roots: HashMap::default(), @@ -2739,6 +2888,8 @@ impl Orderbook { let pair_root = order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_ordered); let prev_root = *pair_root; + pubkey_state.pair_last_seen_local.insert(alb_ordered.clone(), now_sec()); + pubkey_state.orders_uuids.insert((order.uuid, alb_ordered.clone())); { @@ -2917,25 +3068,84 @@ impl Orderbook { self.topics_subscribed_to.contains_key(topic) } + /// Processes a [`new_protocol::PubkeyKeepAlive`] for `from_pubkey`, updating per‑pair state and + /// deciding whether a state sync is required. + /// + /// This function performs only local state transitions; it does no I/O. + /// It may: + /// - Accept the announcement (per pair) and refresh timestamps if the announced root matches the + /// local root. + /// - Clear a pair if the announced root is null/empty (remove all orders, remember the null root, + /// update the maker timestamp floor, and drop the local “last seen” so GC can prune it). + /// - Detect divergence and return the set of pairs that must be synced to the announced roots. + /// + /// Invariants and validation: + /// - Maker timestamps are checked per pair and must be strictly increasing. If any pair is stale + /// (announced timestamp ≤ the last accepted maker timestamp for that pair), the function returns + /// [`OrderbookP2PHandlerError::StaleKeepAlive`] and no state is advanced for that message. Callers + /// must not propagate such a message. + /// - We update `pair_last_seen_local` only for pairs that are accepted/in‑sync. Divergent pairs do + /// not refresh liveness until validated to the expected root. + /// + /// Subscription policy: + /// - Pairs we are not subscribed to are ignored unless this node acts as a relay (`i_am_relay`). + /// + /// Returns: + /// - A map of `"BASE:REL"` to the expected roots (as announced) for which local state diverges and + /// a sync is required. An empty map means no sync is needed and the message may be propagated. + /// + /// Caller responsibilities: + /// - If the returned map is non‑empty, fetch trie diffs from the peer that propagated the keep‑alive + /// (`propagated_from`), validate them to the expected roots, apply, and only then consider forwarding + /// the keep‑alive. + /// + /// Errors: + /// - [`OrderbookP2PHandlerError::StaleKeepAlive`] if any per‑pair maker timestamp regresses or repeats. + /// - Other variants for malformed or otherwise invalid data. + /// + /// Notes: + /// - Maker timestamps are used for monotonicity/anti‑replay; GC uses local receive times. + /// - The caller must ignore Unsolicited pairs in sync responses when applying the response. fn process_keep_alive( &mut self, from_pubkey: &str, message: new_protocol::PubkeyKeepAlive, i_am_relay: bool, - ) -> Option { + propagated_from: &str, + ) -> Result, MmError> { + // TODO(rate-limit): + // Add a single, shared in‑process rate limiter for OrdermatchMessage + // types (e.g., a module in mm2_p2p or lp_network). + // For keep-alive, enforce a minimum interval per pubkey (>= 20–30s) or X messages/minute. + // When throttled, early‑return without syncing/propagating, before any state mutation. + + // Pre-scan: if any single pair is stale => treat the whole message as stale. + // Note: We currently send keep-alives per pair (trie_roots has a single entry), + // so this is equivalent to checking that one pair. If multi-pair keep-alives return, + // demote only stale pairs instead of rejecting the whole message. { let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); - if message.timestamp <= pubkey_state.latest_maker_timestamp { - log::debug!( - "Ignoring PubkeyKeepAlive from {}: message.timestamp={} <= last_processed_timestamp={} (stale/replayed)", - from_pubkey, - message.timestamp, - pubkey_state.latest_maker_timestamp - ); - return None; + for (alb_pair, _) in message.trie_roots.iter() { + let subscribed = self + .topics_subscribed_to + .contains_key(&orderbook_topic_from_ordered_pair(alb_pair)); + if !subscribed && !i_am_relay { + continue; + } + + let last_pair_timestamp = *pubkey_state.latest_root_timestamp_by_pair.get(alb_pair).unwrap_or(&0); + + if message.timestamp <= last_pair_timestamp { + log::debug!( + "Ignoring a stale PubkeyKeepAlive from {} for pair {}: message.timestamp={} <= last_pair_timestamp={}", + from_pubkey, alb_pair, message.timestamp, last_pair_timestamp + ); + return MmError::err(OrderbookP2PHandlerError::StaleKeepAlive { + from_pubkey: from_pubkey.to_owned(), + propagated_from: propagated_from.to_owned(), + }); + } } - pubkey_state.latest_maker_timestamp = message.timestamp; - pubkey_state.last_keep_alive = now_sec(); } let mut trie_roots_to_request = HashMap::new(); @@ -2964,6 +3174,10 @@ impl Orderbook { { let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); pubkey_state.trie_roots.insert(alb_pair.clone(), trie_root); + pubkey_state + .latest_root_timestamp_by_pair + .insert(alb_pair.clone(), message.timestamp); + pubkey_state.pair_last_seen_local.remove(&alb_pair); } continue; @@ -2976,17 +3190,16 @@ impl Orderbook { }; if current_root != trie_root { trie_roots_to_request.insert(alb_pair, trie_root); + continue; } + let state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); + state + .latest_root_timestamp_by_pair + .insert(alb_pair.clone(), message.timestamp); + state.pair_last_seen_local.insert(alb_pair, now_sec()); } - if trie_roots_to_request.is_empty() { - return None; - } - - Some(OrdermatchRequest::SyncPubkeyOrderbookState { - pubkey: from_pubkey.to_owned(), - trie_roots: trie_roots_to_request, - }) + Ok(trie_roots_to_request) } fn orderbook_item_with_proof(&self, order: OrderbookItem) -> OrderbookItemWithProof { @@ -3714,7 +3927,6 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { .expect("CryptoCtx not available") .mm2_internal_pubkey_hex(); - let maker_order_timeout = ctx.conf["maker_order_timeout"].as_u64().unwrap_or(MAKER_ORDER_TIMEOUT); loop { if ctx.is_stopping() { break; @@ -3739,26 +3951,78 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { } { - // remove "timed out" pubkeys states with their orders from orderbook let mut orderbook = ordermatch_ctx.orderbook.lock(); - let mut uuids_to_remove = vec![]; - let mut pubkeys_to_remove = vec![]; + + let deadline = now_sec().saturating_sub(MAKER_ORDER_TIMEOUT); + + let mut pairs_to_prune = Vec::new(); + let mut pubkeys_to_remove = Vec::new(); + for (pubkey, state) in orderbook.pubkeys_state.iter() { - let is_ours = orderbook.my_p2p_pubkeys.contains(pubkey); - let to_keep = - pubkey == &my_pubsecp || is_ours || state.last_keep_alive + maker_order_timeout > now_sec(); - if !to_keep { - for (uuid, _) in &state.orders_uuids { - uuids_to_remove.push(*uuid); - } + // Skip our own pubkeys; local order lifecycle manages them. + if pubkey == &my_pubsecp || orderbook.my_p2p_pubkeys.contains(pubkey) { + continue; + } + + // Remove the pubkey if no pair has been seen recently. + let any_recent_pair = state + .trie_roots + .keys() + .any(|pair| state.pair_last_seen_local.get(pair).copied().unwrap_or(0) > deadline); + + if !any_recent_pair { pubkeys_to_remove.push(pubkey.clone()); + continue; + } + + // Otherwise keep the pubkey and prune only its stale pairs. + let stale_pairs: Vec<_> = state + .trie_roots + .keys() + .filter_map(|pair| { + let last_seen = state.pair_last_seen_local.get(pair).copied().unwrap_or(0); + (last_seen <= deadline).then(|| pair.clone()) + }) + .collect(); + + if !stale_pairs.is_empty() { + pairs_to_prune.push((pubkey.clone(), stale_pairs)); } } - for uuid in uuids_to_remove { - orderbook.remove_order_trie_update(uuid); + // Prune stale pairs for pubkeys we keep. + for (pubkey, pairs) in pairs_to_prune { + for pair in pairs { + remove_pubkey_pair_orders(&mut orderbook, &pubkey, &pair); + if let Some(state) = orderbook.pubkeys_state.get_mut(&pubkey) { + state.pair_last_seen_local.remove(&pair); + } + } } + + // Remove fully expired pubkeys: delete their remaining orders first, then the state. + // GC: remove orders while the pubkey state still exists to avoid lazy re-creation inside `remove_order_trie_update`. for pubkey in pubkeys_to_remove { + if let Some(orders) = orderbook + .pubkeys_state + .get_mut(&pubkey) + .map(|state| std::mem::take(&mut state.orders_uuids)) + { + for (uuid, _) in orders { + orderbook.remove_order_trie_update(uuid); + } + } + + // TODO(pubkey_replay_guard): + // Block stale replays after full pubkey timeout (incl. restarts). + // - On full pubkey removal: store pubkey -> max maker_ts (write-once HashMap). + // - On keep-alive intake: before creating state, drop if maker_ts <= stored floor. + // - Restarts: persist this map (and ideally per-pair last_maker_ts); until persisted, enforce + // sanity bounds: maker_ts > now + MAX_MAKER_FUTURE_SKEW or now - maker_ts > MAX_MAKER_PAST_AGE => reject. + // - Liveness: keep GC driven by local time; optionally prune when maker_stale || local_stale; + // do not rely solely on maker clock. + // - Cleanup: pre-scan without creating state; centralize timestamp checks/logging; never sync + // origin for messages rejected by guards; add unit tests. orderbook.pubkeys_state.remove(&pubkey); } @@ -4127,7 +4391,7 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: let order_match = match my_order_entry.get().matches.get(&connected.maker_order_uuid) { Some(o) => o, None => { - log::warn!( + warn!( "Our node doesn't have the match with uuid {}", connected.maker_order_uuid ); @@ -4288,7 +4552,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg let order_match = match my_order.matches.get_mut(&connect_msg.taker_order_uuid) { Some(o) => o, None => { - log::warn!( + warn!( "Our node doesn't have the match with uuid {}", connect_msg.taker_order_uuid ); @@ -4296,7 +4560,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg }, }; if order_match.request.sender_pubkey != sender_pubkey.unprefixed().into() { - log::warn!("Connect message sender pubkey != request message sender pubkey"); + warn!("Connect message sender pubkey != request message sender pubkey"); return; } @@ -5633,7 +5897,7 @@ pub async fn orders_history_by_filter(ctx: MmArc, req: Json) -> Result { - assert!(pairs_trie_roots.contains_key("C1:C2")); - assert!(!pairs_trie_roots.contains_key("C2:C3")); - }, - _ => panic!("Invalid request {:?}", request), - } + let pairs_trie_roots = orderbook + .process_keep_alive(pubkey, message, false, propagated_from) + .unwrap(); + assert!(pairs_trie_roots.contains_key("C1:C2")); + assert!(!pairs_trie_roots.contains_key("C2:C3")); } #[test] @@ -2651,6 +2646,7 @@ fn test_orderbook_pubkey_sync_request_relay() { OrderbookRequestingState::Requested, ); let pubkey = "pubkey"; + let propagated_from = "propagated_from"; let mut trie_roots = HashMap::new(); trie_roots.insert("C1:C2".to_owned(), [1; 8]); @@ -2661,17 +2657,11 @@ fn test_orderbook_pubkey_sync_request_relay() { timestamp: now_sec(), }; - let request = orderbook.process_keep_alive(pubkey, message, true).unwrap(); - match request { - OrdermatchRequest::SyncPubkeyOrderbookState { - trie_roots: pairs_trie_roots, - .. - } => { - assert!(pairs_trie_roots.contains_key("C1:C2")); - assert!(pairs_trie_roots.contains_key("C2:C3")); - }, - _ => panic!("Invalid request {:?}", request), - } + let pairs_trie_roots = orderbook + .process_keep_alive(pubkey, message, true, propagated_from) + .unwrap(); + assert!(pairs_trie_roots.contains_key("C1:C2")); + assert!(pairs_trie_roots.contains_key("C2:C3")); } #[test] diff --git a/mm2src/mm2_main/tests/feature_gate_for_tests.rs b/mm2src/mm2_main/tests/feature_gate_for_tests.rs new file mode 100644 index 0000000000..620cd7e76a --- /dev/null +++ b/mm2src/mm2_main/tests/feature_gate_for_tests.rs @@ -0,0 +1,8 @@ +#![cfg(not(target_arch = "wasm32"))] + +#[cfg(not(any(feature = "for-tests", feature = "run-docker-tests")))] +compile_error!(concat!( + "Integration tests for `", + env!("CARGO_PKG_NAME"), + "` require either the `for-tests` or the `run-docker-tests` feature." +)); diff --git a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs index 71c55fec0f..9f841e7359 100644 --- a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs +++ b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs @@ -1823,7 +1823,6 @@ fn test_order_should_not_be_displayed_when_node_is_down() { "coins": coins, "seednodes": [mm_bob.ip.to_string()], "rpc_password": "pass", - "maker_order_timeout": 5, }), "pass".into(), None, @@ -1870,7 +1869,7 @@ fn test_order_should_not_be_displayed_when_node_is_down() { assert_eq!(asks.len(), 1, "Alice RICK/MORTY orderbook must have exactly 1 ask"); block_on(mm_bob.stop()).unwrap(); - thread::sleep(Duration::from_secs(6)); + thread::sleep(Duration::from_secs(16)); let rc = block_on(mm_alice.rpc(&json! ({ "userpass": mm_alice.userpass, @@ -1910,7 +1909,6 @@ fn test_own_orders_should_not_be_removed_from_orderbook() { "coins": coins, "i_am_seed": true, "rpc_password": "pass", - "maker_order_timeout": 5, "is_bootstrap_node": true }), "pass".into(), @@ -1939,7 +1937,7 @@ fn test_own_orders_should_not_be_removed_from_orderbook() { .unwrap(); assert!(rc.0.is_success(), "!setprice: {}", rc.1); - thread::sleep(Duration::from_secs(6)); + thread::sleep(Duration::from_secs(16)); let rc = block_on(mm_bob.rpc(&json! ({ "userpass": mm_bob.userpass, @@ -2121,7 +2119,7 @@ fn set_price_with_cancel_previous_should_broadcast_cancelled_message() { ]); // start bob and immediately place the order - let mm_bob = MarketMakerIt::start( + let mut mm_bob = MarketMakerIt::start( json! ({ "gui": "nogui", "netid": 9998, @@ -2159,7 +2157,7 @@ fn set_price_with_cancel_previous_should_broadcast_cancelled_message() { let rc = block_on(mm_bob.rpc(&set_price_json)).unwrap(); assert!(rc.0.is_success(), "!setprice: {}", rc.1); - let mm_alice = MarketMakerIt::start( + let mut mm_alice = MarketMakerIt::start( json! ({ "gui": "nogui", "netid": 9998, @@ -2204,9 +2202,13 @@ fn set_price_with_cancel_previous_should_broadcast_cancelled_message() { let rc = block_on(mm_bob.rpc(&set_price_json)).unwrap(); assert!(rc.0.is_success(), "!setprice: {}", rc.1); - let pause = 2; - log!("Waiting ({} seconds) for Bob to broadcast messages…", pause); - thread::sleep(Duration::from_secs(pause)); + block_on(mm_bob.wait_for_log(10., |log| log.contains("maker_order_cancelled_p2p_notify called"))) + .expect("Cancel broadcast not seen in Bob's logs within the expected time"); + + block_on(mm_alice.wait_for_log(10., |log| { + log.contains("received ordermatch message MakerOrderCancelled") + })) + .expect("Alice did not log MakerOrderCancelled in time"); // Bob orderbook must show 1 order log!("Get RICK/MORTY orderbook on Bob side"); diff --git a/mm2src/mm2_main/tests/mm2_tests_main.rs b/mm2src/mm2_main/tests/mm2_tests_main.rs index 83fdf13071..f38a62a8f3 100644 --- a/mm2src/mm2_main/tests/mm2_tests_main.rs +++ b/mm2src/mm2_main/tests/mm2_tests_main.rs @@ -1,4 +1,4 @@ -#![cfg(not(target_arch = "wasm32"))] +#![cfg(all(not(target_arch = "wasm32"), feature = "for-tests"))] mod integration_tests_common; mod mm2_tests; From ec7ebbfb44488c3e0133da065489c8777a9e3547 Mon Sep 17 00:00:00 2001 From: shamardy Date: Fri, 12 Sep 2025 11:02:26 +0300 Subject: [PATCH 03/16] deps: add libp2p-allow-block-list --- Cargo.lock | 1 + Cargo.toml | 1 + mm2src/mm2_p2p/Cargo.toml | 5 +++-- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c96971897e..ef3a42e26e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4551,6 +4551,7 @@ dependencies = [ "hex", "lazy_static", "libp2p", + "libp2p-allow-block-list", "log", "mm2_core", "mm2_event_stream", diff --git a/Cargo.toml b/Cargo.toml index 635857471e..615f3f8160 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ jsonrpc-core = "18.0.0" lazy_static = "1.4" libc = "0.2" libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.12", default-features = false } +libp2p-allow-block-list = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.12" } lightning = "0.0.113" lightning-background-processor = "0.0.113" lightning-invoice = { version = "0.21.0", features = ["serde"] } diff --git a/mm2src/mm2_p2p/Cargo.toml b/mm2src/mm2_p2p/Cargo.toml index f2cc95e305..c84dab570e 100644 --- a/mm2src/mm2_p2p/Cargo.toml +++ b/mm2src/mm2_p2p/Cargo.toml @@ -39,6 +39,7 @@ void.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] futures-rustls.workspace = true libp2p = { workspace = true, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] } +libp2p-allow-block-list.workspace = true timed-map = { workspace = true, features = ["rustc-hash"] } tokio.workspace = true @@ -48,6 +49,6 @@ libp2p = { workspace = true, features = ["identify", "floodsub", "noise", "gossi timed-map = { workspace = true, features = ["rustc-hash", "wasm"] } [dev-dependencies] -async-std.workspace = true +async-std.workspace = true env_logger.workspace = true -common = { path = "../common", features = ["for-tests"] } +common = { path = "../common", features = ["for-tests"] } \ No newline at end of file From 2a33a4f10243d0e3d2a8e4771db81ab01c088262 Mon Sep 17 00:00:00 2001 From: shamardy Date: Fri, 12 Sep 2025 14:02:21 +0300 Subject: [PATCH 04/16] p2p: integrate allow/block list; add temp-ban/unban + expiry --- mm2src/mm2_p2p/Cargo.toml | 2 +- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 56 ++++++++++++++++++---- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/mm2src/mm2_p2p/Cargo.toml b/mm2src/mm2_p2p/Cargo.toml index c84dab570e..9e6c219cf7 100644 --- a/mm2src/mm2_p2p/Cargo.toml +++ b/mm2src/mm2_p2p/Cargo.toml @@ -19,6 +19,7 @@ futures.workspace = true futures-ticker.workspace = true hex.workspace = true lazy_static.workspace = true +libp2p-allow-block-list.workspace = true log.workspace = true mm2_core = { path = "../mm2_core" } mm2_event_stream = { path = "../mm2_event_stream" } @@ -39,7 +40,6 @@ void.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] futures-rustls.workspace = true libp2p = { workspace = true, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] } -libp2p-allow-block-list.workspace = true timed-map = { workspace = true, features = ["rustc-hash"] } tokio.workspace = true diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 62e5c22e90..80466eb711 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -19,6 +19,7 @@ use libp2p::request_response::ResponseChannel; use libp2p::swarm::{ConnectionDenied, ConnectionId, NetworkBehaviour, SwarmEvent, ToSwarm}; use libp2p::{identity, noise, PeerId, Swarm}; use libp2p::{Multiaddr, Transport}; +use libp2p_allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers}; use log::{debug, error, info}; use mm2_net::ip_addr::is_global_ipv4; use rand::seq::SliceRandom; @@ -29,6 +30,7 @@ use std::net::IpAddr; use std::sync::{Mutex, MutexGuard}; use std::task::{Context, Poll}; use timed_map::{MapKind, TimedMap}; +use void::Void; use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, PeersExchangeResponse}; use super::ping::AdexPing; @@ -188,6 +190,13 @@ pub enum AdexBehaviourCmd { message_id: MessageId, propagation_source: PeerId, }, + TempBanPeer { + peer: PeerId, + duration: Duration, + }, + UnbanPeer { + peer: PeerId, + }, } /// Determines if a dial attempt to the remote should be made. @@ -367,6 +376,7 @@ pub struct AtomicDexBehaviour { runtime: SwarmRuntime, cmd_rx: Receiver, netid: u16, + banned_peers: TimedMap, } #[derive(NetworkBehaviour)] @@ -376,6 +386,7 @@ pub struct CoreBehaviour { peers_exchange: PeersExchange, ping: AdexPing, request_response: RequestResponseBehaviour, + banlist: AllowBlockListBehaviour, } #[derive(Debug)] @@ -385,6 +396,7 @@ pub enum AdexBehaviourEvent { PeersExchange(libp2p::request_response::Event), Ping(libp2p::ping::Event), RequestResponse(RequestResponseBehaviourEvent), + Banlist(Void), } impl From for AdexBehaviourEvent { @@ -395,6 +407,7 @@ impl From for AdexBehaviourEvent { CoreBehaviourEvent::PeersExchange(event) => AdexBehaviourEvent::PeersExchange(event), CoreBehaviourEvent::Ping(event) => AdexBehaviourEvent::Ping(event), CoreBehaviourEvent::RequestResponse(event) => AdexBehaviourEvent::RequestResponse(event), + CoreBehaviourEvent::Banlist(event) => AdexBehaviourEvent::Banlist(event), } } } @@ -560,6 +573,15 @@ impl AtomicDexBehaviour { .gossipsub .propagate_message(&message_id, &propagation_source)?; }, + AdexBehaviourCmd::TempBanPeer { peer, duration } => { + self.core.banlist.block_peer(peer); + self.banned_peers.insert_expirable(peer, (), duration); + self.core.gossipsub.remove_explicit_peer(&peer); + }, + AdexBehaviourCmd::UnbanPeer { peer } => { + self.core.banlist.unblock_peer(peer); + self.banned_peers.remove(&peer); + }, } Ok(()) @@ -585,6 +607,11 @@ impl AtomicDexBehaviour { pub fn connected_peers_len(&self) -> usize { self.core.gossipsub.get_num_peers() } + + #[inline] + fn is_peer_temp_banned(&self, peer: &PeerId) -> bool { + self.banned_peers.contains_key(peer) + } } pub enum NodeType { @@ -757,6 +784,7 @@ fn start_gossipsub( peers_exchange, request_response, ping, + banlist: Default::default(), }; let adex_behavior = AtomicDexBehaviour { @@ -765,6 +793,7 @@ fn start_gossipsub( runtime: config.runtime.clone(), cmd_rx, netid: config.netid, + banned_peers: TimedMap::new_with_map_kind(MapKind::FxHashMap), }; libp2p::swarm::SwarmBuilder::with_executor(transport, adex_behavior, local_peer_id, config.runtime.clone()) @@ -818,9 +847,9 @@ fn start_gossipsub( drop(recently_dialed_peers); + let mut ban_cleanup_interval = Ticker::new(Duration::from_secs(10)); let mut check_connected_relays_interval = Ticker::new_with_next(CONNECTED_RELAYS_CHECK_INTERVAL, CONNECTED_RELAYS_CHECK_INTERVAL); - let mut announce_interval = Ticker::new_with_next(ANNOUNCE_INTERVAL, ANNOUNCE_INITIAL_DELAY); let mut listening = false; @@ -882,9 +911,12 @@ fn start_gossipsub( } } - if swarm.behaviour().core.gossipsub.is_relay() { - while let Poll::Ready(Some(_)) = announce_interval.poll_next_unpin(cx) { - announce_my_addresses(&mut swarm); + while let Poll::Ready(Some(_)) = ban_cleanup_interval.poll_next_unpin(cx) { + let expired = swarm.behaviour_mut().banned_peers.drop_expired_entries(); + for (peer, _) in expired { + { + swarm.behaviour_mut().core.banlist.unblock_peer(peer); + } } } @@ -892,6 +924,12 @@ fn start_gossipsub( maintain_connection_to_relays(&mut swarm, &bootstrap); } + if swarm.behaviour().core.gossipsub.is_relay() { + while let Poll::Ready(Some(_)) = announce_interval.poll_next_unpin(cx) { + announce_my_addresses(&mut swarm); + } + } + if !listening && i_am_relay { for listener in Swarm::listeners(&swarm) { info!("Listening on {}", listener); @@ -916,9 +954,9 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses let mut rng = rand::thread_rng(); if connected_relays.len() < mesh_n_low { - let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); let to_connect_num = mesh_n - connected_relays.len(); - let to_connect = + let mut to_connect = { + let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); swarm .behaviour_mut() .core @@ -928,11 +966,14 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses && addresses .iter() .any(|addr| check_and_mark_dialed(&mut recently_dialed_peers, addr)) - }); + }) + }; + to_connect.retain(|peer, _| !swarm.behaviour().is_peer_temp_banned(peer)); // choose some random bootstrap addresses to connect if peers exchange returned not enough peers if to_connect.len() < to_connect_num { let connect_bootstrap_num = to_connect_num - to_connect.len(); + let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); for addr in bootstrap_addresses .iter() .filter(|addr| { @@ -958,7 +999,6 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses } } } - drop(recently_dialed_peers); } if connected_relays.len() > max_n { From e795c0c12ecc6f515ff9918c03334bcc4c96be98 Mon Sep 17 00:00:00 2001 From: shamardy Date: Fri, 12 Sep 2025 14:45:01 +0300 Subject: [PATCH 05/16] p2p: temp-ban on orderbook SyncFailure (1h) --- mm2src/mm2_main/src/lp_network.rs | 52 +++++++++++++++++++++++++++- mm2src/mm2_main/src/lp_ordermatch.rs | 6 +--- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 7de3d33e38..40af79b200 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -23,7 +23,7 @@ use coins::lp_coinfind; use common::executor::SpawnFuture; use common::{log, Future01CompatExt}; -use compatible_time::Instant; +use compatible_time::{Duration, Instant}; use derive_more::Display; use futures::{channel::oneshot, StreamExt}; use keys::KeyPair; @@ -39,9 +39,12 @@ use mm2_libp2p::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse use mm2_libp2p::{PeerAddresses, RequestResponseBehaviourEvent}; use mm2_metrics::{mm_label, mm_timing}; use serde::de; +use std::str::FromStr; use crate::{lp_healthcheck, lp_ordermatch, lp_stats, lp_swap}; +const TEMP_BAN_DURATION_SECS: u64 = 3600; + pub type P2PRequestResult = Result>; pub type P2PProcessResult = Result>; @@ -159,6 +162,29 @@ async fn process_p2p_message( ) .await { + if let lp_ordermatch::OrderbookP2PHandlerError::SyncFailure { propagated_from, .. } = e.get_inner() { + let to_ban = match PeerId::from_str(propagated_from) { + Ok(peer) => peer, + Err(parse_err) => { + log::error!( + "SyncFailure: invalid propagated_from '{}' ({}); skipping temp-ban", + propagated_from, + parse_err + ); + return; + }, + }; + + temp_ban_peer(&ctx, to_ban, Duration::from_secs(TEMP_BAN_DURATION_SECS)); + log::warn!( + "Temp-banned peer {} for {}s due to orderbook SyncFailure", + propagated_from, + TEMP_BAN_DURATION_SECS + ); + + return; + } + if e.get_inner().is_warning() { log::warn!("{}", e); } else { @@ -446,6 +472,30 @@ pub fn add_reserved_peer_addresses(ctx: &MmArc, peer: PeerId, addresses: PeerAdd }; } +pub fn temp_ban_peer(ctx: &MmArc, peer: PeerId, duration: Duration) { + let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx); + let cmd = AdexBehaviourCmd::TempBanPeer { peer, duration }; + let send_res = { + let mut tx = p2p_ctx.cmd_tx.lock(); + tx.try_send(cmd) + }; + if let Err(e) = send_res { + log::error!("temp_ban_peer cmd_tx.send error {:?}", e); + } +} + +pub fn unban_peer(ctx: &MmArc, peer: PeerId) { + let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx); + let cmd = AdexBehaviourCmd::UnbanPeer { peer }; + let send_res = { + let mut tx = p2p_ctx.cmd_tx.lock(); + tx.try_send(cmd) + }; + if let Err(e) = send_res { + log::error!("unban_peer cmd_tx.send error {:?}", e); + } +} + #[derive(Clone, Debug, Display, Serialize)] pub enum NetIdError { #[display(fmt = "Netid {netid} is larger than max {max_netid}")] diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 9b063f16ad..b0ea6af9cc 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -197,11 +197,7 @@ pub enum OrderbookP2PHandlerError { impl OrderbookP2PHandlerError { pub(crate) fn is_warning(&self) -> bool { - // treat SyncFailure as a warning for now due to outdated nodes - matches!( - self, - OrderbookP2PHandlerError::OrderNotFound(_) | OrderbookP2PHandlerError::SyncFailure { .. } - ) + matches!(self, OrderbookP2PHandlerError::OrderNotFound(_)) } } From 1ea54813cd9491d00ee1e3dc60e2024f4529634d Mon Sep 17 00:00:00 2001 From: shamardy Date: Sat, 13 Sep 2025 03:22:07 +0300 Subject: [PATCH 06/16] =?UTF-8?q?lp=5Fnetwork:=20add=20120s=20per=E2=80=91?= =?UTF-8?q?peer=20grace=20before=20temp=E2=80=91banning=20on=20orderbook?= =?UTF-8?q?=20SyncFailure=20to=20prevent=20bans=20during=20initial=20sync?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mm2src/mm2_main/src/lp_network.rs | 74 +++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 13 deletions(-) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 40af79b200..679bfc08ce 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -27,6 +27,7 @@ use compatible_time::{Duration, Instant}; use derive_more::Display; use futures::{channel::oneshot, StreamExt}; use keys::KeyPair; +use lazy_static::lazy_static; use mm2_core::mm_ctx::{MmArc, MmWeak}; use mm2_err_handle::prelude::*; use mm2_libp2p::application::request_response::P2PRequest; @@ -38,12 +39,21 @@ use mm2_libp2p::{ use mm2_libp2p::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse}; use mm2_libp2p::{PeerAddresses, RequestResponseBehaviourEvent}; use mm2_metrics::{mm_label, mm_timing}; +use parking_lot::Mutex; use serde::de; use std::str::FromStr; +use timed_map::{MapKind, TimedMap}; use crate::{lp_healthcheck, lp_ordermatch, lp_stats, lp_swap}; const TEMP_BAN_DURATION_SECS: u64 = 3600; +const PER_PEER_SYNC_BAN_GRACE: Duration = Duration::from_secs(120); +const SYNC_BAN_GRACE_TTL: Duration = Duration::from_secs(600); + +lazy_static! { + static ref SYNC_BAN_GRACE: Mutex> = + Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); +} pub type P2PRequestResult = Result>; pub type P2PProcessResult = Result>; @@ -162,9 +172,56 @@ async fn process_p2p_message( ) .await { - if let lp_ordermatch::OrderbookP2PHandlerError::SyncFailure { propagated_from, .. } = e.get_inner() { - let to_ban = match PeerId::from_str(propagated_from) { - Ok(peer) => peer, + if let lp_ordermatch::OrderbookP2PHandlerError::SyncFailure { + from_pubkey, + propagated_from, + unresolved_pairs, + } = e.get_inner() + { + match PeerId::from_str(propagated_from) { + Ok(peer) => { + let now = Instant::now(); + let mut map = SYNC_BAN_GRACE.lock(); + map.drop_expired_entries(); + + if let Some(first_seen) = map.get(&peer) { + let elapsed = now.duration_since(*first_seen); + if elapsed >= PER_PEER_SYNC_BAN_GRACE { + // Grace elapsed: ban now and clear the entry. + map.remove(&peer); + log::warn!( + "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}) after {}s grace; banning.", + propagated_from, + from_pubkey, + unresolved_pairs, + PER_PEER_SYNC_BAN_GRACE.as_secs() + ); + temp_ban_peer(&ctx, peer, Duration::from_secs(TEMP_BAN_DURATION_SECS)); + return; + } else { + let remaining = PER_PEER_SYNC_BAN_GRACE.as_secs().saturating_sub(elapsed.as_secs()); + log::warn!( + "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}); {}s grace remaining; not banning.", + propagated_from, + from_pubkey, + unresolved_pairs, + remaining + ); + return; + } + } else { + // First SyncFailure observed: start grace window; this entry will auto-expire after SYNC_BAN_GRACE_TTL + map.insert_expirable(peer, now, SYNC_BAN_GRACE_TTL); + log::warn!( + "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}); starting {}s per-peer grace; not banning yet.", + propagated_from, + from_pubkey, + unresolved_pairs, + PER_PEER_SYNC_BAN_GRACE.as_secs() + ); + return; + } + }, Err(parse_err) => { log::error!( "SyncFailure: invalid propagated_from '{}' ({}); skipping temp-ban", @@ -173,16 +230,7 @@ async fn process_p2p_message( ); return; }, - }; - - temp_ban_peer(&ctx, to_ban, Duration::from_secs(TEMP_BAN_DURATION_SECS)); - log::warn!( - "Temp-banned peer {} for {}s due to orderbook SyncFailure", - propagated_from, - TEMP_BAN_DURATION_SECS - ); - - return; + } } if e.get_inner().is_warning() { From 0be27855520664e95089e6984b5cc48bbfccf012 Mon Sep 17 00:00:00 2001 From: shamardy Date: Tue, 16 Sep 2025 01:56:42 +0300 Subject: [PATCH 07/16] fix(ordermatch): safer sync + role/cause-aware bans; tighten keep-alive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Sync: request diffs from our local “from” roots (SyncPlan), reducing FullTrie fallbacks and false mismatches; apply diffs tentatively and revert on mismatch. - Bans: add cause/role-aware temp-ban with per-peer grace (120s, TTL 600s, temp-ban 20m). Relays do not ban Unavailable light nodes; light nodes ban Unavailable relays. Remote role detected via relay mesh query. - Treat Ok(None) on SyncPubkeyOrderbookState as InvalidOrIncomplete (not Unavailable). - Keep-alive: avoid creating pubkey state on stale messages (read-only pre-scan). - Logging: single structured decision log for SyncFailure ban action. Risk/rollout: minor policy tuning; no protocol changes. Verify behavior under lossy links and observe reduced false bans. --- mm2src/mm2_main/src/lp_network.rs | 185 +++++++++++++----- mm2src/mm2_main/src/lp_ordermatch.rs | 278 ++++++++++++++++++--------- 2 files changed, 321 insertions(+), 142 deletions(-) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 679bfc08ce..9f2bfb60b6 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -27,34 +27,22 @@ use compatible_time::{Duration, Instant}; use derive_more::Display; use futures::{channel::oneshot, StreamExt}; use keys::KeyPair; -use lazy_static::lazy_static; use mm2_core::mm_ctx::{MmArc, MmWeak}; use mm2_err_handle::prelude::*; use mm2_libp2p::application::request_response::P2PRequest; use mm2_libp2p::p2p_ctx::P2PContext; use mm2_libp2p::{ - decode_message, encode_message, DecodingError, GossipsubEvent, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic, - MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR, + decode_message, encode_message, get_relay_mesh, DecodingError, GossipsubEvent, GossipsubMessage, Libp2pPublic, + Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR, }; use mm2_libp2p::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse}; use mm2_libp2p::{PeerAddresses, RequestResponseBehaviourEvent}; use mm2_metrics::{mm_label, mm_timing}; -use parking_lot::Mutex; use serde::de; use std::str::FromStr; -use timed_map::{MapKind, TimedMap}; use crate::{lp_healthcheck, lp_ordermatch, lp_stats, lp_swap}; -const TEMP_BAN_DURATION_SECS: u64 = 3600; -const PER_PEER_SYNC_BAN_GRACE: Duration = Duration::from_secs(120); -const SYNC_BAN_GRACE_TTL: Duration = Duration::from_secs(600); - -lazy_static! { - static ref SYNC_BAN_GRACE: Mutex> = - Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); -} - pub type P2PRequestResult = Result>; pub type P2PProcessResult = Result>; @@ -176,51 +164,40 @@ async fn process_p2p_message( from_pubkey, propagated_from, unresolved_pairs, + cause, } = e.get_inner() { + // Determine if the failing peer is a relay (seed) or a light node. + let is_remote_relay = is_peer_in_relay_mesh(&ctx, propagated_from).await; + + // Decide whether we are allowed to ban this peer given node roles and the failure cause. + let allow_ban = sync_ban::decide_allow_ban(is_remote_relay, i_am_relay, cause); + let action = if allow_ban { "grace_or_tempban" } else { "no_ban" }; + log::warn!( + "Orderbook SyncFailure: peer={} pubkey={} pairs={:?} cause={:?} remote_is_relay={} local_is_relay={} action={}", + propagated_from, + from_pubkey, + unresolved_pairs, + cause, + is_remote_relay, + i_am_relay, + action + ); + match PeerId::from_str(propagated_from) { Ok(peer) => { - let now = Instant::now(); - let mut map = SYNC_BAN_GRACE.lock(); - map.drop_expired_entries(); - - if let Some(first_seen) = map.get(&peer) { - let elapsed = now.duration_since(*first_seen); - if elapsed >= PER_PEER_SYNC_BAN_GRACE { - // Grace elapsed: ban now and clear the entry. - map.remove(&peer); - log::warn!( - "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}) after {}s grace; banning.", - propagated_from, - from_pubkey, - unresolved_pairs, - PER_PEER_SYNC_BAN_GRACE.as_secs() - ); - temp_ban_peer(&ctx, peer, Duration::from_secs(TEMP_BAN_DURATION_SECS)); - return; - } else { - let remaining = PER_PEER_SYNC_BAN_GRACE.as_secs().saturating_sub(elapsed.as_secs()); - log::warn!( - "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}); {}s grace remaining; not banning.", - propagated_from, - from_pubkey, - unresolved_pairs, - remaining - ); - return; - } - } else { - // First SyncFailure observed: start grace window; this entry will auto-expire after SYNC_BAN_GRACE_TTL - map.insert_expirable(peer, now, SYNC_BAN_GRACE_TTL); - log::warn!( - "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}); starting {}s per-peer grace; not banning yet.", - propagated_from, - from_pubkey, - unresolved_pairs, - PER_PEER_SYNC_BAN_GRACE.as_secs() - ); + if !allow_ban { return; } + sync_ban::handle_sync_ban_grace( + &ctx, + peer, + from_pubkey, + propagated_from, + unresolved_pairs, + cause, + ); + return; }, Err(parse_err) => { log::error!( @@ -544,6 +521,15 @@ pub fn unban_peer(ctx: &MmArc, peer: PeerId) { } } +/// Returns true if the given peer (string PeerId) is present in the current relay mesh. +/// This clones the cmd_tx under the lock and drops the guard before awaiting to keep the future Send. +pub async fn is_peer_in_relay_mesh(ctx: &MmArc, peer: &str) -> bool { + let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx); + let cmd_tx = p2p_ctx.cmd_tx.lock().clone(); + let mesh = get_relay_mesh(cmd_tx).await; + mesh.iter().any(|p| p == peer) +} + #[derive(Clone, Debug, Display, Serialize)] pub enum NetIdError { #[display(fmt = "Netid {netid} is larger than max {max_netid}")] @@ -581,3 +567,96 @@ pub fn peer_id_from_secp_public(secp_public: &[u8]) -> Result no-ban by policy. +mod sync_ban { + use super::temp_ban_peer; + use crate::lp_ordermatch::SyncFailureCause; + use common::log; + use compatible_time::{Duration, Instant}; + use lazy_static::lazy_static; + use mm2_core::mm_ctx::MmArc; + use mm2_libp2p::PeerId; + use parking_lot::Mutex; + use timed_map::{MapKind, TimedMap}; + + const TEMP_BAN_DURATION_SECS: u64 = 1200; + const PER_PEER_SYNC_BAN_GRACE: Duration = Duration::from_secs(120); + const SYNC_BAN_GRACE_TTL: Duration = Duration::from_secs(600); + + lazy_static! { + static ref SYNC_BAN_GRACE: Mutex> = + Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); + } + + #[inline] + pub(super) fn decide_allow_ban(is_remote_relay: bool, i_am_relay: bool, cause: &SyncFailureCause) -> bool { + if is_remote_relay { + true + } else if i_am_relay { + matches!(cause, SyncFailureCause::InvalidOrIncomplete) + } else { + true + } + } + + pub(super) fn handle_sync_ban_grace( + ctx: &MmArc, + peer: PeerId, + from_pubkey: &str, + propagated_from: &str, + unresolved_pairs: &[String], + cause: &SyncFailureCause, + ) { + let now = Instant::now(); + let mut map = SYNC_BAN_GRACE.lock(); + map.drop_expired_entries(); + + if let Some(first_seen) = map.get(&peer) { + let elapsed = now.duration_since(*first_seen); + if elapsed >= PER_PEER_SYNC_BAN_GRACE { + // Grace elapsed: ban now and clear the entry. + map.remove(&peer); + log::warn!( + "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}, cause {:?}) after {}s grace; banning.", + propagated_from, + from_pubkey, + unresolved_pairs, + cause, + PER_PEER_SYNC_BAN_GRACE.as_secs() + ); + temp_ban_peer(ctx, peer, Duration::from_secs(TEMP_BAN_DURATION_SECS)); + } else { + let remaining = PER_PEER_SYNC_BAN_GRACE.as_secs().saturating_sub(elapsed.as_secs()); + log::warn!( + "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}, cause {:?}); {}s grace remaining; not banning.", + propagated_from, + from_pubkey, + unresolved_pairs, + cause, + remaining + ); + } + } else { + // First SyncFailure observed: start grace window; this entry will auto-expire after SYNC_BAN_GRACE_TTL + map.insert_expirable(peer, now, SYNC_BAN_GRACE_TTL); + log::warn!( + "Orderbook SyncFailure from {} (pubkey {}, pairs {:?}, cause {:?}); starting {}s per-peer grace; not banning yet.", + propagated_from, + from_pubkey, + unresolved_pairs, + cause, + PER_PEER_SYNC_BAN_GRACE.as_secs() + ); + } + } +} diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index b0ea6af9cc..9af87b01fc 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -158,6 +158,14 @@ const SWAP_VERSION_DEFAULT: u8 = 2; pub type OrderbookP2PHandlerResult = Result<(), MmError>; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SyncFailureCause { + /// Peer was unavailable: no response, timeout, or transport failure to SyncPubkeyOrderbookState. + Unavailable, + /// Peer responded but validation failed or did not resolve all requested pairs. + InvalidOrIncomplete, +} + #[derive(Debug, Display)] pub enum OrderbookP2PHandlerError { #[display(fmt = "'{_0}' is an invalid topic for the orderbook handler.")] @@ -186,12 +194,13 @@ pub enum OrderbookP2PHandlerError { }, #[display( - fmt = "Sync failure for pubkey '{from_pubkey}' via '{propagated_from}'; unresolved pairs: {unresolved_pairs:?}" + fmt = "Sync failure for pubkey '{from_pubkey}' via '{propagated_from}'; unresolved pairs: {unresolved_pairs:?}; cause: {cause:?}" )] SyncFailure { from_pubkey: String, propagated_from: String, unresolved_pairs: Vec, + cause: SyncFailureCause, }, } @@ -327,19 +336,10 @@ fn process_trie_delta( new_root } -fn build_pubkey_state_sync_request( - pubkey: &str, - pending_pairs: &HashSet, - expected_pair_roots: &HashMap, -) -> OrdermatchRequest { - let trie_roots = pending_pairs - .iter() - .filter_map(|p| expected_pair_roots.get(p).map(|&root| (p.clone(), root))) - .collect(); - OrdermatchRequest::SyncPubkeyOrderbookState { - pubkey: pubkey.to_owned(), - trie_roots, - } +#[derive(Clone, Copy, Debug)] +struct SyncPlan { + from: H64, + to: H64, } fn apply_pair_orders_diff( @@ -362,82 +362,120 @@ fn apply_pair_orders_diff( } } -fn apply_and_validate_pubkey_state_sync_response( - orderbook_mutex: &PaMutex, +// TODO(sync-v2): Validate protocol_infos/conf_infos after binding them to a signature from the maker. +#[allow(clippy::too_many_arguments)] +fn apply_pair_sync_with_validation( + orderbook: &mut Orderbook, from_pubkey: &str, peer: &str, - response: SyncPubkeyOrderbookStateRes, + pair: AlbOrderedOrderbookPair, + diff: DeltaOrFullTrie, expected_pair_roots: &HashMap, pending_pairs: &mut HashSet, keep_alive_timestamp: u64, + protocol_infos: &HashMap, + conf_infos: &HashMap, ) { - let mut orderbook = orderbook_mutex.lock(); - for (pair, diff) in response.pair_orders_diff { - // Ignore unsolicited pairs we didn't request to prevent state poisoning. - if !pending_pairs.contains(&pair) { - continue; - } + // Ignore unsolicited pairs we didn't request to prevent state poisoning. + if !pending_pairs.contains(&pair) { + return; + } - let new_root = apply_pair_orders_diff( - &mut orderbook, - from_pubkey, - &pair, - diff, - &response.protocol_infos, - &response.conf_infos, - ); + // Snapshot the current state for this (pubkey, pair) to allow non-destructive validation. + let snapshot_orders: Vec = orderbook + .order_set + .values() + .filter(|o| o.pubkey == from_pubkey && alb_ordered_pair(&o.base, &o.rel) == pair) + .cloned() + .collect(); - if let Some(expected) = expected_pair_roots.get(&pair) { - if &new_root == expected { - pending_pairs.remove(&pair); - // Mark per-pair maker-published timestamp once accepted - let state = pubkey_state_mut(&mut orderbook.pubkeys_state, from_pubkey); - state - .latest_root_timestamp_by_pair - .insert(pair.clone(), keep_alive_timestamp); - state.pair_last_seen_local.insert(pair.clone(), now_sec()); - } else { - warn!( - "Sync validation failed for pubkey {} pair {} from {}: expected {:?}, got {:?}. Reverting pair.", - from_pubkey, pair, peer, expected, new_root - ); - remove_pubkey_pair_orders(&mut orderbook, from_pubkey, &pair); + let (prev_pair_last_seen, prev_latest_root_ts) = { + let prev_state = orderbook.pubkeys_state.get(from_pubkey); + let prev_seen = prev_state.and_then(|s| s.pair_last_seen_local.get(&pair).copied()); + let prev_ts = prev_state.and_then(|s| s.latest_root_timestamp_by_pair.get(&pair).copied()); + (prev_seen, prev_ts) + }; + + // Apply diff and compute the new root. + let new_root = apply_pair_orders_diff(orderbook, from_pubkey, &pair, diff, protocol_infos, conf_infos); + + if let Some(expected) = expected_pair_roots.get(&pair) { + if &new_root == expected { + // Accept: refresh maker timestamp floor and liveness. + pending_pairs.remove(&pair); + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, from_pubkey); + state + .latest_root_timestamp_by_pair + .insert(pair.clone(), keep_alive_timestamp); + state.pair_last_seen_local.insert(pair.clone(), now_sec()); + } else { + // Validation failed: revert to the snapshot non-destructively. + warn!( + "Sync validation failed for pubkey {} pair {} from {}: expected {:?}, got {:?}. Reverting pair.", + from_pubkey, pair, peer, expected, new_root + ); + + // Clear current pair state. + remove_pubkey_pair_orders(orderbook, from_pubkey, &pair); + + // Restore orders from the snapshot. + for order in snapshot_orders { + orderbook.insert_or_update_order_update_trie(order); + } + + // Restore timestamps as they were prior to applying the diff. + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, from_pubkey); + match prev_pair_last_seen { + Some(ts) => { + state.pair_last_seen_local.insert(pair.clone(), ts); + }, + None => { + state.pair_last_seen_local.remove(&pair); + }, + } + match prev_latest_root_ts { + Some(ts) => { + state.latest_root_timestamp_by_pair.insert(pair.clone(), ts); + }, + None => { + state.latest_root_timestamp_by_pair.remove(&pair); + }, } } } } -async fn request_and_apply_pubkey_state_sync_from_peer( - ctx: &MmArc, - orderbook: &PaMutex, +fn apply_and_validate_pubkey_state_sync_response( + orderbook_mutex: &PaMutex, from_pubkey: &str, peer: &str, + response: SyncPubkeyOrderbookStateRes, expected_pair_roots: &HashMap, pending_pairs: &mut HashSet, keep_alive_timestamp: u64, -) -> OrderbookP2PHandlerResult { - let current_req = build_pubkey_state_sync_request(from_pubkey, pending_pairs, expected_pair_roots); - - if let Some(resp) = request_one_peer::( - ctx.clone(), - P2PRequest::Ordermatch(current_req), - peer.to_string(), - ) - .await - .map_mm_err()? - { - apply_and_validate_pubkey_state_sync_response( - orderbook, +) { + let SyncPubkeyOrderbookStateRes { + last_signed_pubkey_payload: _, + pair_orders_diff, + protocol_infos, + conf_infos, + } = response; + let mut orderbook = orderbook_mutex.lock(); + for (pair, diff) in pair_orders_diff { + // delegate per-pair apply/validate to a helper to keep nesting shallow + apply_pair_sync_with_validation( + &mut orderbook, from_pubkey, peer, - resp, + pair, + diff, expected_pair_roots, pending_pairs, keep_alive_timestamp, + &protocol_infos, + &conf_infos, ); } - - Ok(()) } async fn process_orders_keep_alive( @@ -451,37 +489,90 @@ async fn process_orders_keep_alive( let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); // Update local state and decide whether to sync. - let trie_roots_to_request = + let plan_by_pair = ordermatch_ctx .orderbook .lock() .process_keep_alive(&from_pubkey, keep_alive, i_am_relay, &propagated_from)?; - if trie_roots_to_request.is_empty() { + if plan_by_pair.is_empty() { // The message was processed, return Ok to forward it return Ok(()); } - // Query ONLY the keepalive propagator. - let mut remaining_pairs: HashSet = trie_roots_to_request.keys().cloned().collect(); - let _ = request_and_apply_pubkey_state_sync_from_peer( - &ctx, - &ordermatch_ctx.orderbook, - &from_pubkey, - &propagated_from, - &trie_roots_to_request, - &mut remaining_pairs, - keep_alive_timestamp, + // Separate what we need for the request (from roots) and for validation (expected to roots) + let mut pending_pairs: HashSet = HashSet::new(); + let mut from_roots_by_pair: HashMap = HashMap::new(); + let mut expected_pair_roots: HashMap = HashMap::new(); + + for (pair, plan) in plan_by_pair { + pending_pairs.insert(pair.clone()); + from_roots_by_pair.insert(pair.clone(), plan.from); + expected_pair_roots.insert(pair, plan.to); + } + + // Build the V1 request using our local "from" roots (no extra lock needed) + // TODO(sync-v2): Extend this to include per-pair from_root and to_root, + // requesting an exact diff targeted at our state, even if the responder + // advanced after propagating the keep-alive. This should reduce false + // InvalidOrIncomplete classifications and unnecessary bans. + let current_req = OrdermatchRequest::SyncPubkeyOrderbookState { + pubkey: from_pubkey.clone(), + trie_roots: from_roots_by_pair, + }; + + let mut had_response = false; + + match request_one_peer::( + ctx.clone(), + P2PRequest::Ordermatch(current_req), + propagated_from.clone(), ) - .await; + .await + { + Ok(Some(resp)) => { + had_response = true; + apply_and_validate_pubkey_state_sync_response( + &ordermatch_ctx.orderbook, + &from_pubkey, + &propagated_from, + resp, + &expected_pair_roots, + &mut pending_pairs, + keep_alive_timestamp, + ); + }, + Ok(None) => { + // Peer responded but returned None for a diff request; classify as InvalidOrIncomplete. + // This is unexpected for SyncPubkeyOrderbookState. + had_response = true; + warn!( + "SyncPubkeyOrderbookState request to {} returned None; treating as InvalidOrIncomplete", + propagated_from + ); + }, + Err(e) => { + // Transport/timeout or other request error; treat as Unavailable. + warn!( + "SyncPubkeyOrderbookState request to {} failed: {}; treating as Unavailable", + propagated_from, e + ); + }, + } - // Phase 4: Finalize; if unresolved, mark unsynced and DO NOT FORWARD. - if !remaining_pairs.is_empty() { - let unresolved_pairs = remaining_pairs.into_iter().collect::>(); + // If anything remains unresolved, treat as sync failure + if !pending_pairs.is_empty() { + let unresolved_pairs = pending_pairs.into_iter().collect::>(); + let cause = if had_response { + SyncFailureCause::InvalidOrIncomplete + } else { + SyncFailureCause::Unavailable + }; return MmError::err(OrderbookP2PHandlerError::SyncFailure { from_pubkey, propagated_from, unresolved_pairs, + cause, }); } @@ -3108,7 +3199,7 @@ impl Orderbook { message: new_protocol::PubkeyKeepAlive, i_am_relay: bool, propagated_from: &str, - ) -> Result, MmError> { + ) -> Result, MmError> { // TODO(rate-limit): // Add a single, shared in‑process rate limiter for OrdermatchMessage // types (e.g., a module in mm2_p2p or lp_network). @@ -3120,7 +3211,8 @@ impl Orderbook { // so this is equivalent to checking that one pair. If multi-pair keep-alives return, // demote only stale pairs instead of rejecting the whole message. { - let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); + // Read-only pre-scan: do not create a pubkey state for messages that will be rejected as stale. + let existing_state = self.pubkeys_state.get(from_pubkey); for (alb_pair, _) in message.trie_roots.iter() { let subscribed = self .topics_subscribed_to @@ -3129,12 +3221,14 @@ impl Orderbook { continue; } - let last_pair_timestamp = *pubkey_state.latest_root_timestamp_by_pair.get(alb_pair).unwrap_or(&0); + let last_pair_timestamp = existing_state + .and_then(|s| s.latest_root_timestamp_by_pair.get(alb_pair).copied()) + .unwrap_or(0); if message.timestamp <= last_pair_timestamp { log::debug!( - "Ignoring a stale PubkeyKeepAlive from {} for pair {}: message.timestamp={} <= last_pair_timestamp={}", - from_pubkey, alb_pair, message.timestamp, last_pair_timestamp + "Ignoring a stale PubkeyKeepAlive from {} for pair {}: message.timestamp={} <= last_pair_timestamp={}", + from_pubkey, alb_pair, message.timestamp, last_pair_timestamp ); return MmError::err(OrderbookP2PHandlerError::StaleKeepAlive { from_pubkey: from_pubkey.to_owned(), @@ -3144,7 +3238,7 @@ impl Orderbook { } } - let mut trie_roots_to_request = HashMap::new(); + let mut plan_by_pair = HashMap::new(); for (alb_pair, trie_root) in message.trie_roots { let subscribed = self @@ -3185,7 +3279,13 @@ impl Orderbook { *order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_pair) }; if current_root != trie_root { - trie_roots_to_request.insert(alb_pair, trie_root); + plan_by_pair.insert( + alb_pair.clone(), + SyncPlan { + from: current_root, + to: trie_root, + }, + ); continue; } let state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); @@ -3195,7 +3295,7 @@ impl Orderbook { state.pair_last_seen_local.insert(alb_pair, now_sec()); } - Ok(trie_roots_to_request) + Ok(plan_by_pair) } fn orderbook_item_with_proof(&self, order: OrderbookItem) -> OrderbookItemWithProof { From b21b7784b3dd3f3b98688d18ca4a1ecc774c4f2b Mon Sep 17 00:00:00 2001 From: shamardy Date: Tue, 16 Sep 2025 07:47:09 +0300 Subject: [PATCH 08/16] p2p: track ban reasons; auto-unban connectivity to restore mesh --- mm2src/mm2_main/src/lp_network.rs | 18 ++- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 126 +++++++++++++++++++-- mm2src/mm2_p2p/src/behaviours/mod.rs | 1 + mm2src/mm2_p2p/src/behaviours/policy.rs | 34 ++++++ mm2src/mm2_p2p/src/lib.rs | 3 + 5 files changed, 169 insertions(+), 13 deletions(-) create mode 100644 mm2src/mm2_p2p/src/behaviours/policy.rs diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 9f2bfb60b6..479beac5fc 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -32,8 +32,8 @@ use mm2_err_handle::prelude::*; use mm2_libp2p::application::request_response::P2PRequest; use mm2_libp2p::p2p_ctx::P2PContext; use mm2_libp2p::{ - decode_message, encode_message, get_relay_mesh, DecodingError, GossipsubEvent, GossipsubMessage, Libp2pPublic, - Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR, + remove_ban_reason, decode_message, encode_message, get_relay_mesh, DecodingError, GossipsubEvent, GossipsubMessage, + Libp2pPublic, Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR, }; use mm2_libp2p::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse}; use mm2_libp2p::{PeerAddresses, RequestResponseBehaviourEvent}; @@ -518,6 +518,8 @@ pub fn unban_peer(ctx: &MmArc, peer: PeerId) { }; if let Err(e) = send_res { log::error!("unban_peer cmd_tx.send error {:?}", e); + } else { + remove_ban_reason(&peer); } } @@ -585,7 +587,7 @@ mod sync_ban { use compatible_time::{Duration, Instant}; use lazy_static::lazy_static; use mm2_core::mm_ctx::MmArc; - use mm2_libp2p::PeerId; + use mm2_libp2p::{set_ban_reason, BanReason, PeerId}; use parking_lot::Mutex; use timed_map::{MapKind, TimedMap}; @@ -609,6 +611,14 @@ mod sync_ban { } } + #[inline] + fn ban_reason_from_sync_failure(cause: &SyncFailureCause) -> BanReason { + match cause { + SyncFailureCause::Unavailable => BanReason::Connectivity, + SyncFailureCause::InvalidOrIncomplete => BanReason::Misbehavior, + } + } + pub(super) fn handle_sync_ban_grace( ctx: &MmArc, peer: PeerId, @@ -634,6 +644,8 @@ mod sync_ban { cause, PER_PEER_SYNC_BAN_GRACE.as_secs() ); + + set_ban_reason(peer, ban_reason_from_sync_failure(cause)); temp_ban_peer(ctx, peer, Duration::from_secs(TEMP_BAN_DURATION_SECS)); } else { let remaining = PER_PEER_SYNC_BAN_GRACE.as_secs().saturating_sub(elapsed.as_secs()); diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 80466eb711..b269230c37 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -24,7 +24,7 @@ use log::{debug, error, info}; use mm2_net::ip_addr::is_global_ipv4; use rand::seq::SliceRandom; use std::collections::hash_map::DefaultHasher; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::sync::{Mutex, MutexGuard}; @@ -41,7 +41,10 @@ use crate::application::request_response::network_info::NetworkInfoRequest; use crate::application::request_response::P2PRequest; use crate::relay_address::{RelayAddress, RelayAddressError}; use crate::swarm_runtime::SwarmRuntime; -use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; +use crate::{ + remove_ban_reason, decode_message, encode_message, ban_reason, BanReason, NetworkInfo, NetworkPorts, + RequestResponseBehaviourEvent, +}; pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{ @@ -916,6 +919,7 @@ fn start_gossipsub( for (peer, _) in expired { { swarm.behaviour_mut().core.banlist.unblock_peer(peer); + remove_ban_reason(&peer); } } } @@ -944,17 +948,112 @@ fn start_gossipsub( Ok((cmd_tx, event_rx, local_peer_id)) } +/// Select temporary banned peers eligible for connectivity recovery, ordered by the earliest expiry, +/// unban them just-in-time, and return as {peer -> dial addresses}. +/// Only peers explicitly marked with BanReason::Connectivity are eligible. +fn top_up_from_banned( + swarm: &mut AtomicDexSwarm, + needed: usize, + connected_len: usize, + target_mesh_n: usize, +) -> HashMap> { + if needed == 0 { + return HashMap::new(); + } + + // Collect connectivity-related bans and order by the earliest expiry. + let mut candidates: Vec<(PeerId, Duration)> = { + let behaviour = swarm.behaviour(); + let banned = &behaviour.banned_peers; + + banned + .iter() + .filter(|(peer, _)| matches!(ban_reason(peer), Some(BanReason::Connectivity))) + .map(|(peer, _)| { + let remaining_duration = banned + .get_remaining_duration(peer) + .unwrap_or_else(|| Duration::from_secs(u64::MAX)); + (*peer, remaining_duration) + }) + .collect() + }; + candidates.sort_unstable_by_key(|&(_, rem)| rem); + + let mut selected: HashMap> = HashMap::new(); + + for (peer, _) in candidates.into_iter() { + if selected.len() >= needed { + break; + } + + // 1) Gather known addresses for the peer + let addrs: Vec = { + let behaviour_mut = swarm.behaviour_mut(); + behaviour_mut + .core + .peers_exchange + .request_response + .addresses_of_peer(&peer) + }; + if addrs.is_empty() { + continue; + } + + // 2) Skip if we're already connected to any known address. + let already_connected = { + let behaviour = swarm.behaviour(); + addrs.iter().any(|a| behaviour.core.gossipsub.is_connected_to_addr(a)) + }; + if already_connected { + continue; + } + + // 3) Deduplicate dials with a short critical section on the "recently dialed" map. + let to_dial: Vec = { + let mut recent = RECENTLY_DIALED_PEERS.lock().unwrap(); + addrs + .into_iter() + .filter(|addr| check_and_mark_dialed(&mut recent, addr)) + .collect() + }; + + if to_dial.is_empty() { + continue; + } + + // 4) Just-in-time unban before dialing, then clear the auxiliary reason tag. + { + let behaviour_mut = swarm.behaviour_mut(); + behaviour_mut.core.banlist.unblock_peer(peer); + behaviour_mut.banned_peers.remove(&peer); + } + remove_ban_reason(&peer); + + info!( + "Auto-unban: peer {} (reason=Connectivity) to restore relay mesh (connected={}, target={})", + peer, connected_len, target_mesh_n + ); + + selected.insert(peer, to_dial.into_iter().collect()); + } + + selected +} + fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses: &[Multiaddr]) { let behaviour = swarm.behaviour(); let connected_relays = behaviour.core.gossipsub.connected_relays(); - let mesh_n_low = behaviour.core.gossipsub.get_config().mesh_n_low(); - let mesh_n = behaviour.core.gossipsub.get_config().mesh_n(); + let connected_len = connected_relays.len(); + + let cfg = behaviour.core.gossipsub.get_config(); + let mesh_n_low = cfg.mesh_n_low(); + let mesh_n = cfg.mesh_n(); // allow 2 * mesh_n_high connections to other nodes - let max_n = behaviour.core.gossipsub.get_config().mesh_n_high() * 2; + let max_n = cfg.mesh_n_high() * 2; let mut rng = rand::thread_rng(); - if connected_relays.len() < mesh_n_low { - let to_connect_num = mesh_n - connected_relays.len(); + if connected_len < mesh_n_low { + let to_connect_num = mesh_n - connected_len; let mut to_connect = { let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); swarm @@ -970,7 +1069,14 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses }; to_connect.retain(|peer, _| !swarm.behaviour().is_peer_temp_banned(peer)); - // choose some random bootstrap addresses to connect if peers exchange returned not enough peers + // If still short, iteratively top up from earliest-expiring banned peers + let deficit = to_connect_num.saturating_sub(to_connect.len()); + if deficit > 0 { + let recovered = top_up_from_banned(swarm, deficit, connected_len, mesh_n); + to_connect.extend(recovered); + } + + // If still short, use bootstrap addresses to fill the remaining deficit if to_connect.len() < to_connect_num { let connect_bootstrap_num = to_connect_num - to_connect.len(); let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); @@ -1001,8 +1107,8 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses } } - if connected_relays.len() > max_n { - let to_disconnect_num = connected_relays.len() - max_n; + if connected_len > max_n { + let to_disconnect_num = connected_len - max_n; let relays_mesh = swarm.behaviour().core.gossipsub.get_relay_mesh(); let not_in_mesh: Vec<_> = connected_relays .iter() diff --git a/mm2src/mm2_p2p/src/behaviours/mod.rs b/mm2src/mm2_p2p/src/behaviours/mod.rs index 481c15d6ef..1f385bd20e 100644 --- a/mm2src/mm2_p2p/src/behaviours/mod.rs +++ b/mm2src/mm2_p2p/src/behaviours/mod.rs @@ -3,6 +3,7 @@ pub mod atomicdex; mod ping; // mod peer_store; pub(crate) mod peers_exchange; +pub mod policy; pub(crate) mod request_response; #[cfg(all(test, not(target_arch = "wasm32")))] diff --git a/mm2src/mm2_p2p/src/behaviours/policy.rs b/mm2src/mm2_p2p/src/behaviours/policy.rs new file mode 100644 index 0000000000..1df9dabcdc --- /dev/null +++ b/mm2src/mm2_p2p/src/behaviours/policy.rs @@ -0,0 +1,34 @@ +use lazy_static::lazy_static; +use libp2p::PeerId; +use parking_lot::Mutex; +use std::collections::HashMap; + +/// Minimal explicit ban reasons used by the app-level policy. +/// - Connectivity: temporary connectivity-related bans that may be auto-unbanned to restore mesh. +/// - Misbehavior: validation or protocol issues; must NOT be auto-unbanned. +/// - Unknown: default/unspecified classification. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum BanReason { + Connectivity, + Misbehavior, + Unknown, +} + +lazy_static! { + static ref BAN_REASONS: Mutex> = Mutex::new(HashMap::new()); +} + +/// Record/update a reason. +pub fn set_ban_reason(peer: PeerId, reason: BanReason) { + BAN_REASONS.lock().insert(peer, reason); +} + +/// Return the recorded ban reason for a peer (if any). +pub fn ban_reason(peer: &PeerId) -> Option { + BAN_REASONS.lock().get(peer).copied() +} + +/// Remove any recorded ban reason for a peer. (call on unban). +pub fn remove_ban_reason(peer: &PeerId) { + BAN_REASONS.lock().remove(peer); +} diff --git a/mm2src/mm2_p2p/src/lib.rs b/mm2src/mm2_p2p/src/lib.rs index 8438354b9e..859749c89f 100644 --- a/mm2src/mm2_p2p/src/lib.rs +++ b/mm2src/mm2_p2p/src/lib.rs @@ -29,6 +29,9 @@ pub use behaviours::atomicdex::{ // peers-exchange re-exports pub use behaviours::peers_exchange::PeerAddresses; +// policy related re-exports +pub use behaviours::policy::{remove_ban_reason, ban_reason, set_ban_reason, BanReason}; + // request-response related re-exports pub use behaviours::request_response::RequestResponseBehaviourEvent; From 08345d498b34ca7a0e0a834b101d263c4759c180 Mon Sep 17 00:00:00 2001 From: shamardy Date: Wed, 17 Sep 2025 07:41:22 +0300 Subject: [PATCH 09/16] =?UTF-8?q?fix(p2p):=20use=20libp2p=20validation?= =?UTF-8?q?=E2=80=91based=20forwarding=20-=20Route=20message=20forwarding?= =?UTF-8?q?=20through=20libp2p=E2=80=99s=20validation=20path=20instead=20o?= =?UTF-8?q?f=20manual=20propagation=20-=20This=20achieves=20faster=20gossi?= =?UTF-8?q?p=20cache=20cleanup=20while=20avoiding=20forwarding=20malformed?= =?UTF-8?q?/invalid=20messages=20as=20before?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mm2src/mm2_main/src/lp_network.rs | 155 ++++++++++----------- mm2src/mm2_main/src/lp_ordermatch.rs | 6 - mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 18 ++- mm2src/mm2_p2p/src/lib.rs | 4 +- 4 files changed, 86 insertions(+), 97 deletions(-) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 479beac5fc..3b3777389c 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -32,8 +32,8 @@ use mm2_err_handle::prelude::*; use mm2_libp2p::application::request_response::P2PRequest; use mm2_libp2p::p2p_ctx::P2PContext; use mm2_libp2p::{ - remove_ban_reason, decode_message, encode_message, get_relay_mesh, DecodingError, GossipsubEvent, GossipsubMessage, - Libp2pPublic, Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR, + decode_message, encode_message, get_relay_mesh, remove_ban_reason, DecodingError, GossipsubEvent, GossipsubMessage, + Libp2pPublic, Libp2pSecpPublic, MessageAcceptance, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR, }; use mm2_libp2p::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse}; use mm2_libp2p::{PeerAddresses, RequestResponseBehaviourEvent}; @@ -74,8 +74,10 @@ pub enum P2PRequestError { #[allow(clippy::enum_variant_names)] pub enum P2PProcessError { /// The message could not be decoded. + #[display(fmt = "Message decode error: {_0}")] DecodeError(String), /// Message signature is invalid. + #[display(fmt = "Invalid message signature: {_0}")] InvalidSignature(String), /// Unexpected message sender. #[display(fmt = "Unexpected message sender {_0}")] @@ -112,13 +114,21 @@ pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay message, } => { let spawner = ctx.spawner(); - spawner.spawn(process_p2p_message( - ctx, - propagation_source, - message_id, - message, - i_am_relay, - )); + let fut = async move { + // TODO(peer-scoring): handle some errors as `MessageAcceptance::Reject` to apply P4 penalty + let acceptance = + match process_p2p_message(ctx.clone(), propagation_source, message, i_am_relay).await { + Ok(()) => MessageAcceptance::Accept, + Err(e) => { + log::warn!("Error on process P2P message: {}", e); + MessageAcceptance::Ignore + }, + }; + // With gossipsub `validate_messages` config flag enabled, gossipsub waits for this callback before forwarding. + // Accept leads to forwarding only on relay nodes per mesh policy; light nodes do not forward. + report_message_validation(&ctx, propagation_source, message_id, acceptance); + }; + spawner.spawn(fut); }, GossipsubEvent::GossipsubNotSupported { peer_id } => { log::error!("Received unsupported event from Peer: {peer_id}"); @@ -139,15 +149,18 @@ pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay } } +/// Gossipsub validation policy (app-level) +/// +/// - Known topic families: Accept on successful processing; on any error => Ignore. +/// - Unknown or missing topic prefix: return P2PProcessError::ValidationFailed(...), which maps to Ignore. +/// - Forwarding is handled by libp2p: when we report Accept, relays may forward per mesh policy; light nodes do not forward. +/// - Future: clearly malformed inputs (e.g., decode/signature errors) may be mapped to Reject to enable peer-scoring penalties. async fn process_p2p_message( ctx: MmArc, peer_id: PeerId, - message_id: MessageId, message: GossipsubMessage, i_am_relay: bool, -) { - let mut to_propagate = false; - +) -> P2PProcessResult<()> { let mut split = message.topic.as_str().split(TOPIC_SEPARATOR); match split.next() { Some(lp_ordermatch::ORDERBOOK_PREFIX) => { @@ -186,74 +199,56 @@ async fn process_p2p_message( match PeerId::from_str(propagated_from) { Ok(peer) => { - if !allow_ban { - return; + if allow_ban { + sync_ban::handle_sync_ban_grace( + &ctx, + peer, + from_pubkey, + propagated_from, + unresolved_pairs, + cause, + ); } - sync_ban::handle_sync_ban_grace( - &ctx, - peer, - from_pubkey, - propagated_from, - unresolved_pairs, - cause, - ); - return; }, Err(parse_err) => { - log::error!( + return MmError::err(P2PProcessError::ValidationFailed(format!( "SyncFailure: invalid propagated_from '{}' ({}); skipping temp-ban", - propagated_from, - parse_err - ); - return; + propagated_from, parse_err + ))); }, } } - if e.get_inner().is_warning() { - log::warn!("{}", e); - } else { - log::error!("{}", e); - } - return; + return MmError::err(P2PProcessError::ValidationFailed(e.to_string())); } - - to_propagate = true; }, Some(lp_swap::SWAP_PREFIX) => { if let Err(e) = lp_swap::process_swap_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await { - log::error!("{}", e); - return; + return MmError::err(P2PProcessError::ValidationFailed(e.to_string())); } - - to_propagate = true; }, Some(lp_swap::SWAP_V2_PREFIX) => { if let Err(e) = lp_swap::process_swap_v2_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data) { - log::error!("{}", e); - return; + return MmError::err(P2PProcessError::ValidationFailed(e.to_string())); } - - to_propagate = true; }, Some(lp_swap::WATCHER_PREFIX) => { if ctx.is_watcher() { if let Err(e) = lp_swap::process_watcher_msg(ctx.clone(), &message.data) { - log::error!("{}", e); - return; + return MmError::err(P2PProcessError::ValidationFailed(e.to_string())); } } - - to_propagate = true; }, Some(lp_swap::TX_HELPER_PREFIX) => { if let Some(ticker) = split.next() { if let Ok(Some(coin)) = lp_coinfind(&ctx, ticker).await { if let Err(e) = coin.tx_enum_from_bytes(&message.data) { - log::error!("Message cannot continue the process due to: {:?}", e); - return; + return MmError::err(P2PProcessError::ValidationFailed(format!( + "Message cannot continue the process due to: {:?}", + e + ))); }; if coin.is_utxo_in_native_mode() { @@ -269,24 +264,22 @@ async fn process_p2p_message( }) } } - - to_propagate = true; } }, Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => { if let Err(e) = lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await { - log::error!("{}", e); - return; + return MmError::err(P2PProcessError::ValidationFailed(e.to_string())); } - - to_propagate = true; }, - None | Some(_) => (), + None | Some(_) => { + return MmError::err(P2PProcessError::ValidationFailed(format!( + "Unknown or missing topic prefix in '{}'", + message.topic + ))); + }, } - if to_propagate && i_am_relay { - propagate_message(&ctx, message_id, peer_id); - } + Ok(()) } fn process_p2p_request( @@ -476,51 +469,49 @@ fn parse_peers_responses( .collect() } -pub fn propagate_message(ctx: &MmArc, message_id: MessageId, propagation_source: PeerId) { +pub fn add_reserved_peer_addresses(ctx: &MmArc, peer: PeerId, addresses: PeerAddresses) { let ctx = ctx.clone(); let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd = AdexBehaviourCmd::PropagateMessage { - message_id, - propagation_source, - }; + let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses }; if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { - log::error!("propagate_message cmd_tx.send error {:?}", e); + log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e); }; } -pub fn add_reserved_peer_addresses(ctx: &MmArc, peer: PeerId, addresses: PeerAddresses) { +fn report_message_validation( + ctx: &MmArc, + propagation_source: PeerId, + message_id: MessageId, + acceptance: MessageAcceptance, +) { let ctx = ctx.clone(); let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses }; + let cmd = AdexBehaviourCmd::ReportMessageValidation { + message_id, + propagation_source, + acceptance, + }; if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { - log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e); + log::error!("report_validation cmd_tx.send error {:?}", e); }; } pub fn temp_ban_peer(ctx: &MmArc, peer: PeerId, duration: Duration) { let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx); let cmd = AdexBehaviourCmd::TempBanPeer { peer, duration }; - let send_res = { - let mut tx = p2p_ctx.cmd_tx.lock(); - tx.try_send(cmd) - }; - if let Err(e) = send_res { + if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { log::error!("temp_ban_peer cmd_tx.send error {:?}", e); - } + }; } pub fn unban_peer(ctx: &MmArc, peer: PeerId) { let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx); let cmd = AdexBehaviourCmd::UnbanPeer { peer }; - let send_res = { - let mut tx = p2p_ctx.cmd_tx.lock(); - tx.try_send(cmd) - }; - if let Err(e) = send_res { + if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { log::error!("unban_peer cmd_tx.send error {:?}", e); } else { remove_ban_reason(&peer); - } + }; } /// Returns true if the given peer (string PeerId) is present in the current relay mesh. diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 9af87b01fc..ecc7c3c641 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -204,12 +204,6 @@ pub enum OrderbookP2PHandlerError { }, } -impl OrderbookP2PHandlerError { - pub(crate) fn is_warning(&self) -> bool { - matches!(self, OrderbookP2PHandlerError::OrderNotFound(_)) - } -} - impl From for OrderbookP2PHandlerError { fn from(e: P2PRequestError) -> Self { OrderbookP2PHandlerError::P2PRequestError(e.to_string()) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index b269230c37..2b469b484c 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -42,13 +42,13 @@ use crate::application::request_response::P2PRequest; use crate::relay_address::{RelayAddress, RelayAddressError}; use crate::swarm_runtime::SwarmRuntime; use crate::{ - remove_ban_reason, decode_message, encode_message, ban_reason, BanReason, NetworkInfo, NetworkPorts, + ban_reason, decode_message, encode_message, remove_ban_reason, BanReason, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent, }; pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{ - ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent, Message as GossipsubMessage, + ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent, Message as GossipsubMessage, MessageAcceptance, }; pub type AdexCmdTx = Sender; @@ -189,9 +189,11 @@ pub enum AdexBehaviourCmd { peer: PeerId, addresses: PeerAddresses, }, - PropagateMessage { - message_id: MessageId, + /// Report gossipsub message validation result (Accept/Reject/Ignore). + ReportMessageValidation { propagation_source: PeerId, + message_id: MessageId, + acceptance: MessageAcceptance, }, TempBanPeer { peer: PeerId, @@ -568,13 +570,14 @@ impl AtomicDexBehaviour { .peers_exchange .add_peer_addresses_to_reserved_peers(&peer, addresses); }, - AdexBehaviourCmd::PropagateMessage { - message_id, + AdexBehaviourCmd::ReportMessageValidation { propagation_source, + message_id, + acceptance, } => { self.core .gossipsub - .propagate_message(&message_id, &propagation_source)?; + .report_message_validation_result(&message_id, &propagation_source, acceptance)?; }, AdexBehaviourCmd::TempBanPeer { peer, duration } => { self.core.banlist.block_peer(peer); @@ -762,6 +765,7 @@ fn start_gossipsub( .mesh_n(mesh_n) .mesh_n_high(mesh_n_high) .validate_messages() + // TODO(signing): switch to Strict after revising how signing is done .validation_mode(ValidationMode::Permissive) .max_transmit_size(MAX_BUFFER_SIZE) .build() diff --git a/mm2src/mm2_p2p/src/lib.rs b/mm2src/mm2_p2p/src/lib.rs index 859749c89f..7328e805dd 100644 --- a/mm2src/mm2_p2p/src/lib.rs +++ b/mm2src/mm2_p2p/src/lib.rs @@ -23,14 +23,14 @@ pub use crate::swarm_runtime::SwarmRuntime; pub use behaviours::atomicdex::{ get_directly_connected_peers, get_gossip_mesh, get_gossip_peer_topics, get_gossip_topic_peers, get_relay_mesh, spawn_gossipsub, AdexBehaviourCmd, AdexBehaviourError, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse, - AdexResponseChannel, GossipsubEvent, GossipsubMessage, MessageId, NodeType, TopicHash, WssCerts, + AdexResponseChannel, GossipsubEvent, GossipsubMessage, MessageAcceptance, MessageId, NodeType, TopicHash, WssCerts, }; // peers-exchange re-exports pub use behaviours::peers_exchange::PeerAddresses; // policy related re-exports -pub use behaviours::policy::{remove_ban_reason, ban_reason, set_ban_reason, BanReason}; +pub use behaviours::policy::{ban_reason, remove_ban_reason, set_ban_reason, BanReason}; // request-response related re-exports pub use behaviours::request_response::RequestResponseBehaviourEvent; From a69f66d7dae9eb6d0ea85011ceea063e28416046 Mon Sep 17 00:00:00 2001 From: shamardy Date: Wed, 17 Sep 2025 07:48:41 +0300 Subject: [PATCH 10/16] add a todo note about banning for other reasons like invalid topics among other things --- mm2src/mm2_main/src/lp_network.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 3b3777389c..3f4476fcd9 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -200,6 +200,10 @@ async fn process_p2p_message( match PeerId::from_str(propagated_from) { Ok(peer) => { if allow_ban { + // TODO: + // Banning can be moved to `p2p_event_process_loop` for any `ValidationFailed` errors + // as a node shouldn't forward an invalid message to other peers. No grace period should be + // allowed then as it shouldn't be needed once all nodes update. sync_ban::handle_sync_ban_grace( &ctx, peer, From 6b9db96e74d9ea6acad786a607fcb74a8ad16b2b Mon Sep 17 00:00:00 2001 From: shamardy Date: Wed, 17 Sep 2025 08:01:17 +0300 Subject: [PATCH 11/16] add StaleKeepAlive.delay field to debug the logs for how late are the keepalive messages --- mm2src/mm2_main/src/lp_ordermatch.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index ecc7c3c641..c2e2dc1dcb 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -186,11 +186,12 @@ pub enum OrderbookP2PHandlerError { Internal(String), #[display( - fmt = "Received stale keep alive from pubkey '{from_pubkey}' propagated from '{propagated_from}', will ignore it" + fmt = "Received stale keep alive from pubkey '{from_pubkey}' propagated from '{propagated_from}' (delay: {delay}s), will ignore it" )] StaleKeepAlive { from_pubkey: String, propagated_from: String, + delay: u64, }, #[display( @@ -3224,9 +3225,11 @@ impl Orderbook { "Ignoring a stale PubkeyKeepAlive from {} for pair {}: message.timestamp={} <= last_pair_timestamp={}", from_pubkey, alb_pair, message.timestamp, last_pair_timestamp ); + let delay = last_pair_timestamp.saturating_sub(message.timestamp); return MmError::err(OrderbookP2PHandlerError::StaleKeepAlive { from_pubkey: from_pubkey.to_owned(), propagated_from: propagated_from.to_owned(), + delay, }); } } From dac2e3eabe5cccc7be9e8a5e79b948eb1348e7b5 Mon Sep 17 00:00:00 2001 From: shamardy Date: Sat, 20 Sep 2025 04:37:20 +0300 Subject: [PATCH 12/16] docs(ordermatch,gossipsub): KA duplicates investigation; TODOs only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Observed repeated PubkeyKeepAlive messages; root cause not confirmed yet. Hypotheses include seq no. based MessageId dedup gaps and equal-timestamp behavior. - Added investigation notes and TODOs: considered content‑derived MessageId. - Deferred code changes until we gather more evidence to avoid risky churn. No functional changes; comments and guidance only. --- mm2src/mm2_main/src/lp_network.rs | 9 ++++++++ mm2src/mm2_main/src/lp_ordermatch.rs | 9 ++++++++ mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 27 ++++++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 3f4476fcd9..ac2563f970 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -223,6 +223,15 @@ async fn process_p2p_message( } } + // TODO(stale-keep-alive): + // - Currently we do not ban on StaleKeepAlive as the reason for some identical payloads is not known; + // we only don't process / ignore these payloads based on maker timestamp checks. + // Once all nodes update to this version, we still can't ban unless we are sure that dedup cache is working well + // and the stale messages that are delivered to the app layer are truly a sign of misbehavior. + // - App-layer timestamp checks are sufficient for now since banning risks penalizing good peers. + // - Once we use a stable, content-derived MessageId and tune the dedup cache, we can implement banning. + // This will result in better memory and resource usage, nothing more. + return MmError::err(P2PProcessError::ValidationFailed(e.to_string())); } }, diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index c2e2dc1dcb..0263e57778 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -3188,6 +3188,14 @@ impl Orderbook { /// Notes: /// - Maker timestamps are used for monotonicity/anti‑replay; GC uses local receive times. /// - The caller must ignore Unsolicited pairs in sync responses when applying the response. + // TODO(message-id/dedup): + // - We occasionally see “stale” keep-alives reappear, one reason maybe be peers re-broadcasting identical + // payloads with different gossipsub sequence numbers. We currently reject these at the + // application layer based on maker timestamps. + // - Potential improvement: use a stable, content-derived MessageId to reduce duplicates. We can then ban + // peers that send the same message twice within the dedup cache ttl. Maker timestamps would still be checked + // per pair to prevent any type of replays, so this is purely an optimization to reduce redundant processing. + // - Note: app-layer signatures still prevent hijacking; this is purely for dedup/content-addressing. fn process_keep_alive( &mut self, from_pubkey: &str, @@ -3200,6 +3208,7 @@ impl Orderbook { // types (e.g., a module in mm2_p2p or lp_network). // For keep-alive, enforce a minimum interval per pubkey (>= 20–30s) or X messages/minute. // When throttled, early‑return without syncing/propagating, before any state mutation. + // Temp ban peers that exceed their limits. // Pre-scan: if any single pair is stale => treat the whole message as stale. // Note: We currently send keep-alives per pair (trie_roots has a single entry), diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 2b469b484c..c90fa0bfff 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -749,6 +749,33 @@ fn start_gossipsub( // to set default parameters for gossipsub use: // let gossipsub_config = gossipsub::GossipsubConfig::default(); + // TODO(message-id/dedup): + // - Switch to a stable, content-derived MessageId. + // Hash choice: BLAKE3 (very fast) or SHA-256 (ubiquitous); both are collision-resistant. + // - Why: `DefaultHasher` varies across Rust versions/targets and is randomized per process, + // making it unsuitable for cross-run/impl dedup. A content-derived id also improves IHAVE/IWANT + // behavior, reduces replays, and makes ops/memory usage more predictable. + // - Core rules: + // * While we rely on app-layer signatures embedded in the payload, hash the canonical application + // payload bytes only (exclude `sequence_number`, author/pubkey, per-peer metadata, or other + // ephemeral fields). + // * Emit a fixed-size ID (32 bytes); use the raw digest bytes as `MessageId`. + // * Apply domain separation to avoid cross-topic collisions + // (e.g., hash(topic_hash.as_str() || 0x00 || payload_bytes_pre_sign)). + // * After migrating to libp2p Strict (author+seqno signatures), include the author pubkey + // in the digest (and optionally the seqno) to preserve per-publisher identity and avoid + // cross-publisher dedup (e.g., hash(topic || 0x00 || author_pubkey || 0x01 || payload_pre_sign + // [|| 0x02 || seqno])). + // - Integration: + // * Keep `ValidationMode::Permissive` during rollout. Plan migration to libp2p-level signing: + // move signing/verification into libp2p’s signature path (author pubkey + per-sender seqno). + // Once all publishers attach libp2p signatures and peers verify them, enable `Strict` and + // update the MessageId input as above; deprecate the app-layer signature wrapper (or keep as + // defense-in-depth). + // * Duplicate-cache TTL: libp2p defaults to ~60s; consider pinning and logging it at startup. + // - Security: prefer cryptographic hashes (BLAKE3 or SHA-256); avoid non-crypto hashes in adversarial settings. + // - MessageId is for dedup only; authenticity is enforced by signatures (libp2p `Strict` once migrated). + // To content-address message, we can take the hash of message and use it as an ID. let message_id_fn = |message: &GossipsubMessage| { let mut s = DefaultHasher::new(); From 0be254b24430aa2e69e830dee7e7039beb8dca97 Mon Sep 17 00:00:00 2001 From: shamardy Date: Thu, 25 Sep 2025 02:48:17 +0300 Subject: [PATCH 13/16] =?UTF-8?q?feat(ordermatch,p2p):=20add=20optional=20?= =?UTF-8?q?`expected=5Froots`=20to=20`SyncPubkeyOrderbookState`=20-=20Prep?= =?UTF-8?q?ares=20for=20exact=20from=E2=86=92to=20trie=20diffs,=20enabling?= =?UTF-8?q?=20strict=20landing=20=20=20on=20expected=20roots=20(security/c?= =?UTF-8?q?onsistency=20hardening)=20while=20maintaining=20=20=20backward?= =?UTF-8?q?=20compatibility.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Behavioral impact: - No functional change in this commit, purely protocol scaffolding. --- mm2src/mm2_main/src/lp_ordermatch.rs | 10 ++++++++-- mm2src/mm2_main/src/ordermatch_tests.rs | 6 +++--- .../src/application/request_response/ordermatch.rs | 12 ++++++++++-- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 0263e57778..76aa6370d1 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -514,6 +514,7 @@ async fn process_orders_keep_alive( let current_req = OrdermatchRequest::SyncPubkeyOrderbookState { pubkey: from_pubkey.clone(), trie_roots: from_roots_by_pair, + expected_roots: None, }; let mut had_response = false; @@ -925,8 +926,12 @@ impl TryFromBytes for Uuid { pub fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result>, String> { match request { OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel), - OrdermatchRequest::SyncPubkeyOrderbookState { pubkey, trie_roots } => { - let response = process_sync_pubkey_orderbook_state(ctx, pubkey, trie_roots); + OrdermatchRequest::SyncPubkeyOrderbookState { + pubkey, + trie_roots, + expected_roots, + } => { + let response = process_sync_pubkey_orderbook_state(ctx, pubkey, trie_roots, expected_roots); response.map(|res| res.map(|r| encode_message(&r).expect("Serialization failed"))) }, OrdermatchRequest::BestOrders { coin, action, volume } => { @@ -1177,6 +1182,7 @@ fn process_sync_pubkey_orderbook_state( ctx: MmArc, pubkey: String, trie_roots: HashMap, + _expected_roots: Option>, ) -> Result, String> { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); let orderbook = ordermatch_ctx.orderbook.lock(); diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index 235d53ab8d..cb7d628389 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -2479,7 +2479,7 @@ fn test_process_sync_pubkey_orderbook_state_after_new_orders_added() { insert_or_update_order(&ctx, order.clone()); } - let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state) + let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state, None) .unwrap() .unwrap(); @@ -2556,7 +2556,7 @@ fn test_process_sync_pubkey_orderbook_state_after_orders_removed() { remove_order(&ctx, order.uuid); } - let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state) + let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state, None) .unwrap() .unwrap(); @@ -2763,7 +2763,7 @@ fn test_process_sync_pubkey_orderbook_state_points_to_not_uptodate_trie_root() { let SyncPubkeyOrderbookStateRes { mut pair_orders_diff, .. - } = process_sync_pubkey_orderbook_state(ctx, pubkey, roots) + } = process_sync_pubkey_orderbook_state(ctx, pubkey, roots, None) .expect("!process_sync_pubkey_orderbook_state") .expect("Expected MORTY:RICK delta, returned None"); diff --git a/mm2src/mm2_p2p/src/application/request_response/ordermatch.rs b/mm2src/mm2_p2p/src/application/request_response/ordermatch.rs index 250758f594..3d8f966b32 100644 --- a/mm2src/mm2_p2p/src/application/request_response/ordermatch.rs +++ b/mm2src/mm2_p2p/src/application/request_response/ordermatch.rs @@ -21,11 +21,19 @@ pub enum BestOrdersAction { pub enum OrdermatchRequest { /// Get an orderbook for the given pair. GetOrderbook { base: String, rel: String }, - /// Sync specific pubkey orderbook state if our known Patricia trie state doesn't match the latest keep alive message + /// Sync specific pubkey orderbook state if our known Patricia trie state doesn't match the latest keep alive message. + /// + /// New nodes treat `trie_roots` as from_roots when `expected_roots` is present and require an + /// exact from→to diff per pair (to the corresponding `expected_roots` entry). + /// Legacy nodes ignore `expected_roots` (it is optional with serde default). SyncPubkeyOrderbookState { pubkey: String, - /// Request using this condition + /// Request using this condition (interpreted as from_roots when `expected_roots` is provided) trie_roots: HashMap, + /// Optional expected roots to land on exactly (mirrors trie_roots keys). + /// Backward compatible via serde default so legacy nodes ignore it. + #[serde(default)] + expected_roots: Option>, }, /// Request best orders for a specific coin and action. BestOrders { From 51308fddb1625bbcaa35cdf5169512d0fdb7f65b Mon Sep 17 00:00:00 2001 From: shamardy Date: Thu, 25 Sep 2025 05:20:47 +0300 Subject: [PATCH 14/16] feat(ordermatch,p2p): send `expected_roots` in `SyncPubkeyOrderbookState` - Plumb `expected_roots` into the sync handler rejects unsound requests --- mm2src/mm2_main/src/lp_ordermatch.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 76aa6370d1..3e7be0f8a5 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -506,15 +506,15 @@ async fn process_orders_keep_alive( expected_pair_roots.insert(pair, plan.to); } - // Build the V1 request using our local "from" roots (no extra lock needed) - // TODO(sync-v2): Extend this to include per-pair from_root and to_root, - // requesting an exact diff targeted at our state, even if the responder - // advanced after propagating the keep-alive. This should reduce false - // InvalidOrIncomplete classifications and unnecessary bans. + // Build the V2 request using our local "from" roots and the keep-alive "to_root" + // values we intend to land on. This requests an exact diff targeted at our state, + // even if the responder advanced after propagating the keep-alive. This should reduce + // false InvalidOrIncomplete classifications and unnecessary bans. + // Backward-compatibility: legacy peers ignore `expected_roots` optional field. let current_req = OrdermatchRequest::SyncPubkeyOrderbookState { pubkey: from_pubkey.clone(), trie_roots: from_roots_by_pair, - expected_roots: None, + expected_roots: Some(expected_pair_roots.clone()), }; let mut had_response = false; @@ -1182,8 +1182,20 @@ fn process_sync_pubkey_orderbook_state( ctx: MmArc, pubkey: String, trie_roots: HashMap, - _expected_roots: Option>, + expected_roots: Option>, ) -> Result, String> { + if let Some(exp) = expected_roots.as_ref() { + if exp.len() != trie_roots.len() || trie_roots.keys().any(|pair| !exp.contains_key(pair)) { + // TODO(rate-limit/ban): accept at most one SyncPubkeyOrderbookState per peer we sent a KeepAlive to. + return ERR!( + "Rejecting SyncPubkeyOrderbookState for pubkey {}: expected_roots keys mismatch vs trie_roots (expected_roots: {}, trie_roots: {})", + pubkey, + exp.len(), + trie_roots.len() + ); + } + } + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); let orderbook = ordermatch_ctx.orderbook.lock(); let pubkey_state = some_or_return_ok_none!(orderbook.pubkeys_state.get(&pubkey)); From bb2d6f7fbbe5fa0a94f9d7e1b7a42a51f62e9cf5 Mon Sep 17 00:00:00 2001 From: shamardy Date: Thu, 25 Sep 2025 10:04:01 +0300 Subject: [PATCH 15/16] feat(ordermatch): target exact to-root via from_history with temp FullTrie fallback --- mm2src/mm2_main/src/lp_ordermatch.rs | 39 ++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 3e7be0f8a5..ec3bceb08a 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -1203,19 +1203,42 @@ fn process_sync_pubkey_orderbook_state( let order_getter = |uuid: &Uuid| orderbook.order_set.get(uuid).cloned(); let pair_orders_diff: Result, _> = trie_roots .into_iter() - .map(|(pair, root)| { - let actual_pair_root = pubkey_state + .map(|(pair, from_root)| { + let latest_known_root = pubkey_state .trie_roots .get(&pair) .ok_or(ERRL!("No pair trie root for {}", pair))?; + // Determine the target (to) root: + // - exact v2 sync: use the requester’s expected root + // - legacy sync: use our latest known root + let maybe_expected = expected_roots.as_ref().and_then(|roots| roots.get(&pair)); + let target_root = maybe_expected.unwrap_or(latest_known_root); + + // NOTE(exact-sync): We allow FullTrie fallback even when `expected_roots` is present. + // Rationale: `from_root` may originate from GetOrderbook or other flows and is not + // verifiable today if the sender was malicious; an exact delta chain from `from_root` + // to `target_root` might be unavailable. Falling back to a FullTrie at `target_root` + // still lets the requester land exactly on the expected root and pass the apply gate. + // + // TODO(exact-sync, removal conditions): + // - Persist the last KeepAlive root per pubkey and include it with GetOrderbook so + // the requester’s `from_root` can be anchored/validated against a recent KA. + // - Once `from_root` is verifiable, change exact mode to error (no fallback) when + // history cannot reconstruct the from→to chain. + // - Consider TRIE_STATE_HISTORY_TIMEOUT tuning: larger window ⇒ fewer missing chains; + // smaller window ⇒ more missing chains but lower memory. Coordinate with gossipsub + // dedup (~60s) so alternate-source retries are effective within the history window. + let delta_result = match pubkey_state.order_pairs_trie_state_history.get(&pair) { - Some(history) => { - DeltaOrFullTrie::from_history(history, root, *actual_pair_root, &orderbook.memory_db, order_getter) - }, - None => { - get_full_trie(actual_pair_root, &orderbook.memory_db, order_getter).map(DeltaOrFullTrie::FullTrie) - }, + Some(history) => DeltaOrFullTrie::from_history( + history, + from_root, + *target_root, + &orderbook.memory_db, + order_getter, + ), + None => get_full_trie(target_root, &orderbook.memory_db, order_getter).map(DeltaOrFullTrie::FullTrie), }; let delta = try_s!(delta_result); From f7baf4f07e2d33045f930136ec3fdec50367a08f Mon Sep 17 00:00:00 2001 From: shamardy Date: Thu, 25 Sep 2025 10:34:10 +0300 Subject: [PATCH 16/16] docs(ordermatch): add TODOs v2 sync tests --- mm2src/mm2_main/src/lp_ordermatch.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index ec3bceb08a..a74ba0d841 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -511,6 +511,10 @@ async fn process_orders_keep_alive( // even if the responder advanced after propagating the keep-alive. This should reduce // false InvalidOrIncomplete classifications and unnecessary bans. // Backward-compatibility: legacy peers ignore `expected_roots` optional field. + // TODO(tests/serde-interop): + // - Old -> new: new node tolerates missing `expected_roots` (#[serde(default)]). + // - New -> old: legacy peer ignores `expected_roots` without failing. + // - New -> new: exact path works when history is present (serve/apply Delta landing on the expected to-root). let current_req = OrdermatchRequest::SyncPubkeyOrderbookState { pubkey: from_pubkey.clone(), trie_roots: from_roots_by_pair, @@ -1187,6 +1191,7 @@ fn process_sync_pubkey_orderbook_state( if let Some(exp) = expected_roots.as_ref() { if exp.len() != trie_roots.len() || trie_roots.keys().any(|pair| !exp.contains_key(pair)) { // TODO(rate-limit/ban): accept at most one SyncPubkeyOrderbookState per peer we sent a KeepAlive to. + // Note: this is considered as an unavailable error on the requester side, not InvalidOrIncomplete. But this is fine for now. return ERR!( "Rejecting SyncPubkeyOrderbookState for pubkey {}: expected_roots keys mismatch vs trie_roots (expected_roots: {}, trie_roots: {})", pubkey, @@ -1231,13 +1236,9 @@ fn process_sync_pubkey_orderbook_state( // dedup (~60s) so alternate-source retries are effective within the history window. let delta_result = match pubkey_state.order_pairs_trie_state_history.get(&pair) { - Some(history) => DeltaOrFullTrie::from_history( - history, - from_root, - *target_root, - &orderbook.memory_db, - order_getter, - ), + Some(history) => { + DeltaOrFullTrie::from_history(history, from_root, *target_root, &orderbook.memory_db, order_getter) + }, None => get_full_trie(target_root, &orderbook.memory_db, order_getter).map(DeltaOrFullTrie::FullTrie), };