Skip to content
Open

SOON #18

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/libp2p/injector/host_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <libp2p/peer/key_repository/inmem_key_repository.hpp>
#include <libp2p/peer/peer_repository.hpp>
#include <libp2p/peer/protocol_repository/inmem_protocol_repository.hpp>
#include <libp2p/peer/rtt_repository/inmem_rtt_repository.hpp>
#include <libp2p/protocol/gossip/config.hpp>
#include <libp2p/protocol_muxer/multiselect.hpp>

Expand Down Expand Up @@ -98,6 +99,7 @@ namespace libp2p::injector {
di::bind<peer::KeyRepository>.to<peer::InmemKeyRepository>(),
di::bind<peer::AddressRepository>.to<peer::InmemAddressRepository>(),
di::bind<peer::ProtocolRepository>.to<peer::InmemProtocolRepository>(),
di::bind<peer::RttRepository>.to<peer::InmemRttRepository>(),
di::bind<protocol_muxer::ProtocolMuxer>.to<protocol_muxer::multiselect::Multiselect>(),
di::bind<crypto::validator::KeyValidator>.to<crypto::validator::KeyValidatorImpl>(),
di::bind<crypto::CryptoProvider>.to<crypto::CryptoProviderImpl>(),
Expand Down
11 changes: 10 additions & 1 deletion include/libp2p/peer/peer_repository.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <libp2p/peer/peer_id.hpp>
#include <libp2p/peer/peer_info.hpp>
#include <libp2p/peer/protocol_repository.hpp>
#include <libp2p/peer/rtt_repository.hpp>

namespace libp2p::peer {
/**
Expand All @@ -23,7 +24,8 @@ namespace libp2p::peer {
public:
PeerRepository(std::shared_ptr<AddressRepository> addrRepo,
std::shared_ptr<KeyRepository> keyRepo,
std::shared_ptr<ProtocolRepository> protocolRepo);
std::shared_ptr<ProtocolRepository> protocolRepo,
std::shared_ptr<RttRepository> rttRepo);
/**
* @brief Getter for an address repository.
* @return associated instance of an address repository.
Expand All @@ -42,6 +44,12 @@ namespace libp2p::peer {
*/
ProtocolRepository &getProtocolRepository();

/**
* @brief Getter for an RTT repository.
* @return associated instance of an RTT repository.
*/
RttRepository &getRttRepository();

/**
* @brief Returns set of peer ids known by this peer repository.
* @return unordered set of peers
Expand All @@ -61,5 +69,6 @@ namespace libp2p::peer {
std::shared_ptr<AddressRepository> addr_;
std::shared_ptr<KeyRepository> key_;
std::shared_ptr<ProtocolRepository> proto_;
std::shared_ptr<RttRepository> rtt_;
};
} // namespace libp2p::peer
47 changes: 47 additions & 0 deletions include/libp2p/peer/rtt_repository.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <chrono>
#include <optional>

#include <libp2p/basic/garbage_collectable.hpp>
#include <libp2p/peer/peer_id.hpp>

namespace libp2p::peer {

/**
* @brief Repository to store Round Trip Time (RTT) for peers.
* It calculates smoothed RTT using EWMA.
*/
class RttRepository : public basic::GarbageCollectable {
public:
virtual ~RttRepository() = default;

/**
* @brief Update RTT for a peer.
* @param p PeerId
* @param rtt Measured RTT
*/
virtual void updateRtt(const PeerId &p, std::chrono::microseconds rtt) = 0;

/**
* @brief Get smoothed RTT for a peer.
* @param p PeerId
* @return Smoothed RTT if available, std::nullopt otherwise.
*/
virtual std::optional<std::chrono::microseconds> getRtt(
const PeerId &p) const = 0;

/**
* @brief Remove RTT information for a peer.
* @param p PeerId
*/
virtual void clear(const PeerId &p) = 0;
};

} // namespace libp2p::peer
35 changes: 35 additions & 0 deletions include/libp2p/peer/rtt_repository/inmem_rtt_repository.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <libp2p/peer/rtt_repository.hpp>

#include <unordered_map>

namespace libp2p::peer {

class InmemRttRepository : public RttRepository {
public:
void updateRtt(const PeerId &p, std::chrono::microseconds rtt) override;

std::optional<std::chrono::microseconds> getRtt(
const PeerId &p) const override;

void clear(const PeerId &p) override;

void collectGarbage() override;

private:
struct PeerRtt {
std::chrono::microseconds value;
std::chrono::steady_clock::time_point last_updated;
};

std::unordered_map<PeerId, PeerRtt> rtt_map_;
};

} // namespace libp2p::peer
5 changes: 5 additions & 0 deletions include/libp2p/protocol/gossip/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ namespace libp2p::protocol::gossip {
size_t opportunistic_graft_ticks = 60;
size_t opportunistic_graft_peers = 2;

Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the new "soon" configuration field. This field should have a comment explaining what it does, specifically that it enables RTT-based peer selection for mesh broadcasting where peers with lower latency are preferred when the mesh size exceeds the configured mesh_n threshold.

Suggested change
/// Enables RTT-based peer selection for mesh broadcasting. When enabled,
/// peers with lower latency are preferred as mesh peers once the mesh size
/// exceeds the configured mesh_n threshold.

Copilot uses AI. Check for mistakes.
/// Number of peers from the mesh to which we propagate messages.
/// Peers are selected by the smallest RTT.
/// If -1, propagate to all peers in the mesh.
int soon_delta = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int soon_delta = -1;
std::optional<size_t> soon_delta;


ScoreConfig score;
};
} // namespace libp2p::protocol::gossip
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ target_link_libraries(libp2p
p2p_inmem_address_repository
p2p_inmem_key_repository
p2p_inmem_protocol_repository
p2p_inmem_rtt_repository
p2p_key_validator
p2p_listener_manager
p2p_literals
Expand Down
1 change: 1 addition & 0 deletions src/peer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
add_subdirectory(address_repository)
add_subdirectory(key_repository)
add_subdirectory(protocol_repository)
add_subdirectory(rtt_repository)

libp2p_add_library(p2p_peer_errors
errors.cpp
Expand Down
13 changes: 11 additions & 2 deletions src/peer/peer_repository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ namespace libp2p::peer {
PeerRepository::PeerRepository(
std::shared_ptr<AddressRepository> addr_repo,
std::shared_ptr<KeyRepository> key_repo,
std::shared_ptr<ProtocolRepository> protocol_repo)
std::shared_ptr<ProtocolRepository> protocol_repo,
std::shared_ptr<RttRepository> rtt_repo)
: addr_(std::move(addr_repo)),
key_(std::move(key_repo)),
proto_(std::move(protocol_repo)) {
proto_(std::move(protocol_repo)),
rtt_(std::move(rtt_repo)) {
BOOST_ASSERT(addr_ != nullptr);
BOOST_ASSERT(key_ != nullptr);
BOOST_ASSERT(proto_ != nullptr);
BOOST_ASSERT(rtt_ != nullptr);
}

AddressRepository &PeerRepository::getAddressRepository() {
Expand All @@ -44,11 +47,17 @@ namespace libp2p::peer {
return *proto_;
}

RttRepository &PeerRepository::getRttRepository() {
return *rtt_;
}

std::unordered_set<PeerId> PeerRepository::getPeers() const {
std::unordered_set<PeerId> peers;
merge_sets<PeerId>(peers, addr_->getPeers());
merge_sets<PeerId>(peers, key_->getPeers());
merge_sets<PeerId>(peers, proto_->getPeers());
// RttRepository doesn't have getPeers() yet, but it's fine.
// Usually peers in RttRepository should be in others too.
Comment on lines +59 to +60
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment acknowledges that RttRepository doesn't have a getPeers() method, which is inconsistent with the other repository interfaces (AddressRepository, KeyRepository, ProtocolRepository). For consistency and completeness of the repository pattern, consider implementing getPeers() in the RttRepository interface and InmemRttRepository class to maintain a uniform API across all repository types.

Suggested change
// RttRepository doesn't have getPeers() yet, but it's fine.
// Usually peers in RttRepository should be in others too.
// TODO: extend RttRepository with getPeers() and merge its peers here as well.

Copilot uses AI. Check for mistakes.
return peers;
}

Expand Down
13 changes: 13 additions & 0 deletions src/peer/rtt_repository/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#
# Copyright Quadrivium LLC
# All Rights Reserved
# SPDX-License-Identifier: Apache-2.0
#

libp2p_add_library(p2p_inmem_rtt_repository
inmem_rtt_repository.cpp
)
target_link_libraries(p2p_inmem_rtt_repository
p2p_peer_id
)

53 changes: 53 additions & 0 deletions src/peer/rtt_repository/inmem_rtt_repository.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#include <libp2p/peer/rtt_repository/inmem_rtt_repository.hpp>

namespace libp2p::peer {

void InmemRttRepository::updateRtt(const PeerId &p,
std::chrono::microseconds rtt) {
auto now = std::chrono::steady_clock::now();
auto it = rtt_map_.find(p);
if (it == rtt_map_.end()) {
rtt_map_.emplace(p, PeerRtt{rtt, now});
} else {
// EWMA: SRTT = (1 - alpha) * SRTT + alpha * RTT
// alpha = 0.125 (1/8) is standard for TCP
// SRTT = (7 * SRTT + RTT) / 8
it->second.value = (it->second.value * 7 + rtt) / 8;
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EWMA calculation using integer division may lose precision. The expression (it->second.value * 7 + rtt) / 8 performs integer division on std::chrono::microseconds, which could result in rounding errors that accumulate over time. Consider using floating-point arithmetic for the calculation and then converting back to microseconds to maintain better accuracy in the smoothed RTT values.

Suggested change
it->second.value = (it->second.value * 7 + rtt) / 8;
auto srtt_us = static_cast<double>(it->second.value.count());
auto rtt_us = static_cast<double>(rtt.count());
auto new_srtt_us = (7.0 * srtt_us + rtt_us) / 8.0;
it->second.value =
std::chrono::microseconds{
static_cast<std::chrono::microseconds::rep>(new_srtt_us)};

Copilot uses AI. Check for mistakes.
it->second.last_updated = now;
}
}

std::optional<std::chrono::microseconds> InmemRttRepository::getRtt(
const PeerId &p) const {
auto it = rtt_map_.find(p);
if (it != rtt_map_.end()) {
return it->second.value;
}
return std::nullopt;
}

void InmemRttRepository::clear(const PeerId &p) {
rtt_map_.erase(p);
}

void InmemRttRepository::collectGarbage() {
auto now = std::chrono::steady_clock::now();
// Expire entries older than 1 hour
auto ttl = std::chrono::hours(1);

for (auto it = rtt_map_.begin(); it != rtt_map_.end();) {
if (now - it->second.last_updated > ttl) {
it = rtt_map_.erase(it);
} else {
++it;
}
}
}

} // namespace libp2p::peer
31 changes: 29 additions & 2 deletions src/protocol/gossip/gossip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <generated/protocol/gossip/gossip.pb.h>
#include <algorithm>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/endian/conversion.hpp>
Expand Down Expand Up @@ -729,8 +730,34 @@ namespace libp2p::protocol::gossip {
add_peer(peer);
}
}
for (auto &peer : topic.mesh_peers_) {
add_peer(peer);
if (config_.soon_delta >= 0) {
if (topic.mesh_peers_.size()
<= static_cast<size_t>(config_.soon_delta)) {
for (auto &peer : topic.mesh_peers_) {
add_peer(peer);
}
} else {
std::vector<PeerPtr> sorted_mesh_peers(topic.mesh_peers_.begin(),
topic.mesh_peers_.end());
auto &rtt_repo = host_->getPeerRepository().getRttRepository();
std::sort(sorted_mesh_peers.begin(), sorted_mesh_peers.end(),
[&](const PeerPtr &a, const PeerPtr &b) {
auto rtt_a = rtt_repo.getRtt(a->peer_id_);
auto rtt_b = rtt_repo.getRtt(b->peer_id_);
auto val_a =
rtt_a.value_or(std::chrono::microseconds::max());
auto val_b =
rtt_b.value_or(std::chrono::microseconds::max());
return val_a < val_b;
});
Comment on lines +743 to +752
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sorting operation creates a full copy of mesh_peers_ and sorts it on every broadcast, even when only mesh_n peers are needed. This could be inefficient for large mesh sizes. Consider using std::partial_sort instead, which only partially sorts the first mesh_n elements, providing better performance: std::partial_sort(sorted_mesh_peers.begin(), sorted_mesh_peers.begin() + mesh_n, sorted_mesh_peers.end(), & {...}).

Suggested change
std::sort(sorted_mesh_peers.begin(), sorted_mesh_peers.end(),
[&](const PeerPtr &a, const PeerPtr &b) {
auto rtt_a = rtt_repo.getRtt(a->peer_id_);
auto rtt_b = rtt_repo.getRtt(b->peer_id_);
auto val_a =
rtt_a.value_or(std::chrono::microseconds::max());
auto val_b =
rtt_b.value_or(std::chrono::microseconds::max());
return val_a < val_b;
});
std::partial_sort(
sorted_mesh_peers.begin(),
sorted_mesh_peers.begin() + mesh_n,
sorted_mesh_peers.end(),
[&](const PeerPtr &a, const PeerPtr &b) {
auto rtt_a = rtt_repo.getRtt(a->peer_id_);
auto rtt_b = rtt_repo.getRtt(b->peer_id_);
auto val_a =
rtt_a.value_or(std::chrono::microseconds::max());
auto val_b =
rtt_b.value_or(std::chrono::microseconds::max());
return val_a < val_b;
});

Copilot uses AI. Check for mistakes.
for (int i = 0; i < config_.soon_delta; ++i) {
add_peer(sorted_mesh_peers[i]);
}
}
} else {
for (auto &peer : topic.mesh_peers_) {
add_peer(peer);
}
}
if (publish) {
if (auto more =
Expand Down
18 changes: 13 additions & 5 deletions src/protocol/ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ namespace libp2p::protocol {
}
stream = stream_result.value();
}
auto r = co_await ping(
stream,
std::chrono::duration_cast<std::chrono::milliseconds>(config_.timeout));
auto r =
co_await ping(stream,
std::chrono::duration_cast<std::chrono::milliseconds>(
config_.timeout));
if (not r.has_value()) {
stream->reset();
stream.reset();
} else {
host_->getPeerRepository().getRttRepository().updateRtt(
connection->remotePeer(), r.value());
}
timer.expires_after(config_.interval);
co_await timer.async_wait(boost::asio::use_awaitable);
Expand All @@ -103,6 +107,10 @@ namespace libp2p::protocol {
auto stream = stream_result.value();
auto res = co_await ping(stream, timeout);
stream->close();
if (res.has_value()) {
host_->getPeerRepository().getRttRepository().updateRtt(
conn->remotePeer(), res.value());
}
Comment on lines +110 to +113
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RTT is being updated twice for the same ping operation. The ping method called on line 108 already updates the RTT on lines 111-113, and then it's updated again after this method returns in the calling code (e.g., in pingLoop at lines 92-93). This results in duplicate RTT updates with the same value, which could skew the EWMA calculation used in the RTT repository. Consider removing the RTT update from this method since the caller can handle it, or document this intentional double-update behavior if there's a specific reason for it.

Suggested change
if (res.has_value()) {
host_->getPeerRepository().getRttRepository().updateRtt(
conn->remotePeer(), res.value());
}

Copilot uses AI. Check for mistakes.
co_return res;
}

Expand Down Expand Up @@ -132,8 +140,8 @@ namespace libp2p::protocol {
auto end = std::chrono::steady_clock::now();
timer.cancel();
if (r.has_value()) {
co_return std::chrono::duration_cast<std::chrono::microseconds>(end -
start);
co_return std::chrono::duration_cast<std::chrono::microseconds>(end
- start);
}
co_return r.error();
}
Expand Down
Loading