Skip to content

Commit

Permalink
sync: add Receiver::poll_recv(..) method
Browse files Browse the repository at this point in the history
this commit adds a `rx.poll_recv(&mut cx)` to the public interface
of `tokio::sync::oneshot::Receiver<T>`.

this method has the following signature:

```rust
// tokio/src/sync/oneshot.rs
impl<T> Receiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
        // ...
    }
}
```

this is similar to the `tokio::sync::mpsc::Receiver::poll_recv` and
`tokio::sync::mpsc::UnboundedReceiver::poll_recv` methods, which have the
following signature:

```rust
// tokio/src/sync/mpsc/bounded.rs
impl<T> Receiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        // ...
    }
}
```

see: https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv

in particular, note the `&mut self` receiver of these methods, as
opposed to the `Pin<&mut Self>` receiver in `Future::poll(..)`. today, a
oneshot receiver must be pinned in order to be polled via
`Future::poll(..)`.

`tokio::sync::oneshot::Receiver::try_recv(..)` has an important but
subtle difference from `poll_recv(..)`, alluded to in its documentation:

> If a pending value exists in the channel, it is returned. If no value
> has been sent, the current task will not be registered for future
> notification.
>
> This function is useful to call from outside the context of an
> asynchronous task.

see hyperium/http-body#100 for an example use-case for this.

if we *are* in the context of an asynchronous task, we may wish to poll
on the receiver-end of the channel and register for future notification,
indicating that we should be awoken later when a value is ready or when
conditions yielding a spurious failure have passed.

providing a means to poll a `&mut Receiver<T>` avoids the performance
impact of boxing the receiver as an erased `dyn Future` trait object, or
of using an `tokio::sync::mpsc::Receiver<T>`, or the ergonomic wrinkles
of needing to rely on pin projection in asynchronous types that compose
on top of oneshot channels.

---

* https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv
* https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#method.poll_recv
* https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll
* https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html#method.try_recv
* https://github.com/hyperium/http-body/pull/100/files#r1399818104
* hyperium/http-body#100
  • Loading branch information
cratelyn committed Dec 31, 2024
1 parent b3ff911 commit fbedeab
Showing 1 changed file with 57 additions and 22 deletions.
79 changes: 57 additions & 22 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,62 @@ impl<T> Receiver<T> {
}
}

/// Polls the channel for a value.
///
/// If a pending value exists in the channel, it is returned. If no value
/// has been sent, the current task will be registered for future
/// notification.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
/// failure has been resolved. Note that receiving such a wakeup does not
/// guarantee that the next call will succeed — it could fail with another
/// spurious failure.
///
/// To attempt to receive a value from outside of the context of an
/// asynchronous task, or to avoid spurious failures, see the `try_recv`
/// method.
///
/// # Return
///
/// * `Poll::Pending` if no value has been sent yet but the channel is not
/// closed, or if a spurious failure happens.
/// * `Poll::Ready(Ok(T))` is a value is pending in the channel.
/// * `Err(RecvError::Closed)` if the sender has dropped without sending a
/// value.
///
/// # Panics
///
/// This function panics if it is called after a value has been received.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// If `inner` is `None`, then `poll()` has already completed.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _res_span = self.resource_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_span = self.async_op_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_poll_span = self.async_op_poll_span.clone().entered();

let ret = if let Some(inner) = self.inner.as_ref() {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let res = ready!(inner.poll_recv(cx))?;

res
} else {
panic!("called after complete");
};

self.inner = None;
Ready(Ok(ret))
}

/// Attempts to receive a value.
///
/// If a pending value exists in the channel, it is returned. If no value
Expand Down Expand Up @@ -1096,28 +1152,7 @@ impl<T> Future for Receiver<T> {
type Output = Result<T, RecvError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If `inner` is `None`, then `poll()` has already completed.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _res_span = self.resource_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_span = self.async_op_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_poll_span = self.async_op_poll_span.clone().entered();

let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let res = ready!(inner.poll_recv(cx))?;

res
} else {
panic!("called after complete");
};

self.inner = None;
Ready(Ok(ret))
self.poll_recv(cx)
}
}

Expand Down

0 comments on commit fbedeab

Please sign in to comment.