Skip to content

Commit

Permalink
Delay and use Result
Browse files Browse the repository at this point in the history
  • Loading branch information
umgefahren committed Dec 30, 2023
1 parent 88b1092 commit 4d96c33
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 41 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/autonat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bytes = { version = "1", optional = true }
either = { version = "1.9.0", optional = true }
futures = "0.3"
futures-bounded = { workspace = true, optional = true }
futures-time = "3"
futures-timer = "3.0"
instant = "0.1"
libp2p-core = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions protocols/autonat/src/v1/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
NonZeroU8::new(1).expect("1 > 0"),
)
.addresses(addrs)
.allocate_new_port()
.build(),
},
])
Expand Down
1 change: 1 addition & 0 deletions protocols/autonat/src/v2/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ where
}

// FIXME: We don't want test-only APIs in our public API.
#[doc(hidden)]
pub fn validate_addr(&mut self, addr: &Multiaddr) {
if let Some(info) = self.address_candidates.get_mut(addr) {
info.status = TestStatus::Received(self.rng.next_u64());
Expand Down
22 changes: 0 additions & 22 deletions protocols/autonat/src/v2/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ mod tests {
mod_Message::OneOfmsg, DialDataResponse as GenDialDataResponse, Message,
};
use crate::v2::protocol::{Coder, DialDataResponse, Request};
use futures::io::Cursor;

use rand::{thread_rng, Rng};

Expand Down Expand Up @@ -328,25 +327,4 @@ mod tests {
let buf = quick_protobuf::serialize_into_vec(&dial_back_max_nonce).unwrap();
assert!(buf.len() <= super::DIAL_BACK_MAX_SIZE);
}

#[tokio::test]
async fn write_read_request() {
let mut buf = Cursor::new(Vec::new());
let mut coder = Coder::new(&mut buf);
let mut all_req = Vec::with_capacity(100);
for _ in 0..100 {
let data_request: Request = Request::Data(DialDataResponse {
data_count: thread_rng().gen_range(0..4000),
});
all_req.push(data_request.clone());
coder.send(data_request.clone()).await.unwrap();
}
let inner = coder.inner.into_inner();
inner.set_position(0);
let mut coder = Coder::new(inner);
for i in 0..100 {
let read_data_request: Request = coder.next().await.unwrap();
assert_eq!(read_data_request, all_req[i]);
}
}
}
2 changes: 1 addition & 1 deletion protocols/autonat/src/v2/server/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where
if let Some(DialBackCommand { back_channel, .. }) =
self.dialing_dial_back.remove(&connection_id)
{
let _ = back_channel.send(DialBackStatus::DialErr);
let _ = back_channel.send(Err(DialBackStatus::DialErr));
}
}
_ => {}
Expand Down
6 changes: 3 additions & 3 deletions protocols/autonat/src/v2/server/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ConnectionHandler for Handler {
..
}) => {
if let Some(cmd) = self.requested_substream_nonce.take() {
let _ = cmd.back_channel.send(DialBackRes::DialBackErr);
let _ = cmd.back_channel.send(Err(DialBackRes::DialBackErr));
}
}
_ => {}
Expand All @@ -124,11 +124,11 @@ async fn perform_dial_back(
..
}: DialBackCommand,
) -> io::Result<()> {
futures_time::task::sleep(futures_time::time::Duration::from_millis(100)).await;
let res = dial_back(stream, nonce)
.await
.map_err(|_| DialBackRes::DialBackErr)
.map(|_| DialBackRes::Ok)
.unwrap_or_else(|e| e);
.map(|_| ());
back_channel
.send(res)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "send error"))?;
Expand Down
24 changes: 13 additions & 11 deletions protocols/autonat/src/v2/server/handler/dial_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ pub(crate) enum DialBackStatus {
DialErr,
/// Failure during dial back
DialBackErr,
Ok,
}

#[derive(Debug)]
pub struct DialBackCommand {
pub(crate) addr: Multiaddr,
pub(crate) nonce: Nonce,
pub(crate) back_channel: oneshot::Sender<DialBackStatus>,
pub(crate) back_channel: oneshot::Sender<Result<(), DialBackStatus>>,
}

pub struct Handler<R> {
Expand Down Expand Up @@ -172,7 +171,10 @@ enum HandleFail {
InternalError(usize),
RequestRejected,
DialRefused,
DialBack { idx: usize, err: DialBackStatus },
DialBack {
idx: usize,
result: Result<(), DialBackStatus>,
},
}

impl From<HandleFail> for DialResponse {
Expand All @@ -193,13 +195,13 @@ impl From<HandleFail> for DialResponse {
addr_idx: 0,
dial_status: DialStatus::UNUSED,
},
HandleFail::DialBack { idx, err } => Self {
HandleFail::DialBack { idx, result } => Self {
status: ResponseStatus::OK,
addr_idx: idx,
dial_status: match err {
DialBackStatus::DialErr => DialStatus::E_DIAL_ERROR,
DialBackStatus::DialBackErr => DialStatus::E_DIAL_BACK_ERROR,
DialBackStatus::Ok => DialStatus::OK,
dial_status: match result {
Err(DialBackStatus::DialErr) => DialStatus::E_DIAL_ERROR,
Err(DialBackStatus::DialBackErr) => DialStatus::E_DIAL_BACK_ERROR,
Ok(()) => DialStatus::OK,
},
},
}
Expand Down Expand Up @@ -267,14 +269,14 @@ where
.await
.map_err(|_| HandleFail::DialBack {
idx,
err: DialBackStatus::DialErr,
result: Err(DialBackStatus::DialErr),
})?;

let dial_back = rx.await.map_err(|_e| HandleFail::InternalError(idx))?;
if dial_back != DialBackStatus::Ok {
if let Err(err) = dial_back {
return Err(HandleFail::DialBack {
idx,
err: dial_back,
result: Err(err),
});
}
Ok(DialResponse {
Expand Down
11 changes: 7 additions & 4 deletions protocols/autonat/tests/autonatv2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ async fn confirm_successful() {
_ => None,
})
.await;
assert_eq!(tested_addr, alice_bob_external_addrs.get(0).cloned());
assert_eq!(
tested_addr,
alice_bob_external_addrs.get(0).cloned().unwrap()
);
assert_eq!(bytes_sent, 0);
assert_eq!(server, cor_server_peer);
assert!(result.is_ok(), "Result is {result:?}");
Expand Down Expand Up @@ -215,7 +218,7 @@ async fn dial_back_to_unsupported_protocol() {
let data_amount = bob
.wait(|event| match event {
SwarmEvent::Behaviour(CombinedClientEvent::Autonat(client::Event {
tested_addr: Some(tested_addr),
tested_addr,
bytes_sent,
server,
result: Err(_),
Expand Down Expand Up @@ -310,7 +313,7 @@ async fn dial_back_to_non_libp2p() {
let data_amount = bob
.wait(|event| match event {
SwarmEvent::Behaviour(CombinedClientEvent::Autonat(client::Event {
tested_addr: Some(tested_addr),
tested_addr,
bytes_sent,
server,
result: Err(_),
Expand Down Expand Up @@ -409,7 +412,7 @@ async fn dial_back_to_not_supporting() {
let bytes_sent = bob
.wait(|event| match event {
SwarmEvent::Behaviour(CombinedClientEvent::Autonat(client::Event {
tested_addr: Some(tested_addr),
tested_addr,
bytes_sent,
server,
result: Err(_),
Expand Down

0 comments on commit 4d96c33

Please sign in to comment.