Skip to content

Commit a97a1ff

Browse files
authored
Merge pull request #264 from montekki/fs-stream-for-each
Adds for_each stream combinator
2 parents 0b57100 + 6da7efc commit a97a1ff

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

src/stream/stream/for_each.rs

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct ForEachFuture<S, F, T> {
11+
stream: S,
12+
f: F,
13+
__t: PhantomData<T>,
14+
}
15+
16+
impl<S, F, T> ForEachFuture<S, F, T> {
17+
pin_utils::unsafe_pinned!(stream: S);
18+
pin_utils::unsafe_unpinned!(f: F);
19+
20+
pub(super) fn new(stream: S, f: F) -> Self {
21+
ForEachFuture {
22+
stream,
23+
f,
24+
__t: PhantomData,
25+
}
26+
}
27+
}
28+
29+
impl<S, F> Future for ForEachFuture<S, F, S::Item>
30+
where
31+
S: Stream + Sized,
32+
F: FnMut(S::Item),
33+
{
34+
type Output = ();
35+
36+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37+
loop {
38+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
39+
40+
match next {
41+
Some(v) => (self.as_mut().f())(v),
42+
None => return Poll::Ready(()),
43+
}
44+
}
45+
}
46+
}

src/stream/stream/mod.rs

+37
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod filter_map;
3030
mod find;
3131
mod find_map;
3232
mod fold;
33+
mod for_each;
3334
mod fuse;
3435
mod inspect;
3536
mod min_by;
@@ -49,6 +50,7 @@ use filter_map::FilterMap;
4950
use find::FindFuture;
5051
use find_map::FindMapFuture;
5152
use fold::FoldFuture;
53+
use for_each::ForEachFuture;
5254
use min_by::MinByFuture;
5355
use next::NextFuture;
5456
use nth::NthFuture;
@@ -750,6 +752,41 @@ extension_trait! {
750752
FoldFuture::new(self, init, f)
751753
}
752754

755+
#[doc = r#"
756+
Call a closure on each element of the stream.
757+
758+
# Examples
759+
760+
```
761+
# fn main() { async_std::task::block_on(async {
762+
#
763+
use async_std::prelude::*;
764+
use std::collections::VecDeque;
765+
use std::sync::mpsc::channel;
766+
767+
let (tx, rx) = channel();
768+
769+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
770+
let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
771+
772+
let v: Vec<_> = rx.iter().collect();
773+
774+
assert_eq!(v, vec![1, 2, 3]);
775+
#
776+
# }) }
777+
```
778+
"#]
779+
fn for_each<F>(
780+
self,
781+
f: F,
782+
) -> impl Future<Output = ()> [ForEachFuture<Self, F, Self::Item>]
783+
where
784+
Self: Sized,
785+
F: FnMut(Self::Item),
786+
{
787+
ForEachFuture::new(self, f)
788+
}
789+
753790
#[doc = r#"
754791
Tests if any element of the stream matches a predicate.
755792

0 commit comments

Comments
 (0)