diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 2b346eae81c..121127142ce 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -931,6 +931,62 @@ impl Receiver { } } + /// Polls the channel for a value. + /// + /// If a pending value exists in the channel, it is returned. If no value + /// has been sent, the current task will be registered for future + /// notification. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. + /// + /// If this method returns `Poll::Pending` due to a spurious failure, then + /// the `Waker` will be notified when the situation causing the spurious + /// failure has been resolved. Note that receiving such a wakeup does not + /// guarantee that the next call will succeed — it could fail with another + /// spurious failure. + /// + /// To attempt to receive a value from outside of the context of an + /// asynchronous task, or to avoid spurious failures, see the `try_recv` + /// method. + /// + /// # Return + /// + /// * `Poll::Pending` if no value has been sent yet but the channel is not + /// closed, or if a spurious failure happens. + /// * `Poll::Ready(Ok(T))` is a value is pending in the channel. + /// * `Poll::Ready(Err(RecvError))` if the sender has dropped without + /// sending a value. + /// + /// # Panics + /// + /// This function panics if it is called after a value has been received. + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + // If `inner` is `None`, then `poll()` has already completed. + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _res_span = self.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.async_op_poll_span.clone().entered(); + + let ret = if let Some(inner) = self.inner.as_ref() { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let res = ready!(inner.poll_recv(cx))?; + + res + } else { + panic!("called after complete"); + }; + + self.inner = None; + Ready(Ok(ret)) + } + /// Attempts to receive a value. /// /// If a pending value exists in the channel, it is returned. If no value @@ -1096,28 +1152,7 @@ impl Future for Receiver { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // If `inner` is `None`, then `poll()` has already completed. - #[cfg(all(tokio_unstable, feature = "tracing"))] - let _res_span = self.resource_span.clone().entered(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let _ao_span = self.async_op_span.clone().entered(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let _ao_poll_span = self.async_op_poll_span.clone().entered(); - - let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { - #[cfg(all(tokio_unstable, feature = "tracing"))] - let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let res = ready!(inner.poll_recv(cx))?; - - res - } else { - panic!("called after complete"); - }; - - self.inner = None; - Ready(Ok(ret)) + self.poll_recv(cx) } }