From d8b4fb65b8b1d0e7eaae8afb0af824ca27200019 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 23 Dec 2025 14:09:00 +0300 Subject: [PATCH 1/4] feat: add in-memory RTT repository implementation --- include/libp2p/injector/host_injector.hpp | 2 + include/libp2p/peer/peer_repository.hpp | 11 +++- include/libp2p/peer/rtt_repository.hpp | 47 ++++++++++++++++ .../rtt_repository/inmem_rtt_repository.hpp | 35 ++++++++++++ src/CMakeLists.txt | 1 + src/peer/CMakeLists.txt | 1 + src/peer/peer_repository.cpp | 13 ++++- src/peer/rtt_repository/CMakeLists.txt | 13 +++++ .../rtt_repository/inmem_rtt_repository.cpp | 53 +++++++++++++++++++ 9 files changed, 173 insertions(+), 3 deletions(-) create mode 100644 include/libp2p/peer/rtt_repository.hpp create mode 100644 include/libp2p/peer/rtt_repository/inmem_rtt_repository.hpp create mode 100644 src/peer/rtt_repository/CMakeLists.txt create mode 100644 src/peer/rtt_repository/inmem_rtt_repository.cpp diff --git a/include/libp2p/injector/host_injector.hpp b/include/libp2p/injector/host_injector.hpp index f960593..c72b614 100644 --- a/include/libp2p/injector/host_injector.hpp +++ b/include/libp2p/injector/host_injector.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -98,6 +99,7 @@ namespace libp2p::injector { di::bind.to(), di::bind.to(), di::bind.to(), + di::bind.to(), di::bind.to(), di::bind.to(), di::bind.to(), diff --git a/include/libp2p/peer/peer_repository.hpp b/include/libp2p/peer/peer_repository.hpp index d43c722..3412a58 100644 --- a/include/libp2p/peer/peer_repository.hpp +++ b/include/libp2p/peer/peer_repository.hpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace libp2p::peer { /** @@ -23,7 +24,8 @@ namespace libp2p::peer { public: PeerRepository(std::shared_ptr addrRepo, std::shared_ptr keyRepo, - std::shared_ptr protocolRepo); + std::shared_ptr protocolRepo, + std::shared_ptr rttRepo); /** * @brief Getter for an address repository. * @return associated instance of an address repository. @@ -42,6 +44,12 @@ namespace libp2p::peer { */ ProtocolRepository &getProtocolRepository(); + /** + * @brief Getter for an RTT repository. + * @return associated instance of an RTT repository. + */ + RttRepository &getRttRepository(); + /** * @brief Returns set of peer ids known by this peer repository. * @return unordered set of peers @@ -61,5 +69,6 @@ namespace libp2p::peer { std::shared_ptr addr_; std::shared_ptr key_; std::shared_ptr proto_; + std::shared_ptr rtt_; }; } // namespace libp2p::peer diff --git a/include/libp2p/peer/rtt_repository.hpp b/include/libp2p/peer/rtt_repository.hpp new file mode 100644 index 0000000..aa7170d --- /dev/null +++ b/include/libp2p/peer/rtt_repository.hpp @@ -0,0 +1,47 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +#include +#include + +namespace libp2p::peer { + + /** + * @brief Repository to store Round Trip Time (RTT) for peers. + * It calculates smoothed RTT using EWMA. + */ + class RttRepository : public basic::GarbageCollectable { + public: + virtual ~RttRepository() = default; + + /** + * @brief Update RTT for a peer. + * @param p PeerId + * @param rtt Measured RTT + */ + virtual void updateRtt(const PeerId &p, std::chrono::microseconds rtt) = 0; + + /** + * @brief Get smoothed RTT for a peer. + * @param p PeerId + * @return Smoothed RTT if available, std::nullopt otherwise. + */ + virtual std::optional getRtt( + const PeerId &p) const = 0; + + /** + * @brief Remove RTT information for a peer. + * @param p PeerId + */ + virtual void clear(const PeerId &p) = 0; + }; + +} // namespace libp2p::peer diff --git a/include/libp2p/peer/rtt_repository/inmem_rtt_repository.hpp b/include/libp2p/peer/rtt_repository/inmem_rtt_repository.hpp new file mode 100644 index 0000000..3a13a41 --- /dev/null +++ b/include/libp2p/peer/rtt_repository/inmem_rtt_repository.hpp @@ -0,0 +1,35 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include + +namespace libp2p::peer { + + class InmemRttRepository : public RttRepository { + public: + void updateRtt(const PeerId &p, std::chrono::microseconds rtt) override; + + std::optional getRtt( + const PeerId &p) const override; + + void clear(const PeerId &p) override; + + void collectGarbage() override; + + private: + struct PeerRtt { + std::chrono::microseconds value; + std::chrono::steady_clock::time_point last_updated; + }; + + std::unordered_map rtt_map_; + }; + +} // namespace libp2p::peer diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 158598f..372278e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -41,6 +41,7 @@ target_link_libraries(libp2p p2p_inmem_address_repository p2p_inmem_key_repository p2p_inmem_protocol_repository + p2p_inmem_rtt_repository p2p_key_validator p2p_listener_manager p2p_literals diff --git a/src/peer/CMakeLists.txt b/src/peer/CMakeLists.txt index f77815d..5a847f7 100644 --- a/src/peer/CMakeLists.txt +++ b/src/peer/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(address_repository) add_subdirectory(key_repository) add_subdirectory(protocol_repository) +add_subdirectory(rtt_repository) libp2p_add_library(p2p_peer_errors errors.cpp diff --git a/src/peer/peer_repository.cpp b/src/peer/peer_repository.cpp index 563c785..6e80bcb 100644 --- a/src/peer/peer_repository.cpp +++ b/src/peer/peer_repository.cpp @@ -23,13 +23,16 @@ namespace libp2p::peer { PeerRepository::PeerRepository( std::shared_ptr addr_repo, std::shared_ptr key_repo, - std::shared_ptr protocol_repo) + std::shared_ptr protocol_repo, + std::shared_ptr rtt_repo) : addr_(std::move(addr_repo)), key_(std::move(key_repo)), - proto_(std::move(protocol_repo)) { + proto_(std::move(protocol_repo)), + rtt_(std::move(rtt_repo)) { BOOST_ASSERT(addr_ != nullptr); BOOST_ASSERT(key_ != nullptr); BOOST_ASSERT(proto_ != nullptr); + BOOST_ASSERT(rtt_ != nullptr); } AddressRepository &PeerRepository::getAddressRepository() { @@ -44,11 +47,17 @@ namespace libp2p::peer { return *proto_; } + RttRepository &PeerRepository::getRttRepository() { + return *rtt_; + } + std::unordered_set PeerRepository::getPeers() const { std::unordered_set peers; merge_sets(peers, addr_->getPeers()); merge_sets(peers, key_->getPeers()); merge_sets(peers, proto_->getPeers()); + // RttRepository doesn't have getPeers() yet, but it's fine. + // Usually peers in RttRepository should be in others too. return peers; } diff --git a/src/peer/rtt_repository/CMakeLists.txt b/src/peer/rtt_repository/CMakeLists.txt new file mode 100644 index 0000000..b03aeca --- /dev/null +++ b/src/peer/rtt_repository/CMakeLists.txt @@ -0,0 +1,13 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +libp2p_add_library(p2p_inmem_rtt_repository + inmem_rtt_repository.cpp + ) +target_link_libraries(p2p_inmem_rtt_repository + p2p_peer_id + ) + diff --git a/src/peer/rtt_repository/inmem_rtt_repository.cpp b/src/peer/rtt_repository/inmem_rtt_repository.cpp new file mode 100644 index 0000000..5d7db95 --- /dev/null +++ b/src/peer/rtt_repository/inmem_rtt_repository.cpp @@ -0,0 +1,53 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +namespace libp2p::peer { + + void InmemRttRepository::updateRtt(const PeerId &p, + std::chrono::microseconds rtt) { + auto now = std::chrono::steady_clock::now(); + auto it = rtt_map_.find(p); + if (it == rtt_map_.end()) { + rtt_map_.emplace(p, PeerRtt{rtt, now}); + } else { + // EWMA: SRTT = (1 - alpha) * SRTT + alpha * RTT + // alpha = 0.125 (1/8) is standard for TCP + // SRTT = (7 * SRTT + RTT) / 8 + it->second.value = (it->second.value * 7 + rtt) / 8; + it->second.last_updated = now; + } + } + + std::optional InmemRttRepository::getRtt( + const PeerId &p) const { + auto it = rtt_map_.find(p); + if (it != rtt_map_.end()) { + return it->second.value; + } + return std::nullopt; + } + + void InmemRttRepository::clear(const PeerId &p) { + rtt_map_.erase(p); + } + + void InmemRttRepository::collectGarbage() { + auto now = std::chrono::steady_clock::now(); + // Expire entries older than 1 hour + auto ttl = std::chrono::hours(1); + + for (auto it = rtt_map_.begin(); it != rtt_map_.end();) { + if (now - it->second.last_updated > ttl) { + it = rtt_map_.erase(it); + } else { + ++it; + } + } + } + +} // namespace libp2p::peer From 1dc4258d8199879b92ed509efc94597b9c974d8f Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 23 Dec 2025 14:26:01 +0300 Subject: [PATCH 2/4] fix: update RTT repository with ping results --- src/protocol/ping.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/protocol/ping.cpp b/src/protocol/ping.cpp index f99090d..275b6dc 100644 --- a/src/protocol/ping.cpp +++ b/src/protocol/ping.cpp @@ -81,12 +81,16 @@ namespace libp2p::protocol { } stream = stream_result.value(); } - auto r = co_await ping( - stream, - std::chrono::duration_cast(config_.timeout)); + auto r = + co_await ping(stream, + std::chrono::duration_cast( + config_.timeout)); if (not r.has_value()) { stream->reset(); stream.reset(); + } else { + host_->getPeerRepository().getRttRepository().updateRtt( + connection->remotePeer(), r.value()); } timer.expires_after(config_.interval); co_await timer.async_wait(boost::asio::use_awaitable); @@ -103,6 +107,10 @@ namespace libp2p::protocol { auto stream = stream_result.value(); auto res = co_await ping(stream, timeout); stream->close(); + if (res.has_value()) { + host_->getPeerRepository().getRttRepository().updateRtt( + conn->remotePeer(), res.value()); + } co_return res; } @@ -132,8 +140,8 @@ namespace libp2p::protocol { auto end = std::chrono::steady_clock::now(); timer.cancel(); if (r.has_value()) { - co_return std::chrono::duration_cast(end - - start); + co_return std::chrono::duration_cast(end + - start); } co_return r.error(); } From f565a5132ea8aff612c6183acc19bc8229b1503d Mon Sep 17 00:00:00 2001 From: kamilsa Date: Tue, 23 Dec 2025 15:35:54 +0300 Subject: [PATCH 3/4] feat: add soon config to sort peers by rtt --- include/libp2p/protocol/gossip/config.hpp | 2 ++ src/protocol/gossip/gossip.cpp | 31 +++++++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/include/libp2p/protocol/gossip/config.hpp b/include/libp2p/protocol/gossip/config.hpp index f61e7f5..aa9af00 100644 --- a/include/libp2p/protocol/gossip/config.hpp +++ b/include/libp2p/protocol/gossip/config.hpp @@ -192,6 +192,8 @@ namespace libp2p::protocol::gossip { size_t opportunistic_graft_ticks = 60; size_t opportunistic_graft_peers = 2; + bool soon = false; + ScoreConfig score; }; } // namespace libp2p::protocol::gossip diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 9ff8dfa..8967e5c 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -5,6 +5,7 @@ */ #include +#include #include #include #include @@ -729,8 +730,34 @@ namespace libp2p::protocol::gossip { add_peer(peer); } } - for (auto &peer : topic.mesh_peers_) { - add_peer(peer); + if (config_.soon) { + auto mesh_n = config_.mesh_n_for_topic(topic.topic_hash_); + if (topic.mesh_peers_.size() <= mesh_n) { + for (auto &peer : topic.mesh_peers_) { + add_peer(peer); + } + } else { + std::vector sorted_mesh_peers(topic.mesh_peers_.begin(), + topic.mesh_peers_.end()); + auto &rtt_repo = host_->getPeerRepository().getRttRepository(); + std::sort(sorted_mesh_peers.begin(), sorted_mesh_peers.end(), + [&](const PeerPtr &a, const PeerPtr &b) { + auto rtt_a = rtt_repo.getRtt(a->peer_id_); + auto rtt_b = rtt_repo.getRtt(b->peer_id_); + auto val_a = + rtt_a.value_or(std::chrono::microseconds::max()); + auto val_b = + rtt_b.value_or(std::chrono::microseconds::max()); + return val_a < val_b; + }); + for (size_t i = 0; i < mesh_n; ++i) { + add_peer(sorted_mesh_peers[i]); + } + } + } else { + for (auto &peer : topic.mesh_peers_) { + add_peer(peer); + } } if (publish) { if (auto more = From f4ad9c4f04b28670463bdd0296fd849d33efffad Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 24 Dec 2025 11:16:38 +0300 Subject: [PATCH 4/4] feat: update soon configuration to use soon_delta for peer propagation --- include/libp2p/protocol/gossip/config.hpp | 5 ++++- src/protocol/gossip/gossip.cpp | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/include/libp2p/protocol/gossip/config.hpp b/include/libp2p/protocol/gossip/config.hpp index aa9af00..3092072 100644 --- a/include/libp2p/protocol/gossip/config.hpp +++ b/include/libp2p/protocol/gossip/config.hpp @@ -192,7 +192,10 @@ namespace libp2p::protocol::gossip { size_t opportunistic_graft_ticks = 60; size_t opportunistic_graft_peers = 2; - bool soon = false; + /// Number of peers from the mesh to which we propagate messages. + /// Peers are selected by the smallest RTT. + /// If -1, propagate to all peers in the mesh. + int soon_delta = -1; ScoreConfig score; }; diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 8967e5c..13786e7 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -730,9 +730,9 @@ namespace libp2p::protocol::gossip { add_peer(peer); } } - if (config_.soon) { - auto mesh_n = config_.mesh_n_for_topic(topic.topic_hash_); - if (topic.mesh_peers_.size() <= mesh_n) { + if (config_.soon_delta >= 0) { + if (topic.mesh_peers_.size() + <= static_cast(config_.soon_delta)) { for (auto &peer : topic.mesh_peers_) { add_peer(peer); } @@ -750,7 +750,7 @@ namespace libp2p::protocol::gossip { rtt_b.value_or(std::chrono::microseconds::max()); return val_a < val_b; }); - for (size_t i = 0; i < mesh_n; ++i) { + for (int i = 0; i < config_.soon_delta; ++i) { add_peer(sorted_mesh_peers[i]); } }