Skip to content

Commit 68f98f4

Browse files
committed
Implement async versions of process_pending_events
1 parent 04499c0 commit 68f98f4

File tree

2 files changed

+47
-2
lines changed

2 files changed

+47
-2
lines changed

lightning/src/chain/chainmonitor.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter;
3636
use crate::util::logger::Logger;
3737
use crate::util::errors::APIError;
3838
use crate::util::events;
39-
use crate::util::events::EventHandler;
39+
use crate::util::events::{Event, EventHandler};
4040
use crate::ln::channelmanager::ChannelDetails;
4141

4242
use crate::prelude::*;
@@ -479,6 +479,20 @@ where C::Target: chain::Filter,
479479
self.process_pending_events(&event_handler);
480480
events.into_inner()
481481
}
482+
483+
/// Processes any events asynchronously in the order they were generated since the last call
484+
/// using the given event handler.
485+
///
486+
/// See the trait-level documentation of [`EventsProvider`] for requirements.
487+
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(&self, handler: H) {
488+
let mut pending_events = Vec::new();
489+
for monitor_state in self.monitors.read().unwrap().values() {
490+
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
491+
}
492+
for event in pending_events.drain(..) {
493+
handler(event).await;
494+
}
495+
}
482496
}
483497

484498
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

lightning/src/ln/channelmanager.rs

+32-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA
5353
use crate::ln::wire::Encode;
5454
use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient};
5555
use crate::util::config::{UserConfig, ChannelConfig};
56-
use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
56+
use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
5757
use crate::util::{byte_utils, events};
5858
use crate::util::wakers::{Future, Notifier};
5959
use crate::util::scid_utils::fake_scid;
@@ -5566,6 +5566,37 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
55665566
pub fn clear_pending_payments(&self) {
55675567
self.pending_outbound_payments.lock().unwrap().clear()
55685568
}
5569+
5570+
/// Processes any events asynchronously in the order they were generated since the last call
5571+
/// using the given event handler.
5572+
///
5573+
/// See the trait-level documentation of [`EventsProvider`] for requirements.
5574+
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(&self, handler: H) {
5575+
// We'll acquire our total consistency lock until the returned future completes so that
5576+
// we can be sure no other persists happen while processing events.
5577+
let _read_guard = self.total_consistency_lock.read().unwrap();
5578+
5579+
let mut result = NotifyOption::SkipPersist;
5580+
5581+
// TODO: This behavior should be documented. It's unintuitive that we query
5582+
// ChannelMonitors when clearing other events.
5583+
if self.process_pending_monitor_events() {
5584+
result = NotifyOption::DoPersist;
5585+
}
5586+
5587+
let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
5588+
if !pending_events.is_empty() {
5589+
result = NotifyOption::DoPersist;
5590+
}
5591+
5592+
for event in pending_events.drain(..) {
5593+
handler(event).await;
5594+
}
5595+
5596+
if result == NotifyOption::DoPersist {
5597+
self.persistence_notifier.notify();
5598+
}
5599+
}
55695600
}
55705601

55715602
impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<M, T, K, F, L>

0 commit comments

Comments
 (0)