@@ -15,7 +15,7 @@ use futures_core::stream::TryStream;
15
15
#[ cfg( feature = "alloc" ) ]
16
16
use futures_core:: stream:: { BoxStream , LocalBoxStream } ;
17
17
use futures_core:: {
18
- future:: Future ,
18
+ future:: { Future , TryFuture } ,
19
19
stream:: { FusedStream , Stream } ,
20
20
task:: { Context , Poll } ,
21
21
} ;
@@ -149,6 +149,14 @@ mod then;
149
149
#[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
150
150
pub use self :: then:: Then ;
151
151
152
+ mod try_for_each;
153
+ #[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
154
+ pub use self :: try_for_each:: TryForEach ;
155
+
156
+ mod try_fold;
157
+ #[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
158
+ pub use self :: try_fold:: TryFold ;
159
+
152
160
mod zip;
153
161
#[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
154
162
pub use self :: zip:: Zip ;
@@ -197,6 +205,12 @@ cfg_target_has_atomic! {
197
205
#[ cfg( feature = "alloc" ) ]
198
206
#[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
199
207
pub use self :: split:: { SplitStream , SplitSink , ReuniteError } ;
208
+
209
+ #[ cfg( feature = "alloc" ) ]
210
+ mod try_for_each_concurrent;
211
+ #[ cfg( feature = "alloc" ) ]
212
+ #[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
213
+ pub use self :: try_for_each_concurrent:: TryForEachConcurrent ;
200
214
}
201
215
202
216
#[ cfg( feature = "std" ) ]
@@ -934,6 +948,143 @@ pub trait StreamExt: Stream {
934
948
assert_future :: < ( ) , _ > ( ForEachConcurrent :: new ( self , limit. into ( ) , f) )
935
949
}
936
950
951
+ /// Attempt to execute an accumulating asynchronous computation over a
952
+ /// stream, collecting all the values into one final result.
953
+ ///
954
+ /// This combinator will accumulate all values returned by this stream
955
+ /// according to the closure provided. The initial state is also provided to
956
+ /// this method and then is returned again by each execution of the closure.
957
+ /// Once the entire stream has been exhausted the returned future will
958
+ /// resolve to this value.
959
+ ///
960
+ /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but
961
+ /// will exit early if an error is encountered in the provided closure.
962
+ ///
963
+ /// # Examples
964
+ ///
965
+ /// ```
966
+ /// # futures::executor::block_on(async {
967
+ /// use futures::stream::{self, StreamExt};
968
+ ///
969
+ /// let number_stream = stream::iter(vec![1, 2]);
970
+ /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok::<i32, i32>(acc + x) });
971
+ /// assert_eq!(sum.await, Ok(3));
972
+ ///
973
+ /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
974
+ /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x?) });
975
+ /// assert_eq!(sum.await, Err(2));
976
+ /// # })
977
+ /// ```
978
+ fn try_fold < T , Fut , F > ( self , init : T , f : F ) -> TryFold < Self , Fut , T , F >
979
+ where
980
+ F : FnMut ( T , Self :: Item ) -> Fut ,
981
+ Fut : TryFuture < Ok = T > ,
982
+ Self : Sized ,
983
+ {
984
+ assert_future :: < Result < T , Fut :: Error > , _ > ( TryFold :: new ( self , f, init) )
985
+ }
986
+
987
+ /// Attempts to run this stream to completion, executing the provided
988
+ /// asynchronous closure for each element on the stream.
989
+ ///
990
+ /// The provided closure will be called for each item this stream produces,
991
+ /// yielding a future. That future will then be executed to completion
992
+ /// before moving on to the next item.
993
+ ///
994
+ /// The returned value is a [`Future`](futures_core::future::Future) where
995
+ /// the [`Output`](futures_core::future::Future::Output) type is
996
+ /// `Result<(), Fut::Error>`. If any of the intermediate futures returns
997
+ /// an error, this future will return immediately with an error.
998
+ ///
999
+ /// # Examples
1000
+ ///
1001
+ /// ```
1002
+ /// # futures::executor::block_on(async {
1003
+ /// use futures::future;
1004
+ /// use futures::stream::{self, StreamExt};
1005
+ ///
1006
+ /// let mut x = 0i32;
1007
+ ///
1008
+ /// {
1009
+ /// let fut = stream::repeat(1).try_for_each(|item| {
1010
+ /// x += item;
1011
+ /// future::ready(if x == 3 { Err(()) } else { Ok(()) })
1012
+ /// });
1013
+ /// assert_eq!(fut.await, Err(()));
1014
+ /// }
1015
+ ///
1016
+ /// assert_eq!(x, 3);
1017
+ /// # })
1018
+ /// ```
1019
+ fn try_for_each < Fut , F > ( self , f : F ) -> TryForEach < Self , Fut , F >
1020
+ where
1021
+ F : FnMut ( Self :: Item ) -> Fut ,
1022
+ Fut : TryFuture < Ok = ( ) > ,
1023
+ Self : Sized ,
1024
+ {
1025
+ assert_future :: < Result < ( ) , Fut :: Error > , _ > ( TryForEach :: new ( self , f) )
1026
+ }
1027
+
1028
+ /// Attempts to run this stream to completion, executing the provided asynchronous
1029
+ /// closure for each element on the stream concurrently as elements become
1030
+ /// available, exiting as soon as an error occurs.
1031
+ ///
1032
+ /// This is similar to
1033
+ /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
1034
+ /// but will resolve to an error immediately if the provided closure returns
1035
+ /// an error.
1036
+ ///
1037
+ /// This method is only available when the `std` or `alloc` feature of this
1038
+ /// library is activated, and it is activated by default.
1039
+ ///
1040
+ /// # Examples
1041
+ ///
1042
+ /// ```
1043
+ /// # futures::executor::block_on(async {
1044
+ /// use futures::channel::oneshot;
1045
+ /// use futures::stream::{self, StreamExt};
1046
+ ///
1047
+ /// let (tx1, rx1) = oneshot::channel();
1048
+ /// let (tx2, rx2) = oneshot::channel();
1049
+ /// let (_tx3, rx3) = oneshot::channel();
1050
+ ///
1051
+ /// let stream = stream::iter(vec![rx1, rx2, rx3]);
1052
+ /// let fut = stream.try_for_each_concurrent(
1053
+ /// /* limit */ 2,
1054
+ /// |rx| async move {
1055
+ /// let res: Result<(), oneshot::Canceled> = rx.await;
1056
+ /// res
1057
+ /// }
1058
+ /// );
1059
+ ///
1060
+ /// tx1.send(()).unwrap();
1061
+ /// // Drop the second sender so that `rx2` resolves to `Canceled`.
1062
+ /// drop(tx2);
1063
+ ///
1064
+ /// // The final result is an error because the second future
1065
+ /// // resulted in an error.
1066
+ /// assert_eq!(Err(oneshot::Canceled), fut.await);
1067
+ /// # })
1068
+ /// ```
1069
+ #[ cfg_attr( feature = "cfg-target-has-atomic" , cfg( target_has_atomic = "ptr" ) ) ]
1070
+ #[ cfg( feature = "alloc" ) ]
1071
+ fn try_for_each_concurrent < Fut , F , E > (
1072
+ self ,
1073
+ limit : impl Into < Option < usize > > ,
1074
+ f : F ,
1075
+ ) -> TryForEachConcurrent < Self , Fut , F >
1076
+ where
1077
+ F : FnMut ( Self :: Item ) -> Fut ,
1078
+ Fut : Future < Output = Result < ( ) , E > > ,
1079
+ Self : Sized ,
1080
+ {
1081
+ assert_future :: < Result < ( ) , E > , _ > ( TryForEachConcurrent :: new (
1082
+ self ,
1083
+ limit. into ( ) ,
1084
+ f,
1085
+ ) )
1086
+ }
1087
+
937
1088
/// Creates a new stream of at most `n` items of the underlying stream.
938
1089
///
939
1090
/// Once `n` items have been yielded from this stream then it will always
0 commit comments