88//! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
99//! and [`ChannelMonitor`] all in one place.
1010
11+ use core:: convert:: { TryFrom , TryInto } ;
1112use core:: ops:: Deref ;
1213use bitcoin:: hashes:: hex:: { FromHex , ToHex } ;
1314use bitcoin:: { BlockHash , Txid } ;
1415
1516use crate :: io;
17+ use crate :: ln:: msgs:: DecodeError ;
1618use crate :: prelude:: { Vec , String } ;
1719use crate :: routing:: scoring:: WriteableScore ;
1820
@@ -21,12 +23,12 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2123use crate :: chain:: chainmonitor:: { Persist , MonitorUpdateId } ;
2224use crate :: sign:: { EntropySource , NodeSigner , WriteableEcdsaChannelSigner , SignerProvider } ;
2325use crate :: chain:: transaction:: OutPoint ;
24- use crate :: chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate } ;
26+ use crate :: chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , CLOSED_CHANNEL_UPDATE_ID } ;
2527use crate :: ln:: channelmanager:: ChannelManager ;
2628use crate :: routing:: router:: Router ;
2729use crate :: routing:: gossip:: NetworkGraph ;
2830use crate :: util:: logger:: Logger ;
29- use crate :: util:: ser:: { ReadableArgs , Writeable } ;
31+ use crate :: util:: ser:: { Readable , ReadableArgs , Writeable } ;
3032
3133/// The namespace under which the [`ChannelManager`] will be persisted.
3234pub const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE : & str = "" ;
@@ -35,6 +37,8 @@ pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
3537
3638/// The namespace under which [`ChannelMonitor`]s will be persisted.
3739pub const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE : & str = "monitors" ;
40+ /// The namespace under which [`ChannelMonitorUpdate`]s will be persisted.
41+ pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE : & str = "monitors/updates" ;
3842
3943/// The namespace under which the [`NetworkGraph`] will be persisted.
4044pub const NETWORK_GRAPH_PERSISTENCE_NAMESPACE : & str = "" ;
@@ -126,28 +130,28 @@ impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Der
126130 }
127131}
128132
129- impl < ChannelSigner : WriteableEcdsaChannelSigner , K : KVStore > Persist < ChannelSigner > for K {
130- // TODO: We really need a way for the persister to inform the user that its time to crash/shut
131- // down once these start returning failure.
132- // A PermanentFailure implies we should probably just shut down the node since we're
133- // force-closing channels without even broadcasting!
133+ // impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
134+ // // TODO: We really need a way for the persister to inform the user that its time to crash/shut
135+ // // down once these start returning failure.
136+ // // A PermanentFailure implies we should probably just shut down the node since we're
137+ // // force-closing channels without even broadcasting!
134138
135- fn persist_new_channel ( & self , funding_txo : OutPoint , monitor : & ChannelMonitor < ChannelSigner > , _update_id : MonitorUpdateId ) -> chain:: ChannelMonitorUpdateStatus {
136- let key = format ! ( "{}_{}" , funding_txo. txid. to_hex( ) , funding_txo. index) ;
137- match self . write ( CHANNEL_MONITOR_PERSISTENCE_NAMESPACE , & key, & monitor. encode ( ) ) {
138- Ok ( ( ) ) => chain:: ChannelMonitorUpdateStatus :: Completed ,
139- Err ( _) => chain:: ChannelMonitorUpdateStatus :: PermanentFailure ,
140- }
141- }
139+ // fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
140+ // let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
141+ // match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
142+ // Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
143+ // Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
144+ // }
145+ // }
142146
143- fn update_persisted_channel ( & self , funding_txo : OutPoint , _update : Option < & ChannelMonitorUpdate > , monitor : & ChannelMonitor < ChannelSigner > , _update_id : MonitorUpdateId ) -> chain:: ChannelMonitorUpdateStatus {
144- let key = format ! ( "{}_{}" , funding_txo. txid. to_hex( ) , funding_txo. index) ;
145- match self . write ( CHANNEL_MONITOR_PERSISTENCE_NAMESPACE , & key, & monitor. encode ( ) ) {
146- Ok ( ( ) ) => chain:: ChannelMonitorUpdateStatus :: Completed ,
147- Err ( _) => chain:: ChannelMonitorUpdateStatus :: PermanentFailure ,
148- }
149- }
150- }
147+ // fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
148+ // let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
149+ // match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
150+ // Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
151+ // Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
152+ // }
153+ // }
154+ // }
151155
152156/// Read previously persisted [`ChannelMonitor`]s from the store.
153157pub fn read_channel_monitors < K : Deref , ES : Deref , SP : Deref > (
@@ -194,3 +198,303 @@ where
194198 }
195199 Ok ( res)
196200}
201+
202+ enum KVStoreChannelMonitorReaderError {
203+ /// The monitor name was improperly formatted.
204+ BadMonitorName ( String , String ) ,
205+ /// The monitor could not be decoded.
206+ MonitorDecodeFailed ( DecodeError , String ) ,
207+ /// The update could not be decoded.
208+ UpdateDecodeFailed ( DecodeError , String ) ,
209+ /// Storage could not be read.
210+ StorageReadFailed ( io:: Error , String ) ,
211+ /// An update could not be applied to a monitor.
212+ UpdateFailed ( String , String ) ,
213+ }
214+
215+ impl From < KVStoreChannelMonitorReaderError > for io:: Error {
216+ fn from ( value : KVStoreChannelMonitorReaderError ) -> Self {
217+ match value {
218+ KVStoreChannelMonitorReaderError :: BadMonitorName ( reason, context) => {
219+ io:: Error :: new ( io:: ErrorKind :: InvalidInput , format ! ( "{reason}, context: {context}'" ) )
220+ } ,
221+ KVStoreChannelMonitorReaderError :: MonitorDecodeFailed ( reason, context) => {
222+ io:: Error :: new ( io:: ErrorKind :: InvalidData , format ! ( "{reason}, context: {context:?}'" ) )
223+ } ,
224+ KVStoreChannelMonitorReaderError :: UpdateDecodeFailed ( reason, context) => {
225+ io:: Error :: new ( io:: ErrorKind :: InvalidData , format ! ( "{reason}, context: {context:?}'" ) )
226+ } ,
227+ KVStoreChannelMonitorReaderError :: StorageReadFailed ( reason, context) => {
228+ io:: Error :: new ( io:: ErrorKind :: InvalidData , format ! ( "{reason}, context: {context:?}'" ) )
229+ } ,
230+ KVStoreChannelMonitorReaderError :: UpdateFailed ( reason, context) => {
231+ io:: Error :: new ( io:: ErrorKind :: InvalidData , format ! ( "{reason}, context: {context}'" ) )
232+ } ,
233+ }
234+ }
235+ }
236+
237+ /// A struct representing a name for a monitor.
238+ #[ derive( Clone , Debug ) ]
239+ pub struct MonitorName ( String ) ;
240+
241+ impl TryFrom < MonitorName > for OutPoint {
242+ type Error = std:: io:: Error ;
243+
244+ fn try_from ( value : MonitorName ) -> Result < Self , io:: Error > {
245+ let ( txid_hex, index) = value. 0 . split_once ( '_' ) . ok_or_else ( || {
246+ KVStoreChannelMonitorReaderError :: BadMonitorName ( "no underscore" . to_string ( ) , value. 0 . clone ( ) )
247+ } ) ?;
248+ let index = index. parse ( ) . map_err ( |e| {
249+ KVStoreChannelMonitorReaderError :: BadMonitorName (
250+ format ! ( "bad index value, caused by {e}" ) ,
251+ value. 0 . clone ( ) ,
252+ )
253+ } ) ?;
254+ let txid = Txid :: from_hex ( txid_hex) . map_err ( |e| {
255+ KVStoreChannelMonitorReaderError :: BadMonitorName (
256+ format ! ( "bad txid, caused by: {e}" ) ,
257+ value. 0 . clone ( ) ,
258+ )
259+ } ) ?;
260+ Ok ( OutPoint { txid, index } )
261+ }
262+ }
263+
264+ impl From < OutPoint > for MonitorName {
265+ fn from ( value : OutPoint ) -> Self {
266+ MonitorName ( format ! ( "{}_{}" , value. txid. to_hex( ) , value. index) )
267+ }
268+ }
269+
270+ /// A struct representing a name for an update.
271+ #[ derive( Clone , Debug ) ]
272+ pub struct UpdateName ( String ) ;
273+
274+ impl From < u64 > for UpdateName {
275+ fn from ( value : u64 ) -> Self {
276+ Self ( format ! ( "{:0>20}" , value) )
277+ }
278+ }
279+
280+ #[ allow( clippy:: type_complexity) ]
281+ pub trait KVStoreChannelMonitorReader < K : KVStore > {
282+ fn read_channelmonitors < ES : Deref + Clone , SP : Deref + Clone , B : Deref , F : Deref + Clone , L : Deref > (
283+ & self , entropy_source : ES , signer_provider : SP , broadcaster : & B , fee_estimator : F ,
284+ logger : & L ,
285+ ) -> std:: io:: Result < Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: Signer > ) > >
286+ where
287+ ES :: Target : EntropySource + Sized ,
288+ SP :: Target : SignerProvider + Sized ,
289+ B :: Target : BroadcasterInterface ,
290+ F :: Target : FeeEstimator ,
291+ L :: Target : Logger ;
292+ /// List all the names of monitors.
293+ fn list_monitor_names ( & self ) -> io:: Result < Vec < MonitorName > > ;
294+ /// Key to a specific monitor.
295+ fn monitor_key ( & self , monitor_name : & MonitorName ) -> String ;
296+ /// Deserialize a channel monitor.
297+ fn deserialize_monitor < ES : Deref , SP : Deref > (
298+ & self , entropy_source : ES , signer_provider : SP , monitor_name : MonitorName ,
299+ ) -> io:: Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: Signer > ) >
300+ where
301+ ES :: Target : EntropySource + Sized ,
302+ SP :: Target : SignerProvider + Sized ;
303+ /// List all the names of updates corresponding to a given monitor name.
304+ fn list_update_names ( & self , monitor_name : & MonitorName ) -> io:: Result < Vec < UpdateName > > ;
305+ /// Path to corresponding update directory for a given monitor name.
306+ fn path_to_monitor_updates ( & self , monitor_name : & MonitorName ) -> String ;
307+ /// Deserialize a channel monitor update.
308+ fn deserialize_monitor_update (
309+ & self , monitor_name : & MonitorName , update_name : & UpdateName ,
310+ ) -> io:: Result < ChannelMonitorUpdate > ;
311+ /// Key to a specific update.
312+ fn update_key ( & self , monitor_name : & MonitorName , update_name : & UpdateName ) -> String ;
313+ /// Delete updates with an update_id lower than the given channel monitor.
314+ fn delete_stale_updates < ChannelSigner : WriteableEcdsaChannelSigner > (
315+ & self , channel_id : OutPoint , monitor : & ChannelMonitor < ChannelSigner > ,
316+ ) -> io:: Result < ( ) > ;
317+ }
318+
319+ impl < K : KVStore > KVStoreChannelMonitorReader < K > for K {
320+ fn read_channelmonitors <
321+ ES : Deref + Clone ,
322+ SP : Deref + Clone ,
323+ B : Deref ,
324+ F : Deref + Clone ,
325+ L : Deref ,
326+ > (
327+ & self , entropy_source : ES , signer_provider : SP , broadcaster : & B , fee_estimator : F ,
328+ logger : & L ,
329+ ) -> std:: io:: Result < Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: Signer > ) > >
330+ where
331+ ES :: Target : EntropySource + Sized ,
332+ SP :: Target : SignerProvider + Sized ,
333+ B :: Target : BroadcasterInterface ,
334+ F :: Target : FeeEstimator ,
335+ L :: Target : Logger
336+ {
337+ let mut res = Vec :: new ( ) ;
338+ // for each monitor...
339+ for monitor_name in self . list_monitor_names ( ) ? {
340+ // ...parse the monitor
341+ let ( bh, monitor) = self . deserialize_monitor (
342+ entropy_source. clone ( ) ,
343+ signer_provider. clone ( ) ,
344+ monitor_name. clone ( ) ,
345+ ) ?;
346+ // ...parse and apply the updates with an id higher than the monitor.
347+ for update_name in self . list_update_names ( & monitor_name) ? {
348+ let update = self . deserialize_monitor_update ( & monitor_name, & update_name) ?;
349+ if update. update_id == CLOSED_CHANNEL_UPDATE_ID
350+ || update. update_id > monitor. get_latest_update_id ( )
351+ {
352+ monitor
353+ . update_monitor ( & update, broadcaster, fee_estimator. clone ( ) , logger)
354+ . map_err ( |_| {
355+ KVStoreChannelMonitorReaderError :: UpdateFailed (
356+ "update_monitor returned Err(())" . to_string ( ) ,
357+ format ! ( "monitor: {:?}" , monitor_name) ,
358+ )
359+ } ) ?;
360+ }
361+ }
362+ // ...push the result into the return vec
363+ res. push ( ( bh, monitor) )
364+ }
365+ Ok ( res)
366+ }
367+
368+ /// Key to a specific monitor.
369+ fn monitor_key ( & self , monitor_name : & MonitorName ) -> String {
370+ CHANNEL_MONITOR_PERSISTENCE_NAMESPACE . to_owned ( ) + & monitor_name. 0
371+ }
372+
373+ /// Key to a specific update.
374+ fn update_key ( & self , monitor_name : & MonitorName , update_name : & UpdateName ) -> String {
375+ self . path_to_monitor_updates ( monitor_name) + & update_name. 0
376+ }
377+
378+ /// List all the names of monitors.
379+ fn list_monitor_names ( & self ) -> io:: Result < Vec < MonitorName > > {
380+ Ok ( self . list ( CHANNEL_MONITOR_PERSISTENCE_NAMESPACE ) ?. into_iter ( ) . map ( MonitorName ) . collect ( ) )
381+ }
382+
383+ /// List all the names of updates corresponding to a given monitor name.
384+ fn list_update_names ( & self , monitor_name : & MonitorName ) -> io:: Result < Vec < UpdateName > > {
385+ let update_dir_path = self . path_to_monitor_updates ( monitor_name) ;
386+ Ok ( self . list ( & update_dir_path) ?. into_iter ( ) . map ( UpdateName ) . collect ( ) )
387+ }
388+
389+ /// Path to corresponding update directory for a given monitor name.
390+ fn path_to_monitor_updates ( & self , monitor_name : & MonitorName ) -> String {
391+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE . to_owned ( ) + & monitor_name. 0
392+ }
393+
394+ /// Deserialize a channel monitor.
395+ fn deserialize_monitor < ES : Deref , SP : Deref > (
396+ & self , entropy_source : ES , signer_provider : SP , monitor_name : MonitorName ,
397+ ) -> io:: Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: Signer > ) >
398+ where
399+ ES :: Target : EntropySource + Sized ,
400+ SP :: Target : SignerProvider + Sized
401+ {
402+ let key = self . monitor_key ( & monitor_name) ;
403+ let outpoint: OutPoint = monitor_name. try_into ( ) ?;
404+ match <( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: Signer > ) >:: read (
405+ & mut self . read ( CHANNEL_MONITOR_PERSISTENCE_NAMESPACE , & key)
406+ . map_err ( |e| KVStoreChannelMonitorReaderError :: StorageReadFailed ( e, key. to_owned ( ) ) ) ?,
407+ ( & * entropy_source, & * signer_provider) ,
408+ ) {
409+ Ok ( ( blockhash, channel_monitor) ) => {
410+ if channel_monitor. get_funding_txo ( ) . 0 . txid != outpoint. txid
411+ || channel_monitor. get_funding_txo ( ) . 0 . index != outpoint. index
412+ {
413+ return Err ( KVStoreChannelMonitorReaderError :: MonitorDecodeFailed (
414+ DecodeError :: InvalidValue ,
415+ key,
416+ )
417+ . into ( ) ) ;
418+ }
419+ Ok ( ( blockhash, channel_monitor) )
420+ }
421+ Err ( e) => Err ( KVStoreChannelMonitorReaderError :: MonitorDecodeFailed ( e, key) . into ( ) ) ,
422+ }
423+ }
424+
425+ /// Deserialize a channel monitor update.
426+ fn deserialize_monitor_update (
427+ & self , monitor_name : & MonitorName , update_name : & UpdateName ,
428+ ) -> io:: Result < ChannelMonitorUpdate >
429+ {
430+ let key = self . update_key ( monitor_name, update_name) ;
431+ Ok ( ChannelMonitorUpdate :: read ( & mut self
432+ . read ( CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE , & key)
433+ . map_err ( |e| KVStoreChannelMonitorReaderError :: StorageReadFailed ( e, key. to_owned ( ) ) ) ?)
434+ . map_err ( |e| KVStoreChannelMonitorReaderError :: UpdateDecodeFailed ( e, key) ) ?)
435+ }
436+
437+ /// Delete updates with an update_id lower than the given channel monitor.
438+ fn delete_stale_updates < ChannelSigner : WriteableEcdsaChannelSigner > (
439+ & self , channel_id : OutPoint , monitor : & ChannelMonitor < ChannelSigner > ,
440+ ) -> io:: Result < ( ) >
441+ {
442+ let monitor_name: MonitorName = channel_id. into ( ) ;
443+ let update_names =
444+ self . list_update_names ( & monitor_name) ?;
445+ for update_name in update_names {
446+ let update =
447+ self . deserialize_monitor_update ( & monitor_name, & update_name) ?;
448+ if update. update_id != CLOSED_CHANNEL_UPDATE_ID
449+ && update. update_id <= monitor. get_latest_update_id ( )
450+ {
451+ self . remove ( CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE , & self . update_key ( & monitor_name, & update_name) ) ?;
452+ }
453+ }
454+ Ok ( ( ) )
455+ }
456+ }
457+
458+ impl < ChannelSigner : WriteableEcdsaChannelSigner , K : KVStore + KVStoreChannelMonitorReader < K > > Persist < ChannelSigner > for K {
459+ // TODO: We really need a way for the persister to inform the user that its time to crash/shut
460+ // down once these start returning failure.
461+ // A PermanentFailure implies we should probably just shut down the node since we're
462+ // force-closing channels without even broadcasting!
463+ fn persist_new_channel ( & self , funding_txo : OutPoint , monitor : & ChannelMonitor < ChannelSigner > , _update_id : MonitorUpdateId ) -> chain:: ChannelMonitorUpdateStatus
464+ {
465+ let key = self . monitor_key ( & funding_txo. into ( ) ) ;
466+ match self . write ( CHANNEL_MONITOR_PERSISTENCE_NAMESPACE , & key, & monitor. encode ( ) ) {
467+ Ok ( ( ) ) => {
468+ if let Err ( _e) = self . delete_stale_updates ( funding_txo, monitor) {
469+ // TODO(domz): what to do? seems like an error or panic is needed, but OTOH cleanup is technically optional
470+ //log_error!(self.logger, "error cleaning up channel monitor updates! {}", e);
471+ } ;
472+ chain:: ChannelMonitorUpdateStatus :: Completed
473+ } ,
474+ Err ( _) => chain:: ChannelMonitorUpdateStatus :: PermanentFailure ,
475+ }
476+ }
477+
478+ fn update_persisted_channel (
479+ & self , funding_txo : OutPoint , update : Option < & ChannelMonitorUpdate > ,
480+ monitor : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ,
481+ ) -> chain:: ChannelMonitorUpdateStatus {
482+ match update {
483+ Some ( update) => {
484+ // This is an update to the monitor, which we persist to apply on restart.
485+ // IMPORTANT: update_id: MonitorUpdateId is not to be confused with ChannelMonitorUpdate.update_id.
486+ // The first is an opaque identifier for this call (used for calling back write completion). The second
487+ // is the channel update sequence number.
488+ let key = self . update_key ( & funding_txo. into ( ) , & update. update_id . into ( ) ) ;
489+ match self . write ( CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE , & key, & update. encode ( ) ) {
490+ Ok ( ( ) ) => chain:: ChannelMonitorUpdateStatus :: Completed ,
491+ Err ( _) => chain:: ChannelMonitorUpdateStatus :: PermanentFailure ,
492+ }
493+ }
494+ // A new block. Now we need to persist the entire new monitor and discard the old
495+ // updates.
496+ None => self . persist_new_channel ( funding_txo, monitor, update_id) ,
497+ }
498+ }
499+
500+ }
0 commit comments