From 6e869d4bb500869b4c591c4dd53c109e02cba0b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 21 May 2025 12:47:43 +0200 Subject: [PATCH 01/16] Turn `UdpPoller` into `UdpSender` --- quinn/src/connection.rs | 13 +++++---- quinn/src/endpoint.rs | 48 ++++++++++++++++++++++--------- quinn/src/lib.rs | 2 +- quinn/src/runtime.rs | 35 +++++++++++++--------- quinn/src/runtime/tokio.rs | 59 +++++++++++++++++++++++++++++++------- 5 files changed, 112 insertions(+), 45 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 828c34dad7..4bc410a2e6 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -19,7 +19,7 @@ use tracing::{debug_span, Instrument, Span}; use crate::{ mutex::Mutex, recv_stream::RecvStream, - runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller}, + runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSender}, send_stream::SendStream, udp_transmit, ConnectionEvent, Duration, Instant, VarInt, }; @@ -897,7 +897,7 @@ impl ConnectionRef { stopped: FxHashMap::default(), error: None, ref_count: 0, - io_poller: socket.clone().create_io_poller(), + udp_sender: socket.clone().create_sender(), socket, runtime, send_buffer: Vec::new(), @@ -1018,7 +1018,7 @@ pub(crate) struct State { /// Number of live handles that can be used to initiate or handle I/O; excludes the driver ref_count: usize, socket: Arc, - io_poller: Pin>, + udp_sender: Pin>, runtime: Arc, send_buffer: Vec, /// We buffer a transmit when the underlying I/O would block @@ -1057,7 +1057,7 @@ impl State { } }; - if self.io_poller.as_mut().poll_writable(cx)?.is_pending() { + if self.udp_sender.as_mut().poll_writable(cx)?.is_pending() { // Retry after a future wakeup self.buffered_transmit = Some(t); return Ok(false); @@ -1065,7 +1065,8 @@ impl State { let len = t.size; let retry = match self - .socket + .udp_sender + .as_mut() .try_send(&udp_transmit(&t, &self.send_buffer[..len])) { Ok(()) => false, @@ -1110,7 +1111,7 @@ impl State { match self.conn_events.poll_recv(cx) { Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => { self.socket = socket; - self.io_poller = self.socket.clone().create_io_poller(); + self.udp_sender = self.socket.clone().create_sender(); self.inner.local_address_changed(); } Poll::Ready(Some(ConnectionEvent::Proto(event))) => { diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index f70be9ad4f..f1f75b58be 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -15,7 +15,7 @@ use std::{ #[cfg(all(not(wasm_browser), any(feature = "aws-lc-rs", feature = "ring")))] use crate::runtime::default_runtime; use crate::{ - runtime::{AsyncUdpSocket, Runtime}, + runtime::{AsyncUdpSocket, Runtime, UdpSender}, udp_transmit, Instant, }; use bytes::{Bytes, BytesMut}; @@ -429,7 +429,7 @@ impl EndpointInner { } Err(error) => { if let Some(transmit) = error.response { - respond(transmit, &response_buffer, &*state.socket); + respond(transmit, &response_buffer, &mut state.sender); } Err(error.cause) } @@ -441,14 +441,14 @@ impl EndpointInner { state.stats.refused_handshakes += 1; let mut response_buffer = Vec::new(); let transmit = state.inner.refuse(incoming, &mut response_buffer); - respond(transmit, &response_buffer, &*state.socket); + respond(transmit, &response_buffer, &mut state.sender); } pub(crate) fn retry(&self, incoming: proto::Incoming) -> Result<(), proto::RetryError> { let mut state = self.state.lock().unwrap(); let mut response_buffer = Vec::new(); let transmit = state.inner.retry(incoming, &mut response_buffer)?; - respond(transmit, &response_buffer, &*state.socket); + respond(transmit, &response_buffer, &mut state.sender); Ok(()) } @@ -462,6 +462,7 @@ impl EndpointInner { #[derive(Debug)] pub(crate) struct State { socket: Arc, + sender: Pin>, /// During an active migration, abandoned_socket receives traffic /// until the first packet arrives on the new socket. prev_socket: Option>, @@ -489,16 +490,26 @@ impl State { self.recv_state.recv_limiter.start_cycle(get_time); if let Some(socket) = &self.prev_socket { // We don't care about the `PollProgress` from old sockets. - let poll_res = - self.recv_state - .poll_socket(cx, &mut self.inner, &**socket, &*self.runtime, now); + let poll_res = self.recv_state.poll_socket( + cx, + &mut self.inner, + &**socket, + &mut self.sender, + &*self.runtime, + now, + ); if poll_res.is_err() { self.prev_socket = None; } }; - let poll_res = - self.recv_state - .poll_socket(cx, &mut self.inner, &*self.socket, &*self.runtime, now); + let poll_res = self.recv_state.poll_socket( + cx, + &mut self.inner, + &*self.socket, + &mut self.sender, + &*self.runtime, + now, + ); self.recv_state.recv_limiter.finish_cycle(get_time); let poll_res = poll_res?; if poll_res.received_connection_packet { @@ -550,7 +561,11 @@ impl Drop for State { } } -fn respond(transmit: proto::Transmit, response_buffer: &[u8], socket: &dyn AsyncUdpSocket) { +fn respond( + transmit: proto::Transmit, + response_buffer: &[u8], + sender: &mut Pin>, +) { // Send if there's kernel buffer space; otherwise, drop it // // As an endpoint-generated packet, we know this is an @@ -571,7 +586,9 @@ fn respond(transmit: proto::Transmit, response_buffer: &[u8], socket: &dyn Async // to transmit. This is morally equivalent to the packet getting // lost due to congestion further along the link, which // similarly relies on peer retries for recovery. - _ = socket.try_send(&udp_transmit(&transmit, &response_buffer[..transmit.size])); + _ = sender + .as_mut() + .try_send(&udp_transmit(&transmit, &response_buffer[..transmit.size])); } #[inline] @@ -676,6 +693,7 @@ impl EndpointRef { ) -> Self { let (sender, events) = mpsc::unbounded_channel(); let recv_state = RecvState::new(sender, socket.max_receive_segments(), &inner); + let sender = socket.clone().create_sender(); Self(Arc::new(EndpointInner { shared: Shared { incoming: Notify::new(), @@ -683,6 +701,7 @@ impl EndpointRef { }, state: Mutex::new(State { socket, + sender, prev_socket: None, inner, ipv6, @@ -765,6 +784,7 @@ impl RecvState { cx: &mut Context, endpoint: &mut proto::Endpoint, socket: &dyn AsyncUdpSocket, + sender: &mut Pin>, runtime: &dyn Runtime, now: Instant, ) -> Result { @@ -804,7 +824,7 @@ impl RecvState { } else { let transmit = endpoint.refuse(incoming, &mut response_buffer); - respond(transmit, &response_buffer, socket); + respond(transmit, &response_buffer, sender); } } Some(DatagramEvent::ConnectionEvent(handle, event)) => { @@ -818,7 +838,7 @@ impl RecvState { .send(ConnectionEvent::Proto(event)); } Some(DatagramEvent::Response(transmit)) => { - respond(transmit, &response_buffer, socket); + respond(transmit, &response_buffer, sender); } None => {} } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index d971574e67..bd85567052 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -92,7 +92,7 @@ pub use crate::runtime::AsyncStdRuntime; pub use crate::runtime::SmolRuntime; #[cfg(feature = "runtime-tokio")] pub use crate::runtime::TokioRuntime; -pub use crate::runtime::{default_runtime, AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller}; +pub use crate::runtime::{default_runtime, AsyncTimer, AsyncUdpSocket, Runtime, UdpSender}; pub use crate::send_stream::{SendStream, StoppedError, WriteError}; #[cfg(test)] diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 9471c1834e..7869baf88e 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -48,14 +48,7 @@ pub trait AsyncUdpSocket: Send + Sync + Debug + 'static { /// [`Waker`]. /// /// [`Waker`]: std::task::Waker - fn create_io_poller(self: Arc) -> Pin>; - - /// Send UDP datagrams from `transmits`, or return `WouldBlock` and clear the underlying - /// socket's readiness, or return an I/O error - /// - /// If this returns [`io::ErrorKind::WouldBlock`], [`UdpPoller::poll_writable`] must be called - /// to register the calling task to be woken when a send should be attempted again. - fn try_send(&self, transmit: &Transmit) -> io::Result<()>; + fn create_sender(self: Arc) -> Pin>; /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future fn poll_recv( @@ -91,7 +84,7 @@ pub trait AsyncUdpSocket: Send + Sync + Debug + 'static { /// /// Any number of `UdpPoller`s may exist for a single [`AsyncUdpSocket`]. Each `UdpPoller` is /// responsible for notifying at most one task when that socket becomes writable. -pub trait UdpPoller: Send + Sync + Debug + 'static { +pub trait UdpSender: Send + Sync + Debug + 'static { /// Check whether the associated socket is likely to be writable /// /// Must be called after [`AsyncUdpSocket::try_send`] returns [`io::ErrorKind::WouldBlock`] to @@ -99,19 +92,27 @@ pub trait UdpPoller: Send + Sync + Debug + 'static { /// again. Unlike in [`Future::poll`], a [`UdpPoller`] may be reused indefinitely no matter how /// many times `poll_writable` returns [`Poll::Ready`]. fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll>; + + /// Send UDP datagrams from `transmits`, or return `WouldBlock` and clear the underlying + /// socket's readiness, or return an I/O error + /// + /// If this returns [`io::ErrorKind::WouldBlock`], [`UdpPoller::poll_writable`] must be called + /// to register the calling task to be woken when a send should be attempted again. + fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()>; } pin_project_lite::pin_project! { /// Helper adapting a function `MakeFut` that constructs a single-use future `Fut` into a /// [`UdpPoller`] that may be reused indefinitely - struct UdpPollHelper { + struct UdpPollHelper { + try_send: TrySend, make_fut: MakeFut, #[pin] fut: Option, } } -impl UdpPollHelper { +impl UdpPollHelper { /// Construct a [`UdpPoller`] that calls `make_fut` to get the future to poll, storing it until /// it yields [`Poll::Ready`], then creating a new one on the next /// [`poll_writable`](UdpPoller::poll_writable) @@ -120,16 +121,18 @@ impl UdpPollHelper { feature = "runtime-smol", feature = "runtime-tokio" ))] - fn new(make_fut: MakeFut) -> Self { + fn new(try_send: TrySend, make_fut: MakeFut) -> Self { Self { + try_send, make_fut, fut: None, } } } -impl UdpPoller for UdpPollHelper +impl UdpSender for UdpPollHelper where + TrySend: for<'a> Fn(&'a Transmit) -> io::Result<()> + Send + Sync + 'static, MakeFut: Fn() -> Fut + Send + Sync + 'static, Fut: Future> + Send + Sync + 'static, { @@ -150,9 +153,13 @@ where } result } + + fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()> { + (self.try_send)(transmit) + } } -impl Debug for UdpPollHelper { +impl Debug for UdpPollHelper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("UdpPollHelper").finish_non_exhaustive() } diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index ad321a240c..b660e919c2 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -1,4 +1,5 @@ use std::{ + fmt::Debug, future::Future, io, pin::Pin, @@ -12,7 +13,7 @@ use tokio::{ time::{sleep_until, Sleep}, }; -use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPollHelper}; +use super::{AsyncTimer, AsyncUdpSocket, Runtime}; /// A Quinn runtime for Tokio #[derive(Debug)] @@ -54,17 +55,55 @@ struct UdpSocket { inner: udp::UdpSocketState, } -impl AsyncUdpSocket for UdpSocket { - fn create_io_poller(self: Arc) -> Pin> { - Box::pin(UdpPollHelper::new(move || { - let socket = self.clone(); - async move { socket.io.writable().await } - })) +pin_project_lite::pin_project! { + struct UdpSender { + #[pin] + fut: Option> + Send + Sync + 'static>>>, + inner: Arc, + } +} + +impl Debug for UdpSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("UdpSender") } +} - fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> { - self.io.try_io(Interest::WRITABLE, || { - self.inner.send((&self.io).into(), transmit) +impl super::UdpSender for UdpSender { + fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + if this.fut.is_none() { + this.fut.set(Some(Box::pin({ + let socket = this.inner.clone(); + async move { socket.io.writable().await } + }))); + } + // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely + // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, + // and if we didn't store it then we wouldn't be able to keep it alive between + // `poll_writable` calls. + let result = this.fut.as_mut().as_pin_mut().unwrap().poll(cx); + if result.is_ready() { + // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for + // a new `Future` to be created on the next call. + this.fut.set(None); + } + result + } + + fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> { + let socket = &self.inner; + socket.io.try_io(Interest::WRITABLE, || { + socket.inner.send((&socket.io).into(), transmit) + }) + } +} + +impl AsyncUdpSocket for UdpSocket { + fn create_sender(self: Arc) -> Pin> { + Box::pin(UdpSender { + fut: None, + inner: self, }) } From 8a2dd46fc33f8d932c779935ccad6ca3facc3c4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 21 May 2025 13:08:17 +0200 Subject: [PATCH 02/16] Implement a `poll_send` instead of `poll_writable`. --- quinn/src/connection.rs | 59 +++++++++++--------------------- quinn/src/runtime.rs | 70 ++++++++++++++++++++++---------------- quinn/src/runtime/tokio.rs | 51 +++++++++++++++++++-------- 3 files changed, 96 insertions(+), 84 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 4bc410a2e6..10d64785ba 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -901,7 +901,6 @@ impl ConnectionRef { socket, runtime, send_buffer: Vec::new(), - buffered_transmit: None, observed_external_addr: watch::Sender::new(None), }), shared: Shared::default(), @@ -1021,8 +1020,6 @@ pub(crate) struct State { udp_sender: Pin>, runtime: Arc, send_buffer: Vec, - /// We buffer a transmit when the underlying I/O would block - buffered_transmit: Option, /// Our last external address reported by the peer. pub(crate) observed_external_addr: watch::Sender>, } @@ -1035,51 +1032,33 @@ impl State { let max_datagrams = self.socket.max_transmit_segments(); loop { - // Retry the last transmit, or get a new one. - let t = match self.buffered_transmit.take() { - Some(t) => t, - None => { - self.send_buffer.clear(); - self.send_buffer.reserve(self.inner.current_mtu() as usize); - match self - .inner - .poll_transmit(now, max_datagrams, &mut self.send_buffer) - { - Some(t) => { - transmits += match t.segment_size { - None => 1, - Some(s) => (t.size + s - 1) / s, // round up - }; - t - } - None => break, + let t = { + self.send_buffer.clear(); + self.send_buffer.reserve(self.inner.current_mtu() as usize); + match self + .inner + .poll_transmit(now, max_datagrams, &mut self.send_buffer) + { + Some(t) => { + transmits += match t.segment_size { + None => 1, + Some(s) => (t.size + s - 1) / s, // round up + }; + t } + None => break, } }; - if self.udp_sender.as_mut().poll_writable(cx)?.is_pending() { - // Retry after a future wakeup - self.buffered_transmit = Some(t); - return Ok(false); - } - let len = t.size; - let retry = match self + // TODO(matheus23): What to do with the poll result? Is `return Ok(true)` fine? + match self .udp_sender .as_mut() - .try_send(&udp_transmit(&t, &self.send_buffer[..len])) + .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx) { - Ok(()) => false, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true, - Err(e) => return Err(e), - }; - if retry { - // We thought the socket was writable, but it wasn't. Retry so that either another - // `poll_writable` call determines that the socket is indeed not writable and - // registers us for a wakeup, or the send succeeds if this really was just a - // transient failure. - self.buffered_transmit = Some(t); - continue; + Poll::Pending => return Ok(true), + Poll::Ready(result) => result?, // propagate errors } if transmits >= MAX_TRANSMIT_DATAGRAMS { diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 7869baf88e..83e574997b 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -91,13 +91,23 @@ pub trait UdpSender: Send + Sync + Debug + 'static { /// register the task associated with `cx` to be woken when a send should be attempted /// again. Unlike in [`Future::poll`], a [`UdpPoller`] may be reused indefinitely no matter how /// many times `poll_writable` returns [`Poll::Ready`]. - fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll>; - + /// + /// // TODO(matheus23): Fix weird documentation merge + /// /// Send UDP datagrams from `transmits`, or return `WouldBlock` and clear the underlying /// socket's readiness, or return an I/O error /// /// If this returns [`io::ErrorKind::WouldBlock`], [`UdpPoller::poll_writable`] must be called /// to register the calling task to be woken when a send should be attempted again. + fn poll_send( + self: Pin<&mut Self>, + transmit: &Transmit, + cx: &mut Context, + ) -> Poll>; + + /// TODO(matheus23): Docs + /// Last ditch/best effort of sending a transmit. + /// Used by the endpoint for resets / close frames when dropped, etc. fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()>; } @@ -130,34 +140,34 @@ impl UdpPollHelper { } } -impl UdpSender for UdpPollHelper -where - TrySend: for<'a> Fn(&'a Transmit) -> io::Result<()> + Send + Sync + 'static, - MakeFut: Fn() -> Fut + Send + Sync + 'static, - Fut: Future> + Send + Sync + 'static, -{ - fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut this = self.project(); - if this.fut.is_none() { - this.fut.set(Some((this.make_fut)())); - } - // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely - // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, - // and if we didn't store it then we wouldn't be able to keep it alive between - // `poll_writable` calls. - let result = this.fut.as_mut().as_pin_mut().unwrap().poll(cx); - if result.is_ready() { - // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for - // a new `Future` to be created on the next call. - this.fut.set(None); - } - result - } - - fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()> { - (self.try_send)(transmit) - } -} +// impl UdpSender for UdpPollHelper +// where +// TrySend: for<'a> Fn(&'a Transmit) -> io::Result<()> + Send + Sync + 'static, +// MakeFut: Fn() -> Fut + Send + Sync + 'static, +// Fut: Future> + Send + Sync + 'static, +// { +// fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { +// let mut this = self.project(); +// if this.fut.is_none() { +// this.fut.set(Some((this.make_fut)())); +// } +// // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely +// // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, +// // and if we didn't store it then we wouldn't be able to keep it alive between +// // `poll_writable` calls. +// let result = this.fut.as_mut().as_pin_mut().unwrap().poll(cx); +// if result.is_ready() { +// // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for +// // a new `Future` to be created on the next call. +// this.fut.set(None); +// } +// result +// } + +// fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()> { +// (self.try_send)(transmit) +// } +// } impl Debug for UdpPollHelper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index b660e919c2..0881ef5acf 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -70,25 +70,47 @@ impl Debug for UdpSender { } impl super::UdpSender for UdpSender { - fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_send( + self: Pin<&mut Self>, + transmit: &udp::Transmit, + cx: &mut Context, + ) -> Poll> { let mut this = self.project(); - if this.fut.is_none() { - this.fut.set(Some(Box::pin({ - let socket = this.inner.clone(); - async move { socket.io.writable().await } - }))); - } - // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely - // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, - // and if we didn't store it then we wouldn't be able to keep it alive between - // `poll_writable` calls. - let result = this.fut.as_mut().as_pin_mut().unwrap().poll(cx); - if result.is_ready() { + loop { + if this.fut.is_none() { + this.fut.set(Some(Box::pin({ + let socket = this.inner.clone(); + async move { socket.io.writable().await } + }))); + } + // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely + // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, + // and if we didn't store it then we wouldn't be able to keep it alive between + // `poll_writable` calls. + let result = ready!(this.fut.as_mut().as_pin_mut().unwrap().poll(cx)); + // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for // a new `Future` to be created on the next call. this.fut.set(None); + + // If .writable() fails, propagate the error + result?; + + let socket = &this.inner; + let result = socket.io.try_io(Interest::WRITABLE, || { + socket.inner.send((&socket.io).into(), transmit) + }); + + match result { + // We thought the socket was writable, but it wasn't, then retry so that either another + // `writable().await` call determines that the socket is indeed not writable and + // registers us for a wakeup, or the send succeeds if this really was just a + // transient failure. + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + // In all other cases, either propagate the error or we're Ok + _ => return Poll::Ready(result), + } } - result } fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> { @@ -101,6 +123,7 @@ impl super::UdpSender for UdpSender { impl AsyncUdpSocket for UdpSocket { fn create_sender(self: Arc) -> Pin> { + // TODO(matheus23): There's probably a way to get rid of the double-boxing here (and the box inside UdpSender) Box::pin(UdpSender { fut: None, inner: self, From c8253835ece61c329c50fdc85cb60db83080dec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 21 May 2025 13:16:05 +0200 Subject: [PATCH 03/16] Don't store `Arc` in connection state anymore --- quinn/src/connection.rs | 9 +++------ quinn/src/endpoint.rs | 4 +++- quinn/src/lib.rs | 4 ++-- quinn/src/runtime.rs | 10 +++++----- quinn/src/runtime/tokio.rs | 8 ++++---- 5 files changed, 17 insertions(+), 18 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 10d64785ba..170dda1501 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -898,7 +898,6 @@ impl ConnectionRef { error: None, ref_count: 0, udp_sender: socket.clone().create_sender(), - socket, runtime, send_buffer: Vec::new(), observed_external_addr: watch::Sender::new(None), @@ -1016,7 +1015,6 @@ pub(crate) struct State { pub(crate) error: Option, /// Number of live handles that can be used to initiate or handle I/O; excludes the driver ref_count: usize, - socket: Arc, udp_sender: Pin>, runtime: Arc, send_buffer: Vec, @@ -1029,7 +1027,7 @@ impl State { let now = self.runtime.now(); let mut transmits = 0; - let max_datagrams = self.socket.max_transmit_segments(); + let max_datagrams = self.udp_sender.max_transmit_segments(); loop { let t = { @@ -1088,9 +1086,8 @@ impl State { ) -> Result<(), ConnectionError> { loop { match self.conn_events.poll_recv(cx) { - Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => { - self.socket = socket; - self.udp_sender = self.socket.clone().create_sender(); + Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => { + self.udp_sender = sender; self.inner.local_address_changed(); } Poll::Ready(Some(ConnectionEvent::Proto(event))) => { diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index f1f75b58be..b151b97ebf 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -255,7 +255,9 @@ impl Endpoint { // Update connection socket references for sender in inner.recv_state.connections.senders.values() { // Ignoring errors from dropped connections - let _ = sender.send(ConnectionEvent::Rebind(inner.socket.clone())); + let _ = sender.send(ConnectionEvent::Rebind( + inner.socket.clone().create_sender(), + )); } Ok(()) diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index bd85567052..4e5166d7f0 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -41,7 +41,7 @@ #![warn(unreachable_pub)] #![warn(clippy::use_self)] -use std::sync::Arc; +use std::pin::Pin; macro_rules! ready { ($e:expr $(,)?) => { @@ -105,7 +105,7 @@ enum ConnectionEvent { reason: bytes::Bytes, }, Proto(proto::ConnectionEvent), - Rebind(Arc), + Rebind(Pin>), } fn udp_transmit<'a>(t: &proto::Transmit, buffer: &'a [u8]) -> udp::Transmit<'a> { diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 83e574997b..b5071af974 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -61,11 +61,6 @@ pub trait AsyncUdpSocket: Send + Sync + Debug + 'static { /// Look up the local IP address and port used by this socket fn local_addr(&self) -> io::Result; - /// Maximum number of datagrams that a [`Transmit`] may encode - fn max_transmit_segments(&self) -> usize { - 1 - } - /// Maximum number of datagrams that might be described by a single [`RecvMeta`] fn max_receive_segments(&self) -> usize { 1 @@ -105,6 +100,11 @@ pub trait UdpSender: Send + Sync + Debug + 'static { cx: &mut Context, ) -> Poll>; + /// Maximum number of datagrams that a [`Transmit`] may encode + fn max_transmit_segments(&self) -> usize { + 1 + } + /// TODO(matheus23): Docs /// Last ditch/best effort of sending a transmit. /// Used by the endpoint for resets / close frames when dropped, etc. diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index 0881ef5acf..2d6597a54c 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -113,6 +113,10 @@ impl super::UdpSender for UdpSender { } } + fn max_transmit_segments(&self) -> usize { + self.inner.inner.max_gso_segments() + } + fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> { let socket = &self.inner; socket.io.try_io(Interest::WRITABLE, || { @@ -154,10 +158,6 @@ impl AsyncUdpSocket for UdpSocket { self.inner.may_fragment() } - fn max_transmit_segments(&self) -> usize { - self.inner.max_gso_segments() - } - fn max_receive_segments(&self) -> usize { self.inner.gro_segments() } From 0aaea29db580a940aa0dfd3ff65979dcc02eddb5 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 21 May 2025 13:39:31 +0200 Subject: [PATCH 04/16] avoid doubule box --- quinn/src/runtime/tokio.rs | 40 +++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index 2d6597a54c..b511588f44 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -56,20 +56,35 @@ struct UdpSocket { } pin_project_lite::pin_project! { - struct UdpSender { - #[pin] - fut: Option> + Send + Sync + 'static>>>, + struct UdpSender { inner: Arc, + make_fut: MakeFut, + #[pin] + fut: Option, } } -impl Debug for UdpSender { +impl Debug for UdpSender { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("UdpSender") } } -impl super::UdpSender for UdpSender { +impl UdpSender { + fn new(inner: Arc, make_fut: MakeFut) -> Self { + Self { + inner, + fut: None, + make_fut, + } + } +} + +impl super::UdpSender for UdpSender +where + MakeFut: Fn() -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ fn poll_send( self: Pin<&mut Self>, transmit: &udp::Transmit, @@ -78,10 +93,7 @@ impl super::UdpSender for UdpSender { let mut this = self.project(); loop { if this.fut.is_none() { - this.fut.set(Some(Box::pin({ - let socket = this.inner.clone(); - async move { socket.io.writable().await } - }))); + this.fut.set(Some((this.make_fut)())); } // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, @@ -127,11 +139,11 @@ impl super::UdpSender for UdpSender { impl AsyncUdpSocket for UdpSocket { fn create_sender(self: Arc) -> Pin> { - // TODO(matheus23): There's probably a way to get rid of the double-boxing here (and the box inside UdpSender) - Box::pin(UdpSender { - fut: None, - inner: self, - }) + let socket = self.clone(); + Box::pin(UdpSender::new(self, move || { + let socket = socket.clone(); + async move { socket.io.writable().await } + })) } fn poll_recv( From c346dd9aeadbb53c8c16f91579363731916af6ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 21 May 2025 13:42:44 +0200 Subject: [PATCH 05/16] Make `poll_recv` take `&mut self` --- quinn/src/connection.rs | 10 +++++----- quinn/src/endpoint.rs | 36 +++++++++++++++++------------------- quinn/src/runtime.rs | 6 +++--- quinn/src/runtime/tokio.rs | 25 +++++++++++++------------ 4 files changed, 38 insertions(+), 39 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 170dda1501..dc982d2b3a 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -19,7 +19,7 @@ use tracing::{debug_span, Instrument, Span}; use crate::{ mutex::Mutex, recv_stream::RecvStream, - runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSender}, + runtime::{AsyncTimer, Runtime, UdpSender}, send_stream::SendStream, udp_transmit, ConnectionEvent, Duration, Instant, VarInt, }; @@ -42,7 +42,7 @@ impl Connecting { conn: proto::Connection, endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>, conn_events: mpsc::UnboundedReceiver, - socket: Arc, + sender: Pin>, runtime: Arc, ) -> Self { let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel(); @@ -54,7 +54,7 @@ impl Connecting { conn_events, on_handshake_data_send, on_connected_send, - socket, + sender, runtime.clone(), ); @@ -877,7 +877,7 @@ impl ConnectionRef { conn_events: mpsc::UnboundedReceiver, on_handshake_data: oneshot::Sender<()>, on_connected: oneshot::Sender, - socket: Arc, + sender: Pin>, runtime: Arc, ) -> Self { Self(Arc::new(ConnectionInner { @@ -897,7 +897,7 @@ impl ConnectionRef { stopped: FxHashMap::default(), error: None, ref_count: 0, - udp_sender: socket.clone().create_sender(), + udp_sender: sender, runtime, send_buffer: Vec::new(), observed_external_addr: watch::Sender::new(None), diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index b151b97ebf..c2de97a1e2 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -129,7 +129,7 @@ impl Endpoint { pub fn new_with_abstract_socket( config: EndpointConfig, server_config: Option, - socket: Arc, + socket: Box, runtime: Arc, ) -> io::Result { let addr = socket.local_addr()?; @@ -224,12 +224,12 @@ impl Endpoint { .inner .connect(self.runtime.now(), config, addr, server_name)?; - let socket = endpoint.socket.clone(); + let sender = endpoint.socket.create_sender(); endpoint.stats.outgoing_handshakes += 1; Ok(endpoint .recv_state .connections - .insert(ch, conn, socket, self.runtime.clone())) + .insert(ch, conn, sender, self.runtime.clone())) } /// Switch to a new UDP socket @@ -246,7 +246,7 @@ impl Endpoint { /// connections and connections to servers unreachable from the new address will be lost. /// /// On error, the old UDP socket is retained. - pub fn rebind_abstract(&self, socket: Arc) -> io::Result<()> { + pub fn rebind_abstract(&self, socket: Box) -> io::Result<()> { let addr = socket.local_addr()?; let mut inner = self.inner.state.lock().unwrap(); inner.prev_socket = Some(mem::replace(&mut inner.socket, socket)); @@ -255,9 +255,7 @@ impl Endpoint { // Update connection socket references for sender in inner.recv_state.connections.senders.values() { // Ignoring errors from dropped connections - let _ = sender.send(ConnectionEvent::Rebind( - inner.socket.clone().create_sender(), - )); + let _ = sender.send(ConnectionEvent::Rebind(inner.socket.create_sender())); } Ok(()) @@ -422,12 +420,12 @@ impl EndpointInner { { Ok((handle, conn)) => { state.stats.accepted_handshakes += 1; - let socket = state.socket.clone(); + let sender = state.socket.create_sender(); let runtime = state.runtime.clone(); Ok(state .recv_state .connections - .insert(handle, conn, socket, runtime)) + .insert(handle, conn, sender, runtime)) } Err(error) => { if let Some(transmit) = error.response { @@ -463,11 +461,11 @@ impl EndpointInner { #[derive(Debug)] pub(crate) struct State { - socket: Arc, + socket: Box, sender: Pin>, /// During an active migration, abandoned_socket receives traffic /// until the first packet arrives on the new socket. - prev_socket: Option>, + prev_socket: Option>, inner: proto::Endpoint, recv_state: RecvState, driver: Option, @@ -490,12 +488,12 @@ impl State { fn drive_recv(&mut self, cx: &mut Context, now: Instant) -> Result { let get_time = || self.runtime.now(); self.recv_state.recv_limiter.start_cycle(get_time); - if let Some(socket) = &self.prev_socket { + if let Some(socket) = &mut self.prev_socket { // We don't care about the `PollProgress` from old sockets. let poll_res = self.recv_state.poll_socket( cx, &mut self.inner, - &**socket, + &mut *socket, &mut self.sender, &*self.runtime, now, @@ -507,7 +505,7 @@ impl State { let poll_res = self.recv_state.poll_socket( cx, &mut self.inner, - &*self.socket, + &mut self.socket, &mut self.sender, &*self.runtime, now, @@ -617,7 +615,7 @@ impl ConnectionSet { &mut self, handle: ConnectionHandle, conn: proto::Connection, - socket: Arc, + sender: Pin>, runtime: Arc, ) -> Connecting { let (send, recv) = mpsc::unbounded_channel(); @@ -629,7 +627,7 @@ impl ConnectionSet { .unwrap(); } self.senders.insert(handle, send); - Connecting::new(handle, conn, self.sender.clone(), recv, socket, runtime) + Connecting::new(handle, conn, self.sender.clone(), recv, sender, runtime) } fn is_empty(&self) -> bool { @@ -688,14 +686,14 @@ pub(crate) struct EndpointRef(Arc); impl EndpointRef { pub(crate) fn new( - socket: Arc, + socket: Box, inner: proto::Endpoint, ipv6: bool, runtime: Arc, ) -> Self { let (sender, events) = mpsc::unbounded_channel(); let recv_state = RecvState::new(sender, socket.max_receive_segments(), &inner); - let sender = socket.clone().create_sender(); + let sender = socket.create_sender(); Self(Arc::new(EndpointInner { shared: Shared { incoming: Notify::new(), @@ -785,7 +783,7 @@ impl RecvState { &mut self, cx: &mut Context, endpoint: &mut proto::Endpoint, - socket: &dyn AsyncUdpSocket, + socket: &mut Box, sender: &mut Pin>, runtime: &dyn Runtime, now: Instant, diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index b5071af974..3b98e39f3c 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -20,7 +20,7 @@ pub trait Runtime: Send + Sync + Debug + 'static { fn spawn(&self, future: Pin + Send>>); /// Convert `t` into the socket type used by this runtime #[cfg(not(wasm_browser))] - fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result>; + fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result>; /// Look up the current time /// /// Allows simulating the flow of time for testing. @@ -48,11 +48,11 @@ pub trait AsyncUdpSocket: Send + Sync + Debug + 'static { /// [`Waker`]. /// /// [`Waker`]: std::task::Waker - fn create_sender(self: Arc) -> Pin>; + fn create_sender(&self) -> Pin>; /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future fn poll_recv( - &self, + &mut self, cx: &mut Context, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta], diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index b511588f44..6f83ae5c7f 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -28,10 +28,10 @@ impl Runtime for TokioRuntime { tokio::spawn(future); } - fn wrap_udp_socket(&self, sock: std::net::UdpSocket) -> io::Result> { - Ok(Arc::new(UdpSocket { - inner: udp::UdpSocketState::new((&sock).into())?, - io: tokio::net::UdpSocket::from_std(sock)?, + fn wrap_udp_socket(&self, sock: std::net::UdpSocket) -> io::Result> { + Ok(Box::new(UdpSocket { + inner: Arc::new(udp::UdpSocketState::new((&sock).into())?), + io: Arc::new(tokio::net::UdpSocket::from_std(sock)?), })) } @@ -49,15 +49,15 @@ impl AsyncTimer for Sleep { } } -#[derive(Debug)] +#[derive(Debug, Clone)] struct UdpSocket { - io: tokio::net::UdpSocket, - inner: udp::UdpSocketState, + io: Arc, + inner: Arc, } pin_project_lite::pin_project! { struct UdpSender { - inner: Arc, + inner: UdpSocket, make_fut: MakeFut, #[pin] fut: Option, @@ -71,7 +71,7 @@ impl Debug for UdpSender { } impl UdpSender { - fn new(inner: Arc, make_fut: MakeFut) -> Self { + fn new(inner: UdpSocket, make_fut: MakeFut) -> Self { Self { inner, fut: None, @@ -138,22 +138,23 @@ where } impl AsyncUdpSocket for UdpSocket { - fn create_sender(self: Arc) -> Pin> { + fn create_sender(&self) -> Pin> { let socket = self.clone(); - Box::pin(UdpSender::new(self, move || { + Box::pin(UdpSender::new(self.clone(), move || { let socket = socket.clone(); async move { socket.io.writable().await } })) } fn poll_recv( - &self, + &mut self, cx: &mut Context, bufs: &mut [std::io::IoSliceMut<'_>], meta: &mut [udp::RecvMeta], ) -> Poll> { loop { ready!(self.io.poll_recv_ready(cx))?; + // TODO(matheus23) I think this should actually propagate errors that aren't `WouldBlock` if let Ok(res) = self.io.try_io(Interest::READABLE, || { self.inner.recv((&self.io).into(), bufs, meta) }) { From c5b17259f695cad85faa65320806ab759ef2d615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 21 May 2025 13:57:02 +0200 Subject: [PATCH 06/16] Cleanup `connection::State::drive_transmit` a bit --- quinn/src/connection.rs | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index dc982d2b3a..166b5fe107 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -1030,22 +1030,18 @@ impl State { let max_datagrams = self.udp_sender.max_transmit_segments(); loop { - let t = { - self.send_buffer.clear(); - self.send_buffer.reserve(self.inner.current_mtu() as usize); - match self - .inner - .poll_transmit(now, max_datagrams, &mut self.send_buffer) - { - Some(t) => { - transmits += match t.segment_size { - None => 1, - Some(s) => (t.size + s - 1) / s, // round up - }; - t - } - None => break, - } + self.send_buffer.clear(); + self.send_buffer.reserve(self.inner.current_mtu() as usize); + let Some(t) = self + .inner + .poll_transmit(now, max_datagrams, &mut self.send_buffer) + else { + break; + }; + + transmits += match t.segment_size { + None => 1, + Some(s) => (t.size + s - 1) / s, // round up }; let len = t.size; From ed27c9818c4c30521b97ce9cfbb318c05050a1d5 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 21 May 2025 14:23:47 +0200 Subject: [PATCH 07/16] 2 --- quinn/src/runtime.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 3b98e39f3c..8111cdb4fb 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -122,23 +122,23 @@ pin_project_lite::pin_project! { } } -impl UdpPollHelper { - /// Construct a [`UdpPoller`] that calls `make_fut` to get the future to poll, storing it until - /// it yields [`Poll::Ready`], then creating a new one on the next - /// [`poll_writable`](UdpPoller::poll_writable) - #[cfg(any( - feature = "runtime-async-std", - feature = "runtime-smol", - feature = "runtime-tokio" - ))] - fn new(try_send: TrySend, make_fut: MakeFut) -> Self { - Self { - try_send, - make_fut, - fut: None, - } - } -} +// impl UdpPollHelper { +// /// Construct a [`UdpPoller`] that calls `make_fut` to get the future to poll, storing it until +// /// it yields [`Poll::Ready`], then creating a new one on the next +// /// [`poll_writable`](UdpPoller::poll_writable) +// #[cfg(any( +// feature = "runtime-async-std", +// feature = "runtime-smol", +// feature = "runtime-tokio" +// ))] +// fn new(try_send: TrySend, make_fut: MakeFut) -> Self { +// Self { +// try_send, +// make_fut, +// fut: None, +// } +// } +// } // impl UdpSender for UdpPollHelper // where From ada30059656cdd316949394d34549d2285f878b6 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 22 May 2025 11:38:09 +0200 Subject: [PATCH 08/16] bring back buffering --- quinn/src/connection.rs | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 166b5fe107..589e27c09c 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -900,6 +900,7 @@ impl ConnectionRef { udp_sender: sender, runtime, send_buffer: Vec::new(), + buffered_transmit: None, observed_external_addr: watch::Sender::new(None), }), shared: Shared::default(), @@ -1018,6 +1019,8 @@ pub(crate) struct State { udp_sender: Pin>, runtime: Arc, send_buffer: Vec, + /// We buffer a transmit when the underlying I/O would block + buffered_transmit: Option, /// Our last external address reported by the peer. pub(crate) observed_external_addr: watch::Sender>, } @@ -1030,18 +1033,26 @@ impl State { let max_datagrams = self.udp_sender.max_transmit_segments(); loop { - self.send_buffer.clear(); - self.send_buffer.reserve(self.inner.current_mtu() as usize); - let Some(t) = self - .inner - .poll_transmit(now, max_datagrams, &mut self.send_buffer) - else { - break; - }; - - transmits += match t.segment_size { - None => 1, - Some(s) => (t.size + s - 1) / s, // round up + // Retry the last transmit, or get a new one. + let t = match self.buffered_transmit.take() { + Some(t) => t, + None => { + self.send_buffer.clear(); + self.send_buffer.reserve(self.inner.current_mtu() as usize); + match self + .inner + .poll_transmit(now, max_datagrams, &mut self.send_buffer) + { + Some(t) => { + transmits += match t.segment_size { + None => 1, + Some(s) => (t.size + s - 1) / s, // round up + }; + t + } + None => break, + } + } }; let len = t.size; @@ -1051,7 +1062,10 @@ impl State { .as_mut() .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx) { - Poll::Pending => return Ok(true), + Poll::Pending => { + self.buffered_transmit = Some(t); + return Ok(false); + } Poll::Ready(result) => result?, // propagate errors } From 54043cafe70969fb15703352092b34bbdf8104a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 23 May 2025 10:40:53 +0200 Subject: [PATCH 09/16] Bump sccache version --- .github/workflows/rust.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b8b3fdbbf1..0132b6de18 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -118,7 +118,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: mozilla-actions/sccache-action@v0.0.4 + - uses: mozilla-actions/sccache-action@v0.0.9 - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.rust }} @@ -180,7 +180,7 @@ jobs: SCCACHE_GHA_ENABLED: "on" steps: - uses: actions/checkout@v4 - - uses: mozilla-actions/sccache-action@v0.0.4 + - uses: mozilla-actions/sccache-action@v0.0.9 - uses: dtolnay/rust-toolchain@1.71.0 - run: cargo check --lib --all-features -p iroh-quinn-udp -p iroh-quinn-proto -p iroh-quinn @@ -191,7 +191,7 @@ jobs: SCCACHE_GHA_ENABLED: "on" steps: - uses: actions/checkout@v4 - - uses: mozilla-actions/sccache-action@v0.0.4 + - uses: mozilla-actions/sccache-action@v0.0.9 - uses: dtolnay/rust-toolchain@stable with: components: rustfmt, clippy From f9e1e876304d6ef0990341f72738330f4893bad9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 23 May 2025 11:12:17 +0200 Subject: [PATCH 10/16] Remove UdpPollHelper, simlify tokio::UdpSender --- quinn/src/connection.rs | 14 ++++----- quinn/src/runtime.rs | 64 -------------------------------------- quinn/src/runtime/tokio.rs | 23 ++++++-------- 3 files changed, 17 insertions(+), 84 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 589e27c09c..ee7bf02ab0 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -897,7 +897,7 @@ impl ConnectionRef { stopped: FxHashMap::default(), error: None, ref_count: 0, - udp_sender: sender, + sender, runtime, send_buffer: Vec::new(), buffered_transmit: None, @@ -1016,7 +1016,7 @@ pub(crate) struct State { pub(crate) error: Option, /// Number of live handles that can be used to initiate or handle I/O; excludes the driver ref_count: usize, - udp_sender: Pin>, + sender: Pin>, runtime: Arc, send_buffer: Vec, /// We buffer a transmit when the underlying I/O would block @@ -1030,7 +1030,7 @@ impl State { let now = self.runtime.now(); let mut transmits = 0; - let max_datagrams = self.udp_sender.max_transmit_segments(); + let max_datagrams = self.sender.max_transmit_segments(); loop { // Retry the last transmit, or get a new one. @@ -1056,9 +1056,8 @@ impl State { }; let len = t.size; - // TODO(matheus23): What to do with the poll result? Is `return Ok(true)` fine? match self - .udp_sender + .sender .as_mut() .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx) { @@ -1066,7 +1065,8 @@ impl State { self.buffered_transmit = Some(t); return Ok(false); } - Poll::Ready(result) => result?, // propagate errors + Poll::Ready(Err(e)) => return Err(e), + Poll::Ready(Ok(())) => {} } if transmits >= MAX_TRANSMIT_DATAGRAMS { @@ -1097,7 +1097,7 @@ impl State { loop { match self.conn_events.poll_recv(cx) { Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => { - self.udp_sender = sender; + self.sender = sender; self.inner.local_address_changed(); } Poll::Ready(Some(ConnectionEvent::Proto(event))) => { diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 8111cdb4fb..507c1555f7 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -111,70 +111,6 @@ pub trait UdpSender: Send + Sync + Debug + 'static { fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()>; } -pin_project_lite::pin_project! { - /// Helper adapting a function `MakeFut` that constructs a single-use future `Fut` into a - /// [`UdpPoller`] that may be reused indefinitely - struct UdpPollHelper { - try_send: TrySend, - make_fut: MakeFut, - #[pin] - fut: Option, - } -} - -// impl UdpPollHelper { -// /// Construct a [`UdpPoller`] that calls `make_fut` to get the future to poll, storing it until -// /// it yields [`Poll::Ready`], then creating a new one on the next -// /// [`poll_writable`](UdpPoller::poll_writable) -// #[cfg(any( -// feature = "runtime-async-std", -// feature = "runtime-smol", -// feature = "runtime-tokio" -// ))] -// fn new(try_send: TrySend, make_fut: MakeFut) -> Self { -// Self { -// try_send, -// make_fut, -// fut: None, -// } -// } -// } - -// impl UdpSender for UdpPollHelper -// where -// TrySend: for<'a> Fn(&'a Transmit) -> io::Result<()> + Send + Sync + 'static, -// MakeFut: Fn() -> Fut + Send + Sync + 'static, -// Fut: Future> + Send + Sync + 'static, -// { -// fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { -// let mut this = self.project(); -// if this.fut.is_none() { -// this.fut.set(Some((this.make_fut)())); -// } -// // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely -// // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, -// // and if we didn't store it then we wouldn't be able to keep it alive between -// // `poll_writable` calls. -// let result = this.fut.as_mut().as_pin_mut().unwrap().poll(cx); -// if result.is_ready() { -// // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for -// // a new `Future` to be created on the next call. -// this.fut.set(None); -// } -// result -// } - -// fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()> { -// (self.try_send)(transmit) -// } -// } - -impl Debug for UdpPollHelper { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("UdpPollHelper").finish_non_exhaustive() - } -} - /// Automatically select an appropriate runtime from those enabled at compile time /// /// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context, diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index 6f83ae5c7f..f3e61a1869 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -57,7 +57,7 @@ struct UdpSocket { pin_project_lite::pin_project! { struct UdpSender { - inner: UdpSocket, + socket: UdpSocket, make_fut: MakeFut, #[pin] fut: Option, @@ -73,7 +73,7 @@ impl Debug for UdpSender { impl UdpSender { fn new(inner: UdpSocket, make_fut: MakeFut) -> Self { Self { - inner, + socket: inner, fut: None, make_fut, } @@ -82,7 +82,7 @@ impl UdpSender { impl super::UdpSender for UdpSender where - MakeFut: Fn() -> Fut + Send + Sync + 'static, + MakeFut: Fn(&UdpSocket) -> Fut + Send + Sync + 'static, Fut: Future> + Send + Sync + 'static, { fn poll_send( @@ -93,7 +93,7 @@ where let mut this = self.project(); loop { if this.fut.is_none() { - this.fut.set(Some((this.make_fut)())); + this.fut.set(Some((this.make_fut)(&this.socket))); } // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, @@ -108,9 +108,8 @@ where // If .writable() fails, propagate the error result?; - let socket = &this.inner; - let result = socket.io.try_io(Interest::WRITABLE, || { - socket.inner.send((&socket.io).into(), transmit) + let result = this.socket.io.try_io(Interest::WRITABLE, || { + this.socket.inner.send((&this.socket.io).into(), transmit) }); match result { @@ -126,21 +125,19 @@ where } fn max_transmit_segments(&self) -> usize { - self.inner.inner.max_gso_segments() + self.socket.inner.max_gso_segments() } fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> { - let socket = &self.inner; - socket.io.try_io(Interest::WRITABLE, || { - socket.inner.send((&socket.io).into(), transmit) + self.socket.io.try_io(Interest::WRITABLE, || { + self.socket.inner.send((&self.socket.io).into(), transmit) }) } } impl AsyncUdpSocket for UdpSocket { fn create_sender(&self) -> Pin> { - let socket = self.clone(); - Box::pin(UdpSender::new(self.clone(), move || { + Box::pin(UdpSender::new(self.clone(), |socket: &UdpSocket| { let socket = socket.clone(); async move { socket.io.writable().await } })) From bc9c918bb13ffeaf75975cb50ceb7cd434add9d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 23 May 2025 14:05:56 +0200 Subject: [PATCH 11/16] Implement a generic `UdpSenderHelper` and use it in `async_io` --- quinn/src/runtime.rs | 83 +++++++++++++++++++++++++++++++++++ quinn/src/runtime/async_io.rs | 46 +++++++++---------- quinn/src/runtime/tokio.rs | 83 +++-------------------------------- 3 files changed, 114 insertions(+), 98 deletions(-) diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 507c1555f7..b1d936b698 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -111,6 +111,89 @@ pub trait UdpSender: Send + Sync + Debug + 'static { fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()>; } +pin_project_lite::pin_project! { + struct UdpSenderHelper { + socket: Socket, + make_fut: MakeFut, + #[pin] + fut: Option, + } +} + +impl Debug for UdpSenderHelper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("UdpSender") + } +} + +impl UdpSenderHelper { + fn new(inner: Socket, make_fut: MakeFut) -> Self { + Self { + socket: inner, + make_fut, + fut: None, + } + } +} + +impl super::UdpSender for UdpSenderHelper +where + Socket: UdpSenderHelperSocket, + MakeFut: Fn(&Socket) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + fn poll_send( + self: Pin<&mut Self>, + transmit: &udp::Transmit, + cx: &mut Context, + ) -> Poll> { + let mut this = self.project(); + loop { + if this.fut.is_none() { + this.fut.set(Some((this.make_fut)(&this.socket))); + } + // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely + // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, + // and if we didn't store it then we wouldn't be able to keep it alive between + // `poll_writable` calls. + let result = ready!(this.fut.as_mut().as_pin_mut().unwrap().poll(cx)); + + // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for + // a new `Future` to be created on the next call. + this.fut.set(None); + + // If .writable() fails, propagate the error + result?; + + let result = this.socket.try_send(transmit); + + match result { + // We thought the socket was writable, but it wasn't, then retry so that either another + // `writable().await` call determines that the socket is indeed not writable and + // registers us for a wakeup, or the send succeeds if this really was just a + // transient failure. + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + // In all other cases, either propagate the error or we're Ok + _ => return Poll::Ready(result), + } + } + } + + fn max_transmit_segments(&self) -> usize { + self.socket.max_transmit_segments() + } + + fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> { + self.socket.try_send(transmit) + } +} + +trait UdpSenderHelperSocket: Send + Sync + 'static { + fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>; + + fn max_transmit_segments(&self) -> usize; +} + /// Automatically select an appropriate runtime from those enabled at compile time /// /// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context, diff --git a/quinn/src/runtime/async_io.rs b/quinn/src/runtime/async_io.rs index 34df24d76f..5a7a2f5fe0 100644 --- a/quinn/src/runtime/async_io.rs +++ b/quinn/src/runtime/async_io.rs @@ -9,7 +9,7 @@ use std::{ use async_io::{Async, Timer}; -use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPollHelper}; +use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSenderHelper}; #[cfg(feature = "smol")] // Due to MSRV, we must specify `self::` where there's crate/module ambiguity @@ -35,8 +35,8 @@ mod smol { fn wrap_udp_socket( &self, sock: std::net::UdpSocket, - ) -> io::Result> { - Ok(Arc::new(UdpSocket::new(sock)?)) + ) -> io::Result> { + Ok(Box::new(UdpSocket::new(sock)?)) } } } @@ -65,8 +65,8 @@ mod async_std { fn wrap_udp_socket( &self, sock: std::net::UdpSocket, - ) -> io::Result> { - Ok(Arc::new(UdpSocket::new(sock)?)) + ) -> io::Result> { + Ok(Box::new(UdpSocket::new(sock)?)) } } } @@ -81,35 +81,41 @@ impl AsyncTimer for Timer { } } -#[derive(Debug)] +#[derive(Debug, Clone)] struct UdpSocket { - io: Async, - inner: udp::UdpSocketState, + io: Arc>, + inner: Arc, } impl UdpSocket { fn new(sock: std::net::UdpSocket) -> io::Result { Ok(Self { - inner: udp::UdpSocketState::new((&sock).into())?, - io: Async::new_nonblocking(sock)?, + inner: Arc::new(udp::UdpSocketState::new((&sock).into())?), + io: Arc::new(Async::new_nonblocking(sock)?), }) } } -impl AsyncUdpSocket for UdpSocket { - fn create_io_poller(self: Arc) -> Pin> { - Box::pin(UdpPollHelper::new(move || { - let socket = self.clone(); - async move { socket.io.writable().await } - })) +impl super::UdpSenderHelperSocket for UdpSocket { + fn max_transmit_segments(&self) -> usize { + self.inner.max_gso_segments() } fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> { self.inner.send((&self.io).into(), transmit) } +} + +impl AsyncUdpSocket for UdpSocket { + fn create_sender(&self) -> Pin> { + Box::pin(UdpSenderHelper::new(self.clone(), |socket: &UdpSocket| { + let socket = socket.clone(); + async move { socket.io.writable().await } + })) + } fn poll_recv( - &self, + &mut self, cx: &mut Context, bufs: &mut [io::IoSliceMut<'_>], meta: &mut [udp::RecvMeta], @@ -123,17 +129,13 @@ impl AsyncUdpSocket for UdpSocket { } fn local_addr(&self) -> io::Result { - self.io.as_ref().local_addr() + self.io.as_ref().as_ref().local_addr() } fn may_fragment(&self) -> bool { self.inner.may_fragment() } - fn max_transmit_segments(&self) -> usize { - self.inner.max_gso_segments() - } - fn max_receive_segments(&self) -> usize { self.inner.gro_segments() } diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index f3e61a1869..2d66dfd5b7 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -13,7 +13,7 @@ use tokio::{ time::{sleep_until, Sleep}, }; -use super::{AsyncTimer, AsyncUdpSocket, Runtime}; +use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSenderHelper, UdpSenderHelperSocket}; /// A Quinn runtime for Tokio #[derive(Debug)] @@ -55,89 +55,21 @@ struct UdpSocket { inner: Arc, } -pin_project_lite::pin_project! { - struct UdpSender { - socket: UdpSocket, - make_fut: MakeFut, - #[pin] - fut: Option, - } -} - -impl Debug for UdpSender { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("UdpSender") - } -} - -impl UdpSender { - fn new(inner: UdpSocket, make_fut: MakeFut) -> Self { - Self { - socket: inner, - fut: None, - make_fut, - } - } -} - -impl super::UdpSender for UdpSender -where - MakeFut: Fn(&UdpSocket) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + Sync + 'static, -{ - fn poll_send( - self: Pin<&mut Self>, - transmit: &udp::Transmit, - cx: &mut Context, - ) -> Poll> { - let mut this = self.project(); - loop { - if this.fut.is_none() { - this.fut.set(Some((this.make_fut)(&this.socket))); - } - // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely - // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, - // and if we didn't store it then we wouldn't be able to keep it alive between - // `poll_writable` calls. - let result = ready!(this.fut.as_mut().as_pin_mut().unwrap().poll(cx)); - - // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for - // a new `Future` to be created on the next call. - this.fut.set(None); - - // If .writable() fails, propagate the error - result?; - - let result = this.socket.io.try_io(Interest::WRITABLE, || { - this.socket.inner.send((&this.socket.io).into(), transmit) - }); - - match result { - // We thought the socket was writable, but it wasn't, then retry so that either another - // `writable().await` call determines that the socket is indeed not writable and - // registers us for a wakeup, or the send succeeds if this really was just a - // transient failure. - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, - // In all other cases, either propagate the error or we're Ok - _ => return Poll::Ready(result), - } - } - } - +impl UdpSenderHelperSocket for UdpSocket { fn max_transmit_segments(&self) -> usize { - self.socket.inner.max_gso_segments() + self.inner.max_gso_segments() } - fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> { - self.socket.io.try_io(Interest::WRITABLE, || { - self.socket.inner.send((&self.socket.io).into(), transmit) + fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> { + self.io.try_io(Interest::WRITABLE, || { + self.inner.send((&self.io).into(), transmit) }) } } impl AsyncUdpSocket for UdpSocket { fn create_sender(&self) -> Pin> { - Box::pin(UdpSender::new(self.clone(), |socket: &UdpSocket| { + Box::pin(UdpSenderHelper::new(self.clone(), |socket: &UdpSocket| { let socket = socket.clone(); async move { socket.io.writable().await } })) @@ -151,7 +83,6 @@ impl AsyncUdpSocket for UdpSocket { ) -> Poll> { loop { ready!(self.io.poll_recv_ready(cx))?; - // TODO(matheus23) I think this should actually propagate errors that aren't `WouldBlock` if let Ok(res) = self.io.try_io(Interest::READABLE, || { self.inner.recv((&self.io).into(), bufs, meta) }) { From ec09a398bc234b94bc120684eb2d0372ff7af270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 23 May 2025 14:09:12 +0200 Subject: [PATCH 12/16] Expose `UdpSenderHelper` from `runtime` --- quinn/src/lib.rs | 5 ++++- quinn/src/runtime.rs | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 4e5166d7f0..b9d1c89181 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -92,7 +92,10 @@ pub use crate::runtime::AsyncStdRuntime; pub use crate::runtime::SmolRuntime; #[cfg(feature = "runtime-tokio")] pub use crate::runtime::TokioRuntime; -pub use crate::runtime::{default_runtime, AsyncTimer, AsyncUdpSocket, Runtime, UdpSender}; +pub use crate::runtime::{ + default_runtime, AsyncTimer, AsyncUdpSocket, Runtime, UdpSender, UdpSenderHelper, + UdpSenderHelperSocket, +}; pub use crate::send_stream::{SendStream, StoppedError, WriteError}; #[cfg(test)] diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index b1d936b698..71241b351e 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -112,7 +112,7 @@ pub trait UdpSender: Send + Sync + Debug + 'static { } pin_project_lite::pin_project! { - struct UdpSenderHelper { + pub struct UdpSenderHelper { socket: Socket, make_fut: MakeFut, #[pin] @@ -127,7 +127,7 @@ impl Debug for UdpSenderHelper { } impl UdpSenderHelper { - fn new(inner: Socket, make_fut: MakeFut) -> Self { + pub fn new(inner: Socket, make_fut: MakeFut) -> Self { Self { socket: inner, make_fut, @@ -188,7 +188,7 @@ where } } -trait UdpSenderHelperSocket: Send + Sync + 'static { +pub trait UdpSenderHelperSocket: Send + Sync + 'static { fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>; fn max_transmit_segments(&self) -> usize; From 5759445f7c87710fb8efa52c4133f9a7af63df28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 3 Jun 2025 10:07:26 +0200 Subject: [PATCH 13/16] Write documentation --- quinn/src/runtime.rs | 94 ++++++++++++++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 30 deletions(-) diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 71241b351e..df4f5489f7 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -39,12 +39,13 @@ pub trait AsyncTimer: Send + Debug + 'static { /// Abstract implementation of a UDP socket for runtime independence pub trait AsyncUdpSocket: Send + Sync + Debug + 'static { - /// Create a [`UdpPoller`] that can register a single task for write-readiness notifications + /// Create a [`UdpSender`] that can register a single task for write-readiness notifications + /// and send a transmit, if ready. /// /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time, /// i.e. allow at most one caller to wait for an event. This method allows any number of - /// interested tasks to construct their own [`UdpPoller`] object. They can all then wait for the - /// same event and be notified concurrently, because each [`UdpPoller`] can store a separate + /// interested tasks to construct their own [`UdpSender`] object. They can all then wait for the + /// same event and be notified concurrently, because each [`UdpSender`] can store a separate /// [`Waker`]. /// /// [`Waker`]: std::task::Waker @@ -75,59 +76,84 @@ pub trait AsyncUdpSocket: Send + Sync + Debug + 'static { } } -/// An object polled to detect when an associated [`AsyncUdpSocket`] is writable +/// An object for asynchronously writing to an associated [`AsyncUdpSocket`]. /// -/// Any number of `UdpPoller`s may exist for a single [`AsyncUdpSocket`]. Each `UdpPoller` is -/// responsible for notifying at most one task when that socket becomes writable. +/// Any number of [`UdpSender`]s may exist for a single [`AsyncUdpSocket`]. Each [`UdpSender`] is +/// responsible for notifying at most one task for send readiness. pub trait UdpSender: Send + Sync + Debug + 'static { - /// Check whether the associated socket is likely to be writable + /// Send a UDP datagram, or register to be woken if sending may succeed in the future. /// - /// Must be called after [`AsyncUdpSocket::try_send`] returns [`io::ErrorKind::WouldBlock`] to - /// register the task associated with `cx` to be woken when a send should be attempted - /// again. Unlike in [`Future::poll`], a [`UdpPoller`] may be reused indefinitely no matter how - /// many times `poll_writable` returns [`Poll::Ready`]. + /// Usually implementations of this will poll the socket for writability before trying to + /// write to them, and retry both if writing fails. /// - /// // TODO(matheus23): Fix weird documentation merge + /// Quinn will create multiple [`UdpSender`]s, one for each task it's using it from. Thus it's + /// important to poll the underlying socket in a way that doesn't overwrite wakers. /// - /// Send UDP datagrams from `transmits`, or return `WouldBlock` and clear the underlying - /// socket's readiness, or return an I/O error - /// - /// If this returns [`io::ErrorKind::WouldBlock`], [`UdpPoller::poll_writable`] must be called - /// to register the calling task to be woken when a send should be attempted again. + /// A single [`UdpSender`] will be re-used, even if `poll_send` returns `Poll::Ready` once, + /// unlike [`Future::poll`], so calling it again after readiness should not panic. fn poll_send( self: Pin<&mut Self>, transmit: &Transmit, cx: &mut Context, ) -> Poll>; - /// Maximum number of datagrams that a [`Transmit`] may encode + /// Maximum number of datagrams that a [`Transmit`] may encode. fn max_transmit_segments(&self) -> usize { 1 } - /// TODO(matheus23): Docs - /// Last ditch/best effort of sending a transmit. - /// Used by the endpoint for resets / close frames when dropped, etc. + /// Try to send a UDP datagram, if the socket happens to be write-ready. + /// + /// This may fail with [`io::ErrorKind::WouldBlock`], if the socket is currently full. + /// + /// The quinn endpoint uses this function when sending + /// + /// - A version negotiation response due to an unknown version + /// - A `CLOSE` due to a malformed or unwanted connection attempt + /// - A stateless reset due to an unrecognized connection + /// - A `Retry` packet due to a connection attempt when `use_retry` is set + /// + /// If sending in these cases fails, a well-behaved peer will re-try. Thus it's fine + /// if we drop datagrams sometimes with this function. fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()>; } pin_project_lite::pin_project! { - pub struct UdpSenderHelper { + /// A helper for constructing [`UdpSender`]s from an underlying `Socket` type. + /// + /// This struct implements [`UdpSender`] if `MakeWritableFn` produces a `WritableFut`. + /// + /// Also serves as a trick, since `WritableFut` doesn't need to be a named future, + /// it can be an anonymous async block, as long as `MakeWritableFn` produces that + /// anonymous async block type. + /// + /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be + /// used in its dyn-compatible form as a `Pin>`. + pub struct UdpSenderHelper { socket: Socket, - make_fut: MakeFut, + make_fut: MakeWritableFn, #[pin] - fut: Option, + fut: Option, } } -impl Debug for UdpSenderHelper { +impl Debug + for UdpSenderHelper +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("UdpSender") } } -impl UdpSenderHelper { - pub fn new(inner: Socket, make_fut: MakeFut) -> Self { +impl + UdpSenderHelper +{ + /// Create a [`UdpSender`] from a socket and a function that takes + /// the socket's reference and produces a future that resolves to + /// `std::io::Result<()>`, once the socket is write-ready. + /// + /// See also the bounds on this struct's [`UdpSender`] implementation. + pub fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self { Self { socket: inner, make_fut, @@ -136,11 +162,12 @@ impl UdpSenderHelper { } } -impl super::UdpSender for UdpSenderHelper +impl super::UdpSender + for UdpSenderHelper where Socket: UdpSenderHelperSocket, - MakeFut: Fn(&Socket) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + Sync + 'static, + MakeWritableFutFn: Fn(&Socket) -> WritableFut + Send + Sync + 'static, + WritableFut: Future> + Send + Sync + 'static, { fn poll_send( self: Pin<&mut Self>, @@ -188,9 +215,16 @@ where } } +/// Parts of the [`UdpSender`] trait that aren't asynchronous or require storing wakers. +/// +/// This trait is used by [`UdpSenderHelper`] to help construct [`UdpSender`]s. pub trait UdpSenderHelperSocket: Send + Sync + 'static { + /// Try to send a transmit, if the socket happens to be write-ready. + /// + /// Supposed to work identically to [`UdpSender::try_send`], see also its documentation. fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>; + /// See [`UdpSender::max_transmit_segments`]. fn max_transmit_segments(&self) -> usize; } From 0a94d18616dac10c36aab9736a3f63a2322e8705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 3 Jun 2025 10:09:36 +0200 Subject: [PATCH 14/16] More renames --- quinn/src/runtime.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index df4f5489f7..d65aa9e815 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -129,11 +129,11 @@ pin_project_lite::pin_project! { /// /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be /// used in its dyn-compatible form as a `Pin>`. - pub struct UdpSenderHelper { + pub struct UdpSenderHelper { socket: Socket, - make_fut: MakeWritableFn, + make_writable_fut_fn: MakeWritableFutFn, #[pin] - fut: Option, + writable_fut: Option, } } @@ -156,8 +156,8 @@ impl pub fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self { Self { socket: inner, - make_fut, - fut: None, + make_writable_fut_fn: make_fut, + writable_fut: None, } } } @@ -176,18 +176,19 @@ where ) -> Poll> { let mut this = self.project(); loop { - if this.fut.is_none() { - this.fut.set(Some((this.make_fut)(&this.socket))); + if this.writable_fut.is_none() { + this.writable_fut + .set(Some((this.make_writable_fut_fn)(&this.socket))); } // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely - // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`, + // obtain an `&mut Fut` after storing it in `self.writable_fut` when `self` is already behind `Pin`, // and if we didn't store it then we wouldn't be able to keep it alive between // `poll_writable` calls. - let result = ready!(this.fut.as_mut().as_pin_mut().unwrap().poll(cx)); + let result = ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx)); // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for // a new `Future` to be created on the next call. - this.fut.set(None); + this.writable_fut.set(None); // If .writable() fails, propagate the error result?; From 0165c63f85e9f4c3e21d61d842f2e55e6c466089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 3 Jun 2025 10:10:11 +0200 Subject: [PATCH 15/16] More renames --- quinn/src/runtime.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index d65aa9e815..4b69c5a4d1 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -181,9 +181,9 @@ where .set(Some((this.make_writable_fut_fn)(&this.socket))); } // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely - // obtain an `&mut Fut` after storing it in `self.writable_fut` when `self` is already behind `Pin`, + // obtain an `&mut WritableFut` after storing it in `self.writable_fut` when `self` is already behind `Pin`, // and if we didn't store it then we wouldn't be able to keep it alive between - // `poll_writable` calls. + // `poll_send` calls. let result = ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx)); // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for From f3df6ecd29e860a4d70cf18f2d882e847db200b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 3 Jun 2025 14:22:30 +0200 Subject: [PATCH 16/16] Split up run-on sentence --- quinn/src/runtime.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 4b69c5a4d1..11004ae442 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -148,9 +148,10 @@ impl Debug impl UdpSenderHelper { - /// Create a [`UdpSender`] from a socket and a function that takes - /// the socket's reference and produces a future that resolves to - /// `std::io::Result<()>`, once the socket is write-ready. + /// Create helper that implements [`UdpSender`] from a socket. + /// + /// Additionally you need to provide what is essentially an async function + /// that resolves once the socket is write-ready. /// /// See also the bounds on this struct's [`UdpSender`] implementation. pub fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self {