Skip to content

Fix the LossDetection timer for multipath #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 103 additions & 106 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ impl Connection {
// A prior attempt to set the loss detection timer may have failed due to
// anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
// the server's first flight is lost.
self.set_loss_detection_timer(path_id, now);
self.set_loss_detection_timer(now, path_id);
}
}
NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
Expand Down Expand Up @@ -1650,6 +1650,7 @@ impl Connection {
self.inner_on_ack_received(now, space, path, ack)
}

/// Handles an ACK frame acknowledging packets sent on *path*.
fn inner_on_ack_received(
&mut self,
now: Instant,
Expand Down Expand Up @@ -1758,7 +1759,7 @@ impl Connection {
self.detect_lost_packets(now, space, path, true);

if self.peer_completed_address_validation(path) {
self.spaces[space].for_path(path).pto_count = 0;
self.path_data_mut(path).pto_count = 0;
}

// Explicit congestion notification
Expand All @@ -1782,7 +1783,7 @@ impl Connection {
}
}

self.set_loss_detection_timer(path, now);
self.set_loss_detection_timer(now, path);
Ok(())
}

Expand Down Expand Up @@ -1863,52 +1864,57 @@ impl Connection {
.set(Timer::KeyDiscard, start + self.pto(space) * 3);
}

fn on_loss_detection_timeout(&mut self, now: Instant, _path_id: PathId) {
// TODO(@divma): ignoring this (to me) bug for now: we know for which packet number
// space/path_id we need to detect lost packets, so we only need to find the space
// This might technically work as is now that there are as many calls to
// `on_loss_detection_timeout` as paths for which the timer ran out
if let Some((_, pn_space, path_id)) = self.loss_time_and_space() {
/// Handle a [`Timer::LossDetection`] timeout.
///
/// This timer expires for two reasons:
/// - An ACK-eliciting packet we sent should be considered lost.
/// - The PTO may have expired and a tail-loss probe needs to be scheduled.
///
/// The former needs us to schedule re-transmission of the lost data.
///
/// The latter means we have not received an ACK for an ack-eliciting packet we sent
/// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
/// packet, to try and elicit new acknowledgements. These new acknowledgements will
/// indicate whether the previously sent packets were lost or not.
fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
// Time threshold loss Detection
self.detect_lost_packets(now, pn_space, path_id, false);
self.set_loss_detection_timer(path_id, now);
self.set_loss_detection_timer(now, path_id);
return;
}

let (_, space, path) = match self.pto_time_and_space(now) {
let (_, space) = match self.pto_time_and_space(now, path_id) {
Some(x) => x,
None => {
error!("PTO expired while unset");
return;
}
};
let in_flight = self.path_data(path).in_flight.bytes;
let in_flight = self.path_data(path_id).in_flight.bytes;
trace!(
in_flight,
count = self.spaces[space].for_path(path).pto_count,
count = self.path_data(path_id).pto_count,
?space,
?path_id,
"PTO fired"
);

let count = match self.path_data(path).in_flight.ack_eliciting {
let count = match self.path_data(path_id).in_flight.ack_eliciting {
// A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
// deadlock preventions
0 => {
debug_assert!(!self.peer_completed_address_validation(path));
debug_assert!(!self.peer_completed_address_validation(path_id));
1
}
// Conventional loss probe
_ => 2,
};
self.spaces[space].for_path(path).loss_probes = self.spaces[space]
.for_path(path)
.loss_probes
.saturating_add(count);
self.spaces[space].for_path(path).pto_count = self.spaces[space]
.for_path(path)
.pto_count
.saturating_add(1);
self.set_loss_detection_timer(path, now);
let pns = self.spaces[space].for_path(path_id);
pns.loss_probes = pns.loss_probes.saturating_add(count);
let path_data = self.path_data_mut(path_id);
path_data.pto_count = path_data.pto_count.saturating_add(1);
self.set_loss_detection_timer(now, path_id);
}

// TODO(@divma): some docs wouldn't kill
Expand Down Expand Up @@ -2065,72 +2071,77 @@ impl Connection {
}
}

/// Returns the earliest time packets should be declared lost on a path.
fn loss_time_and_space(&self) -> Option<(Instant, SpaceId, PathId)> {
let mut loss_times = Vec::new();
for space_id in SpaceId::iter() {
let space = &self.spaces[space_id];
for (path_id, number_space) in space.iter_paths() {
if let Some(loss_time) = number_space.loss_time {
loss_times.push((loss_time, space_id, *path_id));
}
}
}
loss_times.into_iter().min_by_key(|&(when, _, _)| when)
}

/// Returns the earliest next PTO for all paths.
// TODO(@divma): this needs to receive the PathId as param
fn pto_time_and_space(&mut self, now: Instant) -> Option<(Instant, SpaceId, PathId)> {
// TODO(flub): this will need more adjustments once we have multiple paths as well.
// Also ack-frequency needs to be checked.
// TODO(@divma): fixme
if self.path_data_mut(PathId(0)).in_flight.ack_eliciting == 0 {
// TODO(flub): This uses self.path so effectively PathId(0). Fix for multipath
debug_assert!(!self.peer_completed_address_validation(PathId(0)));
/// Returns the earliest time packets should be declared lost for all spaces on a path.
fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
SpaceId::iter()
.filter_map(|id| {
self.spaces[id]
.number_spaces
.get(&path_id)
.and_then(|pns| pns.loss_time)
.map(|time| (time, id))
})
.min_by_key(|&(time, _)| time)
}

/// Returns the earliest next PTO should fire for all spaces on a path.
fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
let pto_count = self.path_data(path_id).pto_count;
let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
let mut duration = self.path_data_mut(path_id).rtt.pto_base() * backoff;

if path_id == PathId::ZERO
&& self
.paths
.get(&PathId::ZERO)
.map(|path| path.data.in_flight.ack_eliciting)
== Some(0)
&& !self.peer_completed_address_validation(PathId::ZERO)
{
// Address Validation during Connection Establishment:
// https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
// deadlock if an Initial or Handshake packet from the server is lost and the
// server can not send more due to its anti-amplification limit the client must
// send another packet on PTO.
let space = match self.highest_space {
SpaceId::Handshake => SpaceId::Handshake,
_ => SpaceId::Initial,
};
let path_id = match space {
SpaceId::Initial | SpaceId::Handshake => PathId(0),
SpaceId::Data => self.spaces[space]
.iter_paths()
.min_by_key(|&(_, s)| s.pto_count)
.map(|(p, _)| *p)
.unwrap_or(PathId(0)),
};
let pto_count = self.spaces[space].for_path(path_id).pto_count;
let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
let duration = self.path_data_mut(path_id).rtt.pto_base() * backoff;
return Some((now + duration, space, path_id));

return Some((now + duration, space));
}

let mut result = None;
for space in SpaceId::iter() {
for (path_id, pn_space) in self.spaces[space].iter_paths() {
if pn_space.in_flight == 0 {
continue;
if self.spaces[space].for_path(path_id).in_flight == 0 {
continue;
}
if space == SpaceId::Data {
// Skip ApplicationData until handshake completes.
if self.is_handshaking() {
return result;
}
let pto_count = pn_space.pto_count;
let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
let mut duration = self.path_data(PathId(0)).rtt.pto_base() * backoff;
if space == SpaceId::Data {
// Skip ApplicationData until handshake completes.
if self.is_handshaking() {
return result;
}
// Include max_ack_delay and backoff for ApplicationData.
duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
// Include max_ack_delay and backoff for ApplicationData.
duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
}
let last_ack_eliciting = match self.spaces[space]
.for_path(path_id)
.time_of_last_ack_eliciting_packet
{
Some(time) => time,
None => continue,
};
let pto = last_ack_eliciting + duration;
if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
if self.path_data(path_id).anti_amplification_blocked(1) {
// Nothing would be able to be sent.
continue;
}
let last_ack_eliciting = match pn_space.time_of_last_ack_eliciting_packet {
Some(time) => time,
None => continue,
};
let pto = last_ack_eliciting + duration;
if result.map_or(true, |(earliest_pto, _, _)| pto < earliest_pto) {
result = Some((pto, space, *path_id));
if self.path_data(path_id).in_flight.ack_eliciting == 0 {
// Nothing ack-eliciting, no PTO to arm/fire.
continue;
}
result = Some((pto, space));
}
}
result
Expand Down Expand Up @@ -2158,42 +2169,30 @@ impl Connection {
// when it shouldn't be possible.
}

fn set_loss_detection_timer(&mut self, path_id: PathId, now: Instant) {
/// Resets the the [`Timer::LossDetection`] timer to the next instant it may be needed
///
/// The timer must fire if either:
/// - An ack-eliciting packet we sent needs to be declared lost.
/// - A tail-loss probe needs to be sent.
///
/// See [`Connection::on_loss_detection_timeout`] for details.
fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
if self.state.is_closed() {
// No loss detection takes place on closed connections, and `close_common` already
// stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
// reordered packet being handled by state-insensitive code.
return;
}

// TODO(@divma): question: use the path_id or trust number of calls = queued timers
if let Some((loss_time, _, path_id)) = self.loss_time_and_space() {
if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
// Time threshold loss detection.
// TODO(@divma): at least not completely wrong
self.timers.set(Timer::LossDetection(path_id), loss_time);
return;
}

if self.path_data(path_id).anti_amplification_blocked(1) {
// We wouldn't be able to send anything, so don't bother.
self.timers.stop(Timer::LossDetection(path_id));
return;
}

if self.path_data(path_id).in_flight.ack_eliciting == 0
&& self.peer_completed_address_validation(path_id)
{
// There is nothing to detect lost, so no timer is set. However, the client needs to arm
// the timer if the server might be blocked by the anti-amplification limit.
self.timers.stop(Timer::LossDetection(path_id));
return;
}

// Determine which PN space to arm PTO for.
// Calculate PTO duration
// TODO(@divma): both calls are wrong. We need to fin the time and space for this packet
// number space, not for any
if let Some((timeout, _, _)) = self.pto_time_and_space(now) {
if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
self.timers.set(Timer::LossDetection(path_id), timeout);
} else {
self.timers.stop(Timer::LossDetection(path_id));
Expand Down Expand Up @@ -2490,8 +2489,6 @@ impl Connection {

fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
debug_assert!(space_id != SpaceId::Data);
// other path ids exist only in the data space, in which we should no be
let path_id = PathId(0);
trace!("discarding {:?} keys", space_id);
if space_id == SpaceId::Initial {
// No longer needed
Expand All @@ -2501,15 +2498,15 @@ impl Connection {
}
let space = &mut self.spaces[space_id];
space.crypto = None;
let path_space = space.for_path(path_id);
let path_space = space.for_path(PathId::ZERO);
path_space.time_of_last_ack_eliciting_packet = None;
path_space.loss_time = None;
path_space.in_flight = 0;
let sent_packets = mem::take(&mut path_space.sent_packets);
for (pn, packet) in sent_packets.into_iter() {
self.remove_in_flight(path_id, pn, &packet);
self.remove_in_flight(PathId::ZERO, pn, &packet);
}
self.set_loss_detection_timer(path_id, now)
self.set_loss_detection_timer(now, PathId::ZERO)
}

fn handle_coalesced(
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> {
}
conn.permit_idle_reset = false;
}
conn.set_loss_detection_timer(path_id, now);
conn.set_loss_detection_timer(now, path_id);
conn.path_data_mut(path_id).pacing.on_transmit(size);
}
}
Expand Down
4 changes: 4 additions & 0 deletions quinn-proto/src/connection/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub(super) struct PathData {
/// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
/// a packet was sent on a later path.
first_packet: Option<u64>,
/// The number of times a PTO has been sent without receiving an ack.
pub(super) pto_count: u32,
}

impl PathData {
Expand Down Expand Up @@ -155,6 +157,7 @@ impl PathData {
last_observed_addr_report: None,
status: Default::default(),
first_packet: None,
pto_count: 0,
}
}

Expand Down Expand Up @@ -183,6 +186,7 @@ impl PathData {
last_observed_addr_report: None,
status: prev.status,
first_packet: None,
pto_count: 0,
}
}

Expand Down
Loading
Loading