Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: broadcasting idontwant for published messages #5773

Merged
merged 11 commits into from
Jan 4, 2025
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## 0.48.0

- Broadcast `IDONTWANT` on publish to avoid downloading data that is already available.
jxs marked this conversation as resolved.
Show resolved Hide resolved
See [PR 5773](https://github.com/libp2p/rust-libp2p/pull/5773)

- Add configurable `idontwant_message_size_threshold` parameter.
See [PR 5770](https://github.com/libp2p/rust-libp2p/pull/5770)

Expand Down
15 changes: 11 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,13 @@ where
return Err(PublishError::AllQueuesFull(recipient_peers.len()));
}

// Broadcast IDONTWANT messages
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
&& self.config.idontwant_on_publish()
{
self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref());
}

tracing::debug!(message=%msg_id, "Published message");

if let Some(metrics) = self.metrics.as_mut() {
Expand Down Expand Up @@ -1741,7 +1748,7 @@ where

// Broadcast IDONTWANT messages
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
self.send_idontwant(&raw_message, &msg_id, propagation_source);
self.send_idontwant(&raw_message, &msg_id, Some(propagation_source));
}

// Check the validity of the message
Expand Down Expand Up @@ -2600,7 +2607,7 @@ where
&mut self,
message: &RawMessage,
msg_id: &MessageId,
propagation_source: &PeerId,
propagation_source: Option<&PeerId>,
) {
let Some(mesh_peers) = self.mesh.get(&message.topic) else {
return;
Expand All @@ -2611,8 +2618,8 @@ where
let recipient_peers: Vec<PeerId> = mesh_peers
.iter()
.chain(iwant_peers.iter())
.filter(|peer_id| {
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
.filter(|&peer_id| {
Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref()
})
.cloned()
.collect();
Expand Down
42 changes: 30 additions & 12 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub struct Config {
connection_handler_publish_duration: Duration,
connection_handler_forward_duration: Duration,
idontwant_message_size_threshold: usize,
idontwant_on_publish: bool,
}

impl Config {
Expand Down Expand Up @@ -373,15 +374,22 @@ impl Config {
self.connection_handler_forward_duration
}

// The message size threshold for which IDONTWANT messages are sent.
// Sending IDONTWANT messages for small messages can have a negative effect to the overall
// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
// (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message)
// default is 1kB
/// The message size threshold for which IDONTWANT messages are sent.
/// Sending IDONTWANT messages for small messages can have a negative effect to the overall
/// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
/// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
/// (see <https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message>)
/// default is 1kB
pub fn idontwant_message_size_threshold(&self) -> usize {
self.idontwant_message_size_threshold
}

/// Send IDONTWANT messages after publishing message on gossip. This is an optimisation
/// to avoid bandwidth consumption by downloading the published message over gossip.
/// default is false
jxs marked this conversation as resolved.
Show resolved Hide resolved
pub fn idontwant_on_publish(&self) -> bool {
self.idontwant_on_publish
}
}

impl Default for Config {
Expand Down Expand Up @@ -455,6 +463,7 @@ impl Default for ConfigBuilder {
connection_handler_publish_duration: Duration::from_secs(5),
connection_handler_forward_duration: Duration::from_secs(1),
idontwant_message_size_threshold: 1000,
idontwant_on_publish: false,
},
invalid_protocol: false,
}
Expand Down Expand Up @@ -841,17 +850,25 @@ impl ConfigBuilder {
self
}

// The message size threshold for which IDONTWANT messages are sent.
// Sending IDONTWANT messages for small messages can have a negative effect to the overall
// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
// (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message)
// default is 1kB
/// The message size threshold for which IDONTWANT messages are sent.
/// Sending IDONTWANT messages for small messages can have a negative effect to the overall
/// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
/// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
/// (see <https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message>)
/// default is 1kB
pub fn idontwant_message_size_threshold(&mut self, size: usize) -> &mut Self {
self.config.idontwant_message_size_threshold = size;
self
}

/// Send IDONTWANT messages after publishing message on gossip. This is an optimisation
/// to avoid bandwidth consumption by downloading the published message over gossip.
/// default is false
jxs marked this conversation as resolved.
Show resolved Hide resolved
pub fn idontwant_on_publish(&mut self, idontwant_on_publish: bool) -> &mut Self {
self.config.idontwant_on_publish = idontwant_on_publish;
self
}

/// Constructs a [`Config`] from the given configuration and validates the settings.
pub fn build(&self) -> Result<Config, ConfigBuilderError> {
// check all constraints on config
Expand Down Expand Up @@ -926,6 +943,7 @@ impl std::fmt::Debug for Config {
"idontwant_message_size_threhold",
&self.idontwant_message_size_threshold,
);
let _ = builder.field("idontwant_on_publish", &self.idontwant_on_publish);
builder.finish()
}
}
Expand Down
Loading