Skip to content

Commit 5ba7c48

Browse files
authored
Permit concurrent dialing attempts per peer. (#1506)
* Permit concurrent dialing attempts per peer. This is a follow-up to #1440 and relates to #925. This change permits multiple dialing attempts per peer. Note though that `libp2p-swarm` does not yet make use of this ability, retaining the current behaviour. The essence of the changes are that the `Peer` API now provides `Peer::dial()`, i.e. regardless of the state in which the peer is. A dialing attempt is always made up of one or more addresses tried sequentially, as before, but now there can be multiple dialing attempts per peer. A configurable per-peer limit for outgoing connections and thus concurrent dialing attempts is also included. * Introduce `DialError` in `libp2p-swarm`. For a cleaner API and to treat the case of no addresses for a peer as an error, such that a `NetworkBehaviourAction::DialPeer` request is always matched up with either `inject_connection_established` or `inject_dial_error`. * Fix rustdoc link. * Add `DialPeerCondition::Always`. * Adapt to master. * Update changelog.
1 parent 44c0c76 commit 5ba7c48

File tree

7 files changed

+470
-334
lines changed

7 files changed

+470
-334
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Version ???
22

3+
- `libp2p-core`, `libp2p-swarm`: Added support for multiple dialing
4+
attempts per peer, with a configurable limit.
5+
[PR 1506](https://github.com/libp2p/rust-libp2p/pull/1506)
6+
37
- `libp2p-noise`: Added the `X25519Spec` protocol suite which uses
48
libp2p-noise-spec compliant signatures on static keys as well as the
59
`/noise` protocol upgrade, hence providing a libp2p-noise-spec compliant

core/src/connection/pool.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,7 @@ where
225225
TPeerId: Clone + Send + 'static,
226226
{
227227
let endpoint = info.to_connected_point();
228-
if let Some(limit) = self.limits.max_incoming {
229-
let current = self.iter_pending_incoming().count();
230-
if current >= limit {
231-
return Err(ConnectionLimit { limit, current })
232-
}
233-
}
228+
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
234229
Ok(self.add_pending(future, handler, endpoint, None))
235230
}
236231

@@ -267,6 +262,11 @@ where
267262
TPeerId: Clone + Send + 'static,
268263
{
269264
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
265+
266+
if let Some(peer) = &info.peer_id {
267+
self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?;
268+
}
269+
270270
let endpoint = info.to_connected_point();
271271
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
272272
}
@@ -465,6 +465,13 @@ where
465465
self.established.get(peer).map_or(0, |conns| conns.len())
466466
}
467467

468+
/// Counts the number of pending outgoing connections to the given peer.
469+
pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize {
470+
self.iter_pending_outgoing()
471+
.filter(|info| info.peer_id == Some(peer))
472+
.count()
473+
}
474+
468475
/// Returns an iterator over all established connections of `peer`.
469476
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
470477
-> EstablishedConnectionIter<'a,
@@ -837,6 +844,7 @@ pub struct PoolLimits {
837844
pub max_outgoing: Option<usize>,
838845
pub max_incoming: Option<usize>,
839846
pub max_established_per_peer: Option<usize>,
847+
pub max_outgoing_per_peer: Option<usize>,
840848
}
841849

842850
impl PoolLimits {
@@ -854,6 +862,20 @@ impl PoolLimits {
854862
Self::check(current, self.max_outgoing)
855863
}
856864

865+
fn check_incoming<F>(&self, current: F) -> Result<(), ConnectionLimit>
866+
where
867+
F: FnOnce() -> usize
868+
{
869+
Self::check(current, self.max_incoming)
870+
}
871+
872+
fn check_outgoing_per_peer<F>(&self, current: F) -> Result<(), ConnectionLimit>
873+
where
874+
F: FnOnce() -> usize
875+
{
876+
Self::check(current, self.max_outgoing_per_peer)
877+
}
878+
857879
fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
858880
where
859881
F: FnOnce() -> usize

core/src/network.rs

+44-30
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use crate::{
5050
};
5151
use fnv::{FnvHashMap};
5252
use futures::{prelude::*, future};
53+
use smallvec::SmallVec;
5354
use std::{
5455
collections::hash_map,
5556
convert::TryFrom as _,
@@ -78,21 +79,17 @@ where
7879

7980
/// The ongoing dialing attempts.
8081
///
81-
/// The `Network` enforces a single ongoing dialing attempt per peer,
82-
/// even if multiple (established) connections per peer are allowed.
83-
/// However, a single dialing attempt operates on a list of addresses
84-
/// to connect to, which can be extended with new addresses while
85-
/// the connection attempt is still in progress. Thereby each
86-
/// dialing attempt is associated with a new connection and hence a new
87-
/// connection ID.
82+
/// There may be multiple ongoing dialing attempts to the same peer.
83+
/// Each dialing attempt is associated with a new connection and hence
84+
/// a new connection ID.
8885
///
8986
/// > **Note**: `dialing` must be consistent with the pending outgoing
9087
/// > connections in `pool`. That is, for every entry in `dialing`
9188
/// > there must exist a pending outgoing connection in `pool` with
9289
/// > the same connection ID. This is ensured by the implementation of
9390
/// > `Network` (see `dial_peer_impl` and `on_connection_failed`)
94-
/// > together with the implementation of `DialingConnection::abort`.
95-
dialing: FnvHashMap<TPeerId, peer::DialingAttempt>,
91+
/// > together with the implementation of `DialingAttempt::abort`.
92+
dialing: FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
9693
}
9794

9895
impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
@@ -381,8 +378,11 @@ where
381378
Poll::Pending => return Poll::Pending,
382379
Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
383380
match self.dialing.entry(connection.peer_id().clone()) {
384-
hash_map::Entry::Occupied(e) if e.get().id == connection.id() => {
385-
e.remove();
381+
hash_map::Entry::Occupied(mut e) => {
382+
e.get_mut().retain(|s| s.current.0 != connection.id());
383+
if e.get().is_empty() {
384+
e.remove();
385+
}
386386
},
387387
_ => {}
388388
}
@@ -453,7 +453,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans, TConnInfo, TPee
453453
transport: TTrans,
454454
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
455455
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
456-
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
456+
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
457457
opts: DialingOpts<TPeerId, THandler>
458458
) -> Result<ConnectionId, ConnectionLimit>
459459
where
@@ -489,14 +489,12 @@ where
489489
};
490490

491491
if let Ok(id) = &result {
492-
let former = dialing.insert(opts.peer,
493-
peer::DialingAttempt {
494-
id: *id,
495-
current: opts.address,
496-
next: opts.remaining,
492+
dialing.entry(opts.peer).or_default().push(
493+
peer::DialingState {
494+
current: (*id, opts.address),
495+
remaining: opts.remaining,
497496
},
498497
);
499-
debug_assert!(former.is_none());
500498
}
501499

502500
result
@@ -508,7 +506,7 @@ where
508506
/// If the failed connection attempt was a dialing attempt and there
509507
/// are more addresses to try, new `DialingOpts` are returned.
510508
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
511-
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
509+
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
512510
id: ConnectionId,
513511
endpoint: ConnectedPoint,
514512
error: PendingConnectionError<TTrans::Error>,
@@ -521,27 +519,34 @@ where
521519
TPeerId: Eq + Hash + Clone,
522520
{
523521
// Check if the failed connection is associated with a dialing attempt.
524-
// TODO: could be more optimal than iterating over everything
525-
let dialing_peer = dialing.iter() // (1)
526-
.find(|(_, a)| a.id == id)
527-
.map(|(p, _)| p.clone());
522+
let dialing_failed = dialing.iter_mut()
523+
.find_map(|(peer, attempts)| {
524+
if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
525+
let attempt = attempts.remove(pos);
526+
let last = attempts.is_empty();
527+
Some((peer.clone(), attempt, last))
528+
} else {
529+
None
530+
}
531+
});
528532

529-
if let Some(peer_id) = dialing_peer {
530-
// A pending outgoing connection to a known peer failed.
531-
let mut attempt = dialing.remove(&peer_id).expect("by (1)");
533+
if let Some((peer_id, mut attempt, last)) = dialing_failed {
534+
if last {
535+
dialing.remove(&peer_id);
536+
}
532537

533-
let num_remain = u32::try_from(attempt.next.len()).unwrap();
534-
let failed_addr = attempt.current.clone();
538+
let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
539+
let failed_addr = attempt.current.1.clone();
535540

536541
let (opts, attempts_remaining) =
537542
if num_remain > 0 {
538543
if let Some(handler) = handler {
539-
let next_attempt = attempt.next.remove(0);
544+
let next_attempt = attempt.remaining.remove(0);
540545
let opts = DialingOpts {
541546
peer: peer_id.clone(),
542547
handler,
543548
address: next_attempt,
544-
remaining: attempt.next
549+
remaining: attempt.remaining
545550
};
546551
(Some(opts), num_remain)
547552
} else {
@@ -581,9 +586,13 @@ where
581586
/// Information about the network obtained by [`Network::info()`].
582587
#[derive(Clone, Debug)]
583588
pub struct NetworkInfo {
589+
/// The total number of connected peers.
584590
pub num_peers: usize,
591+
/// The total number of connections, both established and pending.
585592
pub num_connections: usize,
593+
/// The total number of pending connections, both incoming and outgoing.
586594
pub num_connections_pending: usize,
595+
/// The total number of established connections.
587596
pub num_connections_established: usize,
588597
}
589598

@@ -633,4 +642,9 @@ impl NetworkConfig {
633642
self.pool_limits.max_established_per_peer = Some(n);
634643
self
635644
}
645+
646+
pub fn set_outgoing_per_peer_limit(&mut self, n: usize) -> &mut Self {
647+
self.pool_limits.max_outgoing_per_peer = Some(n);
648+
self
649+
}
636650
}

0 commit comments

Comments
 (0)