Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

client/network: Add peers to DHT only if protocols match #6549

Merged
merged 14 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
use codec::Encode as _;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::identify::IdentifyInfo;
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use log::debug;
Expand Down Expand Up @@ -364,16 +365,28 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<finality_requests::Even
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent>
for Behaviour<B, H> {
fn inject_event(&mut self, event: peer_info::PeerInfoEvent) {
let peer_info::PeerInfoEvent::Identified { peer_id, mut info } = event;
if info.listen_addrs.len() > 30 {
debug!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \
it is identified by {:?} and {:?}", peer_id, info.protocol_version,
info.agent_version
let peer_info::PeerInfoEvent::Identified {
peer_id,
info: IdentifyInfo {
protocol_version,
agent_version,
mut listen_addrs,
protocols,
..
},
} = event;

if listen_addrs.len() > 30 {
debug!(
target: "sub-libp2p",
"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
peer_id, protocol_version, agent_version
);
info.listen_addrs.truncate(30);
listen_addrs.truncate(30);
}
for addr in &info.listen_addrs {
self.discovery.add_self_reported_address(&peer_id, addr.clone());

for addr in listen_addrs {
self.discovery.add_self_reported_address(&peer_id, protocols.iter(), addr);
}
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
Expand Down
237 changes: 191 additions & 46 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use ip_network::IpNetwork;
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use libp2p::swarm::protocols_handler::multi::MultiHandler;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record};
use libp2p::kad::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record};
use libp2p::kad::GetClosestPeersError;
use libp2p::kad::handler::KademliaHandler;
use libp2p::kad::QueryId;
Expand Down Expand Up @@ -137,17 +137,9 @@ impl DiscoveryConfig {
}

/// Add discovery via Kademlia for the given protocol.
pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self {
// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
let proto_name = {
let mut v = vec![b'/'];
v.extend_from_slice(p.as_bytes());
v.extend_from_slice(b"/kad");
v
};

self.add_kademlia(p, proto_name);
pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self {
let name = protocol_name_from_protocol_id(&id);
self.add_kademlia(id, name);
self
}

Expand All @@ -159,6 +151,10 @@ impl DiscoveryConfig {

let mut config = KademliaConfig::default();
config.set_protocol_name(proto_name);
// By default Kademlia attempts to insert all peers into its routing table once a dialing
// attempt succeeds. In order to control which peer is added, disable the auto-insertion and
// instead add peers manually.
config.set_kbucket_inserts(KademliaBucketInserts::Manual);

let store = MemoryStore::new(self.local_peer_id.clone());
let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config);
Expand Down Expand Up @@ -259,17 +255,43 @@ impl DiscoveryBehaviour {
}
}

/// Call this method when a node reports an address for itself.
/// Add a self-reported address of a remote peer to the k-buckets of the supported
/// DHTs (`supported_protocols`).
///
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will
/// not properly work.
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
if self.allow_non_globals_in_dht || self.can_add_to_dht(&addr) {
for k in self.kademlias.values_mut() {
k.add_address(peer_id, addr.clone());
/// **Note**: It is important that you call this method. The discovery mechanism will not
/// automatically add connecting peers to the Kademlia k-buckets.
pub fn add_self_reported_address(
&mut self,
peer_id: &PeerId,
supported_protocols: impl Iterator<Item = impl AsRef<[u8]>>,
addr: Multiaddr
) {
if !self.allow_non_globals_in_dht && !self.can_add_to_dht(&addr) {
log::trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id);
return
}

let mut added = false;
for protocol in supported_protocols {
for kademlia in self.kademlias.values_mut() {
if protocol.as_ref() == kademlia.protocol_name() {
log::trace!(
target: "sub-libp2p",
"Adding self-reported address {} from {} to Kademlia DHT {}.",
addr, peer_id, String::from_utf8_lossy(kademlia.protocol_name()),
);
kademlia.add_address(peer_id, addr.clone());
added = true;
}
}
} else {
log::trace!(target: "sub-libp2p", "Ignoring self-reported address {} from {}", addr, peer_id);
}

if !added {
log::trace!(
target: "sub-libp2p",
"Ignoring self-reported address {} from {} as remote node is not part of any \
Kademlia DHTs supported by the local node.", addr, peer_id,
);
}
}

Expand Down Expand Up @@ -340,17 +362,21 @@ impl DiscoveryBehaviour {
}

/// Event generated by the `DiscoveryBehaviour`.
#[derive(Debug)]
pub enum DiscoveryOut {
/// The address of a peer has been added to the Kademlia routing table.
///
/// Can be called multiple times with the same identity.
/// A connection to a peer has been established but the peer has not been
/// added to the routing table because [`KademliaBucketInserts::Manual`] is
/// configured. If the peer is to be included in the routing table, it must
/// be explicitly added via
/// [`DiscoveryBehaviour::add_self_reported_address`].
Discovered(PeerId),

/// A peer connected to this node for whom no listen address is known.
///
/// In order for the peer to be added to the Kademlia routing table, a known
/// listen address must be added via [`DiscoveryBehaviour::add_self_reported_address`],
/// e.g. obtained through the `identify` protocol.
/// listen address must be added via
/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
/// the `identify` protocol.
UnroutablePeer(PeerId),

/// The DHT yielded results for the record request, grouped in (key, value) pairs.
Expand Down Expand Up @@ -569,8 +595,12 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RoutablePeer { .. } | KademliaEvent::PendingRoutablePeer { .. } => {
// We are not interested in these events at the moment.
KademliaEvent::RoutablePeer { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::PendingRoutablePeer { .. } => {
// We are not interested in this event at the moment.
}
KademliaEvent::QueryResult { result: QueryResult::GetClosestPeers(res), .. } => {
match res {
Expand Down Expand Up @@ -689,25 +719,36 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec<u8> {
let mut v = vec![b'/'];
v.extend_from_slice(id.as_bytes());
v.extend_from_slice(b"/kad");
v
}

#[cfg(test)]
mod tests {
use crate::config::ProtocolId;
use futures::prelude::*;
use libp2p::identity::Keypair;
use libp2p::Multiaddr;
use libp2p::{Multiaddr, PeerId};
use libp2p::core::upgrade;
use libp2p::core::transport::{Transport, MemoryTransport};
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use libp2p::swarm::Swarm;
use std::{collections::HashSet, task::Poll};
use super::{DiscoveryConfig, DiscoveryOut};
use super::{DiscoveryConfig, DiscoveryOut, protocol_name_from_protocol_id};

#[test]
fn discovery_working() {
let mut user_defined = Vec::new();
let mut first_swarm_peer_id_and_addr = None;
let protocol_id = ProtocolId::from(b"dot".as_ref());

// Build swarms whose behaviour is `DiscoveryBehaviour`.
let mut swarms = (0..25).map(|_| {
// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
// the first swarm via `with_user_defined`.
let mut swarms = (0..25).map(|i| {
let keypair = Keypair::generate_ed25519();
let keypair2 = keypair.clone();

Expand All @@ -730,23 +771,21 @@ mod tests {
});

let behaviour = {
let protocol_id: &[u8] = b"/test/kad/1.0.0";

let mut config = DiscoveryConfig::new(keypair.public());
config.with_user_defined(user_defined.clone())
config.with_user_defined(first_swarm_peer_id_and_addr.clone())
.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(ProtocolId::from(protocol_id));
.add_protocol(protocol_id.clone());

config.finish()
};

let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

if user_defined.is_empty() {
user_defined.push((keypair.public().into_peer_id(), listen_addr.clone()));
if i == 0 {
first_swarm_peer_id_and_addr = Some((keypair.public().into_peer_id(), listen_addr.clone()))
}

Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap();
Expand All @@ -755,7 +794,10 @@ mod tests {

// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
let mut to_discover = (0..swarms.len()).map(|n| {
(0..swarms.len()).filter(|p| *p != n)
(0..swarms.len())
// Skip the first swarm as all other swarms already know it.
.skip(1)
.filter(|p| *p != n)
.map(|p| Swarm::local_peer_id(&swarms[p].0).clone())
.collect::<HashSet<_>>()
}).collect::<Vec<_>>();
Expand All @@ -766,7 +808,7 @@ mod tests {
match swarms[swarm_n].0.poll_next_unpin(cx) {
Poll::Ready(Some(e)) => {
match e {
DiscoveryOut::UnroutablePeer(other) => {
DiscoveryOut::UnroutablePeer(other) | DiscoveryOut::Discovered(other) => {
// Call `add_self_reported_address` to simulate identify happening.
let addr = swarms.iter().find_map(|(s, a)|
if s.local_peer_id == other {
Expand All @@ -775,12 +817,16 @@ mod tests {
None
})
.unwrap();
swarms[swarm_n].0.add_self_reported_address(&other, addr);
},
DiscoveryOut::Discovered(other) => {
swarms[swarm_n].0.add_self_reported_address(
&other,
[protocol_name_from_protocol_id(&protocol_id)].iter(),
addr,
);

to_discover[swarm_n].remove(&other);
}
_ => {}
},
DiscoveryOut::RandomKademliaStarted(_) => {},
e => {panic!("Unexpected event: {:?}", e)},
}
continue 'polling
}
Expand All @@ -799,4 +845,103 @@ mod tests {

futures::executor::block_on(fut);
}

#[test]
fn discovery_ignores_peers_with_unknown_protocols() {
let supported_protocol_id = ProtocolId::from(b"a".as_ref());
let unsupported_protocol_id = ProtocolId::from(b"b".as_ref());

let mut discovery = {
let keypair = Keypair::generate_ed25519();
let mut config = DiscoveryConfig::new(keypair.public());
config.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(supported_protocol_id.clone());
config.finish()
};

let remote_peer_id = PeerId::random();
let remote_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

// Add remote peer with unsupported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&unsupported_protocol_id)].iter(),
remote_addr.clone(),
);

for kademlia in discovery.kademlias.values_mut() {
assert!(
kademlia.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expect peer with unsupported protocol not to be added."
);
}

// Add remote peer with supported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&supported_protocol_id)].iter(),
remote_addr.clone(),
);

for kademlia in discovery.kademlias.values_mut() {
assert_eq!(
1,
kademlia.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expect peer with supported protocol to be added."
);
}
}

#[test]
fn discovery_adds_peer_to_kademlia_of_same_protocol_only() {
let protocol_a = ProtocolId::from(b"a".as_ref());
let protocol_b = ProtocolId::from(b"b".as_ref());

let mut discovery = {
let keypair = Keypair::generate_ed25519();
let mut config = DiscoveryConfig::new(keypair.public());
config.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(protocol_a.clone())
.add_protocol(protocol_b.clone());
config.finish()
};

let remote_peer_id = PeerId::random();
let remote_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

// Add remote peer with `protocol_a` only.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&protocol_a)].iter(),
remote_addr.clone(),
);

assert_eq!(
1,
discovery.kademlias.get_mut(&protocol_a)
.expect("Kademlia instance to exist.")
.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expected remote peer to be added to `protocol_a` Kademlia instance.",

);

assert!(
discovery.kademlias.get_mut(&protocol_b)
.expect("Kademlia instance to exist.")
.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expected remote peer not to be added to `protocol_b` Kademlia instance.",
);
}
}