diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index ecffdc7e41..4fe744cbec 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -20,7 +20,7 @@ mod stream; pub use self::stream::{ Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, - StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, TryForEach, Unzip, Zip, }; #[cfg(feature = "std")] @@ -38,7 +38,7 @@ pub use self::stream::Forward; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; +pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent, TryForEachConcurrent}; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] @@ -49,7 +49,7 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, - TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext, + TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; @@ -60,7 +60,7 @@ pub use self::try_stream::IntoAsyncRead; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; +pub use self::try_stream::{TryBufferUnordered, TryBuffered}; // Primitive streams diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 8571afe4e6..bd74f0fc9b 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -15,7 +15,7 @@ use futures_core::stream::TryStream; #[cfg(feature = "alloc")] use futures_core::stream::{BoxStream, LocalBoxStream}; use futures_core::{ - future::Future, + future::{Future, TryFuture}, stream::{FusedStream, Stream}, task::{Context, Poll}, }; @@ -149,6 +149,14 @@ mod then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::then::Then; +mod try_for_each; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_for_each::TryForEach; + +mod try_fold; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_fold::TryFold; + mod zip; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::zip::Zip; @@ -197,6 +205,12 @@ cfg_target_has_atomic! { #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::split::{SplitStream, SplitSink, ReuniteError}; + + #[cfg(feature = "alloc")] + mod try_for_each_concurrent; + #[cfg(feature = "alloc")] + #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 + pub use self::try_for_each_concurrent::TryForEachConcurrent; } #[cfg(feature = "std")] @@ -934,6 +948,143 @@ pub trait StreamExt: Stream { assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) } + /// Attempt to execute an accumulating asynchronous computation over a + /// stream, collecting all the values into one final result. + /// + /// This combinator will accumulate all values returned by this stream + /// according to the closure provided. The initial state is also provided to + /// this method and then is returned again by each execution of the closure. + /// Once the entire stream has been exhausted the returned future will + /// resolve to this value. + /// + /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but + /// will exit early if an error is encountered in the provided closure. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(vec![1, 2]); + /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok::(acc + x) }); + /// assert_eq!(sum.await, Ok(3)); + /// + /// let number_stream_with_err = stream::iter(vec![Ok::(1), Err(2), Ok(1)]); + /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x?) }); + /// assert_eq!(sum.await, Err(2)); + /// # }) + /// ``` + fn try_fold(self, init: T, f: F) -> TryFold + where + F: FnMut(T, Self::Item) -> Fut, + Fut: TryFuture, + Self: Sized, + { + assert_future::, _>(TryFold::new(self, f, init)) + } + + /// Attempts to run this stream to completion, executing the provided + /// asynchronous closure for each element on the stream. + /// + /// The provided closure will be called for each item this stream produces, + /// yielding a future. That future will then be executed to completion + /// before moving on to the next item. + /// + /// The returned value is a [`Future`](futures_core::future::Future) where + /// the [`Output`](futures_core::future::Future::Output) type is + /// `Result<(), Fut::Error>`. If any of the intermediate futures returns + /// an error, this future will return immediately with an error. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, StreamExt}; + /// + /// let mut x = 0i32; + /// + /// { + /// let fut = stream::repeat(1).try_for_each(|item| { + /// x += item; + /// future::ready(if x == 3 { Err(()) } else { Ok(()) }) + /// }); + /// assert_eq!(fut.await, Err(())); + /// } + /// + /// assert_eq!(x, 3); + /// # }) + /// ``` + fn try_for_each(self, f: F) -> TryForEach + where + F: FnMut(Self::Item) -> Fut, + Fut: TryFuture, + Self: Sized, + { + assert_future::, _>(TryForEach::new(self, f)) + } + + /// Attempts to run this stream to completion, executing the provided asynchronous + /// closure for each element on the stream concurrently as elements become + /// available, exiting as soon as an error occurs. + /// + /// This is similar to + /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent), + /// but will resolve to an error immediately if the provided closure returns + /// an error. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::oneshot; + /// use futures::stream::{self, StreamExt}; + /// + /// let (tx1, rx1) = oneshot::channel(); + /// let (tx2, rx2) = oneshot::channel(); + /// let (_tx3, rx3) = oneshot::channel(); + /// + /// let stream = stream::iter(vec![rx1, rx2, rx3]); + /// let fut = stream.try_for_each_concurrent( + /// /* limit */ 2, + /// |rx| async move { + /// let res: Result<(), oneshot::Canceled> = rx.await; + /// res + /// } + /// ); + /// + /// tx1.send(()).unwrap(); + /// // Drop the second sender so that `rx2` resolves to `Canceled`. + /// drop(tx2); + /// + /// // The final result is an error because the second future + /// // resulted in an error. + /// assert_eq!(Err(oneshot::Canceled), fut.await); + /// # }) + /// ``` + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + fn try_for_each_concurrent( + self, + limit: impl Into>, + f: F, + ) -> TryForEachConcurrent + where + F: FnMut(Self::Item) -> Fut, + Fut: Future>, + Self: Sized, + { + assert_future::, _>(TryForEachConcurrent::new( + self, + limit.into(), + f, + )) + } + /// Creates a new stream of at most `n` items of the underlying stream. /// /// Once `n` items have been yielded from this stream then it will always diff --git a/futures-util/src/stream/try_stream/try_fold.rs b/futures-util/src/stream/stream/try_fold.rs similarity index 78% rename from futures-util/src/stream/try_stream/try_fold.rs rename to futures-util/src/stream/stream/try_fold.rs index 1d41e4bc2b..7fa9834753 100644 --- a/futures-util/src/stream/try_stream/try_fold.rs +++ b/futures-util/src/stream/stream/try_fold.rs @@ -2,7 +2,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::ready; -use futures_core::stream::TryStream; +use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -35,9 +35,9 @@ where } impl TryFold -where St: TryStream, - F: FnMut(T, St::Ok) -> Fut, - Fut: TryFuture, +where St: Stream, + F: FnMut(T, St::Item) -> Fut, + Fut: TryFuture, { pub(super) fn new(stream: St, f: F, t: T) -> Self { Self { @@ -50,9 +50,9 @@ where St: TryStream, } impl FusedFuture for TryFold - where St: TryStream, - F: FnMut(T, St::Ok) -> Fut, - Fut: TryFuture, + where St: Stream, + F: FnMut(T, St::Item) -> Fut, + Fut: TryFuture, { fn is_terminated(&self) -> bool { self.accum.is_none() && self.future.is_none() @@ -60,11 +60,11 @@ impl FusedFuture for TryFold } impl Future for TryFold - where St: TryStream, - F: FnMut(T, St::Ok) -> Fut, - Fut: TryFuture, + where St: Stream, + F: FnMut(T, St::Item) -> Fut, + Fut: TryFuture, { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); @@ -80,11 +80,10 @@ impl Future for TryFold } } else if this.accum.is_some() { // we're waiting on a new item from the stream - let res = ready!(this.stream.as_mut().try_poll_next(cx)); + let res = ready!(this.stream.as_mut().poll_next(cx)); let a = this.accum.take().unwrap(); match res { - Some(Ok(item)) => this.future.set(Some((this.f)(a, item))), - Some(Err(e)) => break Err(e), + Some(item) => this.future.set(Some((this.f)(a, item))), None => break Ok(a), } } else { diff --git a/futures-util/src/stream/try_stream/try_for_each.rs b/futures-util/src/stream/stream/try_for_each.rs similarity index 76% rename from futures-util/src/stream/try_stream/try_for_each.rs rename to futures-util/src/stream/stream/try_for_each.rs index 0a814ae86c..3d7a1f8e38 100644 --- a/futures-util/src/stream/try_stream/try_for_each.rs +++ b/futures-util/src/stream/stream/try_for_each.rs @@ -2,12 +2,12 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::ready; -use futures_core::stream::TryStream; +use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { - /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. + /// Future for the [`try_for_each`](super::StreamExt::try_for_each) method. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEach { #[pin] @@ -32,9 +32,9 @@ where } impl TryForEach -where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: TryFuture, +where St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: TryFuture, { pub(super) fn new(stream: St, f: F) -> Self { Self { @@ -46,11 +46,11 @@ where St: TryStream, } impl Future for TryForEach - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: TryFuture, + where St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: TryFuture, { - type Output = Result<(), St::Error>; + type Output = Result<(), Fut::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); @@ -59,7 +59,7 @@ impl Future for TryForEach ready!(fut.try_poll(cx))?; this.future.set(None); } else { - match ready!(this.stream.as_mut().try_poll_next(cx)?) { + match ready!(this.stream.as_mut().poll_next(cx)) { Some(e) => this.future.set(Some((this.f)(e))), None => break, } diff --git a/futures-util/src/stream/try_stream/try_for_each_concurrent.rs b/futures-util/src/stream/stream/try_for_each_concurrent.rs similarity index 75% rename from futures-util/src/stream/try_stream/try_for_each_concurrent.rs rename to futures-util/src/stream/stream/try_for_each_concurrent.rs index d2f4b0fed2..cf67da59af 100644 --- a/futures-util/src/stream/try_stream/try_for_each_concurrent.rs +++ b/futures-util/src/stream/stream/try_for_each_concurrent.rs @@ -4,7 +4,7 @@ use core::mem; use core::pin::Pin; use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; -use futures_core::stream::TryStream; +use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -36,20 +36,20 @@ where } } -impl FusedFuture for TryForEachConcurrent - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +impl FusedFuture for TryForEachConcurrent + where St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future>, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() } } -impl TryForEachConcurrent -where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +impl TryForEachConcurrent +where St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future>, { pub(super) fn new(stream: St, limit: Option, f: F) -> Self { Self { @@ -62,12 +62,12 @@ where St: TryStream, } } -impl Future for TryForEachConcurrent - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +impl Future for TryForEachConcurrent + where St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future>, { - type Output = Result<(), St::Error>; + type Output = Result<(), E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); @@ -77,12 +77,12 @@ impl Future for TryForEachConcurrent // Check if we've already created a number of futures greater than `limit` if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { let poll_res = match this.stream.as_mut().as_pin_mut() { - Some(stream) => stream.try_poll_next(cx), + Some(stream) => stream.poll_next(cx), None => Poll::Ready(None), }; let elem = match poll_res { - Poll::Ready(Some(Ok(elem))) => { + Poll::Ready(Some(elem)) => { made_progress_this_iter = true; Some(elem) }, @@ -91,13 +91,6 @@ impl Future for TryForEachConcurrent None } Poll::Pending => None, - Poll::Ready(Some(Err(e))) => { - // Empty the stream and futures so that we know - // the future has completed. - this.stream.set(None); - drop(mem::replace(this.futures, FuturesUnordered::new())); - return Poll::Ready(Err(e)); - } }; if let Some(elem) = elem { diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index b7353d908a..d67126063f 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -69,10 +69,6 @@ mod try_next; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_next::TryNext; -mod try_for_each; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::try_for_each::TryForEach; - mod try_filter; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_filter::TryFilter; @@ -93,10 +89,6 @@ mod try_concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_concat::TryConcat; -mod try_fold; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::try_fold::TryFold; - mod try_unfold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_unfold::{try_unfold, TryUnfold}; @@ -121,12 +113,6 @@ cfg_target_has_atomic! { #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_buffered::TryBuffered; - - #[cfg(feature = "alloc")] - mod try_for_each_concurrent; - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::try_for_each_concurrent::TryForEachConcurrent; } #[cfg(feature = "io")] @@ -372,48 +358,6 @@ pub trait TryStreamExt: TryStream { assert_future::, Self::Error>, _>(TryNext::new(self)) } - /// Attempts to run this stream to completion, executing the provided - /// asynchronous closure for each element on the stream. - /// - /// The provided closure will be called for each item this stream produces, - /// yielding a future. That future will then be executed to completion - /// before moving on to the next item. - /// - /// The returned value is a [`Future`](futures_core::future::Future) where the - /// [`Output`](futures_core::future::Future::Output) type is - /// `Result<(), Self::Error>`. If any of the intermediate - /// futures or the stream returns an error, this future will return - /// immediately with an error. - /// - /// # Examples - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::future; - /// use futures::stream::{self, TryStreamExt}; - /// - /// let mut x = 0i32; - /// - /// { - /// let fut = stream::repeat(Ok(1)).try_for_each(|item| { - /// x += item; - /// future::ready(if x == 3 { Err(()) } else { Ok(()) }) - /// }); - /// assert_eq!(fut.await, Err(())); - /// } - /// - /// assert_eq!(x, 3); - /// # }) - /// ``` - fn try_for_each(self, f: F) -> TryForEach - where - F: FnMut(Self::Ok) -> Fut, - Fut: TryFuture, - Self: Sized, - { - assert_future::, _>(TryForEach::new(self, f)) - } - /// Skip elements on this stream while the provided asynchronous predicate /// resolves to `true`. /// @@ -474,66 +418,6 @@ pub trait TryStreamExt: TryStream { assert_stream::, _>(TryTakeWhile::new(self, f)) } - /// Attempts to run this stream to completion, executing the provided asynchronous - /// closure for each element on the stream concurrently as elements become - /// available, exiting as soon as an error occurs. - /// - /// This is similar to - /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent), - /// but will resolve to an error immediately if the underlying stream or the provided - /// closure return an error. - /// - /// This method is only available when the `std` or `alloc` feature of this - /// library is activated, and it is activated by default. - /// - /// # Examples - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::channel::oneshot; - /// use futures::stream::{self, StreamExt, TryStreamExt}; - /// - /// let (tx1, rx1) = oneshot::channel(); - /// let (tx2, rx2) = oneshot::channel(); - /// let (_tx3, rx3) = oneshot::channel(); - /// - /// let stream = stream::iter(vec![rx1, rx2, rx3]); - /// let fut = stream.map(Ok).try_for_each_concurrent( - /// /* limit */ 2, - /// |rx| async move { - /// let res: Result<(), oneshot::Canceled> = rx.await; - /// res - /// } - /// ); - /// - /// tx1.send(()).unwrap(); - /// // Drop the second sender so that `rx2` resolves to `Canceled`. - /// drop(tx2); - /// - /// // The final result is an error because the second future - /// // resulted in an error. - /// assert_eq!(Err(oneshot::Canceled), fut.await); - /// # }) - /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] - #[cfg(feature = "alloc")] - fn try_for_each_concurrent( - self, - limit: impl Into>, - f: F, - ) -> TryForEachConcurrent - where - F: FnMut(Self::Ok) -> Fut, - Fut: Future>, - Self: Sized, - { - assert_future::, _>(TryForEachConcurrent::new( - self, - limit.into(), - f, - )) - } - /// Attempt to transform a stream into a collection, /// returning a future representing the result of that computation. /// @@ -700,43 +584,6 @@ pub trait TryStreamExt: TryStream { ) } - /// Attempt to execute an accumulating asynchronous computation over a - /// stream, collecting all the values into one final result. - /// - /// This combinator will accumulate all values returned by this stream - /// according to the closure provided. The initial state is also provided to - /// this method and then is returned again by each execution of the closure. - /// Once the entire stream has been exhausted the returned future will - /// resolve to this value. - /// - /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will - /// exit early if an error is encountered in either the stream or the - /// provided closure. - /// - /// # Examples - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::stream::{self, TryStreamExt}; - /// - /// let number_stream = stream::iter(vec![Ok::(1), Ok(2)]); - /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) }); - /// assert_eq!(sum.await, Ok(3)); - /// - /// let number_stream_with_err = stream::iter(vec![Ok::(1), Err(2), Ok(1)]); - /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) }); - /// assert_eq!(sum.await, Err(2)); - /// # }) - /// ``` - fn try_fold(self, init: T, f: F) -> TryFold - where - F: FnMut(T, Self::Ok) -> Fut, - Fut: TryFuture, - Self: Sized, - { - assert_future::, _>(TryFold::new(self, f, init)) - } - /// Attempt to concatenate all items of a stream into a single /// extendable destination, returning a future representing the end result. ///