Skip to content

Commit 9e9615c

Browse files
committed
Extend EventsProvider to handle events asynchronously
Unfortunately, for both ChannelManager and ChainMonitor, we were not able to reuse the existing process_pending_events logic (and instead had to copy them verbatim for the async variant) due to async closures not being supported on stable at this time.
1 parent f4f1093 commit 9e9615c

File tree

3 files changed

+93
-2
lines changed

3 files changed

+93
-2
lines changed

lightning/src/chain/chainmonitor.rs

+22-1
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ use crate::util::atomic_counter::AtomicCounter;
3636
use crate::util::logger::Logger;
3737
use crate::util::errors::APIError;
3838
use crate::util::events;
39-
use crate::util::events::EventHandler;
39+
use crate::util::events::{AsyncEventHandler, EventHandler};
4040
use crate::ln::channelmanager::ChannelDetails;
4141

4242
use crate::prelude::*;
4343
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
44+
use core::future::Future as StdFuture;
4445
use core::ops::Deref;
46+
use core::pin::Pin;
4547
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4648
use bitcoin::secp256k1::PublicKey;
4749

@@ -723,6 +725,7 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
723725
handler.handle_event(&event);
724726
}
725727
}
728+
726729
#[cfg(anchors)]
727730
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
728731
///
@@ -746,6 +749,24 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
746749
handler.handle_event(&event);
747750
}
748751
}
752+
753+
/// Processes any events asynchronously in the order they were generated since the last call
754+
/// using the given event handler.
755+
///
756+
/// See the trait-level documentation for requirements.
757+
fn process_pending_events_async<'a, H: Deref + 'a>(
758+
&'a self, handler: H
759+
) -> Pin<Box<dyn StdFuture<Output = ()> + '_>> where H::Target: AsyncEventHandler {
760+
Box::pin(async move {
761+
let mut pending_events = Vec::new();
762+
for monitor_state in self.monitors.read().unwrap().values() {
763+
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
764+
}
765+
for event in pending_events.drain(..) {
766+
handler.handle_event_async(&event).await;
767+
}
768+
})
769+
}
749770
}
750771

751772
#[cfg(test)]

lightning/src/ln/channelmanager.rs

+37-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA
5353
use crate::ln::wire::Encode;
5454
use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner, Recipient};
5555
use crate::util::config::{UserConfig, ChannelConfig};
56-
use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
56+
use crate::util::events::{AsyncEventHandler, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
5757
use crate::util::{byte_utils, events};
5858
use crate::util::wakers::{Future, Notifier};
5959
use crate::util::scid_utils::fake_scid;
@@ -67,9 +67,11 @@ use core::{cmp, mem};
6767
use core::cell::RefCell;
6868
use crate::io::Read;
6969
use crate::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
70+
use core::future::Future as StdFuture;
7071
use core::sync::atomic::{AtomicUsize, Ordering};
7172
use core::time::Duration;
7273
use core::ops::Deref;
74+
use core::pin::Pin;
7375

7476
// We hold various information about HTLC relay in the HTLC objects in Channel itself:
7577
//
@@ -5641,6 +5643,40 @@ where
56415643
result
56425644
});
56435645
}
5646+
5647+
/// Processes any events asynchronously in the order they were generated since the last call
5648+
/// using the given event handler.
5649+
///
5650+
/// See the trait-level documentation for requirements.
5651+
fn process_pending_events_async<'a, H: Deref + 'a>(
5652+
&'a self, handler: H
5653+
) -> Pin<Box<dyn StdFuture<Output = ()> + '_>> where H::Target: AsyncEventHandler {
5654+
// We can't use `optionally_notify` with an async closure as required by the
5655+
// `handle_event_async` call since it's not supported on stable.
5656+
Box::pin(async move {
5657+
let _read_guard = self.total_consistency_lock.read().unwrap();
5658+
let mut result = NotifyOption::SkipPersist;
5659+
5660+
// TODO: This behavior should be documented. It's unintuitive that we query
5661+
// ChannelMonitors when clearing other events.
5662+
if self.process_pending_monitor_events() {
5663+
result = NotifyOption::DoPersist;
5664+
}
5665+
5666+
let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
5667+
if !pending_events.is_empty() {
5668+
result = NotifyOption::DoPersist;
5669+
}
5670+
5671+
for event in pending_events.drain(..) {
5672+
handler.handle_event_async(&event).await;
5673+
}
5674+
5675+
if result == NotifyOption::DoPersist {
5676+
self.persistence_notifier.notify();
5677+
}
5678+
})
5679+
}
56445680
}
56455681

56465682
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> chain::Listen for ChannelManager<Signer, M, T, K, F, L>

lightning/src/util/events.rs

+34
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ use bitcoin::hashes::sha256::Hash as Sha256;
3333
use bitcoin::secp256k1::PublicKey;
3434
use crate::io;
3535
use crate::prelude::*;
36+
use core::future::Future as StdFuture;
3637
use core::time::Duration;
3738
use core::ops::Deref;
39+
use core::pin::Pin;
3840
use crate::sync::Arc;
3941

4042
/// Some information provided on receipt of payment depends on whether the payment received is a
@@ -1368,6 +1370,14 @@ pub trait EventsProvider {
13681370
///
13691371
/// See the trait-level documentation for requirements.
13701372
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
1373+
1374+
/// Processes any events asynchronously in the order they were generated since the last call
1375+
/// using the given event handler.
1376+
///
1377+
/// See the trait-level documentation for requirements.
1378+
fn process_pending_events_async<'a, H: Deref + 'a>(
1379+
&'a self, handler: H
1380+
) -> Pin<Box<dyn StdFuture<Output = ()> + 'a>> where H::Target: AsyncEventHandler;
13711381
}
13721382

13731383
/// A trait implemented for objects handling events from [`EventsProvider`].
@@ -1389,3 +1399,27 @@ impl<T: EventHandler> EventHandler for Arc<T> {
13891399
self.deref().handle_event(event)
13901400
}
13911401
}
1402+
1403+
/// A trait implemented for objects handling events from [`EventsProvider`] asynchronously.
1404+
pub trait AsyncEventHandler {
1405+
/// Handles the given [`Event`] asynchronously.
1406+
///
1407+
/// See [`EventsProvider`] for details that must be considered when implementing this method.
1408+
fn handle_event_async<'a>(&'a self, event: &'a Event) -> Pin<Box<dyn StdFuture<Output = ()> + 'a>>;
1409+
}
1410+
1411+
impl<F> AsyncEventHandler for F where F: Fn(&Event) {
1412+
fn handle_event_async<'a>(&'a self, event: &'a Event) -> Pin<Box<dyn StdFuture<Output = ()> + 'a>> {
1413+
Box::pin(async move {
1414+
self(event);
1415+
})
1416+
}
1417+
}
1418+
1419+
impl<T: AsyncEventHandler> AsyncEventHandler for Arc<T> {
1420+
fn handle_event_async<'a>(&'a self, event: &'a Event) -> Pin<Box<dyn StdFuture<Output = ()> + 'a>> {
1421+
Box::pin(async move {
1422+
self.deref().handle_event_async(event).await
1423+
})
1424+
}
1425+
}

0 commit comments

Comments
 (0)