diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 431f1597c17..bbdd7528465 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -57,13 +57,14 @@ use crate::routing::router::{Path, CandidateRouteHop, PublicHopCandidate}; use crate::routing::log_approx; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::logger::Logger; - use crate::prelude::*; -use core::{cmp, fmt}; +use crate::prelude::hash_map::Entry; +use core::{cmp, fmt, mem}; use core::ops::{Deref, DerefMut}; use core::time::Duration; use crate::io::{self, Read}; use crate::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, DirectedHistoricalLiquidityTracker, HistoricalLiquidityTracker}; #[cfg(not(c_bindings))] use { core::cell::{RefCell, RefMut, Ref}, @@ -474,7 +475,87 @@ where L::Target: Logger { decay_params: ProbabilisticScoringDecayParameters, network_graph: G, logger: L, - channel_liquidities: HashMap, + channel_liquidities: ChannelLiquidities, +} +/// Container for live and historical liquidity bounds for each channel. +#[derive(Clone)] +pub struct ChannelLiquidities(HashMap); + +impl ChannelLiquidities { + fn new() -> Self { + Self(new_hash_map()) + } + + fn time_passed(&mut self, duration_since_epoch: Duration, decay_params: ProbabilisticScoringDecayParameters) { + self.0.retain(|_scid, liquidity| { + liquidity.min_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params); + liquidity.max_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params); + liquidity.last_updated = duration_since_epoch; + + // Only decay the historical buckets if there hasn't been new data for a while. This ties back to our + // earlier conclusion that fixed half-lives for scoring data are inherently flawed—they tend to be either + // too fast or too slow. Ideally, historical buckets should only decay as new data is added, which naturally + // happens when fresh data arrives. However, scoring a channel based on month-old data while treating it the + // same as one with minute-old data is problematic. To address this, we introduced a decay mechanism, but it + // runs very slowly and only activates when no new data has been received for a while, as our preference is + // to decay based on incoming data. + let elapsed_time = + duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated); + if elapsed_time > decay_params.historical_no_updates_half_life { + let half_life = decay_params.historical_no_updates_half_life.as_secs_f64(); + if half_life != 0.0 { + liquidity.liquidity_history.decay_buckets(elapsed_time.as_secs_f64() / half_life); + liquidity.offset_history_last_updated = duration_since_epoch; + } + } + liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 || + liquidity.liquidity_history.has_datapoints() + }); + } + + fn get(&self, short_channel_id: &u64) -> Option<&ChannelLiquidity> { + self.0.get(short_channel_id) + } + + fn insert(&mut self, short_channel_id: u64, liquidity: ChannelLiquidity) -> Option { + self.0.insert(short_channel_id, liquidity) + } + + fn iter(&self) -> impl Iterator { + self.0.iter() + } + + fn entry(&mut self, short_channel_id: u64) -> Entry { + self.0.entry(short_channel_id) + } + + #[cfg(test)] + fn get_mut(&mut self, short_channel_id: &u64) -> Option<&mut ChannelLiquidity> { + self.0.get_mut(short_channel_id) + } +} + +impl Readable for ChannelLiquidities { + #[inline] + fn read(r: &mut R) -> Result { + let mut channel_liquidities = new_hash_map(); + read_tlv_fields!(r, { + (0, channel_liquidities, required), + }); + Ok(ChannelLiquidities(channel_liquidities)) + } +} + +impl Writeable for ChannelLiquidities { + #[inline] + fn write(&self, w: &mut W) -> Result<(), io::Error> { + write_tlv_fields!(w, { + (0, self.0, required), + }); + Ok(()) + } } /// Parameters for configuring [`ProbabilisticScorer`]. @@ -804,6 +885,7 @@ impl ProbabilisticScoringDecayParameters { /// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity /// offset fields gives the opposite direction. #[repr(C)] // Force the fields in memory to be in the order we specify +#[derive(Clone)] struct ChannelLiquidity { /// Lower channel liquidity bound in terms of an offset from zero. min_liquidity_offset_msat: u64, @@ -849,7 +931,7 @@ impl>, L: Deref> ProbabilisticScorer whe decay_params, network_graph, logger, - channel_liquidities: new_hash_map(), + channel_liquidities: ChannelLiquidities::new(), } } @@ -1061,6 +1143,16 @@ impl>, L: Deref> ProbabilisticScorer whe } None } + + /// Overwrite the scorer state with the given external scores. + pub fn set_scores(&mut self, external_scores: ChannelLiquidities) { + _ = mem::replace(&mut self.channel_liquidities, external_scores); + } + + /// Returns the current scores. + pub fn scores(&self) -> &ChannelLiquidities { + &self.channel_liquidities + } } impl ChannelLiquidity { @@ -1074,6 +1166,15 @@ impl ChannelLiquidity { } } + fn merge(&mut self, other: &Self) { + // Take average for min/max liquidity offsets. + self.min_liquidity_offset_msat = (self.min_liquidity_offset_msat + other.min_liquidity_offset_msat) / 2; + self.max_liquidity_offset_msat = (self.max_liquidity_offset_msat + other.max_liquidity_offset_msat) / 2; + + // Merge historical liquidity data. + self.liquidity_history.merge(&other.liquidity_history); + } + /// Returns a view of the channel liquidity directed from `source` to `target` assuming /// `capacity_msat`. fn as_directed( @@ -1603,26 +1704,105 @@ impl>, L: Deref> ScoreUpdate for Probabilistic } fn time_passed(&mut self, duration_since_epoch: Duration) { - let decay_params = self.decay_params; - self.channel_liquidities.retain(|_scid, liquidity| { - liquidity.min_liquidity_offset_msat = - liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params); - liquidity.max_liquidity_offset_msat = - liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params); - liquidity.last_updated = duration_since_epoch; + self.channel_liquidities.time_passed(duration_since_epoch, self.decay_params); + } +} - let elapsed_time = - duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated); - if elapsed_time > decay_params.historical_no_updates_half_life { - let half_life = decay_params.historical_no_updates_half_life.as_secs_f64(); - if half_life != 0.0 { - liquidity.liquidity_history.decay_buckets(elapsed_time.as_secs_f64() / half_life); - liquidity.offset_history_last_updated = duration_since_epoch; - } +/// A probabilistic scorer that combines local and external information to score channels. This scorer is +/// shadow-tracking local only scores, so that it becomes possible to cleanly merge external scores when they become +/// available. +/// +/// This is useful for nodes that have a limited local view of the network and need to augment their view with scores +/// from an external source to improve payment reliability. The external source may use something like background +/// probing to gather a more complete view of the network. Merging reduces the likelihood of losing unique local data on +/// particular channels. +/// +/// Note that only the locally acquired data is persisted. After a restart, the external scores will be lost and must be +/// resupplied. +pub struct CombinedScorer>, L: Deref> where L::Target: Logger { + local_only_scorer: ProbabilisticScorer, + scorer: ProbabilisticScorer, +} + +impl> + Clone, L: Deref + Clone> CombinedScorer where L::Target: Logger { + /// Create a new combined scorer with the given local scorer. + pub fn new(local_scorer: ProbabilisticScorer) -> Self { + let decay_params = local_scorer.decay_params; + let network_graph = local_scorer.network_graph.clone(); + let logger = local_scorer.logger.clone(); + let mut scorer = ProbabilisticScorer::new(decay_params, network_graph, logger); + + scorer.channel_liquidities = local_scorer.channel_liquidities.clone(); + + Self { + local_only_scorer: local_scorer, + scorer: scorer, + } + } + + /// Merge external channel liquidity information into the scorer. + pub fn merge(&mut self, mut external_scores: ChannelLiquidities, duration_since_epoch: Duration) { + // Decay both sets of scores to make them comparable and mergeable. + self.local_only_scorer.time_passed(duration_since_epoch); + external_scores.time_passed(duration_since_epoch, self.local_only_scorer.decay_params); + + let local_scores = &self.local_only_scorer.channel_liquidities; + + // For each channel, merge the external liquidity information with the isolated local liquidity information. + for (scid, mut liquidity) in external_scores.0 { + if let Some(local_liquidity) = local_scores.get(&scid) { + liquidity.merge(local_liquidity); } - liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 || - liquidity.liquidity_history.has_datapoints() - }); + self.scorer.channel_liquidities.insert(scid, liquidity); + } + } + + /// Overwrite the scorer state with the given external scores. + pub fn set_scores(&mut self, external_scores: ChannelLiquidities) { + self.scorer.set_scores(external_scores); + } +} + +impl>, L: Deref> ScoreLookUp for CombinedScorer where L::Target: Logger { + type ScoreParams = ProbabilisticScoringFeeParameters; + + fn channel_penalty_msat( + &self, candidate: &CandidateRouteHop, usage: ChannelUsage, score_params: &ProbabilisticScoringFeeParameters + ) -> u64 { + self.scorer.channel_penalty_msat(candidate, usage, score_params) + } +} + +impl>, L: Deref> ScoreUpdate for CombinedScorer where L::Target: Logger { + fn payment_path_failed(&mut self,path: &Path,short_channel_id:u64,duration_since_epoch:Duration) { + self.local_only_scorer.payment_path_failed(path, short_channel_id, duration_since_epoch); + self.scorer.payment_path_failed(path, short_channel_id, duration_since_epoch); + } + + fn payment_path_successful(&mut self,path: &Path,duration_since_epoch:Duration) { + self.local_only_scorer.payment_path_successful(path, duration_since_epoch); + self.scorer.payment_path_successful(path, duration_since_epoch); + } + + fn probe_failed(&mut self,path: &Path,short_channel_id:u64,duration_since_epoch:Duration) { + self.local_only_scorer.probe_failed(path, short_channel_id, duration_since_epoch); + self.scorer.probe_failed(path, short_channel_id, duration_since_epoch); + } + + fn probe_successful(&mut self,path: &Path,duration_since_epoch:Duration) { + self.local_only_scorer.probe_successful(path, duration_since_epoch); + self.scorer.probe_successful(path, duration_since_epoch); + } + + fn time_passed(&mut self,duration_since_epoch:Duration) { + self.local_only_scorer.time_passed(duration_since_epoch); + self.scorer.time_passed(duration_since_epoch); + } +} + +impl>, L: Deref> Writeable for CombinedScorer where L::Target: Logger { + fn write(&self, writer: &mut W) -> Result<(), crate::io::Error> { + self.local_only_scorer.write(writer) } } @@ -1805,13 +1985,28 @@ mod bucketed_history { self.buckets[bucket] = self.buckets[bucket].saturating_add(BUCKET_FIXED_POINT_ONE); } } + + /// Returns the average of the buckets between the two trackers. + pub(crate) fn merge(&mut self, other: &Self) -> () { + for (bucket, other_bucket) in self.buckets.iter_mut().zip(other.buckets.iter()) { + *bucket = ((*bucket as u32 + *other_bucket as u32) / 2) as u16; + } + } + + /// Applies decay at the given half-life to all buckets. + fn decay(&mut self, half_lives: f64) { + let factor = (1024.0 * powf64(0.5, half_lives)) as u64; + for bucket in self.buckets.iter_mut() { + *bucket = ((*bucket as u64) * factor / 1024) as u16; + } + } } impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); impl_writeable_tlv_based!(LegacyHistoricalBucketRangeTracker, { (0, buckets, required) }); #[derive(Clone, Copy)] - #[repr(C)] // Force the fields in memory to be in the order we specify. + #[repr(C)]// Force the fields in memory to be in the order we specify. pub(super) struct HistoricalLiquidityTracker { // This struct sits inside a `(u64, ChannelLiquidity)` in memory, and we first read the // liquidity offsets in `ChannelLiquidity` when calculating the non-historical score. This @@ -1859,13 +2054,8 @@ mod bucketed_history { } pub(super) fn decay_buckets(&mut self, half_lives: f64) { - let divisor = powf64(2048.0, half_lives) as u64; - for bucket in self.min_liquidity_offset_history.buckets.iter_mut() { - *bucket = ((*bucket as u64) * 1024 / divisor) as u16; - } - for bucket in self.max_liquidity_offset_history.buckets.iter_mut() { - *bucket = ((*bucket as u64) * 1024 / divisor) as u16; - } + self.min_liquidity_offset_history.decay(half_lives); + self.max_liquidity_offset_history.decay(half_lives); self.recalculate_valid_point_count(); } @@ -1901,6 +2091,13 @@ mod bucketed_history { -> DirectedHistoricalLiquidityTracker<&'a mut HistoricalLiquidityTracker> { DirectedHistoricalLiquidityTracker { source_less_than_target, tracker: self } } + + /// Merges the historical liquidity data from another tracker into this one. + pub fn merge(&mut self, other: &Self) { + self.min_liquidity_offset_history.merge(&other.min_liquidity_offset_history); + self.max_liquidity_offset_history.merge(&other.max_liquidity_offset_history); + self.recalculate_valid_point_count(); + } } /// A set of buckets representing the history of where we've seen the minimum- and maximum- @@ -2059,16 +2256,100 @@ mod bucketed_history { Some((cumulative_success_prob * (1024.0 * 1024.0 * 1024.0)) as u64) } } + + #[cfg(test)] + mod tests { + use crate::routing::scoring::ProbabilisticScoringFeeParameters; + + use super::{HistoricalBucketRangeTracker, HistoricalLiquidityTracker}; + #[test] + fn historical_liquidity_bucket_merge() { + let mut bucket1 = HistoricalBucketRangeTracker::new(); + bucket1.track_datapoint(100, 1000); + assert_eq!( + bucket1.buckets, + [ + 0u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + + let mut bucket2 = HistoricalBucketRangeTracker::new(); + bucket2.track_datapoint(0, 1000); + assert_eq!( + bucket2.buckets, + [ + 32u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + + bucket1.merge(&bucket2); + assert_eq!( + bucket1.buckets, + [ + 16u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + } + + #[test] + fn historical_liquidity_bucket_decay() { + let mut bucket = HistoricalBucketRangeTracker::new(); + bucket.track_datapoint(100, 1000); + assert_eq!( + bucket.buckets, + [ + 0u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + + bucket.decay(2.0); + assert_eq!( + bucket.buckets, + [ + 0u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + } + + #[test] + fn historical_liquidity_tracker_merge() { + let params = ProbabilisticScoringFeeParameters::default(); + + let probability1: Option; + let mut tracker1 = HistoricalLiquidityTracker::new(); + { + let mut directed_tracker1 = tracker1.as_directed_mut(true); + directed_tracker1.track_datapoint(100, 200, 1000); + probability1 = directed_tracker1 + .calculate_success_probability_times_billion(¶ms, 500, 1000); + } + + let mut tracker2 = HistoricalLiquidityTracker::new(); + { + let mut directed_tracker2 = tracker2.as_directed_mut(true); + directed_tracker2.track_datapoint(200, 300, 1000); + } + + tracker1.merge(&tracker2); + + let directed_tracker1 = tracker1.as_directed(true); + let probability = + directed_tracker1.calculate_success_probability_times_billion(¶ms, 500, 1000); + + assert_ne!(probability1, probability); + } + } } -use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, DirectedHistoricalLiquidityTracker, HistoricalLiquidityTracker}; impl>, L: Deref> Writeable for ProbabilisticScorer where L::Target: Logger { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { - write_tlv_fields!(w, { - (0, self.channel_liquidities, required), - }); - Ok(()) + self.channel_liquidities.write(w) } } @@ -2079,10 +2360,7 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore r: &mut R, args: (ProbabilisticScoringDecayParameters, G, L) ) -> Result { let (decay_params, network_graph, logger) = args; - let mut channel_liquidities = new_hash_map(); - read_tlv_fields!(r, { - (0, channel_liquidities, required), - }); + let channel_liquidities = ChannelLiquidities::read(r)?; Ok(Self { decay_params, network_graph, @@ -2159,7 +2437,7 @@ impl Readable for ChannelLiquidity { #[cfg(test)] mod tests { - use super::{ChannelLiquidity, HistoricalLiquidityTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorer}; + use super::{ChannelLiquidity, HistoricalLiquidityTracker, ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters}; use crate::blinded_path::BlindedHop; use crate::util::config::UserConfig; @@ -2167,7 +2445,7 @@ mod tests { use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; use crate::routing::gossip::{EffectiveCapacity, NetworkGraph, NodeId}; use crate::routing::router::{BlindedTail, Path, RouteHop, CandidateRouteHop, PublicHopCandidate}; - use crate::routing::scoring::{ChannelUsage, ScoreLookUp, ScoreUpdate}; + use crate::routing::scoring::{ChannelLiquidities, ChannelUsage, CombinedScorer, ScoreLookUp, ScoreUpdate}; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::test_utils::{self, TestLogger}; @@ -2177,6 +2455,7 @@ mod tests { use bitcoin::network::Network; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; use core::time::Duration; + use std::rc::Rc; use crate::io; fn source_privkey() -> SecretKey { @@ -3668,6 +3947,110 @@ mod tests { assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, amount_msat, ¶ms, false), Some(0.0)); } + + #[test] + fn get_scores() { + let logger = TestLogger::new(); + let network_graph = network_graph(&logger); + let params = ProbabilisticScoringFeeParameters { + liquidity_penalty_multiplier_msat: 1_000, + ..ProbabilisticScoringFeeParameters::zero_penalty() + }; + let mut scorer = ProbabilisticScorer::new(ProbabilisticScoringDecayParameters::default(), &network_graph, &logger); + let source = source_node_id(); + let usage = ChannelUsage { + amount_msat: 500, + inflight_htlc_msat: 0, + effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_000, htlc_maximum_msat: 1_000 }, + }; + let successful_path = payment_path_for_amount(200); + let channel = &network_graph.read_only().channel(42).unwrap().to_owned(); + let (info, _) = channel.as_directed_from(&source).unwrap(); + let candidate = CandidateRouteHop::PublicHop(PublicHopCandidate { + info, + short_channel_id: 41, + }); + + scorer.payment_path_successful(&successful_path, Duration::ZERO); + assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 301); + + // Get the scores and assert that both channels are present in the returned struct. + let scores = scorer.scores(); + assert_eq!(scores.iter().count(), 2); + } + + #[test] + fn combined_scorer() { + let logger = TestLogger::new(); + let network_graph = network_graph(&logger); + let params = ProbabilisticScoringFeeParameters::default(); + let mut scorer = ProbabilisticScorer::new( + ProbabilisticScoringDecayParameters::default(), + &network_graph, + &logger, + ); + scorer.payment_path_failed(&payment_path_for_amount(600), 42, Duration::ZERO); + + let mut combined_scorer = CombinedScorer::new(scorer); + + // Verify that the combined_scorer has the correct liquidity range after a failed 600 msat payment. + let liquidity_range = + combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); + assert_eq!(liquidity_range.unwrap(), (0, 600)); + + let source = source_node_id(); + let usage = ChannelUsage { + amount_msat: 750, + inflight_htlc_msat: 0, + effective_capacity: EffectiveCapacity::Total { + capacity_msat: 1_000, + htlc_maximum_msat: 1_000, + }, + }; + + let logger_rc = Rc::new(&logger); + + let mut external_liquidity = ChannelLiquidity::new(Duration::ZERO); + external_liquidity.as_directed_mut(&source_node_id(), &target_node_id(), 1_000).successful( + 1000, + Duration::ZERO, + format_args!("test channel"), + logger_rc.as_ref(), + ); + + let mut external_scores = ChannelLiquidities::new(); + external_scores.insert(42, external_liquidity); + + { + let network_graph = network_graph.read_only(); + let channel = network_graph.channel(42).unwrap(); + let (info, _) = channel.as_directed_from(&source).unwrap(); + let candidate = + CandidateRouteHop::PublicHop(PublicHopCandidate { info, short_channel_id: 42 }); + + let penalty = combined_scorer.channel_penalty_msat(&candidate, usage, ¶ms); + + combined_scorer.merge(external_scores.clone(), Duration::ZERO); + + let penalty_after_merge = + combined_scorer.channel_penalty_msat(&candidate, usage, ¶ms); + + // Since the external source observed a successful payment, the penalty should be lower after the merge. + assert!(penalty_after_merge < penalty); + } + + // Verify that after the merge with a successful payment, the liquidity range is increased. + let liquidity_range = + combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); + assert_eq!(liquidity_range.unwrap(), (0, 300)); + + // Now set (overwrite) the scorer state with the external data which should lead to an even greater liquidity + // range. Just the success from the external source is now considered. + combined_scorer.set_scores(external_scores); + let liquidity_range = + combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); + assert_eq!(liquidity_range.unwrap(), (0, 0)); + } } #[cfg(ldk_bench)]