Skip to content

Commit

Permalink
Implement old suggestions i overlooked
Browse files Browse the repository at this point in the history
  • Loading branch information
umgefahren committed Dec 27, 2023
1 parent 101a325 commit bf2662a
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 115 deletions.
158 changes: 82 additions & 76 deletions protocols/autonatv2/src/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use rand_core::{OsRng, RngCore};
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

use crate::client::handler::dial_request::InternalError;
use crate::{global_only::IpExt, request_response::DialRequest};
use crate::{client::handler::dial_request::InternalError, Nonce};
use crate::{global_only::IpExt, protocol::DialRequest};

use super::handler::{
dial_back,
Expand Down Expand Up @@ -126,7 +126,7 @@ where
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
*self.address_candidates.entry(addr.clone()).or_default() += 1;
self.inject_address_candiate(addr.clone())
}
FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr }) => {
self.address_candidates.remove(addr);
Expand Down Expand Up @@ -179,12 +179,10 @@ where
}
}
Either::Left(dial_request::ToBehaviour::PeerHasServerSupport) => {
if !self.known_servers.contains(&peer_id) {
self.known_servers.push(peer_id);
}
self.inject_kown_server(peer_id);
}
Either::Left(dial_request::ToBehaviour::TestCompleted(Ok(TestEnd {
dial_request: DialRequest { nonce, addrs },
dial_request: DialRequest { nonce, .. },
reachable_addr,
}))) => {
if self.pending_nonces.remove(&nonce) {
Expand All @@ -193,12 +191,6 @@ where
);
return;
}
self.pending_events.extend(
addrs
.into_iter()
.take_while(|addr| addr != &reachable_addr)
.map(ToSwarm::ExternalAddrExpired),
);
self.pending_events
.push_back(ToSwarm::ExternalAddrConfirmed(reachable_addr));
}
Expand All @@ -211,6 +203,15 @@ where
self.pending_events
.push_back(ToSwarm::ExternalAddrExpired(addr.clone()));
}
dial_request::InternalError::InternalServer
| dial_request::InternalError::DataRequestTooLarge { .. }
| dial_request::InternalError::DataRequestTooSmall { .. }
| dial_request::InternalError::InvalidResponse
| dial_request::InternalError::ServerRejectedDialRequest
| dial_request::InternalError::InvalidReferencedAddress { .. }
| dial_request::InternalError::ServerChoseNotToDialAnyAddress => {
self.handle_no_connection(peer_id, connection_id);
}
_ => {
tracing::debug!("Test failed: {:?}", err);
}
Expand All @@ -227,50 +228,13 @@ where
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, <Self::ConnectionHandler as ConnectionHandler>::FromBehaviour>>
{
let pending_event = self.poll_pending_events();
if pending_event.is_ready() {
return pending_event;
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
if self.next_tick.poll_unpin(cx).is_ready()
&& !self.known_servers.is_empty()
&& !self.address_candidates.is_empty()
{
let mut entries = self
.address_candidates
.iter()
.filter(|(addr, _)| !self.already_tested.contains(addr))
.collect::<Vec<_>>();
if entries.is_empty() {
return Poll::Pending;
}
entries.sort_unstable_by_key(|(_, count)| *count);
let addrs = entries
.into_iter()
.rev()
.map(|(addr, _)| addr.clone())
.take(self.config.max_addrs_count)
.collect::<Vec<_>>();
self.already_tested.extend(addrs.iter().cloned());
let peers = if self.known_servers.len() < self.config.test_server_count {
self.known_servers.clone()
} else {
self.known_servers
.choose_multiple(&mut self.rng, self.config.test_server_count)
.copied()
.collect()
};
for peer in peers {
let nonce = self.rng.gen();
let req = DialRequest {
nonce,
addrs: addrs.clone(),
};
self.pending_nonces.insert(nonce);
self.submit_req_for_peer(peer, req);
}
let pending_event = self.poll_pending_events();
if pending_event.is_ready() {
return pending_event;
if self.next_tick.poll_unpin(cx).is_ready() {
self.inject_address_candiate_test();
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
}
Poll::Pending
Expand All @@ -296,18 +260,74 @@ where
}
}

fn submit_req_for_peer(&mut self, peer: PeerId, req: DialRequest) {
/// Injects a known server into the behaviour. It's mostly useful if you are not using identify
/// or for testing purposes.
pub fn inject_kown_server(&mut self, peer: PeerId) {
if !self.known_servers.contains(&peer) {
self.known_servers.push(peer);
}
}

/// Inject a new address candidate into the behaviour.
pub fn inject_address_candiate(&mut self, addr: Multiaddr) {
*self.address_candidates.entry(addr).or_default() += 1;
}

/// Inject an immediate test for all pending address candidates.
pub fn inject_address_candiate_test(&mut self) {
if self.known_servers.is_empty() || self.address_candidates.is_empty() {
return;
}
let mut entries = self
.address_candidates
.iter()
.filter(|(addr, _)| !self.already_tested.contains(addr))
.map(|(addr, count)| (addr.clone(), *count))
.collect::<Vec<_>>();
entries.sort_unstable_by_key(|(_, count)| *count);
let addrs = entries
.iter()
.rev()
.map(|(addr, _)| addr)
.take(self.config.max_addrs_count)
.cloned()
.collect::<Vec<_>>();
self.already_tested.extend(addrs.iter().cloned());
let peers = if self.known_servers.len() < self.config.test_server_count {
self.known_servers.clone()
} else {
self.known_servers
.choose_multiple(&mut self.rng, self.config.test_server_count)
.copied()
.collect()
};
for peer in peers {
let mut remaining_entries = entries.clone();
let mut addrs = Vec::with_capacity(self.config.max_addrs_count);
while !remaining_entries.is_empty() && addrs.len() < self.config.max_addrs_count {
let addr = remaining_entries
.choose_weighted(&mut self.rng, |item| item.1)
.unwrap()
.0
.clone();
remaining_entries.retain(|(a, _)| a != &addr);
addrs.push(addr);
}
let nonce = self.rng.gen();
let req = DialRequest { nonce, addrs };
self.submit_req_for_peer(peer, req, nonce);
}
self.next_tick.reset(self.config.recheck_interval);
}

fn submit_req_for_peer(&mut self, peer: PeerId, req: DialRequest, nonce: Nonce) {
self.pending_nonces.insert(nonce);
if let Some(conn_id) = self.peers_to_handlers.get(&peer) {
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::One(*conn_id),
event: Either::Left(req),
});
} else {
tracing::debug!(
"There should be a connection to {:?}, but there isn't",
peer
);
}
}

Expand All @@ -318,20 +338,6 @@ where
}
self.known_servers.retain(|p| p != &peer_id);
}

fn poll_pending_events(
&mut self,
) -> Poll<
ToSwarm<
<Self as NetworkBehaviour>::ToSwarm,
<<Self as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::FromBehaviour,
>,
> {
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}

impl Default for Behaviour<OsRng> {
Expand Down
4 changes: 0 additions & 4 deletions protocols/autonatv2/src/client/handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
pub(crate) mod dial_back;
pub(crate) mod dial_request;

Check failure on line 2 in protocols/autonatv2/src/client/handler.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/protocols/autonatv2/src/client/handler.rs

use std::time::Duration;

pub(crate) use dial_request::TestEnd;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONCURRENT_REQUESTS: usize = 10;
9 changes: 4 additions & 5 deletions protocols/autonatv2/src/client/handler/dial_back.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
io,
task::{Context, Poll},
time::Duration,
};

use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
Expand All @@ -12,9 +13,7 @@ use libp2p_swarm::{
};
use void::Void;

use crate::{request_response::DialBack, Nonce, DIAL_BACK_PROTOCOL_NAME};

use super::{DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS};
use crate::{protocol::DialBack, Nonce, DIAL_BACK_PROTOCOL_NAME};

pub struct Handler {
inbound: FuturesSet<io::Result<Nonce>>,
Expand All @@ -23,7 +22,7 @@ pub struct Handler {
impl Handler {
pub(crate) fn new() -> Self {
Self {
inbound: FuturesSet::new(DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS),
inbound: FuturesSet::new(Duration::from_secs(5), 2),
}
}
}
Expand Down Expand Up @@ -78,7 +77,7 @@ impl ConnectionHandler for Handler {
}
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { error, .. }) => {
tracing::debug!("Dial back request failed: {:?}", error);
void::unreachable(error);
}
_ => {}
}
Expand Down
Loading

0 comments on commit bf2662a

Please sign in to comment.