Skip to content

Commit 74f5af1

Browse files
committed
rewrite sync bg processor
1 parent 4cb0b33 commit 74f5af1

File tree

3 files changed

+76
-72
lines changed

3 files changed

+76
-72
lines changed

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ bitcoin-io = { version = "0.1.2", default-features = false }
2525
lightning = { version = "0.2.0", path = "../lightning", default-features = false }
2626
lightning-rapid-gossip-sync = { version = "0.2.0", path = "../lightning-rapid-gossip-sync", default-features = false }
2727
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false }
28+
futures = "0.3.31"
2829

2930
[dev-dependencies]
3031
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }

lightning-background-processor/src/lib.rs

Lines changed: 30 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use lightning::util::persist::{KVStore, Persister};
4343
use lightning::util::sweep::OutputSweeper;
4444
#[cfg(feature = "std")]
4545
use lightning::util::sweep::OutputSweeperSync;
46+
use lightning::util::wakers::Sleep;
4647
#[cfg(feature = "std")]
4748
use lightning::util::wakers::Sleeper;
4849
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -981,7 +982,7 @@ impl BackgroundProcessor {
981982
D: 'static + Deref,
982983
O: 'static + Deref,
983984
K: 'static + Deref,
984-
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
985+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send,
985986
>(
986987
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
987988
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
@@ -999,79 +1000,33 @@ impl BackgroundProcessor {
9991000
OM::Target: AOnionMessenger,
10001001
PM::Target: APeerManager,
10011002
LM::Target: ALiquidityManager,
1002-
D::Target: ChangeDestinationSourceSync,
1003+
D::Target: ChangeDestinationSource,
10031004
O::Target: 'static + OutputSpender,
10041005
K::Target: 'static + KVStore,
10051006
{
10061007
let stop_thread = Arc::new(AtomicBool::new(false));
10071008
let stop_thread_clone = stop_thread.clone();
10081009
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1009-
let event_handler = |event| {
1010-
let network_graph = gossip_sync.network_graph();
1011-
if let Some(network_graph) = network_graph {
1012-
handle_network_graph_update(network_graph, &event)
1013-
}
1014-
if let Some(ref scorer) = scorer {
1015-
use std::time::SystemTime;
1016-
let duration_since_epoch = SystemTime::now()
1017-
.duration_since(SystemTime::UNIX_EPOCH)
1018-
.expect("Time should be sometime after 1970");
1019-
if update_scorer(scorer, &event, duration_since_epoch) {
1020-
log_trace!(logger, "Persisting scorer after update");
1021-
if let Err(e) = persister.persist_scorer(&scorer) {
1022-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1023-
}
1024-
}
1025-
}
1026-
event_handler.handle_event(event)
1027-
};
1028-
define_run_body!(
1010+
let fut = process_events_async(
10291011
persister,
1012+
|event| async { event_handler.handle_event(event) },
10301013
chain_monitor,
1031-
chain_monitor.process_pending_events(&event_handler),
10321014
channel_manager,
1033-
channel_manager.get_cm().process_pending_events(&event_handler),
10341015
onion_messenger,
1035-
if let Some(om) = &onion_messenger {
1036-
om.get_om().process_pending_events(&event_handler)
1037-
},
1038-
peer_manager,
10391016
gossip_sync,
1040-
{
1041-
if let Some(ref sweeper) = sweeper {
1042-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1043-
}
1044-
},
1017+
peer_manager,
1018+
liquidity_manager,
1019+
sweeper,
10451020
logger,
10461021
scorer,
1047-
stop_thread.load(Ordering::Acquire),
1048-
{
1049-
let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1050-
(Some(om), Some(lm)) => Sleeper::from_four_futures(
1051-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1052-
&chain_monitor.get_update_future(),
1053-
&om.get_om().get_update_future(),
1054-
&lm.get_lm().get_pending_msgs_future(),
1055-
),
1056-
(Some(om), None) => Sleeper::from_three_futures(
1057-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1058-
&chain_monitor.get_update_future(),
1059-
&om.get_om().get_update_future(),
1060-
),
1061-
(None, Some(lm)) => Sleeper::from_three_futures(
1062-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1063-
&chain_monitor.get_update_future(),
1064-
&lm.get_lm().get_pending_msgs_future(),
1065-
),
1066-
(None, None) => Sleeper::from_two_futures(
1067-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1068-
&chain_monitor.get_update_future(),
1069-
),
1070-
};
1071-
sleeper.wait_timeout(Duration::from_millis(100));
1022+
move |dur: Duration| {
1023+
let stop_thread_clone = stop_thread.clone();
1024+
1025+
Box::pin(async move {
1026+
Sleep::new(dur).await;
1027+
stop_thread_clone.load(Ordering::Acquire)
1028+
})
10721029
},
1073-
|_| Instant::now(),
1074-
|time: &Instant, dur| time.elapsed().as_secs() > dur,
10751030
false,
10761031
|| {
10771032
use std::time::SystemTime;
@@ -1081,7 +1036,10 @@ impl BackgroundProcessor {
10811036
.expect("Time should be sometime after 1970"),
10821037
)
10831038
},
1084-
)
1039+
);
1040+
1041+
// TODO: Implement simple executor in utils.
1042+
futures::executor::block_on(fut).map_err(Into::into)
10851043
});
10861044
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
10871045
}
@@ -1925,7 +1883,7 @@ mod tests {
19251883
nodes[0].p2p_gossip_sync(),
19261884
nodes[0].peer_manager.clone(),
19271885
Some(Arc::clone(&nodes[0].liquidity_manager)),
1928-
Some(nodes[0].sweeper.clone()),
1886+
Some(nodes[0].sweeper.sweeper_async().clone()),
19291887
nodes[0].logger.clone(),
19301888
Some(nodes[0].scorer.clone()),
19311889
);
@@ -2020,7 +1978,7 @@ mod tests {
20201978
nodes[0].no_gossip_sync(),
20211979
nodes[0].peer_manager.clone(),
20221980
Some(Arc::clone(&nodes[0].liquidity_manager)),
2023-
Some(nodes[0].sweeper.clone()),
1981+
Some(nodes[0].sweeper.sweeper_async().clone()),
20241982
nodes[0].logger.clone(),
20251983
Some(nodes[0].scorer.clone()),
20261984
);
@@ -2064,7 +2022,7 @@ mod tests {
20642022
nodes[0].no_gossip_sync(),
20652023
nodes[0].peer_manager.clone(),
20662024
Some(Arc::clone(&nodes[0].liquidity_manager)),
2067-
Some(nodes[0].sweeper.clone()),
2025+
Some(nodes[0].sweeper.sweeper_async().clone()),
20682026
nodes[0].logger.clone(),
20692027
Some(nodes[0].scorer.clone()),
20702028
);
@@ -2135,7 +2093,7 @@ mod tests {
21352093
nodes[0].p2p_gossip_sync(),
21362094
nodes[0].peer_manager.clone(),
21372095
Some(Arc::clone(&nodes[0].liquidity_manager)),
2138-
Some(nodes[0].sweeper.clone()),
2096+
Some(nodes[0].sweeper.sweeper_async().clone()),
21392097
nodes[0].logger.clone(),
21402098
Some(nodes[0].scorer.clone()),
21412099
);
@@ -2166,7 +2124,7 @@ mod tests {
21662124
nodes[0].no_gossip_sync(),
21672125
nodes[0].peer_manager.clone(),
21682126
Some(Arc::clone(&nodes[0].liquidity_manager)),
2169-
Some(nodes[0].sweeper.clone()),
2127+
Some(nodes[0].sweeper.sweeper_async().clone()),
21702128
nodes[0].logger.clone(),
21712129
Some(nodes[0].scorer.clone()),
21722130
);
@@ -2214,7 +2172,7 @@ mod tests {
22142172
nodes[0].no_gossip_sync(),
22152173
nodes[0].peer_manager.clone(),
22162174
Some(Arc::clone(&nodes[0].liquidity_manager)),
2217-
Some(nodes[0].sweeper.clone()),
2175+
Some(nodes[0].sweeper.sweeper_async().clone()),
22182176
nodes[0].logger.clone(),
22192177
Some(nodes[0].scorer.clone()),
22202178
);
@@ -2278,7 +2236,7 @@ mod tests {
22782236
nodes[0].no_gossip_sync(),
22792237
nodes[0].peer_manager.clone(),
22802238
Some(Arc::clone(&nodes[0].liquidity_manager)),
2281-
Some(nodes[0].sweeper.clone()),
2239+
Some(nodes[0].sweeper.sweeper_async().clone()),
22822240
nodes[0].logger.clone(),
22832241
Some(nodes[0].scorer.clone()),
22842242
);
@@ -2443,7 +2401,7 @@ mod tests {
24432401
nodes[0].no_gossip_sync(),
24442402
nodes[0].peer_manager.clone(),
24452403
Some(Arc::clone(&nodes[0].liquidity_manager)),
2446-
Some(nodes[0].sweeper.clone()),
2404+
Some(nodes[0].sweeper.sweeper_async().clone()),
24472405
nodes[0].logger.clone(),
24482406
Some(nodes[0].scorer.clone()),
24492407
);
@@ -2474,7 +2432,7 @@ mod tests {
24742432
nodes[0].no_gossip_sync(),
24752433
nodes[0].peer_manager.clone(),
24762434
Some(Arc::clone(&nodes[0].liquidity_manager)),
2477-
Some(nodes[0].sweeper.clone()),
2435+
Some(nodes[0].sweeper.sweeper_async().clone()),
24782436
nodes[0].logger.clone(),
24792437
Some(nodes[0].scorer.clone()),
24802438
);
@@ -2571,7 +2529,7 @@ mod tests {
25712529
nodes[0].rapid_gossip_sync(),
25722530
nodes[0].peer_manager.clone(),
25732531
Some(Arc::clone(&nodes[0].liquidity_manager)),
2574-
Some(nodes[0].sweeper.clone()),
2532+
Some(nodes[0].sweeper.sweeper_async().clone()),
25752533
nodes[0].logger.clone(),
25762534
Some(nodes[0].scorer.clone()),
25772535
);
@@ -2768,7 +2726,7 @@ mod tests {
27682726
nodes[0].no_gossip_sync(),
27692727
nodes[0].peer_manager.clone(),
27702728
Some(Arc::clone(&nodes[0].liquidity_manager)),
2771-
Some(nodes[0].sweeper.clone()),
2729+
Some(nodes[0].sweeper.sweeper_async().clone()),
27722730
nodes[0].logger.clone(),
27732731
Some(nodes[0].scorer.clone()),
27742732
);

lightning/src/util/wakers.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ use std::time::Duration;
2828
use core::future::Future as StdFuture;
2929
use core::pin::Pin;
3030
use core::task::{Context, Poll};
31+
use std::{
32+
sync::atomic::{AtomicBool, Ordering},
33+
thread,
34+
};
3135

3236
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
3337
///
@@ -340,6 +344,47 @@ impl Sleeper {
340344
}
341345
}
342346

347+
pub struct Sleep {
348+
is_done: Arc<AtomicBool>,
349+
waker: Arc<Mutex<Option<Waker>>>,
350+
}
351+
352+
impl Sleep {
353+
pub fn new(duration: Duration) -> Self {
354+
let is_done = Arc::new(AtomicBool::new(false));
355+
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
356+
357+
let is_done_clone = is_done.clone();
358+
let waker_clone = waker.clone();
359+
360+
thread::spawn(move || {
361+
thread::sleep(duration);
362+
is_done_clone.store(true, Ordering::SeqCst);
363+
364+
if let Some(w) = waker_clone.lock().unwrap().take() {
365+
w.wake();
366+
}
367+
});
368+
369+
Self { is_done, waker }
370+
}
371+
}
372+
373+
impl core::future::Future for Sleep {
374+
type Output = ();
375+
376+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
377+
if self.is_done.load(Ordering::SeqCst) {
378+
Poll::Ready(())
379+
} else {
380+
let mut waker_lock = self.waker.lock().unwrap();
381+
// Store latest waker in case the task is moved or re-polled
382+
*waker_lock = Some(cx.waker().clone());
383+
Poll::Pending
384+
}
385+
}
386+
}
387+
343388
#[cfg(test)]
344389
mod tests {
345390
use super::*;

0 commit comments

Comments
 (0)