-
Notifications
You must be signed in to change notification settings - Fork 3
SOON #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
SOON #18
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements RTT (Round Trip Time) tracking for peers and introduces the "SOON" optimization for the gossip protocol. The changes add infrastructure to measure and store smoothed RTT values using EWMA (Exponential Weighted Moving Average), refactor the ping protocol to support on-demand pings with RTT measurement, and enable RTT-based peer selection in gossip message broadcasting.
- Adds RttRepository interface and InmemRttRepository implementation with EWMA smoothing
- Refactors ping protocol to extract reusable ping methods and measure RTT
- Implements SOON feature to prioritize low-latency peers in gossip mesh broadcasting
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
src/protocol/ping.cpp |
Refactors ping into pingLoop and adds new public/private ping methods that measure and return RTT |
src/protocol/gossip/gossip.cpp |
Implements SOON optimization to sort and select mesh peers by RTT when broadcasting |
src/peer/rtt_repository/inmem_rtt_repository.cpp |
Implements in-memory RTT storage with EWMA smoothing and garbage collection |
src/peer/rtt_repository/CMakeLists.txt |
Adds build configuration for RTT repository library |
src/peer/peer_repository.cpp |
Integrates RttRepository into PeerRepository with getter method |
src/peer/CMakeLists.txt |
Adds rtt_repository subdirectory to build |
src/CMakeLists.txt |
Links p2p_inmem_rtt_repository library |
include/libp2p/protocol/ping.hpp |
Adds public ping API and changes private ping to pingLoop, updates forward declaration |
include/libp2p/protocol/gossip/config.hpp |
Adds "soon" boolean configuration flag |
include/libp2p/peer/rtt_repository/inmem_rtt_repository.hpp |
Declares InmemRttRepository class with PeerRtt storage structure |
include/libp2p/peer/rtt_repository.hpp |
Defines RttRepository interface for RTT operations |
include/libp2p/peer/peer_repository.hpp |
Adds RttRepository member and getter to PeerRepository |
include/libp2p/injector/host_injector.hpp |
Registers RttRepository binding in dependency injection |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| size_t opportunistic_graft_ticks = 60; | ||
| size_t opportunistic_graft_peers = 2; | ||
|
|
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation for the new "soon" configuration field. This field should have a comment explaining what it does, specifically that it enables RTT-based peer selection for mesh broadcasting where peers with lower latency are preferred when the mesh size exceeds the configured mesh_n threshold.
| /// Enables RTT-based peer selection for mesh broadcasting. When enabled, | |
| /// peers with lower latency are preferred as mesh peers once the mesh size | |
| /// exceeds the configured mesh_n threshold. |
| // EWMA: SRTT = (1 - alpha) * SRTT + alpha * RTT | ||
| // alpha = 0.125 (1/8) is standard for TCP | ||
| // SRTT = (7 * SRTT + RTT) / 8 | ||
| it->second.value = (it->second.value * 7 + rtt) / 8; |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The EWMA calculation using integer division may lose precision. The expression (it->second.value * 7 + rtt) / 8 performs integer division on std::chrono::microseconds, which could result in rounding errors that accumulate over time. Consider using floating-point arithmetic for the calculation and then converting back to microseconds to maintain better accuracy in the smoothed RTT values.
| it->second.value = (it->second.value * 7 + rtt) / 8; | |
| auto srtt_us = static_cast<double>(it->second.value.count()); | |
| auto rtt_us = static_cast<double>(rtt.count()); | |
| auto new_srtt_us = (7.0 * srtt_us + rtt_us) / 8.0; | |
| it->second.value = | |
| std::chrono::microseconds{ | |
| static_cast<std::chrono::microseconds::rep>(new_srtt_us)}; |
| /** | ||
| * Performs a single ping on the given connection. | ||
| * Opens a new stream for the ping. | ||
| * @param conn Connection to ping. | ||
| * @param timeout Timeout for the ping operation. | ||
| * @return RTT of the ping. | ||
| */ | ||
| CoroOutcome<std::chrono::microseconds> ping( | ||
| std::shared_ptr<connection::CapableConnection> conn, | ||
| std::chrono::milliseconds timeout); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public ping method documentation doesn't specify whether updateRtt is called automatically. Since the method updates RTT internally (lines 111-113), this side effect should be documented in the comment. Callers need to know that invoking this method will automatically update the RTT repository, especially since it might not be obvious from the return type alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot open a new pull request to apply changes based on this feedback
| timer.async_wait([stream](boost::system::error_code ec) { | ||
| if (not ec) { | ||
| stream->reset(); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timer's async callback captures the stream by value (shared_ptr), which could keep the stream alive even after the coroutine completes and returns. If the timer fires after the function returns but before timer.cancel() is reached (in error scenarios), it could call reset() on a stream that might be in an unexpected state. Consider capturing a weak_ptr instead and checking its validity before calling reset(), or ensure proper cancellation ordering to avoid potential race conditions.
| timer.async_wait([stream](boost::system::error_code ec) { | |
| if (not ec) { | |
| stream->reset(); | |
| auto weak_stream = std::weak_ptr<connection::Stream>{stream}; | |
| timer.async_wait([weak_stream](boost::system::error_code ec) { | |
| if (not ec) { | |
| if (auto locked_stream = weak_stream.lock()) { | |
| locked_stream->reset(); | |
| } |
| // RttRepository doesn't have getPeers() yet, but it's fine. | ||
| // Usually peers in RttRepository should be in others too. |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment acknowledges that RttRepository doesn't have a getPeers() method, which is inconsistent with the other repository interfaces (AddressRepository, KeyRepository, ProtocolRepository). For consistency and completeness of the repository pattern, consider implementing getPeers() in the RttRepository interface and InmemRttRepository class to maintain a uniform API across all repository types.
| // RttRepository doesn't have getPeers() yet, but it's fine. | |
| // Usually peers in RttRepository should be in others too. | |
| // TODO: extend RttRepository with getPeers() and merge its peers here as well. |
| if (res.has_value()) { | ||
| host_->getPeerRepository().getRttRepository().updateRtt( | ||
| conn->remotePeer(), res.value()); | ||
| } |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RTT is being updated twice for the same ping operation. The ping method called on line 108 already updates the RTT on lines 111-113, and then it's updated again after this method returns in the calling code (e.g., in pingLoop at lines 92-93). This results in duplicate RTT updates with the same value, which could skew the EWMA calculation used in the RTT repository. Consider removing the RTT update from this method since the caller can handle it, or document this intentional double-update behavior if there's a specific reason for it.
| if (res.has_value()) { | |
| host_->getPeerRepository().getRttRepository().updateRtt( | |
| conn->remotePeer(), res.value()); | |
| } |
| std::sort(sorted_mesh_peers.begin(), sorted_mesh_peers.end(), | ||
| [&](const PeerPtr &a, const PeerPtr &b) { | ||
| auto rtt_a = rtt_repo.getRtt(a->peer_id_); | ||
| auto rtt_b = rtt_repo.getRtt(b->peer_id_); | ||
| auto val_a = | ||
| rtt_a.value_or(std::chrono::microseconds::max()); | ||
| auto val_b = | ||
| rtt_b.value_or(std::chrono::microseconds::max()); | ||
| return val_a < val_b; | ||
| }); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sorting operation creates a full copy of mesh_peers_ and sorts it on every broadcast, even when only mesh_n peers are needed. This could be inefficient for large mesh sizes. Consider using std::partial_sort instead, which only partially sorts the first mesh_n elements, providing better performance: std::partial_sort(sorted_mesh_peers.begin(), sorted_mesh_peers.begin() + mesh_n, sorted_mesh_peers.end(), & {...}).
| std::sort(sorted_mesh_peers.begin(), sorted_mesh_peers.end(), | |
| [&](const PeerPtr &a, const PeerPtr &b) { | |
| auto rtt_a = rtt_repo.getRtt(a->peer_id_); | |
| auto rtt_b = rtt_repo.getRtt(b->peer_id_); | |
| auto val_a = | |
| rtt_a.value_or(std::chrono::microseconds::max()); | |
| auto val_b = | |
| rtt_b.value_or(std::chrono::microseconds::max()); | |
| return val_a < val_b; | |
| }); | |
| std::partial_sort( | |
| sorted_mesh_peers.begin(), | |
| sorted_mesh_peers.begin() + mesh_n, | |
| sorted_mesh_peers.end(), | |
| [&](const PeerPtr &a, const PeerPtr &b) { | |
| auto rtt_a = rtt_repo.getRtt(a->peer_id_); | |
| auto rtt_b = rtt_repo.getRtt(b->peer_id_); | |
| auto val_a = | |
| rtt_a.value_or(std::chrono::microseconds::max()); | |
| auto val_b = | |
| rtt_b.value_or(std::chrono::microseconds::max()); | |
| return val_a < val_b; | |
| }); |
|
|
||
| namespace libp2p::connection { | ||
| class CapableConnection; | ||
| struct CapableConnection; |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change from "class" to "struct" for the forward declaration is inconsistent with the actual definition. Forward declarations should match the actual type declaration. Verify whether CapableConnection is defined as a class or struct, and ensure this forward declaration matches that definition to avoid potential issues with some compilers or coding standards.
| struct CapableConnection; | |
| class CapableConnection; |
src/protocol/gossip/gossip.cpp
Outdated
| rtt_b.value_or(std::chrono::microseconds::max()); | ||
| return val_a < val_b; | ||
| }); | ||
| for (size_t i = 0; i < mesh_n; ++i) { |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code assumes mesh_n will be less than or equal to sorted_mesh_peers.size() when iterating, but this is guaranteed by the condition on line 735. However, for defensive programming and clarity, consider adding a safety check or using std::min(mesh_n, sorted_mesh_peers.size()) in the loop condition to make this invariant explicit and prevent potential out-of-bounds access if the logic changes in the future.
| for (size_t i = 0; i < mesh_n; ++i) { | |
| auto limit = std::min(mesh_n, sorted_mesh_peers.size()); | |
| for (size_t i = 0; i < limit; ++i) { |
| size_t opportunistic_graft_ticks = 60; | ||
| size_t opportunistic_graft_peers = 2; | ||
|
|
||
| bool soon = false; |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configuration field name "soon" is unclear and doesn't convey what this feature does. Based on the code, this appears to enable an optimization that selects mesh peers with the lowest RTT. A more descriptive name would improve code readability, such as "prefer_low_latency_peers", "optimize_mesh_by_rtt", or "mesh_rtt_optimization".
| bool soon = false; | |
| /// Enable optimization to prefer mesh peers with lower RTT (lower latency). | |
| bool prefer_low_latency_peers = false; |
src/protocol/gossip/gossip.cpp
Outdated
| add_peer(peer); | ||
| if (config_.soon) { | ||
| auto mesh_n = config_.mesh_n_for_topic(topic.topic_hash_); | ||
| if (topic.mesh_peers_.size() <= mesh_n) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Research mentions new config field Δ ≤ mesh_n.
Gossip tries to graft mesh_n peers when there are less than mesh_n_low mesh peers.
Mesh peers may grow up to mesh_n_high when other peers graft us, but it's not guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated accordingly:
- replaced
bool soon = falsewithint soon_delta = 1defining top number of peers by RTT from the mesh to which peer propagates messages
0fb1ae5 to
f565a51
Compare
| /// Number of peers from the mesh to which we propagate messages. | ||
| /// Peers are selected by the smallest RTT. | ||
| /// If -1, propagate to all peers in the mesh. | ||
| int soon_delta = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| int soon_delta = -1; | |
| std::optional<size_t> soon_delta; |
https://ethresear.ch/t/selecting-optimal-outbound-neighbors-soon-for-fast-bandwidth-efficient-propagation-in-p2p-networks/23358