Skip to content

Commit

Permalink
feat(nat): get rid of Transport::address_translation
Browse files Browse the repository at this point in the history
  • Loading branch information
stormshield-ebzh committed May 22, 2024
1 parent 235447b commit 6103cd8
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 123 deletions.
140 changes: 110 additions & 30 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@

use crate::handler::{self, Handler, InEvent};
use crate::protocol::{Info, UpgradeError};
use libp2p_core::multiaddr::Protocol;
use libp2p_core::transport::PortUse;
use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_core::{address_translation, multiaddr, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{
ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
NewExternalAddrCandidateEndpoint,
};
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
Expand All @@ -43,6 +41,50 @@ use std::{
time::Duration,
};

/// Whether an [`Multiaddr`] is a valid for the QUIC transport.
fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
use Protocol::*;
let mut iter = addr.iter();
let Some(first) = iter.next() else {
return false;
};
let Some(second) = iter.next() else {
return false;
};
let Some(third) = iter.next() else {
return false;
};
let fourth = iter.next();
let fifth = iter.next();

matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
&& matches!(second, Udp(_))
&& if v1 {
matches!(third, QuicV1)
} else {
matches!(third, Quic)
}
&& matches!(fourth, Some(P2p(_)) | None)
&& fifth.is_none()
}

fn is_tcp_addr(addr: &Multiaddr) -> bool {
use Protocol::*;

let mut iter = addr.iter();

let first = match iter.next() {
None => return false,
Some(p) => p,
};
let second = match iter.next() {
None => return false,
Some(p) => p,
};

matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
}

/// Network behaviour that automatically identifies nodes periodically, returns information
/// about them, and answers identify queries from other nodes.
///
Expand All @@ -56,8 +98,8 @@ pub struct Behaviour {
/// The address a remote observed for us.
our_observed_addresses: HashMap<ConnectionId, Multiaddr>,

/// Established connections information (Listener / Dialer / port_reuse or not)
connections_endpoints: HashMap<ConnectionId, NewExternalAddrCandidateEndpoint>,
/// The outbound connections established without port reuse (require translation)
outbound_connections_without_port_reuse: HashSet<ConnectionId>,

/// Pending events to be emitted when polled.
events: VecDeque<ToSwarm<Event, InEvent>>,
Expand Down Expand Up @@ -160,7 +202,7 @@ impl Behaviour {
config,
connected: HashMap::new(),
our_observed_addresses: Default::default(),
connections_endpoints: Default::default(),
outbound_connections_without_port_reuse: Default::default(),
events: VecDeque::new(),
discovered_peers,
listen_addresses: Default::default(),
Expand Down Expand Up @@ -221,6 +263,57 @@ impl Behaviour {
.cloned()
.collect()
}

fn emit_new_external_addr_candidate_event(
&mut self,
connection_id: ConnectionId,
observed: &Multiaddr,
) {
if self
.outbound_connections_without_port_reuse
.contains(&connection_id)
{
// Apply address translation to the candidate address.
// For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
let translated_addresses = {
let mut addrs: Vec<_> = self
.listen_addresses
.iter()
.filter_map(|server| {
if (is_tcp_addr(server) && is_tcp_addr(&observed))

Check failure on line 283 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 283 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
|| (is_quic_addr(server, true) && is_quic_addr(&observed, true))

Check failure on line 284 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 284 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
|| (is_quic_addr(server, false) && is_quic_addr(&observed, false))

Check failure on line 285 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 285 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
{
address_translation(server, &observed)

Check failure on line 287 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 287 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
} else {
None
}
})
.collect();

// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};

// If address translation yielded nothing, broadcast the original candidate address.
if translated_addresses.is_empty() {
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
} else {
for addr in translated_addresses {
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(addr));
}
}
} else {
// outgoing connection dialed with port reuse
// incomming connection
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
}
}
}

impl NetworkBehaviour for Behaviour {
Expand All @@ -229,14 +322,11 @@ impl NetworkBehaviour for Behaviour {

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
_: ConnectionId,
peer: PeerId,
_: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.connections_endpoints
.insert(connection_id, NewExternalAddrCandidateEndpoint::Listener);

Ok(Handler::new(
self.config.interval,
peer,
Expand Down Expand Up @@ -264,10 +354,10 @@ impl NetworkBehaviour for Behaviour {
addr.pop();
}

self.connections_endpoints.insert(
connection_id,
NewExternalAddrCandidateEndpoint::Dialer { port_use },
);
if port_use == PortUse::New {
self.outbound_connections_without_port_reuse
.insert(connection_id);
}

Ok(Handler::new(
self.config.interval,
Expand Down Expand Up @@ -310,18 +400,10 @@ impl NetworkBehaviour for Behaviour {
}
}

let endpoint = self
.connections_endpoints
.get(&id)
.expect("we are connected");

match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
not_yet_observed.insert(observed.clone());
self.events.push_back(ToSwarm::NewExternalAddrCandidate {
endpoint: *endpoint,
observed_addr: observed,
});
self.emit_new_external_addr_candidate_event(id, &observed);
}
Entry::Occupied(already_observed) if already_observed.get() == &observed => {
// No-op, we already observed this address.
Expand All @@ -334,10 +416,7 @@ impl NetworkBehaviour for Behaviour {
);

*already_observed.get_mut() = observed.clone();
self.events.push_back(ToSwarm::NewExternalAddrCandidate {
endpoint: *endpoint,
observed_addr: observed,
});
self.emit_new_external_addr_candidate_event(id, &observed);
}
}
}
Expand Down Expand Up @@ -428,7 +507,8 @@ impl NetworkBehaviour for Behaviour {
}

self.our_observed_addresses.remove(&connection_id);
self.connections_endpoints.remove(&connection_id);
self.outbound_connections_without_port_reuse
.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
Expand Down
29 changes: 3 additions & 26 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,6 @@ pub trait NetworkBehaviour: 'static {
-> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>>;
}

#[derive(Debug, Copy, Clone)]
pub enum NewExternalAddrCandidateEndpoint {
/// no translation is required
Listener,
/// translation is required when dialed with PortUse::New
Dialer { port_use: PortUse },
}

/// A command issued from a [`NetworkBehaviour`] for the [`Swarm`].
///
/// [`Swarm`]: super::Swarm
Expand Down Expand Up @@ -290,10 +282,7 @@ pub enum ToSwarm<TOutEvent, TInEvent> {
/// - A protocol such as identify obtained it from a remote.
/// - The user provided it based on configuration.
/// - We made an educated guess based on one of our listen addresses.
NewExternalAddrCandidate {
endpoint: NewExternalAddrCandidateEndpoint,
observed_addr: Multiaddr,
},
NewExternalAddrCandidate(Multiaddr),

/// Indicates to the [`Swarm`](crate::Swarm) that the provided address is confirmed to be externally reachable.
///
Expand Down Expand Up @@ -352,13 +341,7 @@ impl<TOutEvent, TInEventOld> ToSwarm<TOutEvent, TInEventOld> {
peer_id,
connection,
},
ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
} => ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
},
ToSwarm::NewExternalAddrCandidate(addr) => ToSwarm::NewExternalAddrCandidate(addr),
ToSwarm::ExternalAddrConfirmed(addr) => ToSwarm::ExternalAddrConfirmed(addr),
ToSwarm::ExternalAddrExpired(addr) => ToSwarm::ExternalAddrExpired(addr),
ToSwarm::NewExternalAddrOfPeer {
Expand Down Expand Up @@ -389,13 +372,7 @@ impl<TOutEvent, THandlerIn> ToSwarm<TOutEvent, THandlerIn> {
handler,
event,
},
ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
} => ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
},
ToSwarm::NewExternalAddrCandidate(addr) => ToSwarm::NewExternalAddrCandidate(addr),
ToSwarm::ExternalAddrConfirmed(addr) => ToSwarm::ExternalAddrConfirmed(addr),
ToSwarm::ExternalAddrExpired(addr) => ToSwarm::ExternalAddrExpired(addr),
ToSwarm::CloseConnection {
Expand Down
74 changes: 7 additions & 67 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ pub mod derive_prelude {
pub use libp2p_identity::PeerId;
}

use behaviour::NewExternalAddrCandidateEndpoint;
pub use behaviour::{
AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
Expand All @@ -120,7 +119,6 @@ pub use handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
};
use libp2p_core::transport::PortUse;
#[cfg(feature = "macros")]
pub use libp2p_swarm_derive::NetworkBehaviour;
pub use listen_opts::ListenOpts;
Expand Down Expand Up @@ -1134,71 +1132,13 @@ where

self.pending_handler_event = Some((peer_id, handler, event));
}
ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr,
} => {
// Apply address translation to the candidate address.
// For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
match endpoint {
NewExternalAddrCandidateEndpoint::Listener
| NewExternalAddrCandidateEndpoint::Dialer {
port_use: PortUse::Reuse,
} => {
// no translation is required : use the observed address
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate {
addr: &observed_addr,
},
));
self.pending_swarm_events
.push_back(SwarmEvent::NewExternalAddrCandidate {
address: observed_addr,
});
}
NewExternalAddrCandidateEndpoint::Dialer {
port_use: PortUse::New,
} => {
let mut translated_addresses: Vec<_> = self
.listened_addrs
.values()
.flatten()
.filter_map(|server| {
self.transport.address_translation(server, &observed_addr)
})
.collect();

// remove duplicates
translated_addresses.sort_unstable();
translated_addresses.dedup();

// If address translation yielded nothing, broadcast the original candidate address.
if translated_addresses.is_empty() {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate {
addr: &observed_addr,
},
));
self.pending_swarm_events.push_back(
SwarmEvent::NewExternalAddrCandidate {
address: observed_addr,
},
);
} else {
for addr in translated_addresses {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
));
self.pending_swarm_events.push_back(
SwarmEvent::NewExternalAddrCandidate { address: addr },
);
}
}
}
}
ToSwarm::NewExternalAddrCandidate(addr) => {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
));
self.pending_swarm_events
.push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
}
ToSwarm::ExternalAddrConfirmed(addr) => {
self.add_external_address(addr.clone());
Expand Down

0 comments on commit 6103cd8

Please sign in to comment.