From a24204324991a6d51cd8d8c4a2af06e339c7d60a Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 31 Dec 2024 00:00:00 +0000 Subject: [PATCH] sync: add `Receiver::poll_recv(..)` method this commit adds a `rx.poll_recv(&mut cx)` to the public interface of `tokio::sync::oneshot::Receiver`. this method has the following signature: ```rust // tokio/src/sync/oneshot.rs impl Receiver { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { // ... } } ``` this is similar to the `tokio::sync::mpsc::Receiver::poll_recv` and `tokio::sync::mpsc::UnboundedReceiver::poll_recv` methods, which have the following signature: ```rust // tokio/src/sync/mpsc/bounded.rs impl Receiver { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { // ... } } ``` see: https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv in particular, note the `&mut self` receiver of these methods, as opposed to the `Pin<&mut Self>` receiver in `Future::poll(..)`. today, a oneshot receiver must be pinned in order to be polled via `Future::poll(..)`. `tokio::sync::oneshot::Receiver::try_recv(..)` has an important but subtle difference from `poll_recv(..)`, alluded to in its documentation: > If a pending value exists in the channel, it is returned. If no value > has been sent, the current task will not be registered for future > notification. > > This function is useful to call from outside the context of an > asynchronous task. see hyperium/http-body#100 for an example use-case for this. if we *are* in the context of an asynchronous task, we may wish to poll on the receiver-end of the channel and register for future notification, indicating that we should be awoken later when a value is ready or when conditions yielding a spurious failure have passed. providing a means to poll a `&mut Receiver` avoids the performance impact of boxing the receiver as an erased `dyn Future` trait object, or of using an `tokio::sync::mpsc::Receiver`, or the ergonomic wrinkles of needing to rely on pin projection in asynchronous types that compose on top of oneshot channels. --- * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#method.poll_recv * https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll * https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html#method.try_recv * https://github.com/hyperium/http-body/pull/100/files#r1399818104 * https://github.com/hyperium/http-body/pull/100 --- tokio/src/sync/oneshot.rs | 79 ++++++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 22 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 2b346eae81c..121127142ce 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -931,6 +931,62 @@ impl Receiver { } } + /// Polls the channel for a value. + /// + /// If a pending value exists in the channel, it is returned. If no value + /// has been sent, the current task will be registered for future + /// notification. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. + /// + /// If this method returns `Poll::Pending` due to a spurious failure, then + /// the `Waker` will be notified when the situation causing the spurious + /// failure has been resolved. Note that receiving such a wakeup does not + /// guarantee that the next call will succeed — it could fail with another + /// spurious failure. + /// + /// To attempt to receive a value from outside of the context of an + /// asynchronous task, or to avoid spurious failures, see the `try_recv` + /// method. + /// + /// # Return + /// + /// * `Poll::Pending` if no value has been sent yet but the channel is not + /// closed, or if a spurious failure happens. + /// * `Poll::Ready(Ok(T))` is a value is pending in the channel. + /// * `Poll::Ready(Err(RecvError))` if the sender has dropped without + /// sending a value. + /// + /// # Panics + /// + /// This function panics if it is called after a value has been received. + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + // If `inner` is `None`, then `poll()` has already completed. + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _res_span = self.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.async_op_poll_span.clone().entered(); + + let ret = if let Some(inner) = self.inner.as_ref() { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let res = ready!(inner.poll_recv(cx))?; + + res + } else { + panic!("called after complete"); + }; + + self.inner = None; + Ready(Ok(ret)) + } + /// Attempts to receive a value. /// /// If a pending value exists in the channel, it is returned. If no value @@ -1096,28 +1152,7 @@ impl Future for Receiver { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // If `inner` is `None`, then `poll()` has already completed. - #[cfg(all(tokio_unstable, feature = "tracing"))] - let _res_span = self.resource_span.clone().entered(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let _ao_span = self.async_op_span.clone().entered(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let _ao_poll_span = self.async_op_poll_span.clone().entered(); - - let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { - #[cfg(all(tokio_unstable, feature = "tracing"))] - let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let res = ready!(inner.poll_recv(cx))?; - - res - } else { - panic!("called after complete"); - }; - - self.inner = None; - Ready(Ok(ret)) + self.poll_recv(cx) } }