Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions include/libp2p/protocol/ping.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace boost::asio {
} // namespace boost::asio

namespace libp2p::connection {
class CapableConnection;
struct CapableConnection;
} // namespace libp2p::connection

namespace libp2p::crypto::random {
Expand Down Expand Up @@ -68,8 +68,23 @@ namespace libp2p::protocol {

void start();

/**
* Performs a single ping on the given connection.
* Opens a new stream for the ping.
* @param conn Connection to ping.
* @param timeout Timeout for the ping operation.
* @return RTT of the ping.
*/
CoroOutcome<std::chrono::microseconds> ping(
std::shared_ptr<connection::CapableConnection> conn,
std::chrono::milliseconds timeout);

private:
Coro<void> ping(std::shared_ptr<connection::CapableConnection> connection);
Coro<void> pingLoop(std::shared_ptr<connection::CapableConnection> connection);

CoroOutcome<std::chrono::microseconds> ping(
std::shared_ptr<connection::Stream> stream,
std::chrono::milliseconds timeout);

std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<host::BasicHost> host_;
Expand Down
71 changes: 50 additions & 21 deletions src/protocol/ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace libp2p::protocol {
WEAK_LOCK(connection);
WEAK_LOCK(self);
coroSpawn(*self->io_context_, [self, connection]() -> Coro<void> {
co_await self->ping(connection);
co_await self->pingLoop(connection);
});
};
on_peer_connected_sub_ =
Expand All @@ -67,7 +67,7 @@ namespace libp2p::protocol {
.subscribe(on_peer_connected);
}

Coro<void> Ping::ping(
Coro<void> Ping::pingLoop(
std::shared_ptr<connection::CapableConnection> connection) {
co_await coroYield();
boost::asio::steady_timer timer{*io_context_};
Expand All @@ -81,31 +81,60 @@ namespace libp2p::protocol {
}
stream = stream_result.value();
}
PingMessage message;
random_->fillRandomly(message);
timer.expires_after(config_.timeout);
timer.async_wait([stream](boost::system::error_code ec) {
if (not ec) {
stream->reset();
}
});
auto r = co_await write(stream, message);
if (r.has_value()) {
PingMessage reply;
r = co_await read(stream, reply);
if (r.has_value()) {
if (reply != message) {
r = Error::INVALID_RESPONSE;
}
}
}
auto r = co_await ping(
stream,
std::chrono::duration_cast<std::chrono::milliseconds>(config_.timeout));
if (not r.has_value()) {
stream->reset();
stream.reset();
}
timer.cancel();
timer.expires_after(config_.interval);
co_await timer.async_wait(boost::asio::use_awaitable);
}
}

CoroOutcome<std::chrono::microseconds> Ping::ping(
std::shared_ptr<connection::CapableConnection> conn,
std::chrono::milliseconds timeout) {
auto stream_result = co_await host_->newStream(conn, getProtocolIds());
if (not stream_result.has_value()) {
co_return stream_result.error();
}
auto stream = stream_result.value();
auto res = co_await ping(stream, timeout);
stream->close();
co_return res;
}

CoroOutcome<std::chrono::microseconds> Ping::ping(
std::shared_ptr<connection::Stream> stream,
std::chrono::milliseconds timeout) {
PingMessage message;
random_->fillRandomly(message);
boost::asio::steady_timer timer{*io_context_};
timer.expires_after(timeout);
timer.async_wait([stream](boost::system::error_code ec) {
if (not ec) {
stream->reset();
}
});
auto start = std::chrono::steady_clock::now();
auto r = co_await write(stream, message);
if (r.has_value()) {
PingMessage reply;
r = co_await read(stream, reply);
if (r.has_value()) {
if (reply != message) {
r = Error::INVALID_RESPONSE;
}
}
}
auto end = std::chrono::steady_clock::now();
timer.cancel();
if (r.has_value()) {
co_return std::chrono::duration_cast<std::chrono::microseconds>(end -
start);
}
co_return r.error();
}
} // namespace libp2p::protocol
Loading