Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ use crate::lp_message_service::{init_message_service, InitMessageServiceError};
use crate::lp_network::{lp_network_ports, p2p_event_process_loop, subscribe_to_topic, NetIdError};
use crate::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context,
lp_ordermatch_loop, orders_kick_start, BalanceUpdateOrdermatchHandler, OrdermatchInitError};
use crate::lp_swap::{running_swaps_num, swap_kick_starts};
use crate::lp_swap;
use crate::lp_swap::swap_kick_starts;
use crate::lp_wallet::{initialize_wallet_passphrase, WalletInitError};
use crate::rpc::spawn_rpc;

Expand Down Expand Up @@ -535,14 +536,9 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes
};
Timer::sleep(0.2).await
}
// Clearing up the running swaps removes any circular references that might prevent the context from being dropped.
lp_swap::clear_running_swaps(&ctx);

// wait for swaps to stop
loop {
if running_swaps_num(&ctx) == 0 {
break;
};
Timer::sleep(0.2).await
}
Ok(())
}

Expand Down
73 changes: 33 additions & 40 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use std::convert::TryFrom;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -518,7 +518,7 @@ struct LockedAmountInfo {
}

struct SwapsContext {
running_swaps: Mutex<Vec<Weak<dyn AtomicSwap>>>,
running_swaps: Mutex<HashMap<Uuid, Arc<dyn AtomicSwap>>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we intentionally chose a weak reference instead of Arc as it might avoid cyclic references (similar to how it's done for MmArc with MmWeak in coins) 🤔

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that i can trivially find the AtomicSwap pointing back to it self (points to mmctx tho: https://github.com/KomodoPlatform/komodo-defi-framework/blob/0a9410258c7bf58e78d26b33b7a0ffb749c7ba16/mm2src/mm2_main/src/lp_swap/maker_swap.rs#L213)
you see, such questions are really hard to answer in such a codebase and require digging xD
it's much easier/managable to avoid the arc/weak model and use our abortable systems to make sure cyclic refs aren't problematic.

i went for arc here tho since we already manually remove the swap after it finishes or after stopping it. since we drop the thing manually we don't need to complicate it with weak refs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth to test that change (by using test_mm2_stops_immediately test) to make sure that's not the case.

active_swaps_v2_infos: Mutex<HashMap<Uuid, ActiveSwapV2Info>>,
banned_pubkeys: Mutex<HashMap<H256Json, BanReason>>,
swap_msgs: Mutex<HashMap<Uuid, SwapMsgStore>>,
Expand All @@ -534,7 +534,7 @@ impl SwapsContext {
fn from_ctx(ctx: &MmArc) -> Result<Arc<SwapsContext>, String> {
Ok(try_s!(from_ctx(&ctx.swaps_ctx, move || {
Ok(SwapsContext {
running_swaps: Mutex::new(vec![]),
running_swaps: Mutex::new(HashMap::new()),
active_swaps_v2_infos: Mutex::new(HashMap::new()),
banned_pubkeys: Mutex::new(HashMap::new()),
swap_msgs: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -619,21 +619,21 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

let mut locked = swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
if let Some(trade_fee) = locked.trade_fee {
if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol {
total_amount += trade_fee.amount;
let mut locked =
swap_lock
.values()
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
}
total_amount
});
if let Some(trade_fee) = locked.trade_fee {
if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol {
total_amount += trade_fee.amount;
}
}
total_amount
});
drop(swap_lock);

let locked_amounts = swap_ctx.locked_amounts.lock().unwrap();
Expand All @@ -654,14 +654,12 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
locked
}

/// Get number of currently running swaps
pub fn running_swaps_num(ctx: &MmArc) -> u64 {
/// Clear up all the running swaps.
///
/// This doesn't mean that these swaps will be stopped. They can only be stopped from the abortable systems they are running on top of.
pub fn clear_running_swaps(ctx: &MmArc) {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swaps = swap_ctx.running_swaps.lock().unwrap();
swaps.iter().fold(0, |total, swap| match swap.upgrade() {
Some(_) => total + 1,
None => total,
})
swap_ctx.running_swaps.lock().unwrap().clear();
Comment on lines +660 to +662

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does clearing the map stops swap processes?

Copy link
Collaborator Author

@mariocynicys mariocynicys Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. the store of running_swaps stores the swaps (AtomicSwap object) along with the abort handles (which will trigger on Drop) they are running on.
messed things, look at: #2301 (comment)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please mention that on the code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah didn't see that. I was referring to caller side but it's fine

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

big ooops:

yup. the store of running_swaps stores the swaps (AtomicSwap object) along with the abort handles (which will trigger on Drop) they are running on.

this is completely wrong. this feat is in another PR. i mixed things up.

no, clearing the running swaps will not stop these swaps. (though could be stopped via other means, e.g. abort_all from a bigger subsystem).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that is def. worth to mention that on the line.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this code, looks like clear_running_swaps fn is needed to clear swaps_ctx in MmCtx to allow to release ctx so stop_and_wait_for_ctx_is_dropped fn can finish (used in wasm tests).

(this note to memorise)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the doc comment accordingly till the other PR that stores the abort handle lands in.

}

/// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid
Expand All @@ -670,8 +668,7 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str)
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.values()
.filter(|swap| swap.uuid() != except_uuid)
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
Expand All @@ -691,11 +688,9 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = try_s!(swap_ctx.running_swaps.lock());
let mut uuids = vec![];
for swap in swaps.iter() {
if let Some(swap) = swap.upgrade() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
for swap in swaps.values() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
}
drop(swaps);
Expand All @@ -711,15 +706,13 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<

pub fn active_swaps(ctx: &MmArc) -> Result<Vec<(Uuid, u8)>, String> {
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = swap_ctx.running_swaps.lock().unwrap();
let mut uuids = vec![];
for swap in swaps.iter() {
if let Some(swap) = swap.upgrade() {
uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE))
}
}

drop(swaps);
let mut uuids: Vec<_> = swap_ctx
.running_swaps
.lock()
.unwrap()
.keys()
.map(|uuid| (*uuid, LEGACY_SWAP_TYPE))
.collect();

let swaps_v2 = swap_ctx.active_swaps_v2_infos.lock().unwrap();
uuids.extend(swaps_v2.iter().map(|(uuid, info)| (*uuid, info.swap_type)));
Expand Down
10 changes: 8 additions & 2 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2095,10 +2095,14 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
};
}
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);
// Register the swap in the running swaps map.
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, running_swap.clone());
let mut swap_fut = Box::pin(
async move {
let mut events;
Expand Down Expand Up @@ -2162,6 +2166,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Remove the swap from the running swaps map.
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

pub struct MakerSwapPreparedParams {
Expand Down
24 changes: 14 additions & 10 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,17 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
let ctx = swap.ctx.clone();
subscribe_to_topic(&ctx, swap_topic(&swap.uuid));
let mut status = ctx.log.status_handle();
let uuid = swap.uuid.to_string();
let uuid_str = uuid.to_string();
let to_broadcast = !(swap.maker_coin.is_privacy() || swap.taker_coin.is_privacy());
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);

// Register the swap in the running swaps map.
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, running_swap.clone());
let mut swap_fut = Box::pin(
async move {
let mut events;
Expand All @@ -491,10 +494,10 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
}

if event.is_error() {
error!("[swap uuid={uuid}] {event:?}");
error!("[swap uuid={uuid_str}] {event:?}");
}

status.status(&[&"swap", &("uuid", uuid.as_str())], &event.status_str());
status.status(&[&"swap", &("uuid", uuid_str.as_str())], &event.status_str());
running_swap.apply_event(event);
}
match res.0 {
Expand All @@ -503,12 +506,12 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
},
None => {
if let Err(e) = mark_swap_as_finished(ctx.clone(), running_swap.uuid).await {
error!("!mark_swap_finished({}): {}", uuid, e);
error!("!mark_swap_finished({}): {}", uuid_str, e);
}

if to_broadcast {
if let Err(e) = broadcast_my_swap_status(&ctx, running_swap.uuid).await {
error!("!broadcast_my_swap_status({}): {}", uuid, e);
error!("!broadcast_my_swap_status({}): {}", uuid_str, e);
}
}
break;
Expand All @@ -522,6 +525,8 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Remove the swap from the running swaps map.
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -3276,8 +3281,7 @@ mod taker_swap_tests {
.unwrap();
let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap();
let arc = Arc::new(swap);
let weak_ref = Arc::downgrade(&arc);
swaps_ctx.running_swaps.lock().unwrap().push(weak_ref);
swaps_ctx.running_swaps.lock().unwrap().insert(arc.uuid, arc);

let actual = get_locked_amount(&ctx, "RICK");
assert_eq!(actual, MmNumber::from(0));
Expand Down
Loading