From 71cd341b5fb2566bba9c69864f3c01132f93e03f Mon Sep 17 00:00:00 2001 From: Alexis Sellier Date: Sat, 11 Nov 2023 13:19:13 +0100 Subject: [PATCH] [p2p] Move peer connections to an event --- client/src/client.rs | 2 +- client/src/event.rs | 2 ++ common/src/p2p/peer.rs | 6 ----- p2p/src/fsm.rs | 13 ++++----- p2p/src/fsm/addrmgr.rs | 15 ++++++----- p2p/src/fsm/event.rs | 7 ++++- p2p/src/fsm/peermgr.rs | 60 ++++++++++++++++++------------------------ p2p/src/fsm/syncmgr.rs | 2 +- 8 files changed, 50 insertions(+), 57 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index bb60bd6f..edb971b4 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -574,7 +574,7 @@ impl handle::Handle for Handle { event::wait( &events, |e| match e { - fsm::Event::PeerConnected { addr: a, link } + fsm::Event::PeerConnected { addr: a, link, .. } if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) => { Some(link) diff --git a/client/src/event.rs b/client/src/event.rs index 4ec25c5e..0d571c83 100644 --- a/client/src/event.rs +++ b/client/src/event.rs @@ -343,6 +343,7 @@ mod test { client .protocol .connected(remote, &local_addr, Link::Inbound); + client.step(); client.received(&remote, version(42)); client.received(&remote, NetworkMessage::Verack); client.step(); @@ -357,6 +358,7 @@ mod test { client .protocol .connected(remote, &local_addr, Link::Inbound); + client.step(); client.received(&remote, version(43)); client.received(&remote, NetworkMessage::Verack); client.step(); diff --git a/common/src/p2p/peer.rs b/common/src/p2p/peer.rs index 1949f39f..37bf3ef4 100644 --- a/common/src/p2p/peer.rs +++ b/common/src/p2p/peer.rs @@ -350,8 +350,6 @@ impl KnownAddress { pub trait AddressSource { /// Sample a random peer address. Returns `None` if there are no addresses left. fn sample(&mut self, services: ServiceFlags) -> Option<(Address, Source)>; - /// Record an address of ours as seen by a remote peer. - fn record_local_address(&mut self, addr: net::SocketAddr); /// Return an iterator over random peer addresses. fn iter(&mut self, services: ServiceFlags) -> Box + '_>; } @@ -365,10 +363,6 @@ pub mod test { self.pop_front() } - fn record_local_address(&mut self, _addr: net::SocketAddr) { - // Do nothing. - } - fn iter( &mut self, _services: ServiceFlags, diff --git a/p2p/src/fsm.rs b/p2p/src/fsm.rs index bcf796bd..014f1c4d 100644 --- a/p2p/src/fsm.rs +++ b/p2p/src/fsm.rs @@ -56,7 +56,6 @@ use nakamoto_common::block::{BlockHash, Height}; use nakamoto_common::block::{BlockTime, Transaction}; use nakamoto_common::network; use nakamoto_common::nonempty::NonEmpty; -use nakamoto_common::p2p::peer::AddressSource; use nakamoto_common::p2p::{peer, Domain}; use nakamoto_net as traits; @@ -630,8 +629,7 @@ impl> StateMa self.invmgr.received_event(e.clone(), &self.tree); self.syncmgr.received_event(e.clone(), &mut self.tree); self.addrmgr.received_event(e.clone(), &self.tree); - self.peermgr - .received_event(e, &self.tree, &mut self.addrmgr); + self.peermgr.received_event(e, &self.tree); } /// Process a user command. @@ -792,6 +790,8 @@ impl> traits: return; } + // Nb. We only send this message internally, hence we don't + // push it to our outbox. self.event(Event::MessageReceived { from: addr, message: Arc::new(msg.payload), @@ -804,11 +804,8 @@ impl> traits: } fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) { - let height = self.tree.height(); - - self.addrmgr.record_local_address(*local_addr); - self.addrmgr.peer_connected(&addr); - self.peermgr.peer_connected(addr, *local_addr, link, height); + self.peermgr + .peer_connected(addr, *local_addr, link, self.tree.height()); } fn disconnected( diff --git a/p2p/src/fsm/addrmgr.rs b/p2p/src/fsm/addrmgr.rs index b7497d7e..e6c13f71 100644 --- a/p2p/src/fsm/addrmgr.rs +++ b/p2p/src/fsm/addrmgr.rs @@ -136,12 +136,19 @@ impl AddressManager { /// Event received. pub fn received_event(&mut self, event: Event, _tree: &T) { match event { + Event::PeerConnected { addr, .. } => { + self.peer_connected(&addr); + } Event::PeerNegotiated { addr, link, services, + receiver, .. } => { + if let Ok(addr) = receiver.socket_addr() { + self.local_addrs.insert(addr); + } self.peer_negotiated(&addr, services, link); } Event::MessageReceived { from, message } => { @@ -214,7 +221,7 @@ impl AddressManager { } /// Called when a peer has connected. - pub fn peer_connected(&mut self, addr: &net::SocketAddr) { + fn peer_connected(&mut self, addr: &net::SocketAddr) { if !self::is_routable(&addr.ip()) || self::is_local(&addr.ip()) { return; } @@ -222,7 +229,7 @@ impl AddressManager { } /// Called when a peer has handshaked. - pub fn peer_negotiated(&mut self, addr: &net::SocketAddr, services: ServiceFlags, link: Link) { + fn peer_negotiated(&mut self, addr: &net::SocketAddr, services: ServiceFlags, link: Link) { let time = self.clock.local_time(); if !self.connected.contains(&addr.ip()) { @@ -563,10 +570,6 @@ impl AddressSource for AddressManager { AddressManager::sample(self, services) } - fn record_local_address(&mut self, addr: net::SocketAddr) { - self.local_addrs.insert(addr); - } - fn iter(&mut self, services: ServiceFlags) -> Box + '_> { Box::new(AddressManager::iter(self, services)) } diff --git a/p2p/src/fsm/event.rs b/p2p/src/fsm/event.rs index 975ce837..7b036c69 100644 --- a/p2p/src/fsm/event.rs +++ b/p2p/src/fsm/event.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::{error, fmt, io, net}; +use nakamoto_common::bitcoin::network::address::Address; use nakamoto_common::bitcoin::network::constants::ServiceFlags; use nakamoto_common::bitcoin::network::message::NetworkMessage; use nakamoto_common::bitcoin::{Transaction, Txid}; @@ -36,6 +37,8 @@ pub enum Event { PeerConnected { /// Peer address. addr: PeerId, + /// Local address. + local_addr: net::SocketAddr, /// Connection link. link: Link, }, @@ -80,6 +83,8 @@ pub enum Event { persistent: bool, /// Peer height. height: Height, + /// Address of our node, as seen by remote. + receiver: Address, /// Peer user agent. user_agent: String, /// Negotiated protocol version. @@ -334,7 +339,7 @@ impl fmt::Display for Event { write!(fmt, "Transaction {} status changed: {}", txid, status) } Self::Synced { height, .. } => write!(fmt, "filters synced up to height {}", height), - Self::PeerConnected { addr, link } => { + Self::PeerConnected { addr, link, .. } => { write!(fmt, "Peer {} connected ({:?})", &addr, link) } Self::PeerConnectionFailed { addr, error } => { diff --git a/p2p/src/fsm/peermgr.rs b/p2p/src/fsm/peermgr.rs index a976bb54..d530f9d9 100644 --- a/p2p/src/fsm/peermgr.rs +++ b/p2p/src/fsm/peermgr.rs @@ -28,7 +28,6 @@ use nakamoto_common::p2p::Domain; use nakamoto_common::block::time::{AdjustedClock, Clock, LocalDuration, LocalTime}; use nakamoto_common::block::Height; use nakamoto_common::collections::{HashMap, HashSet}; -use nakamoto_common::source; use nakamoto_net as network; use crate::fsm::addrmgr; @@ -56,6 +55,13 @@ const MAX_STALE_HEIGHT_DIFFERENCE: Height = 2016; /// A time offset, in seconds. type TimeOffset = i64; +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Connection to peer failed. + #[error("connection to {addr} failed")] + ConnectionFailed { addr: PeerId }, +} + /// Peer manager configuration. #[derive(Debug, Clone)] pub struct Config { @@ -140,6 +146,8 @@ pub struct PeerInfo { /// An offset in seconds, between this peer's clock and ours. /// A positive offset means the peer's clock is ahead of ours. pub time_offset: TimeOffset, + /// Address of our node, as seen by remote. + pub receiver: Address, /// Whether this peer relays transactions. pub relay: bool, /// Whether this peer supports BIP-339. @@ -212,12 +220,7 @@ impl> PeerManager { for addr in peers { if !self.connect(&addr) { - // TODO: Return error here, or send event. - panic!( - "{}: unable to connect to persistent peer: {}", - source!(), - addr - ); + self.outbox.error(Error::ConnectionFailed { addr }); } } self.outbox.set_timer(IDLE_TIMEOUT); @@ -225,12 +228,7 @@ impl> PeerManager { } /// Event received. - pub fn received_event( - &mut self, - event: Event, - tree: &T, - addrs: &mut A, - ) { + pub fn received_event(&mut self, event: Event, tree: &T) { match event { Event::PeerTimedOut { addr } => { self.disconnect(addr, DisconnectReason::PeerTimeout("other")); @@ -240,7 +238,7 @@ impl> PeerManager { } Event::MessageReceived { from, message } => match message.as_ref() { NetworkMessage::Version(msg) => { - self.received_version(&from, msg, tree.height(), addrs); + self.received_version(&from, msg, tree.height()); } NetworkMessage::Verack => { self.received_verack(&from); @@ -349,7 +347,11 @@ impl> PeerManager { } // Set a timeout for receiving the `version` message. self.outbox.set_timer(HANDSHAKE_TIMEOUT); - self.outbox.event(Event::PeerConnected { addr, link }); + self.outbox.event(Event::PeerConnected { + addr, + local_addr, + link, + }); } /// Called when a peer disconnected. @@ -408,24 +410,17 @@ impl> PeerManager { } /// Called when a `version` message was received. - fn received_version( - &mut self, - addr: &PeerId, - msg: &VersionMessage, - height: Height, - addrs: &mut A, - ) { - if let Err(reason) = self.handle_version(addr, msg, height, addrs) { + fn received_version(&mut self, addr: &PeerId, msg: &VersionMessage, height: Height) { + if let Err(reason) = self.handle_version(addr, msg, height) { self._disconnect(*addr, reason); } } - fn handle_version( + fn handle_version( &mut self, addr: &PeerId, msg: &VersionMessage, height: Height, - addrs: &mut A, ) -> Result<(), DisconnectReason> { let now = self.clock.local_time(); @@ -496,11 +491,6 @@ impl> PeerManager { return Err(DisconnectReason::Other(reason)); } - // Record the address this peer has of us. - if let Ok(addr) = receiver.socket_addr() { - addrs.record_local_address(addr); - } - match conn.link { Link::Inbound => { self.outbox @@ -535,6 +525,7 @@ impl> PeerManager { services, persistent, user_agent, + receiver, state: HandshakeState::ReceivedVersion { since: now }, relay, wtxidrelay: false, @@ -561,6 +552,7 @@ impl> PeerManager { services: peer.services, persistent: peer.persistent, user_agent: peer.user_agent.clone(), + receiver: peer.receiver.clone(), height: peer.height, version: peer.version, relay: peer.relay, @@ -988,7 +980,7 @@ mod tests { peermgr.initialize(&mut addrs); peermgr.connect(&remote); peermgr.peer_connected(remote, local, Link::Outbound, height); - peermgr.received_version(&remote, &version, height, &mut addrs); + peermgr.received_version(&remote, &version, height); assert_matches!( peermgr.peers.get(&remote), @@ -1024,7 +1016,7 @@ mod tests { peermgr.initialize(&mut addrs); peermgr.connect(&remote); peermgr.peer_connected(remote, local, Link::Outbound, height); - peermgr.received_version(&remote, &version, height, &mut addrs); + peermgr.received_version(&remote, &version, height); peermgr.received_verack(&remote); peermgr.received_wtxidrelay(&remote); @@ -1210,7 +1202,7 @@ mod tests { peermgr.peer_connected(remote, local, Link::Outbound, height); assert!(peermgr.peers.contains_key(&remote)); - peermgr.received_version(&remote, &version, height, &mut addrs); + peermgr.received_version(&remote, &version, height); assert!(peermgr.peers.contains_key(&remote)); peermgr.received_verack(&remote); @@ -1230,7 +1222,7 @@ mod tests { peermgr.peer_connected(remote, local, Link::Outbound, height); assert!(peermgr.peers.contains_key(&remote)); - peermgr.received_version(&remote, &version, height, &mut addrs); + peermgr.received_version(&remote, &version, height); assert!(peermgr.peers.contains_key(&remote)); peermgr.received_verack(&remote); diff --git a/p2p/src/fsm/syncmgr.rs b/p2p/src/fsm/syncmgr.rs index 382728df..5c071e7d 100644 --- a/p2p/src/fsm/syncmgr.rs +++ b/p2p/src/fsm/syncmgr.rs @@ -187,7 +187,7 @@ impl SyncManager { } /// Called when a new peer was negotiated. - pub fn peer_negotiated( + fn peer_negotiated( &mut self, addr: PeerId, height: Height,