Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add Receiver::poll_recv(..) method #7059

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,62 @@ impl<T> Receiver<T> {
}
}

/// Polls the channel for a value.
///
/// If a pending value exists in the channel, it is returned. If no value
/// has been sent, the current task will be registered for future
/// notification.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
/// failure has been resolved. Note that receiving such a wakeup does not
/// guarantee that the next call will succeed — it could fail with another
/// spurious failure.
///
/// To attempt to receive a value from outside of the context of an
/// asynchronous task, or to avoid spurious failures, see the `try_recv`
/// method.
///
/// # Return
///
/// * `Poll::Pending` if no value has been sent yet but the channel is not
/// closed, or if a spurious failure happens.
/// * `Poll::Ready(Ok(T))` is a value is pending in the channel.
/// * `Poll::Ready(Err(RecvError))` if the sender has dropped without
/// sending a value.
///
/// # Panics
///
/// This function panics if it is called after a value has been received.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// If `inner` is `None`, then `poll()` has already completed.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _res_span = self.resource_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_span = self.async_op_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_poll_span = self.async_op_poll_span.clone().entered();

let ret = if let Some(inner) = self.inner.as_ref() {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let res = ready!(inner.poll_recv(cx))?;

res
} else {
panic!("called after complete");
};

self.inner = None;
Ready(Ok(ret))
}

/// Attempts to receive a value.
///
/// If a pending value exists in the channel, it is returned. If no value
Expand Down Expand Up @@ -1096,28 +1152,7 @@ impl<T> Future for Receiver<T> {
type Output = Result<T, RecvError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If `inner` is `None`, then `poll()` has already completed.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _res_span = self.resource_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_span = self.async_op_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _ao_poll_span = self.async_op_poll_span.clone().entered();

let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let res = ready!(inner.poll_recv(cx))?;

res
} else {
panic!("called after complete");
};

self.inner = None;
Ready(Ok(ret))
self.poll_recv(cx)
}
}

Expand Down
Loading