Skip to content

Commit 0d1e08e

Browse files
peer handling: friendlier error message on invalid address, more RPC output, introduce peer state concept (#27)
* multi: add the concept of 'connecting' vs. 'connected' peers, add to RPC * net: provide friendlier error message for invalid peer * net: update peer connection state upon succesfull connection * multi: shuffle around peer type and friends * net: adjust logs * net: handle multiple futures within net task * net: match on PeerConnectionState * net: match instead of inspect * Use AtomicBool to indicate connection status (#28) * net: delete unused function push_request * net: remove Error::PeerNotConnected * net: adjust logs * rpc-api: un-public, delete uneeded type --------- Co-authored-by: Ash-L2L <[email protected]>
1 parent 5b42f6a commit 0d1e08e

File tree

12 files changed

+172
-51
lines changed

12 files changed

+172
-51
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ parking_lot = "0.12.1"
2929
prost = "0.13.3"
3030
serde = "1.0.179"
3131
serde_json = "1.0.113"
32+
strum = { version = "0.26.3", features = ["derive"] }
3233
thiserror = "2.0.11"
3334
tiny-bip39 = "2.0.0"
3435
tokio = { version = "1.29.1", default-features = false, features = ["signal"] }

app/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ poll-promise = { version = "0.3.0", features = ["tokio"] }
2222
rustreexo = { workspace = true }
2323
serde = { workspace = true, features = ["derive"] }
2424
shlex = "1.3.0"
25-
strum = { version = "0.26.2", features = ["derive"] }
25+
strum = { workspace = true }
2626
thiserror = { workspace = true }
2727
thunder = { path = "../lib" }
2828
thunder_app_cli = { path = "../cli" }

app/rpc_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use jsonrpsee::{
77
types::ErrorObject,
88
};
99
use thunder::{
10+
net::Peer,
1011
types::{Address, PointedOutput, Txid, WithdrawalBundle},
1112
wallet::Balance,
1213
};
@@ -138,7 +139,7 @@ impl RpcServer for RpcServerImpl {
138139
Ok(height)
139140
}
140141

141-
async fn list_peers(&self) -> RpcResult<Vec<SocketAddr>> {
142+
async fn list_peers(&self) -> RpcResult<Vec<Peer>> {
142143
let peers = self.app.node.get_active_peers();
143144
Ok(peers)
144145
}

integration_tests/ibd.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,14 @@ async fn check_peer_connection(
7878
thunder_setup: &PostSetup,
7979
expected_peer: SocketAddr,
8080
) -> anyhow::Result<()> {
81-
let peers = thunder_setup.rpc_client.list_peers().await?;
81+
let peers = thunder_setup
82+
.rpc_client
83+
.list_peers()
84+
.await?
85+
.iter()
86+
.map(|p| p.address)
87+
.collect::<Vec<_>>();
88+
8289
if peers.contains(&expected_peer) {
8390
Ok(())
8491
} else {

lib/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ semver = { version = "1.0.25", features = ["serde"] }
4141
serde = { workspace = true, features = ["derive"] }
4242
serde_json = { workspace = true }
4343
serde_with = { version = "3.4.0" }
44+
strum = { workspace = true }
4445
thiserror = { workspace = true }
4546
tiny-bip39 = { workspace = true }
4647
tokio = { workspace = true, features = ["rt-multi-thread", "sync"] }
@@ -55,4 +56,4 @@ workspace = true
5556

5657
[lib]
5758
name = "thunder"
58-
path = "lib.rs"
59+
path = "lib.rs"

lib/net/mod.rs

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
collections::{hash_map, HashMap, HashSet},
3-
net::SocketAddr,
3+
net::{IpAddr, SocketAddr},
44
sync::Arc,
55
};
66

@@ -13,6 +13,7 @@ use heed::{
1313
use parking_lot::RwLock;
1414
use quinn::{ClientConfig, Endpoint, ServerConfig};
1515
use tokio_stream::StreamNotifyClose;
16+
use tracing::instrument;
1617

1718
use crate::{
1819
archive::Archive,
@@ -28,8 +29,8 @@ use peer::{
2829
};
2930
pub use peer::{
3031
ConnectionError as PeerConnectionError, Info as PeerConnectionInfo,
31-
InternalMessage as PeerConnectionMessage, PeerStateId,
32-
Request as PeerRequest, Response as PeerResponse,
32+
InternalMessage as PeerConnectionMessage, Peer, PeerConnectionStatus,
33+
PeerStateId, Request as PeerRequest, Response as PeerResponse,
3334
};
3435

3536
#[derive(Debug, thiserror::Error)]
@@ -54,6 +55,10 @@ pub enum Error {
5455
Io(#[from] std::io::Error),
5556
#[error("peer connection not found for {0}")]
5657
MissingPeerConnection(SocketAddr),
58+
/// Unspecified peer IP addresses cannot be connected to.
59+
/// `0.0.0.0` is one example of an "unspecified" IP.
60+
#[error("unspecified peer ip address (cannot connect to '{0}')")]
61+
UnspecfiedPeerIP(IpAddr),
5762
#[error(transparent)]
5863
NoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
5964
#[error("peer connection")]
@@ -169,11 +174,13 @@ fn configure_server() -> Result<(ServerConfig, Vec<u8>), Error> {
169174
///
170175
/// - a stream of incoming QUIC connections
171176
/// - server certificate serialized into DER format
172-
#[allow(unused)]
173177
pub fn make_server_endpoint(
174178
bind_addr: SocketAddr,
175179
) -> Result<(Endpoint, Vec<u8>), Error> {
176180
let (server_config, server_cert) = configure_server()?;
181+
182+
tracing::info!("creating server endpoint: binding to {bind_addr}",);
183+
177184
let mut endpoint = Endpoint::server(server_config, bind_addr)?;
178185
let client_cfg = configure_client()?;
179186
endpoint.set_default_client_config(client_cfg);
@@ -184,9 +191,6 @@ pub fn make_server_endpoint(
184191
pub type PeerInfoRx =
185192
mpsc::UnboundedReceiver<(SocketAddr, Option<PeerConnectionInfo>)>;
186193

187-
// State.
188-
// Archive.
189-
190194
// Keep track of peer state
191195
// Exchange metadata
192196
// Bulk download
@@ -218,7 +222,6 @@ impl Net {
218222
peer_connection_handle: PeerConnectionHandle,
219223
) -> Result<(), Error> {
220224
tracing::trace!(%addr, "add active peer: starting");
221-
222225
let mut active_peers_write = self.active_peers.write();
223226
match active_peers_write.entry(addr) {
224227
hash_map::Entry::Occupied(_) => {
@@ -241,20 +244,37 @@ impl Net {
241244
}
242245
}
243246

244-
// TODO: This should have more context. Last received message, connection state, etc.
245-
pub fn get_active_peers(&self) -> Vec<SocketAddr> {
246-
self.active_peers.read().keys().copied().collect()
247+
// TODO: This should have more context.
248+
// Last received message, connection state, etc.
249+
pub fn get_active_peers(&self) -> Vec<Peer> {
250+
self.active_peers
251+
.read()
252+
.iter()
253+
.map(|(addr, conn_handle)| Peer {
254+
address: *addr,
255+
status: conn_handle.connection_status(),
256+
})
257+
.collect()
247258
}
248259

260+
#[instrument(skip_all, fields(addr), err(Debug))]
249261
pub fn connect_peer(
250262
&self,
251263
env: heed::Env,
252264
addr: SocketAddr,
253265
) -> Result<(), Error> {
254266
if self.active_peers.read().contains_key(&addr) {
255-
tracing::error!(%addr, "connect peer: already connected");
267+
tracing::error!("connect peer: already connected");
256268
return Err(Error::AlreadyConnected(addr));
257269
}
270+
271+
// This check happens within Quinn with a
272+
// generic "invalid remote address". We run the
273+
// same check, and provide a friendlier error
274+
// message.
275+
if addr.ip().is_unspecified() {
276+
return Err(Error::UnspecfiedPeerIP(addr.ip()));
277+
}
258278
let connecting = self.server.connect(addr, "localhost")?;
259279
let mut rwtxn = env.write_txn()?;
260280
self.known_peers.put(&mut rwtxn, &addr, &())?;
@@ -264,9 +284,10 @@ impl Net {
264284
archive: self.archive.clone(),
265285
state: self.state.clone(),
266286
};
287+
267288
let (connection_handle, info_rx) =
268289
peer::connect(connecting, connection_ctxt);
269-
tracing::trace!(%addr, "connect peer: spawning info rx");
290+
tracing::trace!("connect peer: spawning info rx");
270291
tokio::spawn({
271292
let info_rx = StreamNotifyClose::new(info_rx)
272293
.map(move |info| Ok((addr, info)));
@@ -277,7 +298,8 @@ impl Net {
277298
}
278299
}
279300
});
280-
tracing::trace!(%addr, "connect peer: adding to active peers");
301+
302+
tracing::trace!("connect peer: adding to active peers");
281303
self.add_active_peer(addr, connection_handle)?;
282304
Ok(())
283305
}
@@ -426,6 +448,7 @@ impl Net {
426448
}
427449
}
428450
});
451+
// TODO: is this the right state?
429452
self.add_active_peer(addr, connection_handle)?;
430453
Ok(Some(addr))
431454
}
@@ -437,9 +460,10 @@ impl Net {
437460
addr: SocketAddr,
438461
) -> Result<(), Error> {
439462
let active_peers_read = self.active_peers.read();
440-
let Some(peer_connection_handle) = active_peers_read.get(&addr) else {
441-
return Err(Error::MissingPeerConnection(addr));
442-
};
463+
let peer_connection_handle = active_peers_read
464+
.get(&addr)
465+
.ok_or_else(|| Error::MissingPeerConnection(addr))?;
466+
443467
if let Err(send_err) = peer_connection_handle
444468
.internal_message_tx
445469
.unbounded_send(message)
@@ -450,29 +474,6 @@ impl Net {
450474
Ok(())
451475
}
452476

453-
// Push a request to the specified peers
454-
pub fn push_request(
455-
&self,
456-
request: PeerRequest,
457-
peers: &HashSet<SocketAddr>,
458-
) {
459-
let active_peers_read = self.active_peers.read();
460-
for addr in peers {
461-
let Some(peer_connection_handle) = active_peers_read.get(addr)
462-
else {
463-
continue;
464-
};
465-
if let Err(_send_err) = peer_connection_handle
466-
.internal_message_tx
467-
.unbounded_send(request.clone().into())
468-
{
469-
tracing::warn!(
470-
"Failed to push request to peer at {addr}: {request:?}"
471-
)
472-
}
473-
}
474-
}
475-
476477
/// Push a tx to all active peers, except those in the provided set
477478
pub fn push_tx(
478479
&self,
@@ -484,6 +485,13 @@ impl Net {
484485
.iter()
485486
.filter(|(addr, _)| !exclude.contains(addr))
486487
.for_each(|(addr, peer_connection_handle)| {
488+
match peer_connection_handle.connection_status() {
489+
PeerConnectionStatus::Connecting => {
490+
tracing::trace!(%addr, "skipping peer at {addr} because it is not fully connected");
491+
return;
492+
}
493+
PeerConnectionStatus::Connected => {}
494+
}
487495
let request = PeerRequest::PushTransaction {
488496
transaction: tx.clone(),
489497
};

0 commit comments

Comments
 (0)