Skip to content

Commit

Permalink
sync, coop: apply cooperative scheduling to
Browse files Browse the repository at this point in the history
`sync::watch::Receiver::changed`
  • Loading branch information
tglane committed Sep 14, 2024
1 parent a249654 commit a7f1952
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
41 changes: 40 additions & 1 deletion tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ cfg_rt! {
}

cfg_coop! {
use pin_project_lite::pin_project;
use std::cell::Cell;
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

#[must_use]
pub(crate) struct RestoreOnPending(Cell<Budget>);
Expand Down Expand Up @@ -240,6 +243,42 @@ cfg_coop! {
self.0.is_none()
}
}

pin_project! {
/// A future type that calls `poll_proceed` before polling the inner future to check if the
/// inner future has exceeded its budget. If the inner future resolves, this will
/// automatically call `RestoreOnPending::made_progress` before resolving this future with
/// the result of the inner one. If polling the inner future is pending, polling this future
/// type will also return a `Poll::Pending`.
#[must_use = "futures do nothing unless polled"]
pub(crate) struct BudgetConstraintFuture<F: Future> {
#[pin]
pub(crate) fut: F,
}
}

impl<F: Future> Future for BudgetConstraintFuture<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let coop = ready!(poll_proceed(cx));
let me = self.project();
if let Poll::Ready(ret) = me.fut.poll(cx) {
coop.made_progress();
Poll::Ready(ret)
} else {
Poll::Pending
}
}
}

/// Run a future with a budget constraint for cooperative scheduling.
/// If the future exceeds its budget while being polled, control is yielded back to the
/// runtime.
#[inline]
pub(crate) async fn budget_constraint<F: Future>(fut: F) -> F::Output {
BudgetConstraintFuture { fut }.await
}
}

#[cfg(all(test, not(loom)))]
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
changed_impl(&self.shared, &mut self.version).await
crate::runtime::coop::budget_constraint(changed_impl(&self.shared, &mut self.version)).await
}

/// Waits for a value that satisfies the provided condition.
Expand Down

0 comments on commit a7f1952

Please sign in to comment.