Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identify detects if address translation is required #4

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 120 additions & 7 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

use crate::handler::{self, Handler, InEvent};
use crate::protocol::{Info, UpgradeError};
use libp2p_core::multiaddr::Protocol;
use libp2p_core::transport::PortUse;
use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_core::{address_translation, multiaddr, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
Expand All @@ -40,6 +41,50 @@
time::Duration,
};

/// Whether an [`Multiaddr`] is a valid for the QUIC transport.
fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
use Protocol::*;
let mut iter = addr.iter();
let Some(first) = iter.next() else {
return false;
};
let Some(second) = iter.next() else {
return false;
};
let Some(third) = iter.next() else {
return false;
};
let fourth = iter.next();
let fifth = iter.next();

matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
&& matches!(second, Udp(_))
&& if v1 {
matches!(third, QuicV1)
} else {
matches!(third, Quic)
}
&& matches!(fourth, Some(P2p(_)) | None)
&& fifth.is_none()
}
Comment on lines +44 to +69

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this came up another time already. Why do we need to perform address translation for QUIC addresses? QUIC never uses ephemeral ports (to my knowledge).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autonat server should use ephemeral port when dialing back a peer.
otherwise the server dial-back will always succeed (and will report a false positive to the peer) because the peer has already opened the states in the firewalls when dialing the server.
=> translation for QUIC addresses is required

@umgefahren : When dialing back, the server should use a dedicated quic endpoint (not localy binded) and should not reuse a listner endpoint (this is now not the case)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autonat server should use ephemeral port when dialing back a peer. otherwise the server dial-back will always succeed (and will report a false positive to the peer) because the peer has already opened the states in the firewalls when dialing the server. => translation for QUIC addresses is required

@umgefahren : When dialing back, the server should use a dedicated quic endpoint (not localy binded) and should not reuse a listner endpoint (this is now not the case)

You are right. Thank you for laying it out for me. We need to do address translation because we (libp2p) decide to explicitly use an ephemeral port for a new QUIC connection to avoid accidental hole-punching despite QUIC itself not needing it.

I think this is worthwhile documenting here because anybody coming across this will think "hey, QUIC always uses stable ports so why would be bother with address translation for QUIC". (Also cc @mxinden, I know you too have been confused by this in the past)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autonat server should use ephemeral port when dialing back a peer. otherwise the server dial-back will always succeed (and will report a false positive to the peer) because the peer has already opened the states in the firewalls when dialing the server. => translation for QUIC addresses is required

@umgefahren : When dialing back, the server should use a dedicated quic endpoint (not localy binded) and should not reuse a listner endpoint (this is now not the case)

I will change it so that when Endpoint is Dialer and PortUse::Reuse we dial a dedicated quic endpoint.


fn is_tcp_addr(addr: &Multiaddr) -> bool {
use Protocol::*;

let mut iter = addr.iter();

let first = match iter.next() {
None => return false,
Some(p) => p,
};
let second = match iter.next() {
None => return false,
Some(p) => p,
};

matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
}

/// Network behaviour that automatically identifies nodes periodically, returns information
/// about them, and answers identify queries from other nodes.
///
Expand All @@ -53,6 +98,9 @@
/// The address a remote observed for us.
our_observed_addresses: HashMap<ConnectionId, Multiaddr>,

/// The outbound connections established without port reuse (require translation)
outbound_connections_without_port_reuse: HashSet<ConnectionId>,
stormshield-ebzh marked this conversation as resolved.
Show resolved Hide resolved

/// Pending events to be emitted when polled.
events: VecDeque<ToSwarm<Event, InEvent>>,
/// The addresses of all peers that we have discovered.
Expand Down Expand Up @@ -154,6 +202,7 @@
config,
connected: HashMap::new(),
our_observed_addresses: Default::default(),
outbound_connections_without_port_reuse: Default::default(),
events: VecDeque::new(),
discovered_peers,
listen_addresses: Default::default(),
Expand Down Expand Up @@ -214,6 +263,57 @@
.cloned()
.collect()
}

fn emit_new_external_addr_candidate_event(
&mut self,
connection_id: ConnectionId,
observed: &Multiaddr,
) {
if self
stormshield-ebzh marked this conversation as resolved.
Show resolved Hide resolved
.outbound_connections_without_port_reuse
.contains(&connection_id)
{
// Apply address translation to the candidate address.
// For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
let translated_addresses = {
let mut addrs: Vec<_> = self
.listen_addresses
.iter()
.filter_map(|server| {
if (is_tcp_addr(server) && is_tcp_addr(&observed))

Check failure on line 283 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 283 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
|| (is_quic_addr(server, true) && is_quic_addr(&observed, true))

Check failure on line 284 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 284 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
|| (is_quic_addr(server, false) && is_quic_addr(&observed, false))

Check failure on line 285 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 285 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
{
address_translation(server, &observed)

Check failure on line 287 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (beta)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 287 in protocols/identify/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (1.78.0)

this expression creates a reference which is immediately dereferenced by the compiler
} else {
None
}
})
.collect();

// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};

// If address translation yielded nothing, broadcast the original candidate address.
if translated_addresses.is_empty() {
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
} else {
for addr in translated_addresses {
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(addr));
}
}
} else {
// outgoing connection dialed with port reuse
// incomming connection
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
}
}
}

impl NetworkBehaviour for Behaviour {
Expand All @@ -240,12 +340,25 @@

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
_: Endpoint,
_: PortUse,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
// Contrary to inbound events, outbound events are full-p2p qualified
// so we remove /p2p/ in order to be homogeneous
// this will avoid Autonatv2 to probe twice the same address (fully-p2p-qualified + not fully-p2p-qualified)
let mut addr = addr.clone();
if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
addr.pop();
}

if port_use == PortUse::New {
self.outbound_connections_without_port_reuse
.insert(connection_id);
}

Ok(Handler::new(
self.config.interval,
peer,
Expand Down Expand Up @@ -290,8 +403,7 @@
match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
not_yet_observed.insert(observed.clone());
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
self.emit_new_external_addr_candidate_event(id, &observed);
}
Entry::Occupied(already_observed) if already_observed.get() == &observed => {
// No-op, we already observed this address.
Expand All @@ -304,8 +416,7 @@
);

*already_observed.get_mut() = observed.clone();
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
self.emit_new_external_addr_candidate_event(id, &observed);
}
}
}
Expand Down Expand Up @@ -396,6 +507,8 @@
}

self.our_observed_addresses.remove(&connection_id);
self.outbound_connections_without_port_reuse
.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
Expand Down
40 changes: 6 additions & 34 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1133,40 +1133,12 @@ where
self.pending_handler_event = Some((peer_id, handler, event));
}
ToSwarm::NewExternalAddrCandidate(addr) => {
// Apply address translation to the candidate address.
// For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
let translated_addresses = {
let mut addrs: Vec<_> = self
.listened_addrs
.values()
.flatten()
.filter_map(|server| self.transport.address_translation(server, &addr))
.collect();

// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};

// If address translation yielded nothing, broadcast the original candidate address.
if translated_addresses.is_empty() {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
));
self.pending_swarm_events
.push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
} else {
for addr in translated_addresses {
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
));
self.pending_swarm_events
.push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
}
}
self.behaviour
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr: &addr },
));
self.pending_swarm_events
.push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
}
ToSwarm::ExternalAddrConfirmed(addr) => {
self.add_external_address(addr.clone());
Expand Down
31 changes: 2 additions & 29 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ use libp2p_core::{
use provider::{Incoming, Provider};
use socket2::{Domain, Socket, Type};
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashSet, VecDeque},
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
pin::Pin,
sync::{Arc, Mutex, RwLock},
sync::{Arc, RwLock},
task::{Context, Poll, Waker},
time::Duration,
};
Expand All @@ -75,8 +75,6 @@ struct PortReuse {
/// The addresses and ports of the listening sockets
/// registered as eligible for port reuse when dialing
listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
/// Contains a hashset of all multiaddr that where dialed from a reused port.
addr_dialed_from_reuse_poralways_reused_port: Arc<Mutex<HashMap<SocketAddr, bool>>>,
}

impl PortReuse {
Expand Down Expand Up @@ -129,27 +127,6 @@ impl PortReuse {

None
}

fn dialed_from_reuse_port(&self, addr: SocketAddr) {
self.addr_dialed_from_reuse_poralways_reused_port
.lock()
.expect("`dialed_as_listener` never panic while holding the lock")
.entry(addr)
.or_insert(true);
}

fn was_dialed_from_reuse_port(&self, addr: &Multiaddr) -> bool {
if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
*self
.addr_dialed_from_reuse_poralways_reused_port
.lock()
.expect("`already_dialed_as_listener` never panic while holding the lock")
.entry(socket_addr)
.or_insert(false)
} else {
false
}
}
}

impl Config {
Expand Down Expand Up @@ -398,7 +375,6 @@ where
socket
.bind(&socket_addr.into())
.map_err(TransportError::Other)?;
self.port_reuse.dialed_from_reuse_port(socket_addr);
}
_ => {}
}
Expand Down Expand Up @@ -444,9 +420,6 @@ where
if !is_tcp_addr(listen) || !is_tcp_addr(observed) {
return None;
}
if self.port_reuse.was_dialed_from_reuse_port(listen) {
return Some(observed.clone());
}

address_translation(listen, observed)
}
Expand Down
Loading