Skip to content

Handle events asynchronously in the BackgroundProcessor's async variant #1787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 10, 2022
100 changes: 47 additions & 53 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,49 +192,22 @@ where
}
}

/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
struct DecoratingEventHandler<
'a,
E: EventHandler,
PGS: Deref<Target = P2PGossipSync<G, A, L>>,
RGS: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
A: Deref,
L: Deref,
>
where A::Target: chain::Access, L::Target: Logger {
event_handler: E,
gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
}

impl<
'a,
E: EventHandler,
PGS: Deref<Target = P2PGossipSync<G, A, L>>,
RGS: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
A: Deref,
L: Deref,
> EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
where A::Target: chain::Access, L::Target: Logger {
fn handle_event(&self, event: &Event) {
if let Some(network_graph) = self.gossip_sync.network_graph() {
network_graph.handle_event(event);
fn handle_network_graph_update<L: Deref>(
network_graph: &NetworkGraph<L>, event: &Event
) where L::Target: Logger {
if let Event::PaymentPathFailed { ref network_update, .. } = event {
if let Some(network_update) = network_update {
network_graph.handle_network_update(&network_update);
}
self.event_handler.handle_event(event);
}
}

macro_rules! define_run_body {
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr)
=> { {
let event_handler = DecoratingEventHandler {
event_handler: $event_handler,
gossip_sync: &$gossip_sync,
};

log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();

Expand All @@ -245,8 +218,8 @@ macro_rules! define_run_body {
let mut have_pruned = false;

loop {
$channel_manager.process_pending_events(&event_handler);
$chain_monitor.process_pending_events(&event_handler);
$process_channel_manager_events;
$process_chain_monitor_events;

// Note that the PeerManager::process_events may block on ChannelManager's locks,
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
Expand Down Expand Up @@ -379,7 +352,8 @@ pub async fn process_events_async<
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
OMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
EventHandlerFuture: core::future::Future<Output = ()>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
Expand All @@ -392,7 +366,7 @@ pub async fn process_events_async<
SleepFuture: core::future::Future<Output = bool>,
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
) -> Result<(), std::io::Error>
Expand All @@ -412,7 +386,19 @@ where
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
{
let mut should_break = true;
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
let async_event_handler = |event| {
let network_graph = gossip_sync.network_graph();
let event_handler = &event_handler;
async move {
if let Some(network_graph) = network_graph {
handle_network_graph_update(network_graph, &event)
}
event_handler(event).await;
}
};
define_run_body!(persister,
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
gossip_sync, peer_manager, logger, scorer, should_break, {
select_biased! {
_ = channel_manager.get_persistable_update_future().fuse() => true,
Expand Down Expand Up @@ -517,7 +503,15 @@ impl BackgroundProcessor {
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
let event_handler = |event| {
let network_graph = gossip_sync.network_graph();
if let Some(network_graph) = network_graph {
handle_network_graph_update(network_graph, &event)
}
event_handler.handle_event(event);
};
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
});
Expand Down Expand Up @@ -769,7 +763,7 @@ mod tests {
begin_open_channel!($node_a, $node_b, $channel_value);
let events = $node_a.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
tx
}}
Expand All @@ -786,7 +780,7 @@ mod tests {
macro_rules! handle_funding_generation_ready {
($event: expr, $channel_value: expr) => {{
match $event {
&Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
assert_eq!(channel_value_satoshis, $channel_value);
assert_eq!(user_channel_id, 42);

Expand Down Expand Up @@ -847,7 +841,7 @@ mod tests {
// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let event_handler = |_: _| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

macro_rules! check_persisted_data {
Expand Down Expand Up @@ -909,7 +903,7 @@ mod tests {
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let event_handler = |_: _| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
Expand All @@ -932,7 +926,7 @@ mod tests {

let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let event_handler = |_: _| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
match bg_processor.join() {
Ok(_) => panic!("Expected error persisting manager"),
Expand All @@ -949,7 +943,7 @@ mod tests {
let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let event_handler = |_: _| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

match bg_processor.stop() {
Expand All @@ -967,7 +961,7 @@ mod tests {
let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let event_handler = |_: _| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

match bg_processor.stop() {
Expand All @@ -988,7 +982,7 @@ mod tests {

// Set up a background event handler for FundingGenerationReady events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| match event {
let event_handler = move |event: Event| match event {
Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
Event::ChannelReady { .. } => {},
_ => panic!("Unexpected event: {:?}", event),
Expand Down Expand Up @@ -1017,7 +1011,7 @@ mod tests {

// Set up a background event handler for SpendableOutputs events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| match event {
let event_handler = move |event: Event| match event {
Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
Event::ChannelReady { .. } => {},
Event::ChannelClosed { .. } => {},
Expand Down Expand Up @@ -1047,7 +1041,7 @@ mod tests {
let nodes = create_nodes(2, "test_scorer_persistence".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let event_handler = |_: _| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

loop {
Expand Down Expand Up @@ -1075,7 +1069,7 @@ mod tests {
assert!(original_graph_description.contains("42: features: 0000, node_one:"));
assert_eq!(network_graph.read_only().channels().len(), 1);

let event_handler = |_: &_| {};
let event_handler = |_: _| {};
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

loop {
Expand Down Expand Up @@ -1128,7 +1122,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
let event_handler = Arc::clone(&invoice_payer);
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
assert!(bg_processor.stop().is_ok());
Expand Down
Loading