Skip to content

Commit

Permalink
[p2p] Move peer connections to an event
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudhead committed Nov 11, 2023
1 parent d7e958f commit 71cd341
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 57 deletions.
2 changes: 1 addition & 1 deletion client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ impl<W: Waker> handle::Handle for Handle<W> {
event::wait(
&events,
|e| match e {
fsm::Event::PeerConnected { addr: a, link }
fsm::Event::PeerConnected { addr: a, link, .. }
if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
{
Some(link)
Expand Down
2 changes: 2 additions & 0 deletions client/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ mod test {
client
.protocol
.connected(remote, &local_addr, Link::Inbound);
client.step();
client.received(&remote, version(42));
client.received(&remote, NetworkMessage::Verack);
client.step();
Expand All @@ -357,6 +358,7 @@ mod test {
client
.protocol
.connected(remote, &local_addr, Link::Inbound);
client.step();
client.received(&remote, version(43));
client.received(&remote, NetworkMessage::Verack);
client.step();
Expand Down
6 changes: 0 additions & 6 deletions common/src/p2p/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ impl KnownAddress {
pub trait AddressSource {
/// Sample a random peer address. Returns `None` if there are no addresses left.
fn sample(&mut self, services: ServiceFlags) -> Option<(Address, Source)>;
/// Record an address of ours as seen by a remote peer.
fn record_local_address(&mut self, addr: net::SocketAddr);
/// Return an iterator over random peer addresses.
fn iter(&mut self, services: ServiceFlags) -> Box<dyn Iterator<Item = (Address, Source)> + '_>;
}
Expand All @@ -365,10 +363,6 @@ pub mod test {
self.pop_front()
}

fn record_local_address(&mut self, _addr: net::SocketAddr) {
// Do nothing.
}

fn iter(
&mut self,
_services: ServiceFlags,
Expand Down
13 changes: 5 additions & 8 deletions p2p/src/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use nakamoto_common::block::{BlockHash, Height};
use nakamoto_common::block::{BlockTime, Transaction};
use nakamoto_common::network;
use nakamoto_common::nonempty::NonEmpty;
use nakamoto_common::p2p::peer::AddressSource;
use nakamoto_common::p2p::{peer, Domain};
use nakamoto_net as traits;

Expand Down Expand Up @@ -630,8 +629,7 @@ impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> StateMa
self.invmgr.received_event(e.clone(), &self.tree);
self.syncmgr.received_event(e.clone(), &mut self.tree);
self.addrmgr.received_event(e.clone(), &self.tree);
self.peermgr
.received_event(e, &self.tree, &mut self.addrmgr);
self.peermgr.received_event(e, &self.tree);
}

/// Process a user command.
Expand Down Expand Up @@ -792,6 +790,8 @@ impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> traits:
return;
}

// Nb. We only send this message internally, hence we don't
// push it to our outbox.
self.event(Event::MessageReceived {
from: addr,
message: Arc::new(msg.payload),
Expand All @@ -804,11 +804,8 @@ impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> traits:
}

fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
let height = self.tree.height();

self.addrmgr.record_local_address(*local_addr);
self.addrmgr.peer_connected(&addr);
self.peermgr.peer_connected(addr, *local_addr, link, height);
self.peermgr
.peer_connected(addr, *local_addr, link, self.tree.height());
}

fn disconnected(
Expand Down
15 changes: 9 additions & 6 deletions p2p/src/fsm/addrmgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,19 @@ impl<P: Store, C: Clock> AddressManager<P, C> {
/// Event received.
pub fn received_event<T>(&mut self, event: Event, _tree: &T) {
match event {
Event::PeerConnected { addr, .. } => {
self.peer_connected(&addr);
}
Event::PeerNegotiated {
addr,
link,
services,
receiver,
..
} => {
if let Ok(addr) = receiver.socket_addr() {
self.local_addrs.insert(addr);
}
self.peer_negotiated(&addr, services, link);
}
Event::MessageReceived { from, message } => {
Expand Down Expand Up @@ -214,15 +221,15 @@ impl<P: Store, C: Clock> AddressManager<P, C> {
}

/// Called when a peer has connected.
pub fn peer_connected(&mut self, addr: &net::SocketAddr) {
fn peer_connected(&mut self, addr: &net::SocketAddr) {
if !self::is_routable(&addr.ip()) || self::is_local(&addr.ip()) {
return;
}
self.connected.insert(addr.ip());
}

/// Called when a peer has handshaked.
pub fn peer_negotiated(&mut self, addr: &net::SocketAddr, services: ServiceFlags, link: Link) {
fn peer_negotiated(&mut self, addr: &net::SocketAddr, services: ServiceFlags, link: Link) {
let time = self.clock.local_time();

if !self.connected.contains(&addr.ip()) {
Expand Down Expand Up @@ -563,10 +570,6 @@ impl<P: Store, C: Clock> AddressSource for AddressManager<P, C> {
AddressManager::sample(self, services)
}

fn record_local_address(&mut self, addr: net::SocketAddr) {
self.local_addrs.insert(addr);
}

fn iter(&mut self, services: ServiceFlags) -> Box<dyn Iterator<Item = (Address, Source)> + '_> {
Box::new(AddressManager::iter(self, services))
}
Expand Down
7 changes: 6 additions & 1 deletion p2p/src/fsm/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::sync::Arc;
use std::{error, fmt, io, net};

use nakamoto_common::bitcoin::network::address::Address;
use nakamoto_common::bitcoin::network::constants::ServiceFlags;
use nakamoto_common::bitcoin::network::message::NetworkMessage;
use nakamoto_common::bitcoin::{Transaction, Txid};
Expand Down Expand Up @@ -36,6 +37,8 @@ pub enum Event {
PeerConnected {
/// Peer address.
addr: PeerId,
/// Local address.
local_addr: net::SocketAddr,
/// Connection link.
link: Link,
},
Expand Down Expand Up @@ -80,6 +83,8 @@ pub enum Event {
persistent: bool,
/// Peer height.
height: Height,
/// Address of our node, as seen by remote.
receiver: Address,
/// Peer user agent.
user_agent: String,
/// Negotiated protocol version.
Expand Down Expand Up @@ -334,7 +339,7 @@ impl fmt::Display for Event {
write!(fmt, "Transaction {} status changed: {}", txid, status)
}
Self::Synced { height, .. } => write!(fmt, "filters synced up to height {}", height),
Self::PeerConnected { addr, link } => {
Self::PeerConnected { addr, link, .. } => {
write!(fmt, "Peer {} connected ({:?})", &addr, link)
}
Self::PeerConnectionFailed { addr, error } => {
Expand Down
60 changes: 26 additions & 34 deletions p2p/src/fsm/peermgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use nakamoto_common::p2p::Domain;
use nakamoto_common::block::time::{AdjustedClock, Clock, LocalDuration, LocalTime};
use nakamoto_common::block::Height;
use nakamoto_common::collections::{HashMap, HashSet};
use nakamoto_common::source;
use nakamoto_net as network;

use crate::fsm::addrmgr;
Expand Down Expand Up @@ -56,6 +55,13 @@ const MAX_STALE_HEIGHT_DIFFERENCE: Height = 2016;
/// A time offset, in seconds.
type TimeOffset = i64;

#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Connection to peer failed.
#[error("connection to {addr} failed")]
ConnectionFailed { addr: PeerId },
}

/// Peer manager configuration.
#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -140,6 +146,8 @@ pub struct PeerInfo {
/// An offset in seconds, between this peer's clock and ours.
/// A positive offset means the peer's clock is ahead of ours.
pub time_offset: TimeOffset,
/// Address of our node, as seen by remote.
pub receiver: Address,
/// Whether this peer relays transactions.
pub relay: bool,
/// Whether this peer supports BIP-339.
Expand Down Expand Up @@ -212,25 +220,15 @@ impl<C: AdjustedClock<PeerId>> PeerManager<C> {

for addr in peers {
if !self.connect(&addr) {
// TODO: Return error here, or send event.
panic!(
"{}: unable to connect to persistent peer: {}",
source!(),
addr
);
self.outbox.error(Error::ConnectionFailed { addr });
}
}
self.outbox.set_timer(IDLE_TIMEOUT);
self.maintain_connections(addrs);
}

/// Event received.
pub fn received_event<T: BlockReader, A: AddressSource>(
&mut self,
event: Event,
tree: &T,
addrs: &mut A,
) {
pub fn received_event<T: BlockReader>(&mut self, event: Event, tree: &T) {
match event {
Event::PeerTimedOut { addr } => {
self.disconnect(addr, DisconnectReason::PeerTimeout("other"));
Expand All @@ -240,7 +238,7 @@ impl<C: AdjustedClock<PeerId>> PeerManager<C> {
}
Event::MessageReceived { from, message } => match message.as_ref() {
NetworkMessage::Version(msg) => {
self.received_version(&from, msg, tree.height(), addrs);
self.received_version(&from, msg, tree.height());
}
NetworkMessage::Verack => {
self.received_verack(&from);
Expand Down Expand Up @@ -349,7 +347,11 @@ impl<C: AdjustedClock<PeerId>> PeerManager<C> {
}
// Set a timeout for receiving the `version` message.
self.outbox.set_timer(HANDSHAKE_TIMEOUT);
self.outbox.event(Event::PeerConnected { addr, link });
self.outbox.event(Event::PeerConnected {
addr,
local_addr,
link,
});
}

/// Called when a peer disconnected.
Expand Down Expand Up @@ -408,24 +410,17 @@ impl<C: AdjustedClock<PeerId>> PeerManager<C> {
}

/// Called when a `version` message was received.
fn received_version<A: AddressSource>(
&mut self,
addr: &PeerId,
msg: &VersionMessage,
height: Height,
addrs: &mut A,
) {
if let Err(reason) = self.handle_version(addr, msg, height, addrs) {
fn received_version(&mut self, addr: &PeerId, msg: &VersionMessage, height: Height) {
if let Err(reason) = self.handle_version(addr, msg, height) {
self._disconnect(*addr, reason);
}
}

fn handle_version<A: AddressSource>(
fn handle_version(
&mut self,
addr: &PeerId,
msg: &VersionMessage,
height: Height,
addrs: &mut A,
) -> Result<(), DisconnectReason> {
let now = self.clock.local_time();

Expand Down Expand Up @@ -496,11 +491,6 @@ impl<C: AdjustedClock<PeerId>> PeerManager<C> {
return Err(DisconnectReason::Other(reason));
}

// Record the address this peer has of us.
if let Ok(addr) = receiver.socket_addr() {
addrs.record_local_address(addr);
}

match conn.link {
Link::Inbound => {
self.outbox
Expand Down Expand Up @@ -535,6 +525,7 @@ impl<C: AdjustedClock<PeerId>> PeerManager<C> {
services,
persistent,
user_agent,
receiver,
state: HandshakeState::ReceivedVersion { since: now },
relay,
wtxidrelay: false,
Expand All @@ -561,6 +552,7 @@ impl<C: AdjustedClock<PeerId>> PeerManager<C> {
services: peer.services,
persistent: peer.persistent,
user_agent: peer.user_agent.clone(),
receiver: peer.receiver.clone(),
height: peer.height,
version: peer.version,
relay: peer.relay,
Expand Down Expand Up @@ -988,7 +980,7 @@ mod tests {
peermgr.initialize(&mut addrs);
peermgr.connect(&remote);
peermgr.peer_connected(remote, local, Link::Outbound, height);
peermgr.received_version(&remote, &version, height, &mut addrs);
peermgr.received_version(&remote, &version, height);

assert_matches!(
peermgr.peers.get(&remote),
Expand Down Expand Up @@ -1024,7 +1016,7 @@ mod tests {
peermgr.initialize(&mut addrs);
peermgr.connect(&remote);
peermgr.peer_connected(remote, local, Link::Outbound, height);
peermgr.received_version(&remote, &version, height, &mut addrs);
peermgr.received_version(&remote, &version, height);
peermgr.received_verack(&remote);
peermgr.received_wtxidrelay(&remote);

Expand Down Expand Up @@ -1210,7 +1202,7 @@ mod tests {
peermgr.peer_connected(remote, local, Link::Outbound, height);
assert!(peermgr.peers.contains_key(&remote));

peermgr.received_version(&remote, &version, height, &mut addrs);
peermgr.received_version(&remote, &version, height);
assert!(peermgr.peers.contains_key(&remote));

peermgr.received_verack(&remote);
Expand All @@ -1230,7 +1222,7 @@ mod tests {
peermgr.peer_connected(remote, local, Link::Outbound, height);
assert!(peermgr.peers.contains_key(&remote));

peermgr.received_version(&remote, &version, height, &mut addrs);
peermgr.received_version(&remote, &version, height);
assert!(peermgr.peers.contains_key(&remote));

peermgr.received_verack(&remote);
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/fsm/syncmgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl<C: Clock> SyncManager<C> {
}

/// Called when a new peer was negotiated.
pub fn peer_negotiated<T: BlockReader>(
fn peer_negotiated<T: BlockReader>(
&mut self,
addr: PeerId,
height: Height,
Expand Down

0 comments on commit 71cd341

Please sign in to comment.