Skip to content

Commit e551842

Browse files
committed
async read
1 parent c10d7c3 commit e551842

File tree

2 files changed

+76
-55
lines changed

2 files changed

+76
-55
lines changed

lightning-persister/src/fs_store.rs

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,31 @@ impl FilesystemStore {
9494
}
9595

9696
impl FilesystemStore {
97+
fn read(
98+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
99+
) -> lightning::io::Result<Vec<u8>> {
100+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
101+
102+
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
103+
dest_file_path.push(key);
104+
105+
let mut buf = Vec::new();
106+
{
107+
let inner_lock_ref = {
108+
let mut outer_lock = self.locks.lock().unwrap();
109+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
110+
};
111+
let _guard = inner_lock_ref.read().unwrap();
112+
113+
let mut f = fs::File::open(dest_file_path)?;
114+
f.read_to_end(&mut buf)?;
115+
}
116+
117+
self.garbage_collect_locks();
118+
119+
Ok(buf)
120+
}
121+
97122
fn write(
98123
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
99124
) -> Result<(), lightning::io::Error> {
@@ -185,27 +210,9 @@ impl FilesystemStore {
185210
impl KVStore for FilesystemStore {
186211
fn read(
187212
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
188-
) -> lightning::io::Result<Vec<u8>> {
189-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
190-
191-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
192-
dest_file_path.push(key);
193-
194-
let mut buf = Vec::new();
195-
{
196-
let inner_lock_ref = {
197-
let mut outer_lock = self.locks.lock().unwrap();
198-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
199-
};
200-
let _guard = inner_lock_ref.read().unwrap();
201-
202-
let mut f = fs::File::open(dest_file_path)?;
203-
f.read_to_end(&mut buf)?;
204-
}
205-
206-
self.garbage_collect_locks();
207-
208-
Ok(buf)
213+
) -> AsyncResultType<'static, Vec<u8>, lightning::io::Error> {
214+
let res = self.read(primary_namespace, secondary_namespace, key);
215+
Box::pin(async move { res })
209216
}
210217

211218
fn write(

lightning/src/util/persist.rs

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub trait KVStore {
131131
/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
132132
fn read(
133133
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
134-
) -> Result<Vec<u8>, io::Error>;
134+
) -> AsyncResultType<'static, Vec<u8>, io::Error>;
135135
/// Persists the given data under the given `key`.
136136
///
137137
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present
@@ -194,7 +194,7 @@ pub async fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
194194
let keys_to_migrate = source_store.list_all_keys()?;
195195

196196
for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
197-
let data = source_store.read(primary_namespace, secondary_namespace, key)?;
197+
let data = source_store.read(primary_namespace, secondary_namespace, key).await?;
198198
target_store.write(primary_namespace, secondary_namespace, key, &data).await.map_err(
199199
|_| {
200200
io::Error::new(
@@ -327,11 +327,14 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized + Sync + Send + 'sta
327327

328328
Box::pin(async move {
329329
let monitor_key = monitor_name.to_string();
330-
let monitor = match kv_store.read(
331-
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
332-
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
333-
monitor_key.as_str(),
334-
) {
330+
let monitor = match kv_store
331+
.read(
332+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
333+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
334+
monitor_key.as_str(),
335+
)
336+
.await
337+
{
335338
Ok(monitor) => monitor,
336339
Err(_) => return,
337340
};
@@ -358,7 +361,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized + Sync + Send + 'sta
358361
}
359362

360363
/// Read previously persisted [`ChannelMonitor`]s from the store.
361-
pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
364+
pub async fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
362365
kv_store: K, entropy_source: ES, signer_provider: SP,
363366
) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
364367
where
@@ -373,11 +376,15 @@ where
373376
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
374377
)? {
375378
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
376-
&mut io::Cursor::new(kv_store.read(
377-
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
378-
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
379-
&stored_key,
380-
)?),
379+
&mut io::Cursor::new(
380+
kv_store
381+
.read(
382+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
383+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
384+
&stored_key,
385+
)
386+
.await?,
387+
),
381388
(&*entropy_source, &*signer_provider),
382389
) {
383390
Ok((block_hash, channel_monitor)) => {
@@ -563,7 +570,7 @@ where
563570
/// It is extremely important that your [`KVStore::read`] implementation uses the
564571
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
565572
/// documentation for [`MonitorUpdatingPersister`].
566-
pub fn read_all_channel_monitors_with_updates(
573+
pub async fn read_all_channel_monitors_with_updates(
567574
&self,
568575
) -> Result<
569576
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
@@ -575,7 +582,7 @@ where
575582
)?;
576583
let mut res = Vec::with_capacity(monitor_list.len());
577584
for monitor_key in monitor_list {
578-
res.push(self.read_channel_monitor_with_updates(monitor_key.as_str())?)
585+
res.push(self.read_channel_monitor_with_updates(monitor_key.as_str()).await?)
579586
}
580587
Ok(res)
581588
}
@@ -599,20 +606,20 @@ where
599606
///
600607
/// Loading a large number of monitors will be faster if done in parallel. You can use this
601608
/// function to accomplish this. Take care to limit the number of parallel readers.
602-
pub fn read_channel_monitor_with_updates(
609+
pub async fn read_channel_monitor_with_updates(
603610
&self, monitor_key: &str,
604611
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
605612
{
606613
let monitor_name = MonitorName::from_str(monitor_key)?;
607-
let (block_hash, monitor) = self.read_monitor(&monitor_name, monitor_key)?;
614+
let (block_hash, monitor) = self.read_monitor(&monitor_name, monitor_key).await?;
608615
let mut current_update_id = monitor.get_latest_update_id();
609616
loop {
610617
current_update_id = match current_update_id.checked_add(1) {
611618
Some(next_update_id) => next_update_id,
612619
None => break,
613620
};
614621
let update_name = UpdateName::from(current_update_id);
615-
let update = match self.read_monitor_update(monitor_key, &update_name) {
622+
let update = match self.read_monitor_update(monitor_key, &update_name).await {
616623
Ok(update) => update,
617624
Err(err) if err.kind() == io::ErrorKind::NotFound => {
618625
// We can't find any more updates, so we are done.
@@ -638,15 +645,19 @@ where
638645
}
639646

640647
/// Read a channel monitor.
641-
fn read_monitor(
648+
async fn read_monitor(
642649
&self, monitor_name: &MonitorName, monitor_key: &str,
643650
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
644651
{
645-
let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
646-
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
647-
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
648-
monitor_key,
649-
)?);
652+
let mut monitor_cursor = io::Cursor::new(
653+
self.kv_store
654+
.read(
655+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
656+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
657+
monitor_key,
658+
)
659+
.await?,
660+
);
650661
// Discard the sentinel bytes if found.
651662
if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
652663
monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);
@@ -683,14 +694,17 @@ where
683694
}
684695

685696
/// Read a channel monitor update.
686-
fn read_monitor_update(
697+
async fn read_monitor_update(
687698
&self, monitor_key: &str, update_name: &UpdateName,
688699
) -> Result<ChannelMonitorUpdate, io::Error> {
689-
let update_bytes = self.kv_store.read(
690-
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
691-
monitor_key,
692-
update_name.as_str(),
693-
)?;
700+
let update_bytes = self
701+
.kv_store
702+
.read(
703+
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
704+
monitor_key,
705+
update_name.as_str(),
706+
)
707+
.await?;
694708
ChannelMonitorUpdate::read(&mut io::Cursor::new(update_bytes)).map_err(|e| {
695709
log_error!(
696710
self.logger,
@@ -710,14 +724,14 @@ where
710724
/// updates. The updates that have an `update_id` less than or equal to than the stored monitor
711725
/// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will
712726
/// be passed to [`KVStore::remove`].
713-
pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
727+
pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
714728
let monitor_keys = self.kv_store.list(
715729
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
716730
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
717731
)?;
718732
for monitor_key in monitor_keys {
719733
let monitor_name = MonitorName::from_str(&monitor_key)?;
720-
let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key)?;
734+
let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?;
721735
let updates = self
722736
.kv_store
723737
.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str())?;
@@ -909,7 +923,7 @@ where
909923
let maybe_old_monitor = match monitor_latest_update_id {
910924
LEGACY_CLOSED_CHANNEL_UPDATE_ID => {
911925
let monitor_key = monitor_name.to_string();
912-
self.read_monitor(&monitor_name, &monitor_key).ok()
926+
self.read_monitor(&monitor_name, &monitor_key).await.ok()
913927
},
914928
_ => None,
915929
};
@@ -953,7 +967,7 @@ where
953967

954968
async fn archive_persisted_channel(&self, monitor_name: MonitorName) {
955969
let monitor_key = monitor_name.to_string();
956-
let monitor = match self.read_channel_monitor_with_updates(&monitor_key) {
970+
let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await {
957971
Ok((_block_hash, monitor)) => monitor,
958972
Err(_) => return,
959973
};

0 commit comments

Comments
 (0)