Skip to content

Commit 14134bb

Browse files
committed
Async KVStore
1 parent 543aaed commit 14134bb

File tree

10 files changed

+898
-270
lines changed

10 files changed

+898
-270
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@ use lightning::sign::ChangeDestinationSource;
3939
use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
42+
use lightning::util::async_poll::FutureSpawner;
4243
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStore, Persister};
44+
use lightning::util::persist::{
45+
KVStore, KVStoreSync, KVStoreSyncWrapper, Persister, PersisterSync,
46+
};
4447
use lightning::util::sweep::OutputSweeper;
4548
#[cfg(feature = "std")]
4649
use lightning::util::sweep::OutputSweeperSync;
@@ -311,6 +314,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311314
true
312315
}
313316

317+
macro_rules! maybe_await {
318+
(true, $e:expr) => {
319+
$e.await
320+
};
321+
(false, $e:expr) => {
322+
$e
323+
};
324+
}
325+
314326
macro_rules! define_run_body {
315327
(
316328
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +331,7 @@ macro_rules! define_run_body {
319331
$peer_manager: ident, $gossip_sync: ident,
320332
$process_sweeper: expr,
321333
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
334+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt,
323335
) => { {
324336
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
325337
$channel_manager.get_cm().timer_tick_occurred();
@@ -375,7 +387,7 @@ macro_rules! define_run_body {
375387

376388
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377389
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
390+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
379391
log_trace!($logger, "Done persisting ChannelManager.");
380392
}
381393
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +448,7 @@ macro_rules! define_run_body {
436448
log_trace!($logger, "Persisting network graph.");
437449
}
438450

439-
if let Err(e) = $persister.persist_graph(network_graph) {
451+
if let Err(e) = maybe_await!($async, $persister.persist_graph(network_graph)) {
440452
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441453
}
442454

@@ -464,7 +476,7 @@ macro_rules! define_run_body {
464476
} else {
465477
log_trace!($logger, "Persisting scorer");
466478
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
479+
if let Err(e) = maybe_await!($async, $persister.persist_scorer(&scorer)) {
468480
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469481
}
470482
}
@@ -487,16 +499,16 @@ macro_rules! define_run_body {
487499
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488500
// some races where users quit while channel updates were in-flight, with
489501
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
502+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
491503

492504
// Persist Scorer on exit
493505
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
506+
maybe_await!($async, $persister.persist_scorer(&scorer))?;
495507
}
496508

497509
// Persist NetworkGraph on exit
498510
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
511+
maybe_await!($async, $persister.persist_graph(network_graph))?;
500512
}
501513

502514
Ok(())
@@ -782,8 +794,11 @@ pub async fn process_events_async<
782794
EventHandler: Fn(Event) -> EventHandlerFuture,
783795
PS: 'static + Deref + Send,
784796
ES: 'static + Deref + Send,
797+
FS: FutureSpawner,
785798
M: 'static
786-
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
799+
+ Deref<
800+
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES, FS>,
801+
>
787802
+ Send
788803
+ Sync,
789804
CM: 'static + Deref,
@@ -841,7 +856,7 @@ where
841856
if let Some(duration_since_epoch) = fetch_time() {
842857
if update_scorer(scorer, &event, duration_since_epoch) {
843858
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
859+
if let Err(e) = persister.persist_scorer(&*scorer).await {
845860
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846861
// We opt not to abort early on persistence failure here as persisting
847862
// the scorer is non-critical and we still hope that it will have
@@ -919,6 +934,7 @@ where
919934
},
920935
mobile_interruptable_platform,
921936
fetch_time,
937+
true,
922938
)
923939
}
924940

@@ -982,7 +998,16 @@ impl BackgroundProcessor {
982998
ES: 'static + Deref + Send,
983999
M: 'static
9841000
+ Deref<
985-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1001+
Target = ChainMonitor<
1002+
<CM::Target as AChannelManager>::Signer,
1003+
CF,
1004+
T,
1005+
F,
1006+
L,
1007+
P,
1008+
ES,
1009+
FS,
1010+
>,
9861011
>
9871012
+ Send
9881013
+ Sync,
@@ -998,6 +1023,7 @@ impl BackgroundProcessor {
9981023
O: 'static + Deref,
9991024
K: 'static + Deref,
10001025
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1026+
FS: FutureSpawner,
10011027
>(
10021028
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
10031029
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
@@ -1010,15 +1036,15 @@ impl BackgroundProcessor {
10101036
F::Target: 'static + FeeEstimator,
10111037
L::Target: 'static + Logger,
10121038
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1013-
PS::Target: 'static + Persister<'a, CM, L, S>,
1039+
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
10141040
ES::Target: 'static + EntropySource,
10151041
CM::Target: AChannelManager,
10161042
OM::Target: AOnionMessenger,
10171043
PM::Target: APeerManager,
10181044
LM::Target: ALiquidityManager,
10191045
D::Target: ChangeDestinationSourceSync,
10201046
O::Target: 'static + OutputSpender,
1021-
K::Target: 'static + KVStore,
1047+
K::Target: 'static + KVStoreSync,
10221048
{
10231049
let stop_thread = Arc::new(AtomicBool::new(false));
10241050
let stop_thread_clone = stop_thread.clone();
@@ -1098,6 +1124,7 @@ impl BackgroundProcessor {
10981124
.expect("Time should be sometime after 1970"),
10991125
)
11001126
},
1127+
false,
11011128
)
11021129
});
11031130
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

lightning-persister/src/fs_store.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Objects related to [`FilesystemStore`] live here.
22
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
33

4+
use lightning::util::async_poll::{AsyncResult, AsyncResultType};
45
use lightning::util::persist::{KVStore, MigratableKVStore};
56
use lightning::util::string::PrintableString;
67

@@ -92,7 +93,7 @@ impl FilesystemStore {
9293
}
9394
}
9495

95-
impl KVStore for FilesystemStore {
96+
impl FilesystemStore {
9697
fn read(
9798
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
9899
) -> lightning::io::Result<Vec<u8>> {
@@ -120,7 +121,7 @@ impl KVStore for FilesystemStore {
120121

121122
fn write(
122123
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
123-
) -> lightning::io::Result<()> {
124+
) -> Result<(), lightning::io::Error> {
124125
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
125126

126127
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
@@ -204,6 +205,23 @@ impl KVStore for FilesystemStore {
204205

205206
res
206207
}
208+
}
209+
210+
impl KVStore for FilesystemStore {
211+
fn read(
212+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
213+
) -> AsyncResultType<'static, Vec<u8>, lightning::io::Error> {
214+
let res = self.read(primary_namespace, secondary_namespace, key);
215+
Box::pin(async move { res })
216+
}
217+
218+
fn write(
219+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
220+
) -> AsyncResultType<'static, (), lightning::io::Error> {
221+
let res = self.write(primary_namespace, secondary_namespace, key, buf);
222+
223+
Box::pin(async move { res })
224+
}
207225

208226
fn remove(
209227
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,

0 commit comments

Comments
 (0)