From 21a7ffde32c4c3db4b71fa4048012c0e95de785e Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 4 Dec 2024 10:25:23 +0800 Subject: [PATCH 01/15] feature gate metrics related code --- protocols/gossipsub/Cargo.toml | 3 +- protocols/gossipsub/src/behaviour.rs | 196 ++++++++++++++++++++- protocols/gossipsub/src/behaviour/tests.rs | 1 - protocols/gossipsub/src/lib.rs | 2 + protocols/gossipsub/src/peer_score.rs | 117 +++++++++++- protocols/gossipsub/src/topic.rs | 7 +- protocols/gossipsub/src/types.rs | 7 +- 7 files changed, 318 insertions(+), 15 deletions(-) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 665f757fcb3..75c4d5f927e 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"] [features] wasm-bindgen = ["getrandom/js"] +metrics = ["prometheus-client"] [dependencies] asynchronous-codec = { workspace = true } @@ -39,7 +40,7 @@ tracing = { workspace = true } void = "1.0.2" # Metrics dependencies -prometheus-client = { workspace = true } +prometheus-client = { workspace = true, optional = true } [dev-dependencies] async-std = { version = "1.6.3", features = ["unstable"] } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 16adb555a44..d7d54ec32e9 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -31,6 +31,7 @@ use std::{ use futures::StreamExt; use futures_ticker::Ticker; +#[cfg(feature = "metrics")] use prometheus_client::registry::Registry; use rand::{seq::SliceRandom, thread_rng}; @@ -52,7 +53,8 @@ use crate::config::{Config, ValidationMode}; use crate::gossip_promises::GossipPromises; use crate::handler::{Handler, HandlerEvent, HandlerIn}; use crate::mcache::MessageCache; -use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; +#[cfg(feature = "metrics")] +use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; @@ -326,6 +328,7 @@ pub struct Behaviour { data_transform: D, /// Keep track of a set of internal metrics relating to gossipsub. + #[cfg(feature = "metrics")] metrics: Option, } @@ -336,6 +339,7 @@ where { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a /// [`Config`]. This has no subscription filter and uses no compression. + #[cfg(feature = "metrics")] pub fn new(privacy: MessageAuthenticity, config: Config) -> Result { Self::new_with_subscription_filter_and_transform( privacy, @@ -345,7 +349,17 @@ where D::default(), ) } + #[cfg(not(feature = "metrics"))] + pub fn new(privacy: MessageAuthenticity, config: Config) -> Result { + Self::new_with_subscription_filter_and_transform( + privacy, + config, + F::default(), + D::default(), + ) + } + #[cfg(feature = "metrics")] /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a /// [`Config`]. This has no subscription filter and uses no compression. /// Metrics can be evaluated by passing a reference to a [`Registry`]. @@ -372,6 +386,7 @@ where { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a /// [`Config`] and a custom subscription filter. + #[cfg(feature = "metrics")] pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: Config, @@ -386,6 +401,19 @@ where D::default(), ) } + #[cfg(not(feature = "metrics"))] + pub fn new_with_subscription_filter( + privacy: MessageAuthenticity, + config: Config, + subscription_filter: F, + ) -> Result { + Self::new_with_subscription_filter_and_transform( + privacy, + config, + subscription_filter, + D::default(), + ) + } } impl Behaviour @@ -395,6 +423,7 @@ where { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a /// [`Config`] and a custom data transform. + #[cfg(feature = "metrics")] pub fn new_with_transform( privacy: MessageAuthenticity, config: Config, @@ -409,6 +438,19 @@ where data_transform, ) } + #[cfg(not(feature = "metrics"))] + pub fn new_with_transform( + privacy: MessageAuthenticity, + config: Config, + data_transform: D, + ) -> Result { + Self::new_with_subscription_filter_and_transform( + privacy, + config, + F::default(), + data_transform, + ) + } } impl Behaviour @@ -418,6 +460,7 @@ where { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a /// [`Config`] and a custom subscription filter and data transform. + #[cfg(feature = "metrics")] pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: Config, @@ -466,6 +509,53 @@ where data_transform, }) } + #[cfg(not(feature = "metrics"))] + pub fn new_with_subscription_filter_and_transform( + privacy: MessageAuthenticity, + config: Config, + subscription_filter: F, + data_transform: D, + ) -> Result { + // Set up the router given the configuration settings. + + // We do not allow configurations where a published message would also be rejected if it + // were received locally. + validate_config(&privacy, config.validation_mode())?; + + Ok(Behaviour { + events: VecDeque::new(), + control_pool: HashMap::new(), + publish_config: privacy.into(), + duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), + explicit_peers: HashSet::new(), + blacklisted_peers: HashSet::new(), + mesh: HashMap::new(), + fanout: HashMap::new(), + fanout_last_pub: HashMap::new(), + backoffs: BackoffStorage::new( + &config.prune_backoff(), + config.heartbeat_interval(), + config.backoff_slack(), + ), + mcache: MessageCache::new(config.history_gossip(), config.history_length()), + heartbeat: Ticker::new_with_next( + config.heartbeat_interval(), + config.heartbeat_initial_delay(), + ), + heartbeat_ticks: 0, + px_peers: HashSet::new(), + outbound_peers: HashSet::new(), + peer_score: None, + count_received_ihave: HashMap::new(), + count_sent_iwant: HashMap::new(), + pending_iwant_msgs: HashSet::new(), + connected_peers: HashMap::new(), + published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), + config, + subscription_filter, + data_transform, + }) + } } impl Behaviour @@ -731,6 +821,7 @@ where tracing::debug!(message=%msg_id, "Published message"); + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_published_message(&topic_hash); } @@ -774,6 +865,7 @@ where message=%msg_id, "Message not in cache. Ignoring forwarding" ); + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.memcache_miss(); } @@ -781,6 +873,7 @@ where } }; + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_msg_validation(&raw_message.topic, &acceptance); } @@ -798,6 +891,7 @@ where }; if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) { + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_msg_validation(&raw_message.topic, &acceptance); } @@ -927,7 +1021,7 @@ where } let mut added_peers = HashSet::new(); - + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.joined(topic_hash) } @@ -966,8 +1060,9 @@ where self.fanout_last_pub.remove(topic_hash); } - let fanaout_added = added_peers.len(); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { + let fanaout_added = added_peers.len(); m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) } @@ -995,8 +1090,9 @@ where mesh_peers.extend(new_peers); } - let random_added = added_peers.len() - fanaout_added; + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { + let random_added = added_peers.len() - fanaout_added; m.peers_included(topic_hash, Inclusion::Random, random_added) } @@ -1024,8 +1120,9 @@ where ); } - let mesh_peers = self.mesh_peers(topic_hash).count(); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { + let mesh_peers = self.mesh_peers(topic_hash).count(); m.set_mesh_peers(topic_hash, mesh_peers) } @@ -1099,6 +1196,7 @@ where // If our mesh contains the topic, send prune to peers and delete it from the mesh if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.left(topic_hash) } @@ -1228,8 +1326,10 @@ where for id in ids.into_iter().filter(want_message) { // have not seen this message and are not currently requesting it + if iwant_ids.insert(id) { // Register the IWANT metric + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_iwant(&topic); } @@ -1345,6 +1445,7 @@ where // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { if connected_peer.topics.insert(topic.clone()) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.inc_topic_peers(topic); } @@ -1383,6 +1484,7 @@ where ); // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_score_penalty(Penalty::GraftBackoff); } @@ -1440,6 +1542,7 @@ where ); if peers.insert(*peer_id) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(&topic_hash, Inclusion::Subscribed, 1) } @@ -1490,6 +1593,7 @@ where tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer"); } + #[cfg(feature = "metrics")] fn remove_peer_from_mesh( &mut self, peer_id: &PeerId, @@ -1538,6 +1642,51 @@ where } } + #[cfg(not(feature = "metrics"))] + fn remove_peer_from_mesh( + &mut self, + peer_id: &PeerId, + topic_hash: &TopicHash, + backoff: Option, + always_update_backoff: bool, + ) { + let mut update_backoff = always_update_backoff; + if let Some(peers) = self.mesh.get_mut(topic_hash) { + // remove the peer if it exists in the mesh + if peers.remove(peer_id) { + tracing::debug!( + peer=%peer_id, + topic=%topic_hash, + "PRUNE: Removing peer from the mesh for topic" + ); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.prune(peer_id, topic_hash.clone()); + } + + update_backoff = true; + + // inform the handler + peer_removed_from_mesh( + *peer_id, + topic_hash, + &self.mesh, + &mut self.events, + &self.connected_peers, + ); + } + } + if update_backoff { + let time = if let Some(backoff) = backoff { + Duration::from_secs(backoff) + } else { + self.config.prune_backoff() + }; + // is there a backoff specified by the peer? if so obey it. + self.backoffs.update_backoff(topic_hash, peer_id, time); + } + } + /// Handles PRUNE control messages. Removes peer from the mesh. fn handle_prune( &mut self, @@ -1548,7 +1697,10 @@ where let (below_threshold, score) = self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { + #[cfg(feature = "metrics")] self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune); + #[cfg(not(feature = "metrics"))] + self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); if self.mesh.contains_key(&topic_hash) { //connect to px peers @@ -1695,6 +1847,7 @@ where propagation_source: &PeerId, ) { // Record the received metric + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len()); } @@ -1738,6 +1891,7 @@ where ); // Record the received message with the metrics + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.msg_recvd(&message.topic); } @@ -1794,6 +1948,7 @@ where reject_reason: RejectReason, ) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_invalid_message(&raw_message.topic); } @@ -1874,6 +2029,7 @@ where "SUBSCRIPTION: Adding gossip peer to topic" ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.inc_topic_peers(topic_hash); } @@ -1901,6 +2057,7 @@ where topic=%topic_hash, "SUBSCRIPTION: Adding peer to the mesh for topic" ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Subscribed, 1) } @@ -1931,6 +2088,7 @@ where "SUBSCRIPTION: Removing gossip peer from topic" ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.dec_topic_peers(topic_hash); } @@ -1948,7 +2106,10 @@ where // remove unsubscribed peers from the mesh if it exists for (peer_id, topic_hash) in unsubscribed_peers { + #[cfg(feature = "metrics")] self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub); + #[cfg(not(feature = "metrics"))] + self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); } // Potentially inform the handler if we have added this peer to a mesh for the first time. @@ -1989,6 +2150,7 @@ where if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { for (peer, count) in gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_score_penalty(Penalty::BrokenPromise); } @@ -1999,6 +2161,7 @@ where /// Heartbeat function which shifts the memcache and updates the mesh. fn heartbeat(&mut self) { tracing::debug!("Starting heartbeat"); + #[cfg(feature = "metrics")] let start = Instant::now(); self.heartbeat_ticks += 1; @@ -2028,9 +2191,13 @@ where let mut scores = HashMap::with_capacity(self.connected_peers.len()); if let Some((peer_score, ..)) = &self.peer_score { for peer_id in self.connected_peers.keys() { - scores - .entry(peer_id) - .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut())); + scores.entry(peer_id).or_insert_with(|| { + #[cfg(feature = "metrics")] + let score = peer_score.metric_score(peer_id, self.metrics.as_mut()); + #[cfg(not(feature = "metrics"))] + let score = peer_score.score(peer_id); + score + }); } } @@ -2048,6 +2215,7 @@ where let peer_score = *scores.get(peer_id).unwrap_or(&0.0); // Record the score per mesh + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.observe_mesh_peers_score(topic_hash, peer_score); } @@ -2067,6 +2235,7 @@ where } } + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len()) } @@ -2098,6 +2267,7 @@ where } // update the mesh tracing::debug!("Updating mesh, new mesh: {:?}", peer_list); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) } @@ -2159,6 +2329,7 @@ where removed += 1; } + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_removed(topic_hash, Churn::Excess, removed) } @@ -2186,6 +2357,7 @@ where } // update the mesh tracing::debug!("Updating mesh, new mesh: {:?}", peer_list); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len()) } @@ -2256,6 +2428,7 @@ where "Opportunistically graft in topic with peers {:?}", peer_list ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) } @@ -2264,6 +2437,7 @@ where } } // Register the final count of peers in the mesh + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.set_mesh_peers(topic_hash, peers.len()) } @@ -2377,6 +2551,7 @@ where self.mcache.shift(); tracing::debug!("Completed Heartbeat"); + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); metrics.observe_heartbeat_duration(duration); @@ -2708,6 +2883,7 @@ where /// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it /// is not already an arc. fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc { // register bytes sent on the internal metrics. @@ -2858,6 +3034,7 @@ where if let Some(mesh_peers) = self.mesh.get_mut(topic) { // check if the peer is in the mesh and remove it if mesh_peers.remove(&peer_id) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_removed(topic, Churn::Dc, 1); m.set_mesh_peers(topic, mesh_peers.len()); @@ -2865,6 +3042,7 @@ where }; } + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.dec_topic_peers(topic); } @@ -2881,6 +3059,7 @@ where self.outbound_peers.remove(&peer_id); // If metrics are enabled, register the disconnection of a peer based on its protocol. + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { let peer_kind = &self .connected_peers @@ -2978,6 +3157,7 @@ where HandlerEvent::PeerKind(kind) => { // We have identified the protocol this peer is using + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.peer_protocol_connected(kind.clone()); } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index a74566a1308..103ac323be3 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -59,7 +59,6 @@ where let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, - None, self.subscription_filter, self.data_transform, ) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 3db2fa7ce51..0f3956f0ac5 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -100,6 +100,7 @@ mod error; mod gossip_promises; mod handler; mod mcache; +#[cfg(feature = "metrics")] mod metrics; mod peer_score; mod protocol; @@ -113,6 +114,7 @@ mod types; pub use self::behaviour::{Behaviour, Event, MessageAuthenticity}; pub use self::config::{Config, ConfigBuilder, ValidationMode, Version}; pub use self::error::{ConfigBuilderError, PublishError, SubscriptionError, ValidationError}; +#[cfg(feature = "metrics")] pub use self::metrics::Config as MetricsConfig; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index ac24fc91970..2d2e86f16b7 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -21,6 +21,7 @@ //! //! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour. +#[cfg(feature = "metrics")] use crate::metrics::{Metrics, Penalty}; use crate::time_cache::TimeCache; use crate::{MessageId, TopicHash}; @@ -214,11 +215,125 @@ impl PeerScore { /// Returns the score for a peer pub(crate) fn score(&self, peer_id: &PeerId) -> f64 { - self.metric_score(peer_id, None) + let Some(peer_stats) = self.peer_stats.get(peer_id) else { + return 0.0; + }; + let mut score = 0.0; + + // topic scores + for (topic, topic_stats) in peer_stats.topics.iter() { + // topic parameters + if let Some(topic_params) = self.params.topics.get(topic) { + // we are tracking the topic + + // the topic score + let mut topic_score = 0.0; + + // P1: time in mesh + if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status { + let p1 = { + let v = mesh_time.as_secs_f64() + / topic_params.time_in_mesh_quantum.as_secs_f64(); + if v < topic_params.time_in_mesh_cap { + v + } else { + topic_params.time_in_mesh_cap + } + }; + topic_score += p1 * topic_params.time_in_mesh_weight; + } + + // P2: first message deliveries + let p2 = { + let v = topic_stats.first_message_deliveries; + if v < topic_params.first_message_deliveries_cap { + v + } else { + topic_params.first_message_deliveries_cap + } + }; + topic_score += p2 * topic_params.first_message_deliveries_weight; + + // P3: mesh message deliveries + if topic_stats.mesh_message_deliveries_active + && topic_stats.mesh_message_deliveries + < topic_params.mesh_message_deliveries_threshold + { + let deficit = topic_params.mesh_message_deliveries_threshold + - topic_stats.mesh_message_deliveries; + let p3 = deficit * deficit; + topic_score += p3 * topic_params.mesh_message_deliveries_weight; + tracing::debug!( + peer=%peer_id, + %topic, + %deficit, + penalty=%topic_score, + "[Penalty] The peer has a mesh deliveries deficit and will be penalized" + ); + } + + // P3b: + // NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so this detracts. + let p3b = topic_stats.mesh_failure_penalty; + topic_score += p3b * topic_params.mesh_failure_penalty_weight; + + // P4: invalid messages + // NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so this detracts. + let p4 = + topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries; + topic_score += p4 * topic_params.invalid_message_deliveries_weight; + + // update score, mixing with topic weight + score += topic_score * topic_params.topic_weight; + } + } + + // apply the topic score cap, if any + if self.params.topic_score_cap > 0f64 && score > self.params.topic_score_cap { + score = self.params.topic_score_cap; + } + + // P5: application-specific score + let p5 = peer_stats.application_score; + score += p5 * self.params.app_specific_weight; + + // P6: IP collocation factor + for ip in peer_stats.known_ips.iter() { + if self.params.ip_colocation_factor_whitelist.contains(ip) { + continue; + } + + // P6 has a cliff (ip_colocation_factor_threshold); it's only applied iff + // at least that many peers are connected to us from that source IP + // addr. It is quadratic, and the weight is negative (validated by + // peer_score_params.validate()). + if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) { + if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold { + let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; + let p6 = surplus * surplus; + tracing::debug!( + peer=%peer_id, + surplus_ip=%ip, + surplus=%surplus, + "[Penalty] The peer gets penalized because of too many peers with the same ip" + ); + score += p6 * self.params.ip_colocation_factor_weight; + } + } + } + + // P7: behavioural pattern penalty + if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold { + let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold; + let p7 = excess * excess; + score += p7 * self.params.behaviour_penalty_weight; + } + score } /// Returns the score for a peer, logging metrics. This is called from the heartbeat and /// increments the metric counts for penalties. + #[cfg(feature = "metrics")] pub(crate) fn metric_score(&self, peer_id: &PeerId, mut metrics: Option<&mut Metrics>) -> f64 { let Some(peer_stats) = self.peer_stats.get(peer_id) else { return 0.0; diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index a73496b53f2..c74fe836503 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -20,7 +20,6 @@ use crate::rpc_proto::proto; use base64::prelude::*; -use prometheus_client::encoding::EncodeLabelSet; use quick_protobuf::Writer; use sha2::{Digest, Sha256}; use std::fmt; @@ -64,7 +63,11 @@ impl Hasher for Sha256Hash { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, EncodeLabelSet)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[cfg_attr( + feature = "metrics", + derive(prometheus_client::encoding::EncodeLabelSet) +)] pub struct TopicHash { /// The topic hash. Stored as a string to align with the protobuf API. hash: String, diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index a88f4822ac2..4a73c012ec8 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -22,7 +22,6 @@ use crate::TopicHash; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; -use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; use std::fmt::Debug; use std::{collections::BTreeSet, fmt}; @@ -82,7 +81,11 @@ pub(crate) struct PeerConnections { } /// Describes the types of peers that can exist in the gossipsub context. -#[derive(Debug, Clone, PartialEq, Hash, EncodeLabelValue, Eq)] +#[derive(Debug, Clone, PartialEq, Hash, Eq)] +#[cfg_attr( + feature = "metrics", + derive(prometheus_client::encoding::EncodeLabelValue) +)] pub enum PeerKind { /// A gossipsub 1.1 peer. Gossipsubv1_1, From 906e5a0ed7a0987b3051498dc1867b207244d1d0 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 4 Dec 2024 10:33:25 +0800 Subject: [PATCH 02/15] reduce diff and fix test --- protocols/gossipsub/src/behaviour.rs | 1 - protocols/gossipsub/src/behaviour/tests.rs | 10 ++++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index d7d54ec32e9..152b8a94e0d 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1326,7 +1326,6 @@ where for id in ids.into_iter().filter(want_message) { // have not seen this message and are not currently requesting it - if iwant_ids.insert(id) { // Register the IWANT metric #[cfg(feature = "metrics")] diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 103ac323be3..a9239feaece 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -56,6 +56,16 @@ where pub(crate) fn create_network(self) -> (Behaviour, Vec, Vec) { let keypair = libp2p_identity::Keypair::generate_ed25519(); // create a gossipsub struct + #[cfg(feature = "metrics")] + let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( + MessageAuthenticity::Signed(keypair), + self.gs_config, + None, + self.subscription_filter, + self.data_transform, + ) + .unwrap(); + #[cfg(not(feature = "metrics"))] let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, From 252bbdf0d39255a06f667bb53c83f5713fa936fe Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 4 Dec 2024 10:59:16 +0800 Subject: [PATCH 03/15] fix wrongly gated variable --- protocols/gossipsub/src/behaviour.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2e450d1baa3..06bb994bab0 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1071,10 +1071,11 @@ where // remove the last published time self.fanout_last_pub.remove(topic_hash); } - + + #[cfg(feature = "metrics")] + let fanaout_added = added_peers.len(); #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { - let fanaout_added = added_peers.len(); m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) } From a0970e32d155baaefe8dd29754af6a6aae74857f Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 4 Dec 2024 11:01:58 +0800 Subject: [PATCH 04/15] reduce diff --- protocols/gossipsub/src/behaviour.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 06bb994bab0..cb2e716d390 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1071,7 +1071,6 @@ where // remove the last published time self.fanout_last_pub.remove(topic_hash); } - #[cfg(feature = "metrics")] let fanaout_added = added_peers.len(); #[cfg(feature = "metrics")] From b79e15d6122c95bf22adba0021b66738eb5762ac Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 4 Dec 2024 11:08:07 +0800 Subject: [PATCH 05/15] fix borrowing rule violation --- protocols/gossipsub/src/behaviour.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index cb2e716d390..0ca79b55991 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1130,10 +1130,11 @@ where &self.connected_peers, ); } - + + #[cfg(feature = "metrics")] + let mesh_peers = self.mesh_peers(topic_hash).count(); #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { - let mesh_peers = self.mesh_peers(topic_hash).count(); m.set_mesh_peers(topic_hash, mesh_peers) } From 871e36a96e169bf5f302a472901a04078c241c8f Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 4 Dec 2024 11:12:25 +0800 Subject: [PATCH 06/15] formatting --- protocols/gossipsub/src/behaviour.rs | 6 +++--- protocols/gossipsub/src/lib.rs | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 0ca79b55991..b8440c424f4 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -48,6 +48,8 @@ use quick_protobuf::{MessageWrite, Writer}; use rand::{seq::SliceRandom, thread_rng}; use web_time::{Instant, SystemTime}; +#[cfg(feature = "metrics")] +use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::{ backoff::BackoffStorage, config::{Config, ValidationMode}, @@ -68,8 +70,6 @@ use crate::{ }, FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError, }; -#[cfg(feature = "metrics")] -use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; #[cfg(test)] mod tests; @@ -1130,7 +1130,7 @@ where &self.connected_peers, ); } - + #[cfg(feature = "metrics")] let mesh_peers = self.mesh_peers(topic_hash).count(); #[cfg(feature = "metrics")] diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 9bbff04a1f3..ae614367e14 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -113,6 +113,9 @@ mod topic; mod transform; mod types; +#[cfg(feature = "metrics")] +pub use metrics::Config as MetricsConfig; + pub use self::{ behaviour::{Behaviour, Event, MessageAuthenticity}, config::{Config, ConfigBuilder, ValidationMode, Version}, @@ -130,8 +133,6 @@ pub use self::{ transform::{DataTransform, IdentityTransform}, types::{FailedMessages, Message, MessageAcceptance, MessageId, RawMessage}, }; -#[cfg(feature = "metrics")] -pub use metrics::Config as MetricsConfig; #[deprecated(note = "Will be removed from the public API.")] pub type Rpc = self::types::Rpc; From c6ac3f4435e90beb96c380c31ee67a6a22f2ee08 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 5 Dec 2024 18:13:45 +0800 Subject: [PATCH 07/15] sync new changes to scoring --- protocols/gossipsub/src/peer_score.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index bac0893663e..7995e08fba3 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -332,12 +332,19 @@ impl PeerScore { } } - // P7: behavioural pattern penalty + // P7: behavioural pattern penalty. if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold { let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold; let p7 = excess * excess; score += p7 * self.params.behaviour_penalty_weight; } + + // Slow peer weighting. + if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold { + let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold; + score += excess * self.params.slow_peer_weight; + } + score } From 7d71e9c08876c046b1e094e081706e6a9451e920 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 5 Dec 2024 23:51:52 +0800 Subject: [PATCH 08/15] refactor behaviour constructor --- protocols/gossipsub/src/behaviour.rs | 122 +++------------------ protocols/gossipsub/src/behaviour/tests.rs | 10 -- 2 files changed, 16 insertions(+), 116 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index b8440c424f4..fa582ad6606 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -342,18 +342,8 @@ where F: TopicSubscriptionFilter + Default, { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`]. This has no subscription filter and uses no compression. - #[cfg(feature = "metrics")] - pub fn new(privacy: MessageAuthenticity, config: Config) -> Result { - Self::new_with_subscription_filter_and_transform( - privacy, - config, - None, - F::default(), - D::default(), - ) - } - #[cfg(not(feature = "metrics"))] + /// [`Config`]. This has no subscription filter and uses no compression. + /// Metrics are disabled by default. pub fn new(privacy: MessageAuthenticity, config: Config) -> Result { Self::new_with_subscription_filter_and_transform( privacy, @@ -363,23 +353,16 @@ where ) } - #[cfg(feature = "metrics")] - /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`]. This has no subscription filter and uses no compression. + /// Allow the behaviour to also provide metric data. /// Metrics can be evaluated by passing a reference to a [`Registry`]. - pub fn new_with_metrics( - privacy: MessageAuthenticity, - config: Config, + #[cfg(feature = "metrics")] + pub fn with_metrics( + mut self, metrics_registry: &mut Registry, metrics_config: MetricsConfig, - ) -> Result { - Self::new_with_subscription_filter_and_transform( - privacy, - config, - Some((metrics_registry, metrics_config)), - F::default(), - D::default(), - ) + ) -> Self { + self.metrics = Some(Metrics::new(metrics_registry, metrics_config)); + self } } @@ -389,23 +372,8 @@ where F: TopicSubscriptionFilter, { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`] and a custom subscription filter. - #[cfg(feature = "metrics")] - pub fn new_with_subscription_filter( - privacy: MessageAuthenticity, - config: Config, - metrics: Option<(&mut Registry, MetricsConfig)>, - subscription_filter: F, - ) -> Result { - Self::new_with_subscription_filter_and_transform( - privacy, - config, - metrics, - subscription_filter, - D::default(), - ) - } - #[cfg(not(feature = "metrics"))] + /// [`Config`] and a custom subscription filter. + /// Metrics are disabled by default. pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: Config, @@ -426,23 +394,8 @@ where F: TopicSubscriptionFilter + Default, { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`] and a custom data transform. - #[cfg(feature = "metrics")] - pub fn new_with_transform( - privacy: MessageAuthenticity, - config: Config, - metrics: Option<(&mut Registry, MetricsConfig)>, - data_transform: D, - ) -> Result { - Self::new_with_subscription_filter_and_transform( - privacy, - config, - metrics, - F::default(), - data_transform, - ) - } - #[cfg(not(feature = "metrics"))] + /// [`Config`] and a custom data transform. + /// Metrics are disabled by default. pub fn new_with_transform( privacy: MessageAuthenticity, config: Config, @@ -464,52 +417,7 @@ where { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a /// [`Config`] and a custom subscription filter and data transform. - #[cfg(feature = "metrics")] - pub fn new_with_subscription_filter_and_transform( - privacy: MessageAuthenticity, - config: Config, - metrics: Option<(&mut Registry, MetricsConfig)>, - subscription_filter: F, - data_transform: D, - ) -> Result { - // Set up the router given the configuration settings. - - // We do not allow configurations where a published message would also be rejected if it - // were received locally. - validate_config(&privacy, config.validation_mode())?; - - Ok(Behaviour { - metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)), - events: VecDeque::new(), - publish_config: privacy.into(), - duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - explicit_peers: HashSet::new(), - blacklisted_peers: HashSet::new(), - mesh: HashMap::new(), - fanout: HashMap::new(), - fanout_last_pub: HashMap::new(), - backoffs: BackoffStorage::new( - &config.prune_backoff(), - config.heartbeat_interval(), - config.backoff_slack(), - ), - mcache: MessageCache::new(config.history_gossip(), config.history_length()), - heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()), - heartbeat_ticks: 0, - px_peers: HashSet::new(), - outbound_peers: HashSet::new(), - peer_score: None, - count_received_ihave: HashMap::new(), - count_sent_iwant: HashMap::new(), - connected_peers: HashMap::new(), - published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), - config, - subscription_filter, - data_transform, - failed_messages: Default::default(), - }) - } - #[cfg(not(feature = "metrics"))] + /// Metrics are disabled by default. pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: Config, @@ -523,6 +431,8 @@ where validate_config(&privacy, config.validation_mode())?; Ok(Behaviour { + #[cfg(feature = "metrics")] + metrics: None, events: VecDeque::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index e0a85ccd912..ba5d9abcdaa 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -67,16 +67,6 @@ where ) { let keypair = libp2p_identity::Keypair::generate_ed25519(); // create a gossipsub struct - #[cfg(feature = "metrics")] - let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( - MessageAuthenticity::Signed(keypair), - self.gs_config, - None, - self.subscription_filter, - self.data_transform, - ) - .unwrap(); - #[cfg(not(feature = "metrics"))] let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, From efdf89abedebb2d6bc5c703f8ec552bd02515d39 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 6 Dec 2024 01:11:13 +0800 Subject: [PATCH 09/15] refactor scoring --- protocols/gossipsub/src/behaviour.rs | 22 ++-- protocols/gossipsub/src/metrics.rs | 4 +- protocols/gossipsub/src/peer_score.rs | 161 +++++--------------------- 3 files changed, 48 insertions(+), 139 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index fa582ad6606..704d0f2695c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1380,6 +1380,8 @@ where } else { let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0); let now = Instant::now(); + #[cfg(feature = "metrics")] + let mut num_graft_backoff_penalties = 0; for topic_hash in topics { if let Some(peers) = self.mesh.get_mut(&topic_hash) { // if the peer is already in the mesh ignore the graft @@ -1403,8 +1405,8 @@ where // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { #[cfg(feature = "metrics")] - if let Some(metrics) = self.metrics.as_mut() { - metrics.register_score_penalty(Penalty::GraftBackoff); + { + num_graft_backoff_penalties += 1; } peer_score.add_penalty(peer_id, 1); @@ -1491,6 +1493,10 @@ where continue; } } + #[cfg(feature = "metrics")] + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_score_penalty(Penalty::GraftBackoff, num_graft_backoff_penalties); + } } if !to_prune_topics.is_empty() { @@ -2062,12 +2068,14 @@ where /// Applies penalties to peers that did not respond to our IWANT requests. fn apply_iwant_penalties(&mut self) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - for (peer, count) in gossip_promises.get_broken_promises() { + let broken_promises = gossip_promises.get_broken_promises(); + #[cfg(feature = "metrics")] + if let Some(metrics) = self.metrics.as_mut() { + metrics + .register_score_penalty(Penalty::BrokenPromise, broken_promises.len() as u64); + } + for (peer, count) in broken_promises { peer_score.add_penalty(&peer, count); - #[cfg(feature = "metrics")] - if let Some(metrics) = self.metrics.as_mut() { - metrics.register_score_penalty(Penalty::BrokenPromise); - } } } } diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 2519da64b73..ee3af5cd002 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -485,10 +485,10 @@ impl Metrics { } /// Register a score penalty. - pub(crate) fn register_score_penalty(&mut self, penalty: Penalty) { + pub(crate) fn register_score_penalty(&mut self, penalty: Penalty, inc: u64) { self.scoring_penalties .get_or_create(&PenaltyLabel { penalty }) - .inc(); + .inc_by(inc); } /// Registers that a message was published on a specific topic. diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 7995e08fba3..ad10dc0a714 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -223,139 +223,31 @@ impl PeerScore { /// Returns the score for a peer pub(crate) fn score(&self, peer_id: &PeerId) -> f64 { - let Some(peer_stats) = self.peer_stats.get(peer_id) else { - return 0.0; - }; - let mut score = 0.0; - - // topic scores - for (topic, topic_stats) in peer_stats.topics.iter() { - // topic parameters - if let Some(topic_params) = self.params.topics.get(topic) { - // we are tracking the topic - - // the topic score - let mut topic_score = 0.0; - - // P1: time in mesh - if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status { - let p1 = { - let v = mesh_time.as_secs_f64() - / topic_params.time_in_mesh_quantum.as_secs_f64(); - if v < topic_params.time_in_mesh_cap { - v - } else { - topic_params.time_in_mesh_cap - } - }; - topic_score += p1 * topic_params.time_in_mesh_weight; - } - - // P2: first message deliveries - let p2 = { - let v = topic_stats.first_message_deliveries; - if v < topic_params.first_message_deliveries_cap { - v - } else { - topic_params.first_message_deliveries_cap - } - }; - topic_score += p2 * topic_params.first_message_deliveries_weight; - - // P3: mesh message deliveries - if topic_stats.mesh_message_deliveries_active - && topic_stats.mesh_message_deliveries - < topic_params.mesh_message_deliveries_threshold - { - let deficit = topic_params.mesh_message_deliveries_threshold - - topic_stats.mesh_message_deliveries; - let p3 = deficit * deficit; - topic_score += p3 * topic_params.mesh_message_deliveries_weight; - tracing::debug!( - peer=%peer_id, - %topic, - %deficit, - penalty=%topic_score, - "[Penalty] The peer has a mesh deliveries deficit and will be penalized" - ); - } - - // P3b: - // NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so - // this detracts. - let p3b = topic_stats.mesh_failure_penalty; - topic_score += p3b * topic_params.mesh_failure_penalty_weight; - - // P4: invalid messages - // NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so - // this detracts. - let p4 = - topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries; - topic_score += p4 * topic_params.invalid_message_deliveries_weight; - - // update score, mixing with topic weight - score += topic_score * topic_params.topic_weight; - } - } - - // apply the topic score cap, if any - if self.params.topic_score_cap > 0f64 && score > self.params.topic_score_cap { - score = self.params.topic_score_cap; - } - - // P5: application-specific score - let p5 = peer_stats.application_score; - score += p5 * self.params.app_specific_weight; - - // P6: IP collocation factor - for ip in peer_stats.known_ips.iter() { - if self.params.ip_colocation_factor_whitelist.contains(ip) { - continue; - } - - // P6 has a cliff (ip_colocation_factor_threshold); it's only applied iff - // at least that many peers are connected to us from that source IP - // addr. It is quadratic, and the weight is negative (validated by - // peer_score_params.validate()). - if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) { - if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold { - let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; - let p6 = surplus * surplus; - tracing::debug!( - peer=%peer_id, - surplus_ip=%ip, - surplus=%surplus, - "[Penalty] The peer gets penalized because of too many peers with the same ip" - ); - score += p6 * self.params.ip_colocation_factor_weight; - } - } - } - - // P7: behavioural pattern penalty. - if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold { - let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold; - let p7 = excess * excess; - score += p7 * self.params.behaviour_penalty_weight; - } - - // Slow peer weighting. - if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold { - let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold; - score += excess * self.params.slow_peer_weight; - } - + let (score, _, _) = self.calculate_score(peer_id); score } /// Returns the score for a peer, logging metrics. This is called from the heartbeat and /// increments the metric counts for penalties. #[cfg(feature = "metrics")] - pub(crate) fn metric_score(&self, peer_id: &PeerId, mut metrics: Option<&mut Metrics>) -> f64 { + pub(crate) fn metric_score(&self, peer_id: &PeerId, metrics: Option<&mut Metrics>) -> f64 { + let (score, num_message_deficit, num_ip_colocation) = self.calculate_score(peer_id); + if let Some(m) = metrics { + m.register_score_penalty(Penalty::MessageDeficit, num_message_deficit); + m.register_score_penalty(Penalty::IPColocation, num_ip_colocation); + } + score + } + + fn calculate_score(&self, peer_id: &PeerId) -> (f64, u64, u64) { let Some(peer_stats) = self.peer_stats.get(peer_id) else { - return 0.0; + return (0.0, 0, 0); }; let mut score = 0.0; + #[cfg(feature = "metrics")] + let mut num_message_deficit_penalties = 0; + #[cfg(feature = "metrics")] + let mut num_ip_colocation_penalties = 0; // topic scores for (topic, topic_stats) in peer_stats.topics.iter() { @@ -400,8 +292,10 @@ impl PeerScore { - topic_stats.mesh_message_deliveries; let p3 = deficit * deficit; topic_score += p3 * topic_params.mesh_message_deliveries_weight; - if let Some(metrics) = metrics.as_mut() { - metrics.register_score_penalty(Penalty::MessageDeficit); + + #[cfg(feature = "metrics")] + { + num_message_deficit_penalties += 1; } tracing::debug!( peer=%peer_id, @@ -453,8 +347,9 @@ impl PeerScore { if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold { let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; let p6 = surplus * surplus; - if let Some(metrics) = metrics.as_mut() { - metrics.register_score_penalty(Penalty::IPColocation); + #[cfg(feature = "metrics")] + { + num_ip_colocation_penalties += 1; } tracing::debug!( peer=%peer_id, @@ -479,8 +374,14 @@ impl PeerScore { let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold; score += excess * self.params.slow_peer_weight; } - - score + #[cfg(feature = "metrics")] + return ( + score, + num_message_deficit_penalties, + num_ip_colocation_penalties, + ); + #[cfg(not(feature = "metrics"))] + return (score, 0, 0); } pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) { From 07646288605f0f638b9d7f9476b2912189a66f44 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 9 Dec 2024 14:21:07 +0800 Subject: [PATCH 10/15] remove unnecessary feature gates --- protocols/gossipsub/src/peer_score.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index ad10dc0a714..3374b2d3a50 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -244,9 +244,9 @@ impl PeerScore { return (0.0, 0, 0); }; let mut score = 0.0; - #[cfg(feature = "metrics")] + #[cfg_attr(not(feature = "metrics"), allow(unused_mut))] let mut num_message_deficit_penalties = 0; - #[cfg(feature = "metrics")] + #[cfg_attr(not(feature = "metrics"), allow(unused_mut))] let mut num_ip_colocation_penalties = 0; // topic scores @@ -374,14 +374,12 @@ impl PeerScore { let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold; score += excess * self.params.slow_peer_weight; } - #[cfg(feature = "metrics")] + return ( score, num_message_deficit_penalties, num_ip_colocation_penalties, ); - #[cfg(not(feature = "metrics"))] - return (score, 0, 0); } pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) { From ee4b77f4f80d7c15d9302de2f108f049f0586873 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 9 Dec 2024 22:14:57 +0800 Subject: [PATCH 11/15] remove unnecessary return --- protocols/gossipsub/src/peer_score.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 3374b2d3a50..599050b940b 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -375,11 +375,11 @@ impl PeerScore { score += excess * self.params.slow_peer_weight; } - return ( + ( score, num_message_deficit_penalties, num_ip_colocation_penalties, - ); + ) } pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) { From 343c6ae8e0938f9190aeb8cc3ae12f4c400c765b Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 9 Dec 2024 22:25:53 +0800 Subject: [PATCH 12/15] remove duplication of remove_peer_from_mesh --- protocols/gossipsub/src/behaviour.rs | 66 +++++++++++----------------- 1 file changed, 25 insertions(+), 41 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 704d0f2695c..444650a01fb 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1520,7 +1520,7 @@ where } #[cfg(feature = "metrics")] - fn remove_peer_from_mesh( + fn remove_peer_from_mesh_with_metric( &mut self, peer_id: &PeerId, topic_hash: &TopicHash, @@ -1528,55 +1528,25 @@ where always_update_backoff: bool, reason: Churn, ) { - let mut update_backoff = always_update_backoff; - if let Some(peers) = self.mesh.get_mut(topic_hash) { - // remove the peer if it exists in the mesh - if peers.remove(peer_id) { - tracing::debug!( - peer=%peer_id, - topic=%topic_hash, - "PRUNE: Removing peer from the mesh for topic" - ); - if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic_hash, reason, 1) - } - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.prune(peer_id, topic_hash.clone()); - } - - update_backoff = true; - - // inform the handler - peer_removed_from_mesh( - *peer_id, - topic_hash, - &self.mesh, - &mut self.events, - &self.connected_peers, - ); - } + let is_peer_removed = + self.remove_peer_from_mesh(peer_id, topic_hash, backoff, always_update_backoff); + if !is_peer_removed { + return; } - if update_backoff { - let time = if let Some(backoff) = backoff { - Duration::from_secs(backoff) - } else { - self.config.prune_backoff() - }; - // is there a backoff specified by the peer? if so obey it. - self.backoffs.update_backoff(topic_hash, peer_id, time); + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic_hash, reason, 1) } } - #[cfg(not(feature = "metrics"))] fn remove_peer_from_mesh( &mut self, peer_id: &PeerId, topic_hash: &TopicHash, backoff: Option, always_update_backoff: bool, - ) { + ) -> bool { let mut update_backoff = always_update_backoff; + let mut is_peer_removed = false; if let Some(peers) = self.mesh.get_mut(topic_hash) { // remove the peer if it exists in the mesh if peers.remove(peer_id) { @@ -1585,6 +1555,7 @@ where topic=%topic_hash, "PRUNE: Removing peer from the mesh for topic" ); + is_peer_removed = true; if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer_id, topic_hash.clone()); @@ -1611,6 +1582,7 @@ where // is there a backoff specified by the peer? if so obey it. self.backoffs.update_backoff(topic_hash, peer_id, time); } + is_peer_removed } /// Handles PRUNE control messages. Removes peer from the mesh. @@ -1624,7 +1596,13 @@ where self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { #[cfg(feature = "metrics")] - self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune); + self.remove_peer_from_mesh_with_metric( + peer_id, + &topic_hash, + backoff, + true, + Churn::Prune, + ); #[cfg(not(feature = "metrics"))] self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); @@ -2031,7 +2009,13 @@ where .get_mut(&topic_hash) .map(|peers| peers.remove(&peer_id)); #[cfg(feature = "metrics")] - self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub); + self.remove_peer_from_mesh_with_metric( + &peer_id, + &topic_hash, + None, + false, + Churn::Unsub, + ); #[cfg(not(feature = "metrics"))] self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); } From 7add7b7be2f2c13702c552a9e6599226dc6416c3 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 9 Dec 2024 22:29:29 +0800 Subject: [PATCH 13/15] reorder cfg flags --- protocols/gossipsub/src/rpc.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/rpc.rs b/protocols/gossipsub/src/rpc.rs index 675f37796d6..05049d9983c 100644 --- a/protocols/gossipsub/src/rpc.rs +++ b/protocols/gossipsub/src/rpc.rs @@ -96,14 +96,14 @@ impl Sender { sender.try_send(rpc).map_err(|err| err.into_inner()) } - #[cfg(feature = "metrics")] /// Returns the current size of the priority queue. + #[cfg(feature = "metrics")] pub(crate) fn priority_queue_len(&self) -> usize { self.len.load(Ordering::Relaxed) } - #[cfg(feature = "metrics")] /// Returns the current size of the non-priority queue. + #[cfg(feature = "metrics")] pub(crate) fn non_priority_queue_len(&self) -> usize { self.non_priority_sender.len() } From 51d3559151b7e60c153b0b29399d6e3c68b20ca1 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 9 Dec 2024 22:49:23 +0800 Subject: [PATCH 14/15] remove unnecessary gates by reordering code --- protocols/gossipsub/src/behaviour.rs | 35 +++++++++++++++------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 444650a01fb..bcfb5dd5103 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -980,12 +980,14 @@ where // remove the last published time self.fanout_last_pub.remove(topic_hash); - } - #[cfg(feature = "metrics")] - let fanaout_added = added_peers.len(); - #[cfg(feature = "metrics")] - if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) + + // Record how many peers we've added through fanout + #[cfg(feature = "metrics")] + if let Some(m) = self.metrics.as_mut() { + // The number can be lower than `add_peers` depending on the length of `peers` + let fanaout_added = added_peers.len(); + m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) + } } // check if we need to get more peers, which we randomly select @@ -1008,16 +1010,15 @@ where "JOIN: Inserting {:?} random peers into the mesh", new_peers.len() ); + // Record how many random peers we've added when we need more peers. + #[cfg(feature = "metrics")] + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Random, new_peers.len()) + } let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default(); mesh_peers.extend(new_peers); } - #[cfg(feature = "metrics")] - if let Some(m) = self.metrics.as_mut() { - let random_added = added_peers.len() - fanaout_added; - m.peers_included(topic_hash, Inclusion::Random, random_added) - } - for peer_id in added_peers { // Send a GRAFT control message tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer"); @@ -1042,10 +1043,11 @@ where } #[cfg(feature = "metrics")] - let mesh_peers = self.mesh_peers(topic_hash).count(); - #[cfg(feature = "metrics")] - if let Some(m) = self.metrics.as_mut() { - m.set_mesh_peers(topic_hash, mesh_peers) + { + let mesh_peers = self.mesh_peers(topic_hash).count(); + if let Some(m) = self.metrics.as_mut() { + m.set_mesh_peers(topic_hash, mesh_peers) + } } tracing::debug!(topic=%topic_hash, "Completed JOIN for topic"); @@ -2476,6 +2478,7 @@ where self.failed_messages.shrink_to_fit(); tracing::debug!("Completed Heartbeat"); + // Record how long we take to finish the heartbeat #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); From 6aef7a5521e665d42a15553afacba46010fa1126 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 10 Dec 2024 08:21:34 +0800 Subject: [PATCH 15/15] changelog --- protocols/gossipsub/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 0bfee4d3e91..94bf9e77343 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,8 @@ +## unreleased + +- Make `prometheus-client` an optional dependency with feature `metrics`. + See [PR 5711](https://github.com/libp2p/rust-libp2p/pull/5711). + ## 0.48.0 - Correct state inconsistencies with the mesh and fanout when unsubscribing.