diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 8f365a7ec..37ce1b1d9 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1294,7 +1294,7 @@ impl Connection { // A prior attempt to set the loss detection timer may have failed due to // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if // the server's first flight is lost. - self.set_loss_detection_timer(path_id, now); + self.set_loss_detection_timer(now, path_id); } } NewIdentifiers(ids, now, cid_len, cid_lifetime) => { @@ -1650,6 +1650,7 @@ impl Connection { self.inner_on_ack_received(now, space, path, ack) } + /// Handles an ACK frame acknowledging packets sent on *path*. fn inner_on_ack_received( &mut self, now: Instant, @@ -1758,7 +1759,7 @@ impl Connection { self.detect_lost_packets(now, space, path, true); if self.peer_completed_address_validation(path) { - self.spaces[space].for_path(path).pto_count = 0; + self.path_data_mut(path).pto_count = 0; } // Explicit congestion notification @@ -1782,7 +1783,7 @@ impl Connection { } } - self.set_loss_detection_timer(path, now); + self.set_loss_detection_timer(now, path); Ok(()) } @@ -1863,52 +1864,57 @@ impl Connection { .set(Timer::KeyDiscard, start + self.pto(space) * 3); } - fn on_loss_detection_timeout(&mut self, now: Instant, _path_id: PathId) { - // TODO(@divma): ignoring this (to me) bug for now: we know for which packet number - // space/path_id we need to detect lost packets, so we only need to find the space - // This might technically work as is now that there are as many calls to - // `on_loss_detection_timeout` as paths for which the timer ran out - if let Some((_, pn_space, path_id)) = self.loss_time_and_space() { + /// Handle a [`Timer::LossDetection`] timeout. + /// + /// This timer expires for two reasons: + /// - An ACK-eliciting packet we sent should be considered lost. + /// - The PTO may have expired and a tail-loss probe needs to be scheduled. + /// + /// The former needs us to schedule re-transmission of the lost data. + /// + /// The latter means we have not received an ACK for an ack-eliciting packet we sent + /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting + /// packet, to try and elicit new acknowledgements. These new acknowledgements will + /// indicate whether the previously sent packets were lost or not. + fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) { + if let Some((_, pn_space)) = self.loss_time_and_space(path_id) { // Time threshold loss Detection self.detect_lost_packets(now, pn_space, path_id, false); - self.set_loss_detection_timer(path_id, now); + self.set_loss_detection_timer(now, path_id); return; } - let (_, space, path) = match self.pto_time_and_space(now) { + let (_, space) = match self.pto_time_and_space(now, path_id) { Some(x) => x, None => { error!("PTO expired while unset"); return; } }; - let in_flight = self.path_data(path).in_flight.bytes; + let in_flight = self.path_data(path_id).in_flight.bytes; trace!( in_flight, - count = self.spaces[space].for_path(path).pto_count, + count = self.path_data(path_id).pto_count, ?space, + ?path_id, "PTO fired" ); - let count = match self.path_data(path).in_flight.ack_eliciting { + let count = match self.path_data(path_id).in_flight.ack_eliciting { // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification // deadlock preventions 0 => { - debug_assert!(!self.peer_completed_address_validation(path)); + debug_assert!(!self.peer_completed_address_validation(path_id)); 1 } // Conventional loss probe _ => 2, }; - self.spaces[space].for_path(path).loss_probes = self.spaces[space] - .for_path(path) - .loss_probes - .saturating_add(count); - self.spaces[space].for_path(path).pto_count = self.spaces[space] - .for_path(path) - .pto_count - .saturating_add(1); - self.set_loss_detection_timer(path, now); + let pns = self.spaces[space].for_path(path_id); + pns.loss_probes = pns.loss_probes.saturating_add(count); + let path_data = self.path_data_mut(path_id); + path_data.pto_count = path_data.pto_count.saturating_add(1); + self.set_loss_detection_timer(now, path_id); } // TODO(@divma): some docs wouldn't kill @@ -2065,72 +2071,77 @@ impl Connection { } } - /// Returns the earliest time packets should be declared lost on a path. - fn loss_time_and_space(&self) -> Option<(Instant, SpaceId, PathId)> { - let mut loss_times = Vec::new(); - for space_id in SpaceId::iter() { - let space = &self.spaces[space_id]; - for (path_id, number_space) in space.iter_paths() { - if let Some(loss_time) = number_space.loss_time { - loss_times.push((loss_time, space_id, *path_id)); - } - } - } - loss_times.into_iter().min_by_key(|&(when, _, _)| when) - } - - /// Returns the earliest next PTO for all paths. - // TODO(@divma): this needs to receive the PathId as param - fn pto_time_and_space(&mut self, now: Instant) -> Option<(Instant, SpaceId, PathId)> { - // TODO(flub): this will need more adjustments once we have multiple paths as well. - // Also ack-frequency needs to be checked. - // TODO(@divma): fixme - if self.path_data_mut(PathId(0)).in_flight.ack_eliciting == 0 { - // TODO(flub): This uses self.path so effectively PathId(0). Fix for multipath - debug_assert!(!self.peer_completed_address_validation(PathId(0))); + /// Returns the earliest time packets should be declared lost for all spaces on a path. + fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> { + SpaceId::iter() + .filter_map(|id| { + self.spaces[id] + .number_spaces + .get(&path_id) + .and_then(|pns| pns.loss_time) + .map(|time| (time, id)) + }) + .min_by_key(|&(time, _)| time) + } + + /// Returns the earliest next PTO should fire for all spaces on a path. + fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> { + let pto_count = self.path_data(path_id).pto_count; + let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT)); + let mut duration = self.path_data_mut(path_id).rtt.pto_base() * backoff; + + if path_id == PathId::ZERO + && self + .paths + .get(&PathId::ZERO) + .map(|path| path.data.in_flight.ack_eliciting) + == Some(0) + && !self.peer_completed_address_validation(PathId::ZERO) + { + // Address Validation during Connection Establishment: + // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a + // deadlock if an Initial or Handshake packet from the server is lost and the + // server can not send more due to its anti-amplification limit the client must + // send another packet on PTO. let space = match self.highest_space { SpaceId::Handshake => SpaceId::Handshake, _ => SpaceId::Initial, }; - let path_id = match space { - SpaceId::Initial | SpaceId::Handshake => PathId(0), - SpaceId::Data => self.spaces[space] - .iter_paths() - .min_by_key(|&(_, s)| s.pto_count) - .map(|(p, _)| *p) - .unwrap_or(PathId(0)), - }; - let pto_count = self.spaces[space].for_path(path_id).pto_count; - let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT)); - let duration = self.path_data_mut(path_id).rtt.pto_base() * backoff; - return Some((now + duration, space, path_id)); + + return Some((now + duration, space)); } let mut result = None; for space in SpaceId::iter() { - for (path_id, pn_space) in self.spaces[space].iter_paths() { - if pn_space.in_flight == 0 { - continue; + if self.spaces[space].for_path(path_id).in_flight == 0 { + continue; + } + if space == SpaceId::Data { + // Skip ApplicationData until handshake completes. + if self.is_handshaking() { + return result; } - let pto_count = pn_space.pto_count; - let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT)); - let mut duration = self.path_data(PathId(0)).rtt.pto_base() * backoff; - if space == SpaceId::Data { - // Skip ApplicationData until handshake completes. - if self.is_handshaking() { - return result; - } - // Include max_ack_delay and backoff for ApplicationData. - duration += self.ack_frequency.max_ack_delay_for_pto() * backoff; + // Include max_ack_delay and backoff for ApplicationData. + duration += self.ack_frequency.max_ack_delay_for_pto() * backoff; + } + let last_ack_eliciting = match self.spaces[space] + .for_path(path_id) + .time_of_last_ack_eliciting_packet + { + Some(time) => time, + None => continue, + }; + let pto = last_ack_eliciting + duration; + if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) { + if self.path_data(path_id).anti_amplification_blocked(1) { + // Nothing would be able to be sent. + continue; } - let last_ack_eliciting = match pn_space.time_of_last_ack_eliciting_packet { - Some(time) => time, - None => continue, - }; - let pto = last_ack_eliciting + duration; - if result.map_or(true, |(earliest_pto, _, _)| pto < earliest_pto) { - result = Some((pto, space, *path_id)); + if self.path_data(path_id).in_flight.ack_eliciting == 0 { + // Nothing ack-eliciting, no PTO to arm/fire. + continue; } + result = Some((pto, space)); } } result @@ -2158,7 +2169,14 @@ impl Connection { // when it shouldn't be possible. } - fn set_loss_detection_timer(&mut self, path_id: PathId, now: Instant) { + /// Resets the the [`Timer::LossDetection`] timer to the next instant it may be needed + /// + /// The timer must fire if either: + /// - An ack-eliciting packet we sent needs to be declared lost. + /// - A tail-loss probe needs to be sent. + /// + /// See [`Connection::on_loss_detection_timeout`] for details. + fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) { if self.state.is_closed() { // No loss detection takes place on closed connections, and `close_common` already // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a @@ -2166,34 +2184,15 @@ impl Connection { return; } - // TODO(@divma): question: use the path_id or trust number of calls = queued timers - if let Some((loss_time, _, path_id)) = self.loss_time_and_space() { + if let Some((loss_time, _)) = self.loss_time_and_space(path_id) { // Time threshold loss detection. - // TODO(@divma): at least not completely wrong self.timers.set(Timer::LossDetection(path_id), loss_time); return; } - if self.path_data(path_id).anti_amplification_blocked(1) { - // We wouldn't be able to send anything, so don't bother. - self.timers.stop(Timer::LossDetection(path_id)); - return; - } - - if self.path_data(path_id).in_flight.ack_eliciting == 0 - && self.peer_completed_address_validation(path_id) - { - // There is nothing to detect lost, so no timer is set. However, the client needs to arm - // the timer if the server might be blocked by the anti-amplification limit. - self.timers.stop(Timer::LossDetection(path_id)); - return; - } - // Determine which PN space to arm PTO for. // Calculate PTO duration - // TODO(@divma): both calls are wrong. We need to fin the time and space for this packet - // number space, not for any - if let Some((timeout, _, _)) = self.pto_time_and_space(now) { + if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) { self.timers.set(Timer::LossDetection(path_id), timeout); } else { self.timers.stop(Timer::LossDetection(path_id)); @@ -2490,8 +2489,6 @@ impl Connection { fn discard_space(&mut self, now: Instant, space_id: SpaceId) { debug_assert!(space_id != SpaceId::Data); - // other path ids exist only in the data space, in which we should no be - let path_id = PathId(0); trace!("discarding {:?} keys", space_id); if space_id == SpaceId::Initial { // No longer needed @@ -2501,15 +2498,15 @@ impl Connection { } let space = &mut self.spaces[space_id]; space.crypto = None; - let path_space = space.for_path(path_id); + let path_space = space.for_path(PathId::ZERO); path_space.time_of_last_ack_eliciting_packet = None; path_space.loss_time = None; path_space.in_flight = 0; let sent_packets = mem::take(&mut path_space.sent_packets); for (pn, packet) in sent_packets.into_iter() { - self.remove_in_flight(path_id, pn, &packet); + self.remove_in_flight(PathId::ZERO, pn, &packet); } - self.set_loss_detection_timer(path_id, now) + self.set_loss_detection_timer(now, PathId::ZERO) } fn handle_coalesced( diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index d95c47d06..421740df9 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -244,7 +244,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { } conn.permit_idle_reset = false; } - conn.set_loss_detection_timer(path_id, now); + conn.set_loss_detection_timer(now, path_id); conn.path_data_mut(path_id).pacing.on_transmit(size); } } diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index 1d868f4dc..33912e0c7 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -103,6 +103,8 @@ pub(super) struct PathData { /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if /// a packet was sent on a later path. first_packet: Option, + /// The number of times a PTO has been sent without receiving an ack. + pub(super) pto_count: u32, } impl PathData { @@ -155,6 +157,7 @@ impl PathData { last_observed_addr_report: None, status: Default::default(), first_packet: None, + pto_count: 0, } } @@ -183,6 +186,7 @@ impl PathData { last_observed_addr_report: None, status: prev.status, first_packet: None, + pto_count: 0, } } diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index e464a0301..b2d84f3e9 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -83,10 +83,6 @@ impl PacketSpace { .or_insert_with(PacketNumberSpace::new_default) } - pub(super) fn iter_paths(&self) -> impl Iterator { - self.number_spaces.iter() - } - pub(super) fn iter_paths_mut(&mut self) -> impl Iterator { self.number_spaces.values_mut() } @@ -230,10 +226,6 @@ pub(super) struct PacketNumberSpace { // // Loss Detection // - /// The number of times a PTO has been sent without receiving an ack. - // TODO(flub): This used to be on the connection itself. Maybe it should be on the - // PathData or somewhere. Evaluate this again later on when we have more working. - pub(super) pto_count: u32, /// The time the most recently sent retransmittable packet was sent. pub(super) time_of_last_ack_eliciting_packet: Option, /// Earliest time when we might declare a packet lost. @@ -268,7 +260,6 @@ impl PacketNumberSpace { sent_with_keys: 0, ping_pending: false, immediate_ack_pending: false, - pto_count: 0, time_of_last_ack_eliciting_packet: None, loss_time: None, loss_probes: 0, @@ -295,7 +286,6 @@ impl PacketNumberSpace { sent_with_keys: 0, ping_pending: false, immediate_ack_pending: false, - pto_count: 0, time_of_last_ack_eliciting_packet: None, loss_time: None, loss_probes: 0, @@ -323,7 +313,6 @@ impl PacketNumberSpace { sent_with_keys: 0, ping_pending: false, immediate_ack_pending: false, - pto_count: 0, time_of_last_ack_eliciting_packet: None, loss_time: None, loss_probes: 0,