Skip to content

Commit cc33eb5

Browse files
authored
fix: cap pending connection backlog (#1993)
1 parent 4ee72ac commit cc33eb5

File tree

1 file changed

+82
-5
lines changed

1 file changed

+82
-5
lines changed

crates/core/src/ring/mod.rs

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ impl Ring {
482482
}
483483

484484
let current_connections = self.connection_manager.get_open_connections();
485+
let pending_connection_targets = pending_conn_adds.len();
485486
let neighbor_locations = {
486487
let peers = self.connection_manager.get_connections_by_location();
487488
tracing::debug!(
@@ -520,18 +521,39 @@ impl Ring {
520521
?adjustment,
521522
current_connections,
522523
is_gateway,
523-
pending_adds = pending_conn_adds.len(),
524+
pending_adds = pending_connection_targets,
524525
"Topology adjustment result"
525526
);
526527

527528
match adjustment {
528529
TopologyAdjustment::AddConnections(target_locs) => {
529-
tracing::info!(
530-
"Adding {} locations to pending connections (total pending: {})",
530+
let allowed = calculate_allowed_connection_additions(
531+
current_connections,
532+
pending_connection_targets,
533+
self.connection_manager.min_connections,
534+
self.connection_manager.max_connections,
531535
target_locs.len(),
532-
pending_conn_adds.len() + target_locs.len()
533536
);
534-
pending_conn_adds.extend(target_locs);
537+
538+
if allowed == 0 {
539+
tracing::debug!(
540+
requested = target_locs.len(),
541+
current_connections,
542+
pending = pending_connection_targets,
543+
min_connections = self.connection_manager.min_connections,
544+
max_connections = self.connection_manager.max_connections,
545+
"Skipping queuing new connection targets – backlog already satisfies capacity constraints"
546+
);
547+
} else {
548+
let total_pending_after = pending_connection_targets + allowed;
549+
tracing::info!(
550+
requested = target_locs.len(),
551+
allowed,
552+
total_pending_after,
553+
"Queuing additional connection targets"
554+
);
555+
pending_conn_adds.extend(target_locs.into_iter().take(allowed));
556+
}
535557
}
536558
TopologyAdjustment::RemoveConnections(mut should_disconnect_peers) => {
537559
for peer in should_disconnect_peers.drain(..) {
@@ -670,6 +692,61 @@ impl Ring {
670692
}
671693
}
672694

695+
fn calculate_allowed_connection_additions(
696+
current_connections: usize,
697+
pending_connections: usize,
698+
min_connections: usize,
699+
max_connections: usize,
700+
requested: usize,
701+
) -> usize {
702+
if requested == 0 {
703+
return 0;
704+
}
705+
706+
let effective_connections = current_connections.saturating_add(pending_connections);
707+
if effective_connections >= max_connections {
708+
return 0;
709+
}
710+
711+
let mut available_capacity = max_connections - effective_connections;
712+
713+
if current_connections < min_connections {
714+
let deficit_to_min = min_connections.saturating_sub(effective_connections);
715+
available_capacity = available_capacity.min(deficit_to_min);
716+
}
717+
718+
available_capacity.min(requested)
719+
}
720+
721+
#[cfg(test)]
722+
mod pending_additions_tests {
723+
use super::calculate_allowed_connection_additions;
724+
725+
#[test]
726+
fn respects_minimum_when_backlog_exists() {
727+
let allowed = calculate_allowed_connection_additions(1, 24, 25, 200, 24);
728+
assert_eq!(allowed, 0, "Backlog should satisfy minimum deficit");
729+
}
730+
731+
#[test]
732+
fn permits_requests_until_minimum_is_met() {
733+
let allowed = calculate_allowed_connection_additions(1, 0, 25, 200, 24);
734+
assert_eq!(allowed, 24);
735+
}
736+
737+
#[test]
738+
fn caps_additions_at_available_capacity() {
739+
let allowed = calculate_allowed_connection_additions(190, 5, 25, 200, 10);
740+
assert_eq!(allowed, 5);
741+
}
742+
743+
#[test]
744+
fn respects_requested_when_capacity_allows() {
745+
let allowed = calculate_allowed_connection_additions(50, 0, 25, 200, 3);
746+
assert_eq!(allowed, 3);
747+
}
748+
}
749+
673750
#[derive(thiserror::Error, Debug)]
674751
pub(crate) enum RingError {
675752
#[error(transparent)]

0 commit comments

Comments
 (0)