Skip to content

Move try_fold, try_for_each, and try_for_each_concurrent to StreamExt #2342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, TryForEach, Unzip, Zip,
};

#[cfg(feature = "std")]
Expand All @@ -38,7 +38,7 @@ pub use self::stream::Forward;

#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent};
pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent, TryForEachConcurrent};

#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "sink")]
Expand All @@ -49,7 +49,7 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream};
mod try_stream;
pub use self::try_stream::{
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse,
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext,
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext,
TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
};

Expand All @@ -60,7 +60,7 @@ pub use self::try_stream::IntoAsyncRead;

#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
pub use self::try_stream::{TryBufferUnordered, TryBuffered};

// Primitive streams

Expand Down
153 changes: 152 additions & 1 deletion futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures_core::stream::TryStream;
#[cfg(feature = "alloc")]
use futures_core::stream::{BoxStream, LocalBoxStream};
use futures_core::{
future::Future,
future::{Future, TryFuture},
stream::{FusedStream, Stream},
task::{Context, Poll},
};
Expand Down Expand Up @@ -149,6 +149,14 @@ mod then;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::then::Then;

mod try_for_each;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_for_each::TryForEach;

mod try_fold;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_fold::TryFold;

mod zip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::zip::Zip;
Expand Down Expand Up @@ -197,6 +205,12 @@ cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::split::{SplitStream, SplitSink, ReuniteError};

#[cfg(feature = "alloc")]
mod try_for_each_concurrent;
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_for_each_concurrent::TryForEachConcurrent;
}

#[cfg(feature = "std")]
Expand Down Expand Up @@ -934,6 +948,143 @@ pub trait StreamExt: Stream {
assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
}

/// Attempt to execute an accumulating asynchronous computation over a
/// stream, collecting all the values into one final result.
///
/// This combinator will accumulate all values returned by this stream
/// according to the closure provided. The initial state is also provided to
/// this method and then is returned again by each execution of the closure.
/// Once the entire stream has been exhausted the returned future will
/// resolve to this value.
///
/// This method is similar to [`fold`](crate::stream::StreamExt::fold), but
/// will exit early if an error is encountered in the provided closure.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let number_stream = stream::iter(vec![1, 2]);
/// let sum = number_stream.try_fold(0, |acc, x| async move { Ok::<i32, i32>(acc + x) });
/// assert_eq!(sum.await, Ok(3));
///
/// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
/// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x?) });
/// assert_eq!(sum.await, Err(2));
/// # })
/// ```
fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
where
F: FnMut(T, Self::Item) -> Fut,
Fut: TryFuture<Ok = T>,
Self: Sized,
{
assert_future::<Result<T, Fut::Error>, _>(TryFold::new(self, f, init))
}

/// Attempts to run this stream to completion, executing the provided
/// asynchronous closure for each element on the stream.
///
/// The provided closure will be called for each item this stream produces,
/// yielding a future. That future will then be executed to completion
/// before moving on to the next item.
///
/// The returned value is a [`Future`](futures_core::future::Future) where
/// the [`Output`](futures_core::future::Future::Output) type is
/// `Result<(), Fut::Error>`. If any of the intermediate futures returns
/// an error, this future will return immediately with an error.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::stream::{self, StreamExt};
///
/// let mut x = 0i32;
///
/// {
/// let fut = stream::repeat(1).try_for_each(|item| {
/// x += item;
/// future::ready(if x == 3 { Err(()) } else { Ok(()) })
/// });
/// assert_eq!(fut.await, Err(()));
/// }
///
/// assert_eq!(x, 3);
/// # })
/// ```
fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: TryFuture<Ok = ()>,
Self: Sized,
{
assert_future::<Result<(), Fut::Error>, _>(TryForEach::new(self, f))
}

/// Attempts to run this stream to completion, executing the provided asynchronous
/// closure for each element on the stream concurrently as elements become
/// available, exiting as soon as an error occurs.
///
/// This is similar to
/// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
/// but will resolve to an error immediately if the provided closure returns
/// an error.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::channel::oneshot;
/// use futures::stream::{self, StreamExt};
///
/// let (tx1, rx1) = oneshot::channel();
/// let (tx2, rx2) = oneshot::channel();
/// let (_tx3, rx3) = oneshot::channel();
///
/// let stream = stream::iter(vec![rx1, rx2, rx3]);
/// let fut = stream.try_for_each_concurrent(
/// /* limit */ 2,
/// |rx| async move {
/// let res: Result<(), oneshot::Canceled> = rx.await;
/// res
/// }
/// );
///
/// tx1.send(()).unwrap();
/// // Drop the second sender so that `rx2` resolves to `Canceled`.
/// drop(tx2);
///
/// // The final result is an error because the second future
/// // resulted in an error.
/// assert_eq!(Err(oneshot::Canceled), fut.await);
/// # })
/// ```
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn try_for_each_concurrent<Fut, F, E>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> TryForEachConcurrent<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Result<(), E>>,
Self: Sized,
{
assert_future::<Result<(), E>, _>(TryForEachConcurrent::new(
self,
limit.into(),
f,
))
}

/// Creates a new stream of at most `n` items of the underlying stream.
///
/// Once `n` items have been yielded from this stream then it will always
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;

Expand Down Expand Up @@ -35,9 +35,9 @@ where
}

impl<St, Fut, T, F> TryFold<St, Fut, T, F>
where St: TryStream,
F: FnMut(T, St::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = St::Error>,
where St: Stream,
F: FnMut(T, St::Item) -> Fut,
Fut: TryFuture<Ok = T>,
{
pub(super) fn new(stream: St, f: F, t: T) -> Self {
Self {
Expand All @@ -50,21 +50,21 @@ where St: TryStream,
}

impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F>
where St: TryStream,
F: FnMut(T, St::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = St::Error>,
where St: Stream,
F: FnMut(T, St::Item) -> Fut,
Fut: TryFuture<Ok = T>,
{
fn is_terminated(&self) -> bool {
self.accum.is_none() && self.future.is_none()
}
}

impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
where St: TryStream,
F: FnMut(T, St::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = St::Error>,
where St: Stream,
F: FnMut(T, St::Item) -> Fut,
Fut: TryFuture<Ok = T>,
{
type Output = Result<T, St::Error>;
type Output = Result<T, Fut::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Expand All @@ -80,11 +80,10 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
}
} else if this.accum.is_some() {
// we're waiting on a new item from the stream
let res = ready!(this.stream.as_mut().try_poll_next(cx));
let res = ready!(this.stream.as_mut().poll_next(cx));
let a = this.accum.take().unwrap();
match res {
Some(Ok(item)) => this.future.set(Some((this.f)(a, item))),
Some(Err(e)) => break Err(e),
Some(item) => this.future.set(Some((this.f)(a, item))),
None => break Ok(a),
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use core::fmt;
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
/// Future for the [`try_for_each`](super::StreamExt::try_for_each) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryForEach<St, Fut, F> {
#[pin]
Expand All @@ -32,9 +32,9 @@ where
}

impl<St, Fut, F> TryForEach<St, Fut, F>
where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: TryFuture<Ok = ()>,
{
pub(super) fn new(stream: St, f: F) -> Self {
Self {
Expand All @@ -46,11 +46,11 @@ where St: TryStream,
}

impl<St, Fut, F> Future for TryForEach<St, Fut, F>
where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: TryFuture<Ok = ()>,
{
type Output = Result<(), St::Error>;
type Output = Result<(), Fut::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Expand All @@ -59,7 +59,7 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F>
ready!(fut.try_poll(cx))?;
this.future.set(None);
} else {
match ready!(this.stream.as_mut().try_poll_next(cx)?) {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(e) => this.future.set(Some((this.f)(e))),
None => break,
}
Expand Down
Loading