Skip to content

Commit 03bf4a7

Browse files
committed
Add run_until_cancelled_owned to CancellationToken
1 parent 5c8cd33 commit 03bf4a7

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

Diff for: tokio-util/src/sync/cancellation_token.rs

+49
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,55 @@ impl CancellationToken {
287287
}
288288
.await
289289
}
290+
291+
/// Runs a future to completion and returns its result wrapped inside of an `Option`
292+
/// unless the `CancellationToken` is cancelled. In that case the function returns
293+
/// `None` and the future gets dropped.
294+
///
295+
/// The function takes self by value.
296+
///
297+
/// # Cancel safety
298+
///
299+
/// This method is only cancel safe if `fut` is cancel safe.
300+
pub async fn run_until_cancelled_owned<F>(self, fut: F) -> Option<F::Output>
301+
where
302+
F: Future,
303+
{
304+
pin_project! {
305+
/// A Future that is resolved once the corresponding [`CancellationToken`]
306+
/// is cancelled or a given Future gets resolved. It is biased towards the
307+
/// Future completion.
308+
#[must_use = "futures do nothing unless polled"]
309+
struct RunUntilCancelledFuture<F: Future> {
310+
#[pin]
311+
cancellation: WaitForCancellationFutureOwned,
312+
#[pin]
313+
future: F,
314+
}
315+
}
316+
317+
impl<F: Future> Future for RunUntilCancelledFuture<F> {
318+
type Output = Option<F::Output>;
319+
320+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
321+
let this = self.project();
322+
if let Poll::Ready(res) = this.future.poll(cx) {
323+
Poll::Ready(Some(res))
324+
} else if this.cancellation.poll(cx).is_ready() {
325+
Poll::Ready(None)
326+
} else {
327+
Poll::Pending
328+
}
329+
}
330+
}
331+
332+
RunUntilCancelledFuture {
333+
cancellation: self.cancelled_owned(),
334+
future: fut,
335+
}
336+
.await
337+
}
338+
290339
}
291340

292341
// ===== impl WaitForCancellationFuture =====

Diff for: tokio-util/tests/sync_cancellation_token.rs

+48
Original file line numberDiff line numberDiff line change
@@ -493,3 +493,51 @@ fn run_until_cancelled_test() {
493493
);
494494
}
495495
}
496+
497+
#[test]
498+
fn run_until_cancelled_owned_test() {
499+
let (waker, _) = new_count_waker();
500+
501+
{
502+
let token = CancellationToken::new();
503+
let to_cancel = token.clone();
504+
505+
let fut = token.run_until_cancelled_owned(std::future::pending::<()>());
506+
pin!(fut);
507+
508+
assert_eq!(
509+
Poll::Pending,
510+
fut.as_mut().poll(&mut Context::from_waker(&waker))
511+
);
512+
513+
to_cancel.cancel();
514+
515+
assert_eq!(
516+
Poll::Ready(None),
517+
fut.as_mut().poll(&mut Context::from_waker(&waker))
518+
);
519+
}
520+
521+
{
522+
let (tx, rx) = oneshot::channel::<()>();
523+
524+
let token = CancellationToken::new();
525+
let fut = token.run_until_cancelled_owned(async move {
526+
rx.await.unwrap();
527+
42
528+
});
529+
pin!(fut);
530+
531+
assert_eq!(
532+
Poll::Pending,
533+
fut.as_mut().poll(&mut Context::from_waker(&waker))
534+
);
535+
536+
tx.send(()).unwrap();
537+
538+
assert_eq!(
539+
Poll::Ready(Some(42)),
540+
fut.as_mut().poll(&mut Context::from_waker(&waker))
541+
);
542+
}
543+
}

0 commit comments

Comments
 (0)