Skip to content

Commit

Permalink
Merge branch 'master' into gossipsub-invalid-message
Browse files Browse the repository at this point in the history
  • Loading branch information
drHuangMHT authored Jan 5, 2025
2 parents 135cc97 + 7e3086d commit d592634
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 66 deletions.
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## 0.48.0

- Allow broadcasting `IDONTWANT` messages when publishing to avoid downloading data that is already available.
See [PR 5773](https://github.com/libp2p/rust-libp2p/pull/5773)

- Add configurable `idontwant_message_size_threshold` parameter.
See [PR 5770](https://github.com/libp2p/rust-libp2p/pull/5770)

Expand Down
15 changes: 11 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,13 @@ where
return Err(PublishError::AllQueuesFull(recipient_peers.len()));
}

// Broadcast IDONTWANT messages
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
&& self.config.idontwant_on_publish()
{
self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref());
}

tracing::debug!(message=%msg_id, "Published message");

if let Some(metrics) = self.metrics.as_mut() {
Expand Down Expand Up @@ -1748,7 +1755,7 @@ where

// Broadcast IDONTWANT messages
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
self.send_idontwant(&raw_message, &msg_id, propagation_source);
self.send_idontwant(&raw_message, &msg_id, Some(propagation_source));
}

// Check the validity of the message
Expand Down Expand Up @@ -2594,7 +2601,7 @@ where
&mut self,
message: &RawMessage,
msg_id: &MessageId,
propagation_source: &PeerId,
propagation_source: Option<&PeerId>,
) {
let Some(mesh_peers) = self.mesh.get(&message.topic) else {
return;
Expand All @@ -2605,8 +2612,8 @@ where
let recipient_peers: Vec<PeerId> = mesh_peers
.iter()
.chain(iwant_peers.iter())
.filter(|peer_id| {
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
.filter(|&peer_id| {
Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref()
})
.cloned()
.collect();
Expand Down
42 changes: 30 additions & 12 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub struct Config {
connection_handler_publish_duration: Duration,
connection_handler_forward_duration: Duration,
idontwant_message_size_threshold: usize,
idontwant_on_publish: bool,
}

impl Config {
Expand Down Expand Up @@ -373,15 +374,22 @@ impl Config {
self.connection_handler_forward_duration
}

// The message size threshold for which IDONTWANT messages are sent.
// Sending IDONTWANT messages for small messages can have a negative effect to the overall
// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
// (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message)
// default is 1kB
/// The message size threshold for which IDONTWANT messages are sent.
/// Sending IDONTWANT messages for small messages can have a negative effect to the overall
/// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
/// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
/// (see <https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message>)
/// default is 1kB
pub fn idontwant_message_size_threshold(&self) -> usize {
self.idontwant_message_size_threshold
}

/// Send IDONTWANT messages after publishing message on gossip. This is an optimisation
/// to avoid bandwidth consumption by downloading the published message over gossip.
/// By default it is false.
pub fn idontwant_on_publish(&self) -> bool {
self.idontwant_on_publish
}
}

impl Default for Config {
Expand Down Expand Up @@ -455,6 +463,7 @@ impl Default for ConfigBuilder {
connection_handler_publish_duration: Duration::from_secs(5),
connection_handler_forward_duration: Duration::from_secs(1),
idontwant_message_size_threshold: 1000,
idontwant_on_publish: false,
},
invalid_protocol: false,
}
Expand Down Expand Up @@ -841,17 +850,25 @@ impl ConfigBuilder {
self
}

// The message size threshold for which IDONTWANT messages are sent.
// Sending IDONTWANT messages for small messages can have a negative effect to the overall
// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
// (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message)
// default is 1kB
/// The message size threshold for which IDONTWANT messages are sent.
/// Sending IDONTWANT messages for small messages can have a negative effect to the overall
/// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
/// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
/// (see <https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message>)
/// default is 1kB
pub fn idontwant_message_size_threshold(&mut self, size: usize) -> &mut Self {
self.config.idontwant_message_size_threshold = size;
self
}

/// Send IDONTWANT messages after publishing message on gossip. This is an optimisation
/// to avoid bandwidth consumption by downloading the published message over gossip.
/// By default it is false.
pub fn idontwant_on_publish(&mut self, idontwant_on_publish: bool) -> &mut Self {
self.config.idontwant_on_publish = idontwant_on_publish;
self
}

/// Constructs a [`Config`] from the given configuration and validates the settings.
pub fn build(&self) -> Result<Config, ConfigBuilderError> {
// check all constraints on config
Expand Down Expand Up @@ -926,6 +943,7 @@ impl std::fmt::Debug for Config {
"idontwant_message_size_threhold",
&self.idontwant_message_size_threshold,
);
let _ = builder.field("idontwant_on_publish", &self.idontwant_on_publish);
builder.finish()
}
}
Expand Down
2 changes: 2 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
See [PR 5705](https://github.com/libp2p/rust-libp2p/pull/5705).
- Fix systematic memory allocation when iterating over `KBuckets`.
See [PR 5715](https://github.com/libp2p/rust-libp2p/pull/5715).
- Remove deprecated default constructor for `ProtocolConfig`.
See [PR 5774](https://github.com/libp2p/rust-libp2p/pull/5774).

## 0.46.2

Expand Down
23 changes: 0 additions & 23 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,29 +238,6 @@ impl Config {
}
}

/// Returns the default configuration.
#[deprecated(note = "Use `Config::new` instead")]
#[allow(clippy::should_implement_trait)]
pub fn default() -> Self {
Default::default()
}

/// Sets custom protocol names.
///
/// Kademlia nodes only communicate with other nodes using the same protocol
/// name. Using custom name(s) therefore allows to segregate the DHT from
/// others, if that is desired.
///
/// More than one protocol name can be supplied. In this case the node will
/// be able to talk to other nodes supporting any of the provided names.
/// Multiple names must be used with caution to avoid network partitioning.
#[deprecated(note = "Use `Config::new` instead")]
#[allow(deprecated)]
pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
self.protocol_config.set_protocol_names(names);
self
}

/// Sets the timeout for a single query.
///
/// > **Note**: A single query usually comprises at least as many requests
Expand Down
28 changes: 1 addition & 27 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! to poll the underlying transport for incoming messages, and the `Sink` component
//! is used to send messages to remote peers.
use std::{io, iter, marker::PhantomData, time::Duration};
use std::{io, marker::PhantomData, time::Duration};

use asynchronous_codec::{Decoder, Encoder, Framed};
use bytes::BytesMut;
Expand Down Expand Up @@ -156,43 +156,17 @@ impl ProtocolConfig {
}
}

/// Returns the default configuration.
#[deprecated(note = "Use `ProtocolConfig::new` instead")]
#[allow(clippy::should_implement_trait)]
pub fn default() -> Self {
Default::default()
}

/// Returns the configured protocol name.
pub fn protocol_names(&self) -> &[StreamProtocol] {
&self.protocol_names
}

/// Modifies the protocol names used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
#[deprecated(note = "Use `ProtocolConfig::new` instead")]
pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) {
self.protocol_names = names;
}

/// Modifies the maximum allowed size of a single Kademlia packet.
pub fn set_max_packet_size(&mut self, size: usize) {
self.max_packet_size = size;
}
}

impl Default for ProtocolConfig {
/// Returns the default configuration.
///
/// Deprecated: use `ProtocolConfig::new` instead.
fn default() -> Self {
ProtocolConfig {
protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(),
max_packet_size: DEFAULT_MAX_PACKET_SIZE,
}
}
}

impl UpgradeInfo for ProtocolConfig {
type Info = StreamProtocol;
type InfoIter = std::vec::IntoIter<Self::Info>;
Expand Down

0 comments on commit d592634

Please sign in to comment.