Skip to content

Commit 36e93ad

Browse files
committed
Move try_fold, try_for_each, and try_for_each_concurrent to StreamExt
1 parent f71d3f7 commit 36e93ad

File tree

6 files changed

+204
-205
lines changed

6 files changed

+204
-205
lines changed

futures-util/src/stream/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod stream;
2020
pub use self::stream::{
2121
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
2222
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
23-
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
23+
StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, TryForEach, Unzip, Zip,
2424
};
2525

2626
#[cfg(feature = "std")]
@@ -38,7 +38,7 @@ pub use self::stream::Forward;
3838

3939
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
4040
#[cfg(feature = "alloc")]
41-
pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent};
41+
pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent, TryForEachConcurrent};
4242

4343
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
4444
#[cfg(feature = "sink")]
@@ -49,7 +49,7 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream};
4949
mod try_stream;
5050
pub use self::try_stream::{
5151
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse,
52-
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext,
52+
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext,
5353
TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
5454
};
5555

@@ -60,7 +60,7 @@ pub use self::try_stream::IntoAsyncRead;
6060

6161
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
6262
#[cfg(feature = "alloc")]
63-
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
63+
pub use self::try_stream::{TryBufferUnordered, TryBuffered};
6464

6565
// Primitive streams
6666

futures-util/src/stream/stream/mod.rs

+154-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures_core::stream::TryStream;
1515
#[cfg(feature = "alloc")]
1616
use futures_core::stream::{BoxStream, LocalBoxStream};
1717
use futures_core::{
18-
future::Future,
18+
future::{Future, TryFuture},
1919
stream::{FusedStream, Stream},
2020
task::{Context, Poll},
2121
};
@@ -149,6 +149,14 @@ mod then;
149149
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150150
pub use self::then::Then;
151151

152+
mod try_for_each;
153+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154+
pub use self::try_for_each::TryForEach;
155+
156+
mod try_fold;
157+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158+
pub use self::try_fold::TryFold;
159+
152160
mod zip;
153161
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154162
pub use self::zip::Zip;
@@ -197,6 +205,12 @@ cfg_target_has_atomic! {
197205
#[cfg(feature = "alloc")]
198206
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
199207
pub use self::split::{SplitStream, SplitSink, ReuniteError};
208+
209+
#[cfg(feature = "alloc")]
210+
mod try_for_each_concurrent;
211+
#[cfg(feature = "alloc")]
212+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
213+
pub use self::try_for_each_concurrent::TryForEachConcurrent;
200214
}
201215

202216
#[cfg(feature = "std")]
@@ -934,6 +948,145 @@ pub trait StreamExt: Stream {
934948
assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
935949
}
936950

951+
/// Attempt to execute an accumulating asynchronous computation over a
952+
/// stream, collecting all the values into one final result.
953+
///
954+
/// This combinator will accumulate all values returned by this stream
955+
/// according to the closure provided. The initial state is also provided to
956+
/// this method and then is returned again by each execution of the closure.
957+
/// Once the entire stream has been exhausted the returned future will
958+
/// resolve to this value.
959+
///
960+
/// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
961+
/// exit early if an error is encountered in either the stream or the
962+
/// provided closure.
963+
///
964+
/// # Examples
965+
///
966+
/// ```
967+
/// # futures::executor::block_on(async {
968+
/// use futures::stream::{self, StreamExt};
969+
///
970+
/// let number_stream = stream::iter(vec![1, 2]);
971+
/// let sum = number_stream.try_fold(0, |acc, x| async move { Ok::<i32, i32>(acc + x) });
972+
/// assert_eq!(sum.await, Ok(3));
973+
///
974+
/// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
975+
/// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x?) });
976+
/// assert_eq!(sum.await, Err(2));
977+
/// # })
978+
/// ```
979+
fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
980+
where
981+
F: FnMut(T, Self::Item) -> Fut,
982+
Fut: TryFuture<Ok = T>,
983+
Self: Sized,
984+
{
985+
assert_future::<Result<T, Fut::Error>, _>(TryFold::new(self, f, init))
986+
}
987+
988+
/// Attempts to run this stream to completion, executing the provided
989+
/// asynchronous closure for each element on the stream.
990+
///
991+
/// The provided closure will be called for each item this stream produces,
992+
/// yielding a future. That future will then be executed to completion
993+
/// before moving on to the next item.
994+
///
995+
/// The returned value is a [`Future`](futures_core::future::Future) where the
996+
/// [`Output`](futures_core::future::Future::Output) type is
997+
/// `Result<(), Self::Error>`. If any of the intermediate
998+
/// futures or the stream returns an error, this future will return
999+
/// immediately with an error.
1000+
///
1001+
/// # Examples
1002+
///
1003+
/// ```
1004+
/// # futures::executor::block_on(async {
1005+
/// use futures::future;
1006+
/// use futures::stream::{self, StreamExt};
1007+
///
1008+
/// let mut x = 0i32;
1009+
///
1010+
/// {
1011+
/// let fut = stream::repeat(1).try_for_each(|item| {
1012+
/// x += item;
1013+
/// future::ready(if x == 3 { Err(()) } else { Ok(()) })
1014+
/// });
1015+
/// assert_eq!(fut.await, Err(()));
1016+
/// }
1017+
///
1018+
/// assert_eq!(x, 3);
1019+
/// # })
1020+
/// ```
1021+
fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
1022+
where
1023+
F: FnMut(Self::Item) -> Fut,
1024+
Fut: TryFuture<Ok = ()>,
1025+
Self: Sized,
1026+
{
1027+
assert_future::<Result<(), Fut::Error>, _>(TryForEach::new(self, f))
1028+
}
1029+
1030+
/// Attempts to run this stream to completion, executing the provided asynchronous
1031+
/// closure for each element on the stream concurrently as elements become
1032+
/// available, exiting as soon as an error occurs.
1033+
///
1034+
/// This is similar to
1035+
/// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
1036+
/// but will resolve to an error immediately if the underlying stream or the provided
1037+
/// closure return an error.
1038+
///
1039+
/// This method is only available when the `std` or `alloc` feature of this
1040+
/// library is activated, and it is activated by default.
1041+
///
1042+
/// # Examples
1043+
///
1044+
/// ```
1045+
/// # futures::executor::block_on(async {
1046+
/// use futures::channel::oneshot;
1047+
/// use futures::stream::{self, StreamExt};
1048+
///
1049+
/// let (tx1, rx1) = oneshot::channel();
1050+
/// let (tx2, rx2) = oneshot::channel();
1051+
/// let (_tx3, rx3) = oneshot::channel();
1052+
///
1053+
/// let stream = stream::iter(vec![rx1, rx2, rx3]);
1054+
/// let fut = stream.try_for_each_concurrent(
1055+
/// /* limit */ 2,
1056+
/// |rx| async move {
1057+
/// let res: Result<(), oneshot::Canceled> = rx.await;
1058+
/// res
1059+
/// }
1060+
/// );
1061+
///
1062+
/// tx1.send(()).unwrap();
1063+
/// // Drop the second sender so that `rx2` resolves to `Canceled`.
1064+
/// drop(tx2);
1065+
///
1066+
/// // The final result is an error because the second future
1067+
/// // resulted in an error.
1068+
/// assert_eq!(Err(oneshot::Canceled), fut.await);
1069+
/// # })
1070+
/// ```
1071+
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1072+
#[cfg(feature = "alloc")]
1073+
fn try_for_each_concurrent<Fut, F, E>(
1074+
self,
1075+
limit: impl Into<Option<usize>>,
1076+
f: F,
1077+
) -> TryForEachConcurrent<Self, Fut, F>
1078+
where
1079+
F: FnMut(Self::Item) -> Fut,
1080+
Fut: Future<Output = Result<(), E>>,
1081+
Self: Sized,
1082+
{
1083+
assert_future::<Result<(), E>, _>(TryForEachConcurrent::new(
1084+
self,
1085+
limit.into(),
1086+
f,
1087+
))
1088+
}
1089+
9371090
/// Creates a new stream of at most `n` items of the underlying stream.
9381091
///
9391092
/// Once `n` items have been yielded from this stream then it will always

futures-util/src/stream/try_stream/try_fold.rs renamed to futures-util/src/stream/stream/try_fold.rs

+13-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use core::fmt;
22
use core::pin::Pin;
33
use futures_core::future::{FusedFuture, Future, TryFuture};
44
use futures_core::ready;
5-
use futures_core::stream::TryStream;
5+
use futures_core::stream::Stream;
66
use futures_core::task::{Context, Poll};
77
use pin_project_lite::pin_project;
88

@@ -35,9 +35,9 @@ where
3535
}
3636

3737
impl<St, Fut, T, F> TryFold<St, Fut, T, F>
38-
where St: TryStream,
39-
F: FnMut(T, St::Ok) -> Fut,
40-
Fut: TryFuture<Ok = T, Error = St::Error>,
38+
where St: Stream,
39+
F: FnMut(T, St::Item) -> Fut,
40+
Fut: TryFuture<Ok = T>,
4141
{
4242
pub(super) fn new(stream: St, f: F, t: T) -> Self {
4343
Self {
@@ -50,21 +50,21 @@ where St: TryStream,
5050
}
5151

5252
impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F>
53-
where St: TryStream,
54-
F: FnMut(T, St::Ok) -> Fut,
55-
Fut: TryFuture<Ok = T, Error = St::Error>,
53+
where St: Stream,
54+
F: FnMut(T, St::Item) -> Fut,
55+
Fut: TryFuture<Ok = T>,
5656
{
5757
fn is_terminated(&self) -> bool {
5858
self.accum.is_none() && self.future.is_none()
5959
}
6060
}
6161

6262
impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
63-
where St: TryStream,
64-
F: FnMut(T, St::Ok) -> Fut,
65-
Fut: TryFuture<Ok = T, Error = St::Error>,
63+
where St: Stream,
64+
F: FnMut(T, St::Item) -> Fut,
65+
Fut: TryFuture<Ok = T>,
6666
{
67-
type Output = Result<T, St::Error>;
67+
type Output = Result<T, Fut::Error>;
6868

6969
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
7070
let mut this = self.project();
@@ -80,11 +80,10 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
8080
}
8181
} else if this.accum.is_some() {
8282
// we're waiting on a new item from the stream
83-
let res = ready!(this.stream.as_mut().try_poll_next(cx));
83+
let res = ready!(this.stream.as_mut().poll_next(cx));
8484
let a = this.accum.take().unwrap();
8585
match res {
86-
Some(Ok(item)) => this.future.set(Some((this.f)(a, item))),
87-
Some(Err(e)) => break Err(e),
86+
Some(item) => this.future.set(Some((this.f)(a, item))),
8887
None => break Ok(a),
8988
}
9089
} else {

futures-util/src/stream/try_stream/try_for_each.rs renamed to futures-util/src/stream/stream/try_for_each.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use core::fmt;
22
use core::pin::Pin;
33
use futures_core::future::{Future, TryFuture};
44
use futures_core::ready;
5-
use futures_core::stream::TryStream;
5+
use futures_core::stream::Stream;
66
use futures_core::task::{Context, Poll};
77
use pin_project_lite::pin_project;
88

99
pin_project! {
10-
/// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
10+
/// Future for the [`try_for_each`](super::StreamExt::try_for_each) method.
1111
#[must_use = "futures do nothing unless you `.await` or poll them"]
1212
pub struct TryForEach<St, Fut, F> {
1313
#[pin]
@@ -32,9 +32,9 @@ where
3232
}
3333

3434
impl<St, Fut, F> TryForEach<St, Fut, F>
35-
where St: TryStream,
36-
F: FnMut(St::Ok) -> Fut,
37-
Fut: TryFuture<Ok = (), Error = St::Error>,
35+
where St: Stream,
36+
F: FnMut(St::Item) -> Fut,
37+
Fut: TryFuture<Ok = ()>,
3838
{
3939
pub(super) fn new(stream: St, f: F) -> Self {
4040
Self {
@@ -46,11 +46,11 @@ where St: TryStream,
4646
}
4747

4848
impl<St, Fut, F> Future for TryForEach<St, Fut, F>
49-
where St: TryStream,
50-
F: FnMut(St::Ok) -> Fut,
51-
Fut: TryFuture<Ok = (), Error = St::Error>,
49+
where St: Stream,
50+
F: FnMut(St::Item) -> Fut,
51+
Fut: TryFuture<Ok = ()>,
5252
{
53-
type Output = Result<(), St::Error>;
53+
type Output = Result<(), Fut::Error>;
5454

5555
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5656
let mut this = self.project();
@@ -59,7 +59,7 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F>
5959
ready!(fut.try_poll(cx))?;
6060
this.future.set(None);
6161
} else {
62-
match ready!(this.stream.as_mut().try_poll_next(cx)?) {
62+
match ready!(this.stream.as_mut().poll_next(cx)) {
6363
Some(e) => this.future.set(Some((this.f)(e))),
6464
None => break,
6565
}

0 commit comments

Comments
 (0)