Skip to content

Commit 6774e46

Browse files
committed
Handle events asynchronously in the BackgroundProcessor's async variant
1 parent 4ea3415 commit 6774e46

File tree

1 file changed

+30
-12
lines changed
  • lightning-background-processor/src

1 file changed

+30
-12
lines changed

lightning-background-processor/src/lib.rs

+30-12
Original file line numberDiff line numberDiff line change
@@ -230,15 +230,11 @@ where A::Target: chain::Access, L::Target: Logger {
230230
}
231231

232232
macro_rules! define_run_body {
233-
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
233+
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
234+
$channel_manager: ident, $process_channel_manager_events: expr,
234235
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
235236
$loop_exit_check: expr, $await: expr)
236237
=> { {
237-
let event_handler = DecoratingEventHandler {
238-
event_handler: $event_handler,
239-
gossip_sync: &$gossip_sync,
240-
};
241-
242238
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
243239
$channel_manager.timer_tick_occurred();
244240

@@ -249,8 +245,8 @@ macro_rules! define_run_body {
249245
let mut have_pruned = false;
250246

251247
loop {
252-
$channel_manager.process_pending_events(&event_handler);
253-
$chain_monitor.process_pending_events(&event_handler);
248+
$process_channel_manager_events;
249+
$process_chain_monitor_events;
254250

255251
// Note that the PeerManager::process_events may block on ChannelManager's locks,
256252
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -383,7 +379,8 @@ pub async fn process_events_async<
383379
CMH: 'static + Deref + Send + Sync,
384380
RMH: 'static + Deref + Send + Sync,
385381
OMH: 'static + Deref + Send + Sync,
386-
EH: 'static + EventHandler + Send,
382+
EventHandlerFuture: core::future::Future<Output = ()>,
383+
EventHandler: Fn(Event) -> EventHandlerFuture,
387384
PS: 'static + Deref + Send,
388385
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
389386
CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
@@ -396,7 +393,7 @@ pub async fn process_events_async<
396393
SleepFuture: core::future::Future<Output = bool>,
397394
Sleeper: Fn(Duration) -> SleepFuture
398395
>(
399-
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
396+
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
400397
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
401398
sleeper: Sleeper,
402399
) -> Result<(), std::io::Error>
@@ -416,7 +413,23 @@ where
416413
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
417414
{
418415
let mut should_continue = true;
419-
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
416+
let async_event_handler = |event| -> core::pin::Pin<Box<dyn core::future::Future<Output = ()>>> {
417+
let network_graph = gossip_sync.network_graph();
418+
let event_handler = &event_handler;
419+
Box::pin(async move {
420+
if let Some(network_graph) = network_graph {
421+
if let Event::PaymentPathFailed { ref network_update, .. } = event {
422+
if let Some(network_update) = network_update {
423+
network_graph.handle_network_update(&network_update);
424+
}
425+
}
426+
}
427+
event_handler(event).await;
428+
})
429+
};
430+
define_run_body!(persister,
431+
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
432+
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
420433
gossip_sync, peer_manager, logger, scorer, should_continue, {
421434
select_biased! {
422435
_ = channel_manager.get_persistable_update_future().fuse() => true,
@@ -521,7 +534,12 @@ impl BackgroundProcessor {
521534
let stop_thread = Arc::new(AtomicBool::new(false));
522535
let stop_thread_clone = stop_thread.clone();
523536
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
524-
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
537+
let event_handler = DecoratingEventHandler {
538+
event_handler,
539+
gossip_sync: &gossip_sync,
540+
};
541+
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
542+
channel_manager, channel_manager.process_pending_events(&event_handler),
525543
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
526544
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
527545
});

0 commit comments

Comments
 (0)