diff --git a/include/libp2p/protocol/ping.hpp b/include/libp2p/protocol/ping.hpp index c520790..29bb150 100644 --- a/include/libp2p/protocol/ping.hpp +++ b/include/libp2p/protocol/ping.hpp @@ -15,7 +15,7 @@ namespace boost::asio { } // namespace boost::asio namespace libp2p::connection { - class CapableConnection; + struct CapableConnection; } // namespace libp2p::connection namespace libp2p::crypto::random { @@ -68,8 +68,23 @@ namespace libp2p::protocol { void start(); + /** + * Performs a single ping on the given connection. + * Opens a new stream for the ping. + * @param conn Connection to ping. + * @param timeout Timeout for the ping operation. + * @return RTT of the ping. + */ + CoroOutcome ping( + std::shared_ptr conn, + std::chrono::milliseconds timeout); + private: - Coro ping(std::shared_ptr connection); + Coro pingLoop(std::shared_ptr connection); + + CoroOutcome ping( + std::shared_ptr stream, + std::chrono::milliseconds timeout); std::shared_ptr io_context_; std::shared_ptr host_; diff --git a/src/protocol/ping.cpp b/src/protocol/ping.cpp index f6036f1..f99090d 100644 --- a/src/protocol/ping.cpp +++ b/src/protocol/ping.cpp @@ -58,7 +58,7 @@ namespace libp2p::protocol { WEAK_LOCK(connection); WEAK_LOCK(self); coroSpawn(*self->io_context_, [self, connection]() -> Coro { - co_await self->ping(connection); + co_await self->pingLoop(connection); }); }; on_peer_connected_sub_ = @@ -67,7 +67,7 @@ namespace libp2p::protocol { .subscribe(on_peer_connected); } - Coro Ping::ping( + Coro Ping::pingLoop( std::shared_ptr connection) { co_await coroYield(); boost::asio::steady_timer timer{*io_context_}; @@ -81,31 +81,60 @@ namespace libp2p::protocol { } stream = stream_result.value(); } - PingMessage message; - random_->fillRandomly(message); - timer.expires_after(config_.timeout); - timer.async_wait([stream](boost::system::error_code ec) { - if (not ec) { - stream->reset(); - } - }); - auto r = co_await write(stream, message); - if (r.has_value()) { - PingMessage reply; - r = co_await read(stream, reply); - if (r.has_value()) { - if (reply != message) { - r = Error::INVALID_RESPONSE; - } - } - } + auto r = co_await ping( + stream, + std::chrono::duration_cast(config_.timeout)); if (not r.has_value()) { stream->reset(); stream.reset(); } - timer.cancel(); timer.expires_after(config_.interval); co_await timer.async_wait(boost::asio::use_awaitable); } } + + CoroOutcome Ping::ping( + std::shared_ptr conn, + std::chrono::milliseconds timeout) { + auto stream_result = co_await host_->newStream(conn, getProtocolIds()); + if (not stream_result.has_value()) { + co_return stream_result.error(); + } + auto stream = stream_result.value(); + auto res = co_await ping(stream, timeout); + stream->close(); + co_return res; + } + + CoroOutcome Ping::ping( + std::shared_ptr stream, + std::chrono::milliseconds timeout) { + PingMessage message; + random_->fillRandomly(message); + boost::asio::steady_timer timer{*io_context_}; + timer.expires_after(timeout); + timer.async_wait([stream](boost::system::error_code ec) { + if (not ec) { + stream->reset(); + } + }); + auto start = std::chrono::steady_clock::now(); + auto r = co_await write(stream, message); + if (r.has_value()) { + PingMessage reply; + r = co_await read(stream, reply); + if (r.has_value()) { + if (reply != message) { + r = Error::INVALID_RESPONSE; + } + } + } + auto end = std::chrono::steady_clock::now(); + timer.cancel(); + if (r.has_value()) { + co_return std::chrono::duration_cast(end - + start); + } + co_return r.error(); + } } // namespace libp2p::protocol