Skip to content

Commit 3dd50bd

Browse files
Improve substream management (#3261)
## Issue Addressed Which issue # does this PR address? ## Proposed Changes Please list or describe the changes introduced by this PR. ## Additional Info Please provide any additional information. For example, future considerations or information useful for reviewers.
1 parent 11d80a6 commit 3dd50bd

File tree

3 files changed

+35
-29
lines changed

3 files changed

+35
-29
lines changed

Diff for: beacon_node/lighthouse_network/src/behaviour/mod.rs

-3
Original file line numberDiff line numberDiff line change
@@ -1006,9 +1006,6 @@ where
10061006
proto,
10071007
error,
10081008
} => {
1009-
if matches!(error, RPCError::HandlerRejected) {
1010-
// this peer's request got canceled
1011-
}
10121009
// Inform the peer manager of the error.
10131010
// An inbound error here means we sent an error to the peer, or the stream
10141011
// timed out.

Diff for: beacon_node/lighthouse_network/src/peer_manager/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
457457
debug!(self.log, "Internal RPC Error"; "error" => %e, "peer_id" => %peer_id);
458458
return;
459459
}
460-
RPCError::HandlerRejected => {
461-
// Our fault. Do nothing
462-
return;
463-
}
460+
RPCError::HandlerRejected => PeerAction::Fatal,
464461
RPCError::InvalidData(_) => {
465462
// Peer is not complying with the protocol. This is considered a malicious action
466463
PeerAction::Fatal

Diff for: beacon_node/lighthouse_network/src/rpc/handler.rs

+34-22
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ const IO_ERROR_RETRIES: u8 = 3;
4040
/// Maximum time given to the handler to perform shutdown operations.
4141
const SHUTDOWN_TIMEOUT_SECS: u8 = 15;
4242

43+
/// Maximum number of simultaneous inbound substreams we keep for this peer.
44+
const MAX_INBOUND_SUBSTREAMS: usize = 32;
45+
4346
/// Identifier of inbound and outbound substreams from the handler's perspective.
4447
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
4548
pub struct SubstreamId(usize);
@@ -241,7 +244,7 @@ where
241244
// We now drive to completion communications already dialed/established
242245
while let Some((id, req)) = self.dial_queue.pop() {
243246
self.events_out.push(Err(HandlerErr::Outbound {
244-
error: RPCError::HandlerRejected,
247+
error: RPCError::Disconnected,
245248
proto: req.protocol(),
246249
id,
247250
}));
@@ -265,7 +268,7 @@ where
265268
self.dial_queue.push((id, req));
266269
}
267270
_ => self.events_out.push(Err(HandlerErr::Outbound {
268-
error: RPCError::HandlerRejected,
271+
error: RPCError::Disconnected,
269272
proto: req.protocol(),
270273
id,
271274
})),
@@ -339,23 +342,32 @@ where
339342

340343
// store requests that expect responses
341344
if expected_responses > 0 {
342-
// Store the stream and tag the output.
343-
let delay_key = self.inbound_substreams_delay.insert(
344-
self.current_inbound_substream_id,
345-
Duration::from_secs(RESPONSE_TIMEOUT),
346-
);
347-
let awaiting_stream = InboundState::Idle(substream);
348-
self.inbound_substreams.insert(
349-
self.current_inbound_substream_id,
350-
InboundInfo {
351-
state: awaiting_stream,
352-
pending_items: VecDeque::with_capacity(expected_responses as usize),
353-
delay_key: Some(delay_key),
354-
protocol: req.protocol(),
355-
request_start_time: Instant::now(),
356-
remaining_chunks: expected_responses,
357-
},
358-
);
345+
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
346+
// Store the stream and tag the output.
347+
let delay_key = self.inbound_substreams_delay.insert(
348+
self.current_inbound_substream_id,
349+
Duration::from_secs(RESPONSE_TIMEOUT),
350+
);
351+
let awaiting_stream = InboundState::Idle(substream);
352+
self.inbound_substreams.insert(
353+
self.current_inbound_substream_id,
354+
InboundInfo {
355+
state: awaiting_stream,
356+
pending_items: VecDeque::with_capacity(expected_responses as usize),
357+
delay_key: Some(delay_key),
358+
protocol: req.protocol(),
359+
request_start_time: Instant::now(),
360+
remaining_chunks: expected_responses,
361+
},
362+
);
363+
} else {
364+
self.events_out.push(Err(HandlerErr::Inbound {
365+
id: self.current_inbound_substream_id,
366+
proto: req.protocol(),
367+
error: RPCError::HandlerRejected,
368+
}));
369+
return self.shutdown(None);
370+
}
359371
}
360372

361373
// If we received a goodbye, shutdown the connection.
@@ -382,7 +394,7 @@ where
382394
// accept outbound connections only if the handler is not deactivated
383395
if matches!(self.state, HandlerState::Deactivated) {
384396
self.events_out.push(Err(HandlerErr::Outbound {
385-
error: RPCError::HandlerRejected,
397+
error: RPCError::Disconnected,
386398
proto,
387399
id,
388400
}));
@@ -671,7 +683,7 @@ where
671683
{
672684
// if the request was still active, report back to cancel it
673685
self.events_out.push(Err(HandlerErr::Inbound {
674-
error: RPCError::HandlerRejected,
686+
error: RPCError::Disconnected,
675687
proto: info.protocol,
676688
id: *id,
677689
}));
@@ -803,7 +815,7 @@ where
803815
// the handler is deactivated. Close the stream
804816
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
805817
self.events_out.push(Err(HandlerErr::Outbound {
806-
error: RPCError::HandlerRejected,
818+
error: RPCError::Disconnected,
807819
proto: entry.get().proto,
808820
id: entry.get().req_id,
809821
}))

0 commit comments

Comments
 (0)