Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identify detects if address translation is required #4

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use libp2p_core::transport::PortUse;
use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::behaviour::{
ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
NewExternalAddrCandidateEndpoint,
};
use libp2p_swarm::{
ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
Expand Down Expand Up @@ -53,6 +56,9 @@ pub struct Behaviour {
/// The address a remote observed for us.
our_observed_addresses: HashMap<ConnectionId, Multiaddr>,

/// Established connections information (Listener / Dialer / port_reuse or not)
connections_endpoints: HashMap<ConnectionId, NewExternalAddrCandidateEndpoint>,

/// Pending events to be emitted when polled.
events: VecDeque<ToSwarm<Event, InEvent>>,
/// The addresses of all peers that we have discovered.
Expand Down Expand Up @@ -154,6 +160,7 @@ impl Behaviour {
config,
connected: HashMap::new(),
our_observed_addresses: Default::default(),
connections_endpoints: Default::default(),
events: VecDeque::new(),
discovered_peers,
listen_addresses: Default::default(),
Expand Down Expand Up @@ -222,11 +229,14 @@ impl NetworkBehaviour for Behaviour {

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
connection_id: ConnectionId,
peer: PeerId,
_: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.connections_endpoints
.insert(connection_id, NewExternalAddrCandidateEndpoint::Listener);

Ok(Handler::new(
self.config.interval,
peer,
Expand All @@ -240,12 +250,25 @@ impl NetworkBehaviour for Behaviour {

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
_: Endpoint,
_: PortUse,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
// Contrary to inbound events, outbound events are full-p2p qualified
// so we remove /p2p/ in order to be homogeneous
// this will avoid Autonatv2 to probe twice the same address (fully-p2p-qualified + not fully-p2p-qualified)
let mut addr = addr.clone();
if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
addr.pop();
}

self.connections_endpoints.insert(
connection_id,
NewExternalAddrCandidateEndpoint::Dialer { port_use },
);

Ok(Handler::new(
self.config.interval,
peer,
Expand Down Expand Up @@ -287,11 +310,18 @@ impl NetworkBehaviour for Behaviour {
}
}

let endpoint = self
.connections_endpoints
.get(&id)
.expect("we are connected");

match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
not_yet_observed.insert(observed.clone());
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
self.events.push_back(ToSwarm::NewExternalAddrCandidate {
endpoint: *endpoint,
observed_addr: observed,
});
}
Entry::Occupied(already_observed) if already_observed.get() == &observed => {
// No-op, we already observed this address.
Expand All @@ -304,8 +334,10 @@ impl NetworkBehaviour for Behaviour {
);

*already_observed.get_mut() = observed.clone();
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
self.events.push_back(ToSwarm::NewExternalAddrCandidate {
endpoint: *endpoint,
observed_addr: observed,
});
}
}
}
Expand Down Expand Up @@ -396,6 +428,7 @@ impl NetworkBehaviour for Behaviour {
}

self.our_observed_addresses.remove(&connection_id);
self.connections_endpoints.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
Expand Down
29 changes: 26 additions & 3 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ pub trait NetworkBehaviour: 'static {
-> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>>;
}

#[derive(Debug, Copy, Clone)]
pub enum NewExternalAddrCandidateEndpoint {
/// no translation is required
Listener,
/// translation is required when dialed with PortUse::New
Dialer { port_use: PortUse },
}

/// A command issued from a [`NetworkBehaviour`] for the [`Swarm`].
///
/// [`Swarm`]: super::Swarm
Expand Down Expand Up @@ -282,7 +290,10 @@ pub enum ToSwarm<TOutEvent, TInEvent> {
/// - A protocol such as identify obtained it from a remote.
/// - The user provided it based on configuration.
/// - We made an educated guess based on one of our listen addresses.
NewExternalAddrCandidate(Multiaddr),
NewExternalAddrCandidate {
endpoint: NewExternalAddrCandidateEndpoint,
observed_addr: Multiaddr,
},

/// Indicates to the [`Swarm`](crate::Swarm) that the provided address is confirmed to be externally reachable.
///
Expand Down Expand Up @@ -341,7 +352,13 @@ impl<TOutEvent, TInEventOld> ToSwarm<TOutEvent, TInEventOld> {
peer_id,
connection,
},
ToSwarm::NewExternalAddrCandidate(addr) => ToSwarm::NewExternalAddrCandidate(addr),
ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
} => ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
},
ToSwarm::ExternalAddrConfirmed(addr) => ToSwarm::ExternalAddrConfirmed(addr),
ToSwarm::ExternalAddrExpired(addr) => ToSwarm::ExternalAddrExpired(addr),
ToSwarm::NewExternalAddrOfPeer {
Expand Down Expand Up @@ -372,7 +389,13 @@ impl<TOutEvent, THandlerIn> ToSwarm<TOutEvent, THandlerIn> {
handler,
event,
},
ToSwarm::NewExternalAddrCandidate(addr) => ToSwarm::NewExternalAddrCandidate(addr),
ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
} => ToSwarm::NewExternalAddrCandidate {
endpoint,
observed_addr: addr,
},
ToSwarm::ExternalAddrConfirmed(addr) => ToSwarm::ExternalAddrConfirmed(addr),
ToSwarm::ExternalAddrExpired(addr) => ToSwarm::ExternalAddrExpired(addr),
ToSwarm::CloseConnection {
Expand Down
86 changes: 59 additions & 27 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub mod derive_prelude {
pub use libp2p_identity::PeerId;
}

use behaviour::NewExternalAddrCandidateEndpoint;
pub use behaviour::{
AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
Expand All @@ -119,6 +120,7 @@ pub use handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
};
use libp2p_core::transport::PortUse;
#[cfg(feature = "macros")]
pub use libp2p_swarm_derive::NetworkBehaviour;
pub use listen_opts::ListenOpts;
Expand Down Expand Up @@ -1132,39 +1134,69 @@ where

self.pending_handler_event = Some((peer_id, handler, event));
}
ToSwarm::NewExternalAddrCandidate(addr) => {
ToSwarm::NewExternalAddrCandidate {
endpoint,
Copy link

@thomaseizinger thomaseizinger May 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of this API change. It feels like we are passing a lot of information back and forth here. If we don't actually need any state from the TCP transport to do the translation, perhaps this is the time to just remove Transport::address_translation entirely?

AFAIK, all transports just delegate to the generic address_translation function anyway! Plus, all behaviours get told about all listen addresses. So, can we instead:

  • Remember all listen addresses in the identify behaviour
  • Call the generic address_translation function from there (we might even be able to move it to the identify behaviour which would have the added benefit of it being removed from the public API of libp2p-core!)
  • In the identify behaviour, we know which addresses need translation, so we can call it only for those
  • All other addresses we can just emit as is

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advise Thomas !
I implemented the address translation without using Transport::address_translation
(I did not removed yet Transport::address_translation but it is not used anymore)

  • listen addresses are already stored in identify
  • translation is done when both conditions are meet:
    • a multiaddr of a transport requiring translation (use is_addr_tcp and is_quic_addr taken from the tcp/quic transport implementations)
    • connection was dialed without port reuse
  • call the address_translation function
  • If translation gave an address emit the ToSwarm::NewExternalAddrCandidate with the translated address, otherwise emit the ToSwarm::NewExternalAddrCandidate with the observed address,

If this approach is ok, i will :

  • remove Transport::address_translation function
  • remove the is_addr_tcp and is_quic_addr for the transports
  • keep the generic address_translation in lib_p2p_core because used in mdns (but not for nat purpose)

observed_addr,
} => {
// Apply address translation to the candidate address.
// For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
let translated_addresses = {
let mut addrs: Vec<_> = self
.listened_addrs
.values()
.flatten()
.filter_map(|server| self.transport.address_translation(server, &addr))
.collect();

// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};

// If address translation yielded nothing, broadcast the original candidate address.
if translated_addresses.is_empty() {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
));
self.pending_swarm_events
.push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
} else {
for addr in translated_addresses {
match endpoint {
NewExternalAddrCandidateEndpoint::Listener
| NewExternalAddrCandidateEndpoint::Dialer {
port_use: PortUse::Reuse,
} => {
// no translation is required : use the observed address
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
NewExternalAddrCandidate {
addr: &observed_addr,
},
));
self.pending_swarm_events
.push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
.push_back(SwarmEvent::NewExternalAddrCandidate {
address: observed_addr,
});
}
NewExternalAddrCandidateEndpoint::Dialer {
port_use: PortUse::New,
} => {
let mut translated_addresses: Vec<_> = self
.listened_addrs
.values()
.flatten()
.filter_map(|server| {
self.transport.address_translation(server, &observed_addr)
})
.collect();

// remove duplicates
translated_addresses.sort_unstable();
translated_addresses.dedup();

// If address translation yielded nothing, broadcast the original candidate address.
if translated_addresses.is_empty() {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate {
addr: &observed_addr,
},
));
self.pending_swarm_events.push_back(
SwarmEvent::NewExternalAddrCandidate {
address: observed_addr,
},
);
} else {
for addr in translated_addresses {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
));
self.pending_swarm_events.push_back(
SwarmEvent::NewExternalAddrCandidate { address: addr },
);
}
}
}
}
}
Expand Down
31 changes: 2 additions & 29 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ use libp2p_core::{
use provider::{Incoming, Provider};
use socket2::{Domain, Socket, Type};
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashSet, VecDeque},
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
pin::Pin,
sync::{Arc, Mutex, RwLock},
sync::{Arc, RwLock},
task::{Context, Poll, Waker},
time::Duration,
};
Expand All @@ -75,8 +75,6 @@ struct PortReuse {
/// The addresses and ports of the listening sockets
/// registered as eligible for port reuse when dialing
listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
/// Contains a hashset of all multiaddr that where dialed from a reused port.
addr_dialed_from_reuse_poralways_reused_port: Arc<Mutex<HashMap<SocketAddr, bool>>>,
}

impl PortReuse {
Expand Down Expand Up @@ -129,27 +127,6 @@ impl PortReuse {

None
}

fn dialed_from_reuse_port(&self, addr: SocketAddr) {
self.addr_dialed_from_reuse_poralways_reused_port
.lock()
.expect("`dialed_as_listener` never panic while holding the lock")
.entry(addr)
.or_insert(true);
}

fn was_dialed_from_reuse_port(&self, addr: &Multiaddr) -> bool {
if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
*self
.addr_dialed_from_reuse_poralways_reused_port
.lock()
.expect("`already_dialed_as_listener` never panic while holding the lock")
.entry(socket_addr)
.or_insert(false)
} else {
false
}
}
}

impl Config {
Expand Down Expand Up @@ -398,7 +375,6 @@ where
socket
.bind(&socket_addr.into())
.map_err(TransportError::Other)?;
self.port_reuse.dialed_from_reuse_port(socket_addr);
}
_ => {}
}
Expand Down Expand Up @@ -444,9 +420,6 @@ where
if !is_tcp_addr(listen) || !is_tcp_addr(observed) {
return None;
}
if self.port_reuse.was_dialed_from_reuse_port(listen) {
return Some(observed.clone());
}

address_translation(listen, observed)
}
Expand Down
Loading