diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index e9663c4c39c..7ba03c38cc7 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,8 @@ +## unreleased + +- Make `prometheus-client` an optional dependency with feature `metrics`. + See [PR 5711](https://github.com/libp2p/rust-libp2p/pull/5711). + ## 0.48.0 - Correct state inconsistencies with the mesh and fanout when unsubscribing. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index d48993b331e..2f80177ac5d 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"] [features] wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"] +metrics = ["prometheus-client"] [dependencies] async-channel = "2.3.1" @@ -38,7 +39,7 @@ sha2 = "0.10.8" tracing = { workspace = true } # Metrics dependencies -prometheus-client = { workspace = true } +prometheus-client = { workspace = true, optional = true } [dev-dependencies] libp2p-core = { workspace = true } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 954e87ee470..3052a1cd98c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -42,18 +42,20 @@ use libp2p_swarm::{ ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; +#[cfg(feature = "metrics")] use prometheus_client::registry::Registry; use quick_protobuf::{MessageWrite, Writer}; use rand::{seq::SliceRandom, thread_rng}; use web_time::{Instant, SystemTime}; +#[cfg(feature = "metrics")] +use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::{ backoff::BackoffStorage, config::{Config, ValidationMode}, gossip_promises::GossipPromises, handler::{Handler, HandlerEvent, HandlerIn}, mcache::MessageCache, - metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}, peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}, protocol::SIGNING_PREFIX, rpc::Sender, @@ -327,6 +329,7 @@ pub struct Behaviour { data_transform: D, /// Keep track of a set of internal metrics relating to gossipsub. + #[cfg(feature = "metrics")] metrics: Option, /// Tracks the numbers of failed messages per peer-id. @@ -339,33 +342,27 @@ where F: TopicSubscriptionFilter + Default, { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`]. This has no subscription filter and uses no compression. + /// [`Config`]. This has no subscription filter and uses no compression. + /// Metrics are disabled by default. pub fn new(privacy: MessageAuthenticity, config: Config) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, - None, F::default(), D::default(), ) } - /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`]. This has no subscription filter and uses no compression. + /// Allow the behaviour to also provide metric data. /// Metrics can be evaluated by passing a reference to a [`Registry`]. - pub fn new_with_metrics( - privacy: MessageAuthenticity, - config: Config, + #[cfg(feature = "metrics")] + pub fn with_metrics( + mut self, metrics_registry: &mut Registry, metrics_config: MetricsConfig, - ) -> Result { - Self::new_with_subscription_filter_and_transform( - privacy, - config, - Some((metrics_registry, metrics_config)), - F::default(), - D::default(), - ) + ) -> Self { + self.metrics = Some(Metrics::new(metrics_registry, metrics_config)); + self } } @@ -375,17 +372,16 @@ where F: TopicSubscriptionFilter, { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`] and a custom subscription filter. + /// [`Config`] and a custom subscription filter. + /// Metrics are disabled by default. pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: Config, - metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, - metrics, subscription_filter, D::default(), ) @@ -398,17 +394,16 @@ where F: TopicSubscriptionFilter + Default, { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a - /// [`Config`] and a custom data transform. + /// [`Config`] and a custom data transform. + /// Metrics are disabled by default. pub fn new_with_transform( privacy: MessageAuthenticity, config: Config, - metrics: Option<(&mut Registry, MetricsConfig)>, data_transform: D, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, - metrics, F::default(), data_transform, ) @@ -422,10 +417,10 @@ where { /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a /// [`Config`] and a custom subscription filter and data transform. + /// Metrics are disabled by default. pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: Config, - metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, data_transform: D, ) -> Result { @@ -436,7 +431,8 @@ where validate_config(&privacy, config.validation_mode())?; Ok(Behaviour { - metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)), + #[cfg(feature = "metrics")] + metrics: None, events: VecDeque::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), @@ -747,6 +743,7 @@ where tracing::debug!(message=%msg_id, "Published message"); + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_published_message(&topic_hash); } @@ -790,6 +787,7 @@ where message=%msg_id, "Message not in cache. Ignoring forwarding" ); + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.memcache_miss(); } @@ -797,6 +795,7 @@ where } }; + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_msg_validation(&raw_message.topic, &acceptance); } @@ -814,6 +813,7 @@ where }; if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) { + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_msg_validation(&raw_message.topic, &acceptance); } @@ -943,7 +943,7 @@ where } let mut added_peers = HashSet::new(); - + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.joined(topic_hash) } @@ -980,11 +980,14 @@ where // remove the last published time self.fanout_last_pub.remove(topic_hash); - } - let fanaout_added = added_peers.len(); - if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) + // Record how many peers we've added through fanout + #[cfg(feature = "metrics")] + if let Some(m) = self.metrics.as_mut() { + // The number can be lower than `add_peers` depending on the length of `peers` + let fanaout_added = added_peers.len(); + m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) + } } // check if we need to get more peers, which we randomly select @@ -1007,15 +1010,15 @@ where "JOIN: Inserting {:?} random peers into the mesh", new_peers.len() ); + // Record how many random peers we've added when we need more peers. + #[cfg(feature = "metrics")] + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Random, new_peers.len()) + } let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default(); mesh_peers.extend(new_peers); } - let random_added = added_peers.len() - fanaout_added; - if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Random, random_added) - } - for peer_id in added_peers { // Send a GRAFT control message tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer"); @@ -1039,9 +1042,12 @@ where ); } - let mesh_peers = self.mesh_peers(topic_hash).count(); - if let Some(m) = self.metrics.as_mut() { - m.set_mesh_peers(topic_hash, mesh_peers) + #[cfg(feature = "metrics")] + { + let mesh_peers = self.mesh_peers(topic_hash).count(); + if let Some(m) = self.metrics.as_mut() { + m.set_mesh_peers(topic_hash, mesh_peers) + } } tracing::debug!(topic=%topic_hash, "Completed JOIN for topic"); @@ -1114,6 +1120,7 @@ where // If our mesh contains the topic, send prune to peers and delete it from the mesh if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.left(topic_hash) } @@ -1242,6 +1249,7 @@ where // have not seen this message and are not currently requesting it if iwant_ids.insert(id) { // Register the IWANT metric + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_iwant(&topic); } @@ -1357,6 +1365,7 @@ where // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { if connected_peer.topics.insert(topic.clone()) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.inc_topic_peers(topic); } @@ -1373,6 +1382,8 @@ where } else { let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0); let now = Instant::now(); + #[cfg(feature = "metrics")] + let mut num_graft_backoff_penalties = 0; for topic_hash in topics { if let Some(peers) = self.mesh.get_mut(&topic_hash) { // if the peer is already in the mesh ignore the graft @@ -1395,8 +1406,9 @@ where ); // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { - if let Some(metrics) = self.metrics.as_mut() { - metrics.register_score_penalty(Penalty::GraftBackoff); + #[cfg(feature = "metrics")] + { + num_graft_backoff_penalties += 1; } peer_score.add_penalty(peer_id, 1); @@ -1453,6 +1465,7 @@ where ); if peers.insert(*peer_id) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(&topic_hash, Inclusion::Subscribed, 1) } @@ -1482,6 +1495,10 @@ where continue; } } + #[cfg(feature = "metrics")] + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_score_penalty(Penalty::GraftBackoff, num_graft_backoff_penalties); + } } if !to_prune_topics.is_empty() { @@ -1504,7 +1521,8 @@ where tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer"); } - fn remove_peer_from_mesh( + #[cfg(feature = "metrics")] + fn remove_peer_from_mesh_with_metric( &mut self, peer_id: &PeerId, topic_hash: &TopicHash, @@ -1512,7 +1530,25 @@ where always_update_backoff: bool, reason: Churn, ) { + let is_peer_removed = + self.remove_peer_from_mesh(peer_id, topic_hash, backoff, always_update_backoff); + if !is_peer_removed { + return; + } + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic_hash, reason, 1) + } + } + + fn remove_peer_from_mesh( + &mut self, + peer_id: &PeerId, + topic_hash: &TopicHash, + backoff: Option, + always_update_backoff: bool, + ) -> bool { let mut update_backoff = always_update_backoff; + let mut is_peer_removed = false; if let Some(peers) = self.mesh.get_mut(topic_hash) { // remove the peer if it exists in the mesh if peers.remove(peer_id) { @@ -1521,9 +1557,7 @@ where topic=%topic_hash, "PRUNE: Removing peer from the mesh for topic" ); - if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic_hash, reason, 1) - } + is_peer_removed = true; if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer_id, topic_hash.clone()); @@ -1550,6 +1584,7 @@ where // is there a backoff specified by the peer? if so obey it. self.backoffs.update_backoff(topic_hash, peer_id, time); } + is_peer_removed } /// Handles PRUNE control messages. Removes peer from the mesh. @@ -1562,7 +1597,16 @@ where let (below_threshold, score) = self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { - self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune); + #[cfg(feature = "metrics")] + self.remove_peer_from_mesh_with_metric( + peer_id, + &topic_hash, + backoff, + true, + Churn::Prune, + ); + #[cfg(not(feature = "metrics"))] + self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); if self.mesh.contains_key(&topic_hash) { // connect to px peers @@ -1709,6 +1753,7 @@ where propagation_source: &PeerId, ) { // Record the received metric + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len()); } @@ -1752,6 +1797,7 @@ where ); // Record the received message with the metrics + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.msg_recvd(&message.topic); } @@ -1803,6 +1849,7 @@ where reject_reason: RejectReason, ) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.register_invalid_message(&raw_message.topic); } @@ -1883,6 +1930,7 @@ where "SUBSCRIPTION: Adding gossip peer to topic" ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.inc_topic_peers(topic_hash); } @@ -1910,6 +1958,7 @@ where topic=%topic_hash, "SUBSCRIPTION: Adding peer to the mesh for topic" ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Subscribed, 1) } @@ -1940,6 +1989,7 @@ where "SUBSCRIPTION: Removing gossip peer from topic" ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.dec_topic_peers(topic_hash); } @@ -1960,7 +2010,16 @@ where self.fanout .get_mut(&topic_hash) .map(|peers| peers.remove(&peer_id)); - self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub); + #[cfg(feature = "metrics")] + self.remove_peer_from_mesh_with_metric( + &peer_id, + &topic_hash, + None, + false, + Churn::Unsub, + ); + #[cfg(not(feature = "metrics"))] + self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); } // Potentially inform the handler if we have added this peer to a mesh for the first time. @@ -1995,11 +2054,14 @@ where /// Applies penalties to peers that did not respond to our IWANT requests. fn apply_iwant_penalties(&mut self) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - for (peer, count) in gossip_promises.get_broken_promises() { + let broken_promises = gossip_promises.get_broken_promises(); + #[cfg(feature = "metrics")] + if let Some(metrics) = self.metrics.as_mut() { + metrics + .register_score_penalty(Penalty::BrokenPromise, broken_promises.len() as u64); + } + for (peer, count) in broken_promises { peer_score.add_penalty(&peer, count); - if let Some(metrics) = self.metrics.as_mut() { - metrics.register_score_penalty(Penalty::BrokenPromise); - } } } } @@ -2007,11 +2069,13 @@ where /// Heartbeat function which shifts the memcache and updates the mesh. fn heartbeat(&mut self) { tracing::debug!("Starting heartbeat"); + #[cfg(feature = "metrics")] let start = Instant::now(); // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally // before we add all the gossip from this heartbeat in order to gain a true measure of // steady-state size of the queues. + #[cfg(feature = "metrics")] if let Some(m) = &mut self.metrics { for sender_queue in self.connected_peers.values().map(|v| &v.sender) { m.observe_priority_queue_size(sender_queue.priority_queue_len()); @@ -2046,9 +2110,13 @@ where let mut scores = HashMap::with_capacity(self.connected_peers.len()); if let Some((peer_score, ..)) = &self.peer_score { for peer_id in self.connected_peers.keys() { - scores - .entry(peer_id) - .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut())); + scores.entry(peer_id).or_insert_with(|| { + #[cfg(feature = "metrics")] + let score = peer_score.metric_score(peer_id, self.metrics.as_mut()); + #[cfg(not(feature = "metrics"))] + let score = peer_score.score(peer_id); + score + }); } } @@ -2066,6 +2134,7 @@ where let peer_score = *scores.get(peer_id).unwrap_or(&0.0); // Record the score per mesh + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.observe_mesh_peers_score(topic_hash, peer_score); } @@ -2085,6 +2154,7 @@ where } } + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len()) } @@ -2116,6 +2186,7 @@ where } // update the mesh tracing::debug!("Updating mesh, new mesh: {:?}", peer_list); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) } @@ -2177,6 +2248,7 @@ where removed += 1; } + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_removed(topic_hash, Churn::Excess, removed) } @@ -2204,6 +2276,7 @@ where } // update the mesh tracing::debug!("Updating mesh, new mesh: {:?}", peer_list); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len()) } @@ -2274,6 +2347,7 @@ where "Opportunistically graft in topic with peers {:?}", peer_list ); + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) } @@ -2282,6 +2356,7 @@ where } } // Register the final count of peers in the mesh + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.set_mesh_peers(topic_hash, peers.len()) } @@ -2403,6 +2478,8 @@ where self.failed_messages.shrink_to_fit(); tracing::debug!("Completed Heartbeat"); + // Record how long we take to finish the heartbeat + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); metrics.observe_heartbeat_duration(duration); @@ -2723,6 +2800,7 @@ where /// sending the message failed due to the channel to the connection handler being /// full (which indicates a slow peer). fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc { // register bytes sent on the internal metrics. @@ -2891,6 +2969,7 @@ where if let Some(mesh_peers) = self.mesh.get_mut(topic) { // check if the peer is in the mesh and remove it if mesh_peers.remove(&peer_id) { + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_removed(topic, Churn::Dc, 1); m.set_mesh_peers(topic, mesh_peers.len()); @@ -2898,6 +2977,7 @@ where }; } + #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.dec_topic_peers(topic); } @@ -2913,6 +2993,7 @@ where self.outbound_peers.remove(&peer_id); // If metrics are enabled, register the disconnection of a peer based on its protocol. + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.peer_protocol_disconnected(connected_peer.kind.clone()); } @@ -3040,6 +3121,7 @@ where HandlerEvent::PeerKind(kind) => { // We have identified the protocol this peer is using + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { metrics.peer_protocol_connected(kind.clone()); } @@ -3087,6 +3169,7 @@ where } // Record metrics on the failure. + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { match rpc { RpcOut::Publish { message, .. } => { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index eaa983d214d..ba5d9abcdaa 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -70,7 +70,6 @@ where let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, - None, self.subscription_filter, self.data_transform, ) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 87db1b771d1..ae614367e14 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -101,6 +101,7 @@ mod error; mod gossip_promises; mod handler; mod mcache; +#[cfg(feature = "metrics")] mod metrics; mod peer_score; mod protocol; @@ -112,11 +113,13 @@ mod topic; mod transform; mod types; +#[cfg(feature = "metrics")] +pub use metrics::Config as MetricsConfig; + pub use self::{ behaviour::{Behaviour, Event, MessageAuthenticity}, config::{Config, ConfigBuilder, ValidationMode, Version}, error::{ConfigBuilderError, PublishError, SubscriptionError, ValidationError}, - metrics::Config as MetricsConfig, peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 2519da64b73..ee3af5cd002 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -485,10 +485,10 @@ impl Metrics { } /// Register a score penalty. - pub(crate) fn register_score_penalty(&mut self, penalty: Penalty) { + pub(crate) fn register_score_penalty(&mut self, penalty: Penalty, inc: u64) { self.scoring_penalties .get_or_create(&PenaltyLabel { penalty }) - .inc(); + .inc_by(inc); } /// Registers that a message was published on a specific topic. diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 33573ebeacc..599050b940b 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -29,11 +29,9 @@ use std::{ use libp2p_identity::PeerId; use web_time::Instant; -use crate::{ - metrics::{Metrics, Penalty}, - time_cache::TimeCache, - MessageId, TopicHash, -}; +#[cfg(feature = "metrics")] +use crate::metrics::{Metrics, Penalty}; +use crate::{time_cache::TimeCache, MessageId, TopicHash}; mod params; pub use params::{ @@ -225,16 +223,31 @@ impl PeerScore { /// Returns the score for a peer pub(crate) fn score(&self, peer_id: &PeerId) -> f64 { - self.metric_score(peer_id, None) + let (score, _, _) = self.calculate_score(peer_id); + score } /// Returns the score for a peer, logging metrics. This is called from the heartbeat and /// increments the metric counts for penalties. - pub(crate) fn metric_score(&self, peer_id: &PeerId, mut metrics: Option<&mut Metrics>) -> f64 { + #[cfg(feature = "metrics")] + pub(crate) fn metric_score(&self, peer_id: &PeerId, metrics: Option<&mut Metrics>) -> f64 { + let (score, num_message_deficit, num_ip_colocation) = self.calculate_score(peer_id); + if let Some(m) = metrics { + m.register_score_penalty(Penalty::MessageDeficit, num_message_deficit); + m.register_score_penalty(Penalty::IPColocation, num_ip_colocation); + } + score + } + + fn calculate_score(&self, peer_id: &PeerId) -> (f64, u64, u64) { let Some(peer_stats) = self.peer_stats.get(peer_id) else { - return 0.0; + return (0.0, 0, 0); }; let mut score = 0.0; + #[cfg_attr(not(feature = "metrics"), allow(unused_mut))] + let mut num_message_deficit_penalties = 0; + #[cfg_attr(not(feature = "metrics"), allow(unused_mut))] + let mut num_ip_colocation_penalties = 0; // topic scores for (topic, topic_stats) in peer_stats.topics.iter() { @@ -279,8 +292,10 @@ impl PeerScore { - topic_stats.mesh_message_deliveries; let p3 = deficit * deficit; topic_score += p3 * topic_params.mesh_message_deliveries_weight; - if let Some(metrics) = metrics.as_mut() { - metrics.register_score_penalty(Penalty::MessageDeficit); + + #[cfg(feature = "metrics")] + { + num_message_deficit_penalties += 1; } tracing::debug!( peer=%peer_id, @@ -332,8 +347,9 @@ impl PeerScore { if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold { let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; let p6 = surplus * surplus; - if let Some(metrics) = metrics.as_mut() { - metrics.register_score_penalty(Penalty::IPColocation); + #[cfg(feature = "metrics")] + { + num_ip_colocation_penalties += 1; } tracing::debug!( peer=%peer_id, @@ -359,7 +375,11 @@ impl PeerScore { score += excess * self.params.slow_peer_weight; } - score + ( + score, + num_message_deficit_penalties, + num_ip_colocation_penalties, + ) } pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) { diff --git a/protocols/gossipsub/src/rpc.rs b/protocols/gossipsub/src/rpc.rs index b5f05c7b2e5..05049d9983c 100644 --- a/protocols/gossipsub/src/rpc.rs +++ b/protocols/gossipsub/src/rpc.rs @@ -97,11 +97,13 @@ impl Sender { } /// Returns the current size of the priority queue. + #[cfg(feature = "metrics")] pub(crate) fn priority_queue_len(&self) -> usize { self.len.load(Ordering::Relaxed) } /// Returns the current size of the non-priority queue. + #[cfg(feature = "metrics")] pub(crate) fn non_priority_queue_len(&self) -> usize { self.non_priority_sender.len() } diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index 4793c23a8e1..c0a03c0a5a7 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -21,7 +21,6 @@ use std::fmt; use base64::prelude::*; -use prometheus_client::encoding::EncodeLabelSet; use quick_protobuf::Writer; use sha2::{Digest, Sha256}; @@ -66,7 +65,11 @@ impl Hasher for Sha256Hash { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, EncodeLabelSet)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[cfg_attr( + feature = "metrics", + derive(prometheus_client::encoding::EncodeLabelSet) +)] pub struct TopicHash { /// The topic hash. Stored as a string to align with the protobuf API. hash: String, diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index bcb1f279ae5..db465b54339 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -24,7 +24,6 @@ use std::{collections::BTreeSet, fmt, fmt::Debug}; use futures_timer::Delay; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; -use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -112,7 +111,11 @@ pub(crate) struct PeerConnections { } /// Describes the types of peers that can exist in the gossipsub context. -#[derive(Debug, Clone, PartialEq, Hash, EncodeLabelValue, Eq)] +#[derive(Debug, Clone, PartialEq, Hash, Eq)] +#[cfg_attr( + feature = "metrics", + derive(prometheus_client::encoding::EncodeLabelValue) +)] pub enum PeerKind { /// A gossipsub 1.1 peer. Gossipsubv1_1,