Skip to content

Commit

Permalink
Implement all suggestions from the PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
umgefahren committed Dec 22, 2023
1 parent dab99d2 commit da2a790
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 179 deletions.
8 changes: 2 additions & 6 deletions protocols/autonatv2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ quick-protobuf-codec = { workspace = true }
asynchronous-codec = "0.7"
libp2p-core = { workspace = true }
rand_core = "0.6"
rand = { version = "0.8", optional = true }
rand = "0.8"
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
futures-bounded = { workspace = true }
Expand All @@ -28,15 +28,11 @@ unsigned-varint = { workspace = true, features = ["futures"] }
futures-timer = "3.0.2"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync"] }
tokio = { version = "1", features = ["macros", "rt", "sync"] }
libp2p-swarm-test = { workspace = true }
libp2p-identify = { workspace = true }
libp2p-swarm = { workspace = true, features = ["macros"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"]}

[lints]
workspace = true

[features]
default = ["rand"]
rand = ["dep:rand"]
37 changes: 21 additions & 16 deletions protocols/autonatv2/src/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{global_only::IpExt, request_response::DialRequest};
use super::handler::{
dial_back,
dial_request::{self, StatusUpdate},
Handler, TestEnd,
TestEnd,
};

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -89,7 +89,7 @@ impl<R> NetworkBehaviour for Behaviour<R>
where
R: RngCore + 'static,
{
type ConnectionHandler = Handler;
type ConnectionHandler = Either<dial_request::Handler, dial_back::Handler>;

type ToSwarm = StatusUpdate;

Expand Down Expand Up @@ -123,9 +123,7 @@ where
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
if !self.already_tested.contains(addr) {
*self.address_candidates.entry(addr.clone()).or_default() += 1;
}
*self.address_candidates.entry(addr.clone()).or_default() += 1;
}
FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr }) => {
self.address_candidates.remove(addr);
Expand All @@ -144,7 +142,6 @@ where
connection_id,
..
}) => {
tracing::trace!("connection with {peer_id:?} closed");
self.handle_no_connection(peer_id, connection_id);
}
FromSwarm::DialFailure(DialFailure {
Expand All @@ -163,24 +160,21 @@ where
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: <Handler as ConnectionHandler>::ToBehaviour,
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
) {
if matches!(event, Either::Left(_)) {
self.peers_to_handlers
.entry(peer_id)
.or_insert(connection_id);
}
match event {
Either::Right(Ok(nonce)) => {
Either::Right(nonce) => {
if self.pending_nonces.remove(&nonce) {
tracing::trace!("Received pending nonce from {peer_id:?}");
} else {
tracing::warn!("Received unexpected nonce from {peer_id:?}, this means that another node tried to be reachable on an address this node is reachable on.");
}
}
Either::Right(Err(err)) => {
tracing::debug!("Dial back failed: {:?}", err);
}
Either::Left(dial_request::ToBehaviour::PeerHasServerSupport) => {
if !self.known_servers.contains(&peer_id) {
self.known_servers.push(peer_id);
Expand Down Expand Up @@ -228,7 +222,8 @@ where
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, <Handler as ConnectionHandler>::FromBehaviour>> {
) -> Poll<ToSwarm<Self::ToSwarm, <Self::ConnectionHandler as ConnectionHandler>::FromBehaviour>>
{
let pending_event = self.poll_pending_events();
if pending_event.is_ready() {
return pending_event;
Expand All @@ -237,12 +232,19 @@ where
&& !self.known_servers.is_empty()
&& !self.address_candidates.is_empty()
{
let mut entries = self.address_candidates.drain().collect::<Vec<_>>();
let mut entries = self
.address_candidates
.iter()
.filter(|(addr, _)| !self.already_tested.contains(addr))
.collect::<Vec<_>>();
if entries.is_empty() {
return Poll::Pending;
}
entries.sort_unstable_by_key(|(_, count)| *count);
let addrs = entries
.into_iter()
.rev()
.map(|(addr, _)| addr)
.map(|(addr, _)| addr.clone())
.take(self.config.max_addrs_count)
.collect::<Vec<_>>();
self.already_tested.extend(addrs.iter().cloned());
Expand Down Expand Up @@ -296,7 +298,7 @@ where
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::One(*conn_id),
event: Either::Left(dial_request::FromBehaviour::PerformRequest(req)),
event: Either::Left(req),
});
} else {
tracing::debug!(
Expand All @@ -317,7 +319,10 @@ where
fn poll_pending_events(
&mut self,
) -> Poll<
ToSwarm<<Self as NetworkBehaviour>::ToSwarm, <Handler as ConnectionHandler>::FromBehaviour>,
ToSwarm<
<Self as NetworkBehaviour>::ToSwarm,
<<Self as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::FromBehaviour,
>,
> {
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
Expand Down
3 changes: 0 additions & 3 deletions protocols/autonatv2/src/client/handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub(crate) mod dial_back;
pub(crate) mod dial_request;

use either::Either;
use std::{
fmt::{Display, Formatter},
sync::Arc,
Expand All @@ -15,8 +14,6 @@ use self::dial_request::InternalError;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONCURRENT_REQUESTS: usize = 10;

pub(crate) type Handler = Either<dial_request::Handler, dial_back::Handler>;

#[derive(Clone, Debug)]
pub struct Error {
pub(crate) internal: Arc<InternalError>,
Expand Down
28 changes: 12 additions & 16 deletions protocols/autonatv2/src/client/handler/dial_back.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
convert::identity,
io,
task::{Context, Poll},
};
Expand All @@ -11,13 +10,12 @@ use libp2p_swarm::{
handler::{ConnectionEvent, FullyNegotiatedInbound, ListenUpgradeError},
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
};
use void::Void;

use crate::{request_response::DialBack, Nonce};
use crate::{request_response::DialBack, Nonce, DIAL_BACK_PROTOCOL_NAME};

use super::{DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS};

pub(crate) type ToBehaviour = io::Result<Nonce>;

pub struct Handler {
inbound: FuturesSet<io::Result<Nonce>>,
}
Expand All @@ -31,15 +29,15 @@ impl Handler {
}

impl ConnectionHandler for Handler {
type FromBehaviour = ();
type ToBehaviour = ToBehaviour;
type FromBehaviour = Void;
type ToBehaviour = Nonce;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundProtocol = DeniedUpgrade;
type InboundOpenInfo = ();
type OutboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(crate::DIAL_BACK_UPGRADE, ())
SubstreamProtocol::new(ReadyUpgrade::new(DIAL_BACK_PROTOCOL_NAME), ())
}

fn poll(
Expand All @@ -49,11 +47,13 @@ impl ConnectionHandler for Handler {
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
if let Poll::Ready(result) = self.inbound.poll_unpin(cx) {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
result
.map_err(|timeout| io::Error::new(io::ErrorKind::TimedOut, timeout))
.and_then(identity),
));
match result {
Ok(Ok(nonce)) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(nonce))
}
Ok(Err(err)) => tracing::debug!("Dial back handler failed with: {err:?}"),
Err(err) => tracing::debug!("Dial back handler timed out with: {err:?}"),
}
}
Poll::Pending
}
Expand Down Expand Up @@ -83,10 +83,6 @@ impl ConnectionHandler for Handler {
_ => {}
}
}

fn connection_keep_alive(&self) -> bool {
false
}
}

async fn perform_dial_back(mut stream: impl AsyncRead + AsyncWrite + Unpin) -> io::Result<u64> {
Expand Down
42 changes: 17 additions & 25 deletions protocols/autonatv2/src/client/handler/dial_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
DialDataRequest, DialDataResponse, DialRequest, DialResponse, Request, Response,
DATA_FIELD_LEN_UPPER_BOUND, DATA_LEN_LOWER_BOUND, DATA_LEN_UPPER_BOUND,
},
REQUEST_PROTOCOL_NAME, REQUEST_UPGRADE,
REQUEST_PROTOCOL_NAME,
};

use super::{DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS};
Expand Down Expand Up @@ -89,11 +89,6 @@ pub struct StatusUpdate {
pub result: Result<(), crate::client::handler::Error>,
}

#[derive(Debug)]
pub enum FromBehaviour {
PerformRequest(DialRequest),
}

pub struct Handler {
queued_events: VecDeque<
ConnectionHandlerEvent<
Expand Down Expand Up @@ -134,7 +129,7 @@ impl Handler {
self.queued_streams.push_back(tx);
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(REQUEST_UPGRADE, ()),
protocol: SubstreamProtocol::new(ReadyUpgrade::new(REQUEST_PROTOCOL_NAME), ()),
});
if self
.outbound
Expand All @@ -152,16 +147,11 @@ impl Handler {
}

impl ConnectionHandler for Handler {
type FromBehaviour = FromBehaviour;

type FromBehaviour = DialRequest;
type ToBehaviour = ToBehaviour;

type InboundProtocol = DeniedUpgrade;

type OutboundProtocol = ReadyUpgrade<StreamProtocol>;

type InboundOpenInfo = ();

type OutboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
Expand Down Expand Up @@ -196,11 +186,7 @@ impl ConnectionHandler for Handler {
}

fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event {
FromBehaviour::PerformRequest(req) => {
self.perform_request(req);
}
}
self.perform_request(event);
}

fn on_connection_event(
Expand Down Expand Up @@ -272,6 +258,7 @@ async fn start_substream_handle(
let mut data_amount = 0;
let mut checked_addr_idx = None;
let addrs = dial_request.addrs.clone();
assert_ne!(addrs, vec![]);
let res = handle_substream(
dial_request,
substream,
Expand All @@ -298,10 +285,8 @@ async fn handle_substream(
checked_addr_idx: &mut Option<usize>,
) -> Result<TestEnd, InternalError> {
let mut coder = Coder::new(substream);
coder
.send_request(Request::Dial(dial_request.clone()))
.await?;
match coder.next_response().await? {
coder.send(Request::Dial(dial_request.clone())).await?;
match coder.next().await? {
Response::Data(DialDataRequest {
addr_idx,
num_bytes,
Expand All @@ -326,7 +311,7 @@ async fn handle_substream(
}
*checked_addr_idx = Some(addr_idx);
send_aap_data(&mut coder, num_bytes, data_amount).await?;
if let Response::Dial(dial_response) = coder.next_response().await? {
if let Response::Dial(dial_response) = coder.next().await? {
*checked_addr_idx = Some(dial_response.addr_idx);
coder.close().await?;
test_end_from_dial_response(dial_request, dial_response)
Expand Down Expand Up @@ -396,10 +381,17 @@ where
.take(count_full)
.chain(once(partial_len))
.filter(|e| *e > 0)
.map(|data_count| (data_count, Request::Data(DialDataResponse { data_count })))
.map(|data_count| {
(
data_count,
Request::Data(
DialDataResponse::new(data_count).expect("data count is unexpectedly too big"),
),
)
})
{
*data_amount += data_count;
substream.send_request(req).await?;
substream.send(req).await?;
}
Ok(())
}
7 changes: 1 addition & 6 deletions protocols/autonatv2/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_swarm::StreamProtocol;

pub mod client;
Expand All @@ -11,9 +10,5 @@ pub(crate) const REQUEST_PROTOCOL_NAME: StreamProtocol =
StreamProtocol::new("/libp2p/autonat/2/dial-request");
pub(crate) const DIAL_BACK_PROTOCOL_NAME: StreamProtocol =
StreamProtocol::new("/libp2p/autonat/2/dial-back");
pub(crate) const REQUEST_UPGRADE: ReadyUpgrade<StreamProtocol> =
ReadyUpgrade::new(REQUEST_PROTOCOL_NAME);
pub(crate) const DIAL_BACK_UPGRADE: ReadyUpgrade<StreamProtocol> =
ReadyUpgrade::new(DIAL_BACK_PROTOCOL_NAME);

pub type Nonce = u64;
type Nonce = u64;
Loading

0 comments on commit da2a790

Please sign in to comment.