Skip to content

Commit 7812b29

Browse files
committed
async kv store
1 parent 5688166 commit 7812b29

File tree

8 files changed

+353
-137
lines changed

8 files changed

+353
-137
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use lightning::sign::ChangeDestinationSource;
3838
#[cfg(feature = "std")]
3939
use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::OutputSpender;
41+
use lightning::util::async_poll::FutureSpawner;
4142
use lightning::util::logger::Logger;
4243
use lightning::util::persist::{KVStore, Persister};
4344
use lightning::util::sweep::OutputSweeper;
@@ -780,8 +781,9 @@ pub async fn process_events_async<
780781
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
781782
EventHandler: Fn(Event) -> EventHandlerFuture,
782783
PS: 'static + Deref + Send,
784+
FS: FutureSpawner,
783785
M: 'static
784-
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
786+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, FS>>
785787
+ Send
786788
+ Sync,
787789
CM: 'static + Deref,
@@ -977,7 +979,7 @@ impl BackgroundProcessor {
977979
EH: 'static + EventHandler + Send,
978980
PS: 'static + Deref + Send,
979981
M: 'static
980-
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
982+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, FS>>
981983
+ Send
982984
+ Sync,
983985
CM: 'static + Deref + Send,
@@ -992,6 +994,7 @@ impl BackgroundProcessor {
992994
O: 'static + Deref,
993995
K: 'static + Deref,
994996
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
997+
FS: FutureSpawner
995998
>(
996999
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
9971000
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,

lightning-persister/src/fs_store.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
33

44
use lightning::util::persist::{KVStore, MigratableKVStore};
55
use lightning::util::string::PrintableString;
6+
use lightning::util::async_poll::AsyncResult;
67

78
use std::collections::HashMap;
89
use std::fs;
@@ -11,6 +12,8 @@ use std::path::{Path, PathBuf};
1112
use std::sync::atomic::{AtomicUsize, Ordering};
1213
use std::sync::{Arc, Mutex, RwLock};
1314

15+
16+
1417
#[cfg(target_os = "windows")]
1518
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
1619

@@ -329,6 +332,12 @@ impl KVStore for FilesystemStore {
329332

330333
Ok(keys)
331334
}
335+
336+
fn write_async(
337+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
338+
) -> AsyncResult<'static, ()> {
339+
todo!()
340+
}
332341
}
333342

334343
fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {

lightning/src/chain/chainmonitor.rs

Lines changed: 125 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ use crate::chain::transaction::{OutPoint, TransactionData};
3636
use crate::ln::types::ChannelId;
3737
use crate::sign::ecdsa::EcdsaChannelSigner;
3838
use crate::events::{self, Event, EventHandler, ReplayEvent};
39+
use crate::util::async_poll::{poll_or_spawn, AsyncResult, FutureSpawner};
3940
use crate::util::logger::{Logger, WithContext};
4041
use crate::util::errors::APIError;
4142
use crate::util::persist::MonitorName;
4243
use crate::util::wakers::{Future, Notifier};
4344
use crate::ln::channel_state::ChannelDetails;
45+
use crate::sync::{Arc};
4446

4547
use crate::prelude::*;
4648
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
@@ -122,7 +124,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
122124
///
123125
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
124126
/// [`Writeable::write`]: crate::util::ser::Writeable::write
125-
fn persist_new_channel(&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus;
127+
fn persist_new_channel(&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>) -> AsyncResult<'static, ()>;
126128

127129
/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
128130
/// update.
@@ -161,7 +163,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
161163
/// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
162164
///
163165
/// [`Writeable::write`]: crate::util::ser::Writeable::write
164-
fn update_persisted_channel(&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus;
166+
fn update_persisted_channel(&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> AsyncResult<'static, ()>;
165167
/// Prevents the channel monitor from being loaded on startup.
166168
///
167169
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
@@ -233,31 +235,33 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
233235
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
234236
/// [module-level documentation]: crate::chain::chainmonitor
235237
/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
236-
pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
238+
pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner>
237239
where C::Target: chain::Filter,
238240
T::Target: BroadcasterInterface,
239241
F::Target: FeeEstimator,
240242
L::Target: Logger,
241243
P::Target: Persist<ChannelSigner>,
242244
{
243-
monitors: RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
245+
monitors: Arc<RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>>,
244246
chain_source: Option<C>,
245247
broadcaster: T,
246248
logger: L,
247249
fee_estimator: F,
248250
persister: P,
249251
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
250252
/// from the user and not from a [`ChannelMonitor`].
251-
pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
253+
pending_monitor_events: Arc<Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>>,
252254
/// The best block height seen, used as a proxy for the passage of time.
253255
highest_chain_height: AtomicUsize,
254256

255257
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
256258
/// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process).
257-
event_notifier: Notifier,
259+
event_notifier: Arc<Notifier>,
260+
261+
future_spawner: Arc<FS>,
258262
}
259263

260-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
264+
impl<ChannelSigner: EcdsaChannelSigner + 'static, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner> ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
261265
where C::Target: chain::Filter,
262266
T::Target: BroadcasterInterface,
263267
F::Target: FeeEstimator,
@@ -347,18 +351,31 @@ where C::Target: chain::Filter,
347351
// `ChannelMonitorUpdate` after a channel persist for a channel with the same
348352
// `latest_update_id`.
349353
let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
350-
match self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor) {
351-
ChannelMonitorUpdateStatus::Completed =>
352-
log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data",
353-
log_funding_info!(monitor)
354-
),
355-
ChannelMonitorUpdateStatus::InProgress => {
356-
log_trace!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor));
357-
}
358-
ChannelMonitorUpdateStatus::UnrecoverableError => {
359-
return Err(());
354+
let max_update_id = _pending_monitor_updates.iter().copied().max().unwrap_or(0);
355+
356+
let persist_res = self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor);
357+
358+
let monitors = self.monitors.clone();
359+
let pending_monitor_updates_cb = self.pending_monitor_events.clone();
360+
let event_notifier = self.event_notifier.clone();
361+
let future_spawner = self.future_spawner.clone();
362+
let channel_id = *channel_id;
363+
364+
match poll_or_spawn(persist_res, move || {
365+
// TODO: Log error if the monitor is not persisted.
366+
let _ = ChainMonitor::<ChannelSigner, C, T, F, L, P, FS>::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier,
367+
channel_id, max_update_id);
368+
}, future_spawner.deref()) {
369+
Ok(true) => {
370+
// log
371+
},
372+
Ok(false) => {
373+
// log
374+
}
375+
Err(_) => {
376+
return Err(());
377+
},
360378
}
361-
}
362379
}
363380

364381
// Register any new outputs with the chain source for filtering, storing any dependent
@@ -388,17 +405,18 @@ where C::Target: chain::Filter,
388405
/// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
389406
/// always need to fetch full blocks absent another means for determining which blocks contain
390407
/// transactions relevant to the watched channels.
391-
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
408+
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P, future_spawner: FS) -> Self {
392409
Self {
393-
monitors: RwLock::new(new_hash_map()),
410+
monitors: Arc::new(RwLock::new(new_hash_map())),
394411
chain_source,
395412
broadcaster,
396413
logger,
397414
fee_estimator: feeest,
398415
persister,
399-
pending_monitor_events: Mutex::new(Vec::new()),
416+
pending_monitor_events: Arc::new(Mutex::new(Vec::new())),
400417
highest_chain_height: AtomicUsize::new(0),
401-
event_notifier: Notifier::new(),
418+
event_notifier: Arc::new(Notifier::new()),
419+
future_spawner: Arc::new(future_spawner),
402420
}
403421
}
404422

@@ -531,6 +549,40 @@ where C::Target: chain::Filter,
531549
Ok(())
532550
}
533551

552+
fn channel_monitor_updated_internal(
553+
monitors: &RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
554+
pending_monitor_events: &Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
555+
event_notifier: &Notifier,
556+
channel_id: ChannelId, completed_update_id: u64) -> Result<(), APIError> {
557+
let monitors = monitors.read().unwrap();
558+
let monitor_data = if let Some(mon) = monitors.get(&channel_id) { mon } else {
559+
return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching channel ID {} found", channel_id) });
560+
};
561+
let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
562+
pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
563+
564+
// Note that we only check for pending non-chainsync monitor updates and we don't track monitor
565+
// updates resulting from chainsync in `pending_monitor_updates`.
566+
let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates);
567+
568+
// TODO: Add logger
569+
570+
if monitor_is_pending_updates {
571+
// If there are still monitor updates pending, we cannot yet construct a
572+
// Completed event.
573+
return Ok(());
574+
}
575+
let funding_txo = monitor_data.monitor.get_funding_txo();
576+
pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
577+
funding_txo,
578+
channel_id,
579+
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
580+
}], monitor_data.monitor.get_counterparty_node_id()));
581+
582+
event_notifier.notify();
583+
Ok(())
584+
}
585+
534586
/// This wrapper avoids having to update some of our tests for now as they assume the direct
535587
/// chain::Watch API wherein we mark a monitor fully-updated by just calling
536588
/// channel_monitor_updated once with the highest ID.
@@ -669,8 +721,8 @@ where C::Target: chain::Filter,
669721
}
670722
}
671723

672-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
673-
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
724+
impl<ChannelSigner: EcdsaChannelSigner + Send + Sync + 'static, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner>
725+
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
674726
where
675727
C::Target: chain::Filter,
676728
T::Target: BroadcasterInterface,
@@ -698,8 +750,8 @@ where
698750
}
699751
}
700752

701-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
702-
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
753+
impl<ChannelSigner: EcdsaChannelSigner + Sync + Send + 'static, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner>
754+
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
703755
where
704756
C::Target: chain::Filter,
705757
T::Target: BroadcasterInterface,
@@ -752,8 +804,8 @@ where
752804
}
753805
}
754806

755-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
756-
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
807+
impl<ChannelSigner: EcdsaChannelSigner + Sync + Send + 'static, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref, FS: FutureSpawner + Clone>
808+
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
757809
where C::Target: chain::Filter,
758810
T::Target: BroadcasterInterface,
759811
F::Target: FeeEstimator,
@@ -774,15 +826,28 @@ where C::Target: chain::Filter,
774826
let update_id = monitor.get_latest_update_id();
775827
let mut pending_monitor_updates = Vec::new();
776828
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
777-
match persist_res {
778-
ChannelMonitorUpdateStatus::InProgress => {
779-
log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
780-
pending_monitor_updates.push(update_id);
781-
},
782-
ChannelMonitorUpdateStatus::Completed => {
829+
830+
let update_status;
831+
let monitors = self.monitors.clone();
832+
let pending_monitor_updates_cb = self.pending_monitor_events.clone();
833+
let event_notifier = self.event_notifier.clone();
834+
let future_spawner = self.future_spawner.clone();
835+
836+
match poll_or_spawn(persist_res, move || {
837+
// TODO: Log error if the monitor is not persisted.
838+
let _ = ChainMonitor::<ChannelSigner, C, T, F, L, P, FS>::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier,
839+
channel_id, update_id);
840+
}, future_spawner.deref()) {
841+
Ok(true) => {
783842
log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
843+
update_status = ChannelMonitorUpdateStatus::Completed;
784844
},
785-
ChannelMonitorUpdateStatus::UnrecoverableError => {
845+
Ok(false) => {
846+
log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
847+
pending_monitor_updates.push(update_id);
848+
update_status = ChannelMonitorUpdateStatus::InProgress;
849+
}
850+
Err(_) => {
786851
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
787852
log_error!(logger, "{}", err_str);
788853
panic!("{}", err_str);
@@ -795,7 +860,7 @@ where C::Target: chain::Filter,
795860
monitor,
796861
pending_monitor_updates: Mutex::new(pending_monitor_updates),
797862
});
798-
Ok(persist_res)
863+
Ok(update_status)
799864
}
800865

801866
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
@@ -840,27 +905,40 @@ where C::Target: chain::Filter,
840905
} else {
841906
self.persister.update_persisted_channel(monitor.persistence_key(), Some(update), monitor)
842907
};
843-
match persist_res {
844-
ChannelMonitorUpdateStatus::InProgress => {
845-
pending_monitor_updates.push(update_id);
908+
909+
let monitors = self.monitors.clone();
910+
let pending_monitor_updates_cb = self.pending_monitor_events.clone();
911+
let event_notifier = self.event_notifier.clone();
912+
let future_spawner = self.future_spawner.clone();
913+
914+
let update_status;
915+
match poll_or_spawn(persist_res, move || {
916+
// TODO: Log error if the monitor is not persisted.
917+
let _ = ChainMonitor::<ChannelSigner, C, T, F, L, P, FS>::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier,
918+
channel_id, update_id);
919+
}, future_spawner.deref()) {
920+
Ok(true) => {
846921
log_debug!(logger,
847-
"Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
922+
"Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
848923
update_id,
849924
log_funding_info!(monitor)
850925
);
926+
update_status = ChannelMonitorUpdateStatus::Completed;
851927
},
852-
ChannelMonitorUpdateStatus::Completed => {
928+
Ok(false) => {
853929
log_debug!(logger,
854-
"Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
930+
"Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
855931
update_id,
856932
log_funding_info!(monitor)
857933
);
858-
},
859-
ChannelMonitorUpdateStatus::UnrecoverableError => {
934+
pending_monitor_updates.push(update_id);
935+
update_status = ChannelMonitorUpdateStatus::InProgress;
936+
}
937+
Err(_) => {
860938
// Take the monitors lock for writing so that we poison it and any future
861939
// operations going forward fail immediately.
862940
core::mem::drop(pending_monitor_updates);
863-
core::mem::drop(monitors);
941+
// core::mem::drop(monitors);
864942
let _poison = self.monitors.write().unwrap();
865943
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
866944
log_error!(logger, "{}", err_str);
@@ -870,7 +948,7 @@ where C::Target: chain::Filter,
870948
if update_res.is_err() {
871949
ChannelMonitorUpdateStatus::InProgress
872950
} else {
873-
persist_res
951+
update_status
874952
}
875953
}
876954
}
@@ -891,7 +969,7 @@ where C::Target: chain::Filter,
891969
}
892970
}
893971

894-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
972+
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
895973
where C::Target: chain::Filter,
896974
T::Target: BroadcasterInterface,
897975
F::Target: FeeEstimator,
@@ -1138,4 +1216,3 @@ mod tests {
11381216
}).is_err());
11391217
}
11401218
}
1141-

0 commit comments

Comments
 (0)