From 0890de8a68ffde0828b2fd8bd5e1c4ca24d84a6c Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Mon, 16 Dec 2024 20:27:48 +0100 Subject: [PATCH 1/6] fix running_swap memory leak this field was convereted to a hashmap instead of a vector for easy access to the swap also we now manually delete the swap from running swaps when the swap is finished (memory leak fix). as a consequence to manuallly deleting the swap from running_swaps, we can now store them as arcs instead of weakrefs, which simplifies a lot of .upgrade calls. --- mm2src/mm2_main/src/lp_swap.rs | 68 ++++++++++------------- mm2src/mm2_main/src/lp_swap/maker_swap.rs | 10 +++- mm2src/mm2_main/src/lp_swap/taker_swap.rs | 25 +++++---- 3 files changed, 53 insertions(+), 50 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 507b9a6f51..f64b64e13c 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -88,7 +88,7 @@ use std::convert::TryFrom; use std::num::NonZeroUsize; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use uuid::Uuid; @@ -518,7 +518,7 @@ struct LockedAmountInfo { } struct SwapsContext { - running_swaps: Mutex>>, + running_swaps: Mutex>>, active_swaps_v2_infos: Mutex>, banned_pubkeys: Mutex>, swap_msgs: Mutex>, @@ -534,7 +534,7 @@ impl SwapsContext { fn from_ctx(ctx: &MmArc) -> Result, String> { Ok(try_s!(from_ctx(&ctx.swaps_ctx, move || { Ok(SwapsContext { - running_swaps: Mutex::new(vec![]), + running_swaps: Mutex::new(HashMap::new()), active_swaps_v2_infos: Mutex::new(HashMap::new()), banned_pubkeys: Mutex::new(HashMap::new()), swap_msgs: Mutex::new(HashMap::new()), @@ -619,21 +619,21 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); let swap_lock = swap_ctx.running_swaps.lock().unwrap(); - let mut locked = swap_lock - .iter() - .filter_map(|swap| swap.upgrade()) - .flat_map(|swap| swap.locked_amount()) - .fold(MmNumber::from(0), |mut total_amount, locked| { - if locked.coin == coin { - total_amount += locked.amount; - } - if let Some(trade_fee) = locked.trade_fee { - if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol { - total_amount += trade_fee.amount; + let mut locked = + swap_lock + .values() + .flat_map(|swap| swap.locked_amount()) + .fold(MmNumber::from(0), |mut total_amount, locked| { + if locked.coin == coin { + total_amount += locked.amount; } - } - total_amount - }); + if let Some(trade_fee) = locked.trade_fee { + if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol { + total_amount += trade_fee.amount; + } + } + total_amount + }); drop(swap_lock); let locked_amounts = swap_ctx.locked_amounts.lock().unwrap(); @@ -657,11 +657,8 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { /// Get number of currently running swaps pub fn running_swaps_num(ctx: &MmArc) -> u64 { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); - let swaps = swap_ctx.running_swaps.lock().unwrap(); - swaps.iter().fold(0, |total, swap| match swap.upgrade() { - Some(_) => total + 1, - None => total, - }) + let count = swap_ctx.running_swaps.lock().unwrap().len(); + count as u64 } /// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid @@ -670,8 +667,7 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str) let swap_lock = swap_ctx.running_swaps.lock().unwrap(); swap_lock - .iter() - .filter_map(|swap| swap.upgrade()) + .values() .filter(|swap| swap.uuid() != except_uuid) .flat_map(|swap| swap.locked_amount()) .fold(MmNumber::from(0), |mut total_amount, locked| { @@ -691,11 +687,9 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet) -> Result< let swap_ctx = try_s!(SwapsContext::from_ctx(ctx)); let swaps = try_s!(swap_ctx.running_swaps.lock()); let mut uuids = vec![]; - for swap in swaps.iter() { - if let Some(swap) = swap.upgrade() { - if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) { - uuids.push(*swap.uuid()) - } + for swap in swaps.values() { + if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) { + uuids.push(*swap.uuid()) } } drop(swaps); @@ -711,15 +705,13 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet) -> Result< pub fn active_swaps(ctx: &MmArc) -> Result, String> { let swap_ctx = try_s!(SwapsContext::from_ctx(ctx)); - let swaps = swap_ctx.running_swaps.lock().unwrap(); - let mut uuids = vec![]; - for swap in swaps.iter() { - if let Some(swap) = swap.upgrade() { - uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE)) - } - } - - drop(swaps); + let mut uuids: Vec<_> = swap_ctx + .running_swaps + .lock() + .unwrap() + .keys() + .map(|uuid| (*uuid, LEGACY_SWAP_TYPE)) + .collect(); let swaps_v2 = swap_ctx.active_swaps_v2_infos.lock().unwrap(); uuids.extend(swaps_v2.iter().map(|(uuid, info)| (*uuid, info.swap_type))); diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index 0eb72b8a71..b720fd6b98 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -2095,10 +2095,14 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { }; } let running_swap = Arc::new(swap); - let weak_ref = Arc::downgrade(&running_swap); let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap(); swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker); - swap_ctx.running_swaps.lock().unwrap().push(weak_ref); + // Register the swap in the running swaps map. + swap_ctx + .running_swaps + .lock() + .unwrap() + .insert(uuid, running_swap.clone()); let mut swap_fut = Box::pin( async move { let mut events; @@ -2162,6 +2166,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { _swap = swap_fut => (), // swap finished normally _touch = touch_loop => unreachable!("Touch loop can not stop!"), }; + // Remove the swap from the running swaps map. + swap_ctx.running_swaps.lock().unwrap().remove(&uuid); } pub struct MakerSwapPreparedParams { diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap.rs b/mm2src/mm2_main/src/lp_swap/taker_swap.rs index c7b1cf59a9..f1e7b44f13 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap.rs @@ -458,14 +458,17 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { let ctx = swap.ctx.clone(); subscribe_to_topic(&ctx, swap_topic(&swap.uuid)); let mut status = ctx.log.status_handle(); - let uuid = swap.uuid.to_string(); + let uuid_str = uuid.to_string(); let to_broadcast = !(swap.maker_coin.is_privacy() || swap.taker_coin.is_privacy()); let running_swap = Arc::new(swap); - let weak_ref = Arc::downgrade(&running_swap); let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap(); swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker); - swap_ctx.running_swaps.lock().unwrap().push(weak_ref); - + // Register the swap in the running swaps map. + swap_ctx + .running_swaps + .lock() + .unwrap() + .insert(uuid, running_swap.clone()); let mut swap_fut = Box::pin( async move { let mut events; @@ -491,10 +494,10 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { } if event.is_error() { - error!("[swap uuid={uuid}] {event:?}"); + error!("[swap uuid={uuid_str}] {event:?}"); } - status.status(&[&"swap", &("uuid", uuid.as_str())], &event.status_str()); + status.status(&[&"swap", &("uuid", uuid_str.as_str())], &event.status_str()); running_swap.apply_event(event); } match res.0 { @@ -503,12 +506,12 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { }, None => { if let Err(e) = mark_swap_as_finished(ctx.clone(), running_swap.uuid).await { - error!("!mark_swap_finished({}): {}", uuid, e); + error!("!mark_swap_finished({}): {}", uuid_str, e); } if to_broadcast { if let Err(e) = broadcast_my_swap_status(&ctx, running_swap.uuid).await { - error!("!broadcast_my_swap_status({}): {}", uuid, e); + error!("!broadcast_my_swap_status({}): {}", uuid_str, e); } } break; @@ -522,6 +525,8 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { _swap = swap_fut => (), // swap finished normally _touch = touch_loop => unreachable!("Touch loop can not stop!"), }; + // Remove the swap from the running swaps map. + swap_ctx.running_swaps.lock().unwrap().remove(&uuid); } #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] @@ -3276,8 +3281,8 @@ mod taker_swap_tests { .unwrap(); let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap(); let arc = Arc::new(swap); - let weak_ref = Arc::downgrade(&arc); - swaps_ctx.running_swaps.lock().unwrap().push(weak_ref); + // Create a dummy abort handle as if it was a running swap. + swaps_ctx.running_swaps.lock().unwrap().insert(arc.uuid, arc); let actual = get_locked_amount(&ctx, "RICK"); assert_eq!(actual, MmNumber::from(0)); From fbd5e9fa9eb0e2c8104fa5fc89429f9bad4a50b6 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Mon, 30 Dec 2024 10:10:10 +0100 Subject: [PATCH 2/6] remove irrelevant comment oops --- mm2src/mm2_main/src/lp_swap/taker_swap.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap.rs b/mm2src/mm2_main/src/lp_swap/taker_swap.rs index f1e7b44f13..1b199508aa 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap.rs @@ -3281,7 +3281,6 @@ mod taker_swap_tests { .unwrap(); let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap(); let arc = Arc::new(swap); - // Create a dummy abort handle as if it was a running swap. swaps_ctx.running_swaps.lock().unwrap().insert(arc.uuid, arc); let actual = get_locked_amount(&ctx, "RICK"); From f5ac03d8d0d242c06577af4919e37bba924c5ff0 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Wed, 8 Jan 2025 12:42:15 +0100 Subject: [PATCH 3/6] test fix: don't wait for running swaps THERE ARE NO RUNNING SWAPS --- mm2src/mm2_main/src/lp_native_dex.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 1e5f7feff0..4dd4a49e85 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -536,13 +536,6 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes Timer::sleep(0.2).await } - // wait for swaps to stop - loop { - if running_swaps_num(&ctx) == 0 { - break; - }; - Timer::sleep(0.2).await - } Ok(()) } From 6f157d6c6625b234d5bb379548a6af38d94dacf3 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Wed, 8 Jan 2025 13:08:22 +0100 Subject: [PATCH 4/6] test fix: proactively clear the running swaps map theoretically we don't need to clear it on native targets since the executable will terminate even without clearing, but for wasm the story isn't exactly the same, i presume --- mm2src/mm2_main/src/lp_native_dex.rs | 4 +++- mm2src/mm2_main/src/lp_swap.rs | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 4dd4a49e85..3af6d231f1 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -52,7 +52,8 @@ use crate::lp_message_service::{init_message_service, InitMessageServiceError}; use crate::lp_network::{lp_network_ports, p2p_event_process_loop, subscribe_to_topic, NetIdError}; use crate::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context, lp_ordermatch_loop, orders_kick_start, BalanceUpdateOrdermatchHandler, OrdermatchInitError}; -use crate::lp_swap::{running_swaps_num, swap_kick_starts}; +use crate::lp_swap; +use crate::lp_swap::swap_kick_starts; use crate::lp_wallet::{initialize_wallet_passphrase, WalletInitError}; use crate::rpc::spawn_rpc; @@ -535,6 +536,7 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes }; Timer::sleep(0.2).await } + lp_swap::clear_running_swaps(&ctx); Ok(()) } diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index f64b64e13c..d2990e6e25 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -655,10 +655,9 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { } /// Get number of currently running swaps -pub fn running_swaps_num(ctx: &MmArc) -> u64 { +pub fn clear_running_swaps(ctx: &MmArc) { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); - let count = swap_ctx.running_swaps.lock().unwrap().len(); - count as u64 + swap_ctx.running_swaps.lock().unwrap().clear(); } /// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid From dd5418fc2444679a57acdb5370788b2ac1fcc1da Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Mon, 20 Jan 2025 11:59:17 +0100 Subject: [PATCH 5/6] fix clear running_swaps doc comment --- mm2src/mm2_main/src/lp_swap.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index d2990e6e25..16ebcee5ae 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -654,7 +654,9 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { locked } -/// Get number of currently running swaps +/// Clear up all the running swaps. +/// +/// This also auto-stops any running swaps since their abort handles will get triggered (assuming these abort handles aren't already triggered by other means). pub fn clear_running_swaps(ctx: &MmArc) { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); swap_ctx.running_swaps.lock().unwrap().clear(); From 64a00120808cec2f8d074c012e10a2ab780171ff Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Wed, 22 Jan 2025 11:46:11 +0100 Subject: [PATCH 6/6] clear up the doc comment confusion --- mm2src/mm2_main/src/lp_native_dex.rs | 1 + mm2src/mm2_main/src/lp_swap.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 3af6d231f1..594daeb7d2 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -536,6 +536,7 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes }; Timer::sleep(0.2).await } + // Clearing up the running swaps removes any circular references that might prevent the context from being dropped. lp_swap::clear_running_swaps(&ctx); Ok(()) diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 16ebcee5ae..f9baf2ca4d 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -656,7 +656,7 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { /// Clear up all the running swaps. /// -/// This also auto-stops any running swaps since their abort handles will get triggered (assuming these abort handles aren't already triggered by other means). +/// This doesn't mean that these swaps will be stopped. They can only be stopped from the abortable systems they are running on top of. pub fn clear_running_swaps(ctx: &MmArc) { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); swap_ctx.running_swaps.lock().unwrap().clear();