From 15120cb36e4623445ad7204adfc1da9cb1816246 Mon Sep 17 00:00:00 2001 From: CollinValley Date: Sun, 22 Mar 2020 16:39:50 -0700 Subject: [PATCH 1/2] Filter Map: Add to project --- src/par_stream/filter_map.rs | 78 ++++++++++++++++++++++++++++++++++++ src/par_stream/mod.rs | 15 +++++++ 2 files changed, 93 insertions(+) create mode 100644 src/par_stream/filter_map.rs diff --git a/src/par_stream/filter_map.rs b/src/par_stream/filter_map.rs new file mode 100644 index 0000000..a101d1c --- /dev/null +++ b/src/par_stream/filter_map.rs @@ -0,0 +1,78 @@ +// use async_std::prelude::*; +use async_std::future::Future; +use async_std::sync::{self, Receiver}; +use async_std::task; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::ParallelStream; + +pin_project_lite::pin_project! { + #[derive(Debug)] + pub struct FilterMap { + #[pin] + receiver: Receiver, + limit: Option, + } +} + +impl FilterMap { + /// Create a new instance of `FilterMap`. + pub fn new(mut stream: S, mut f: F) -> Self + where + S: ParallelStream, + F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static, + Fut: Future> + Send, + { + let (sender, receiver) = sync::channel(1); + let limit = stream.get_limit(); + task::spawn(async move { + while let Some(item) = stream.next().await { + let sender = sender.clone(); + task::spawn(async move { + if let Some(res) = f(item).await { + sender.send(res).await; + } + }); + } + }); + FilterMap { receiver, limit } + } +} + +impl ParallelStream for FilterMap { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use async_std::prelude::*; + let this = self.project(); + this.receiver.poll_next(cx) + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + let s = async_std::stream::from_iter(vec![1,2,1,2,1,2]); + let mut output : Vec = vec![]; + let mut stream = crate::from_stream(s) + .filter_map(|n| async move { + if n%2 == 0 { + Some(n) + } else { + None + } + }); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![2usize; 3]); +} diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..7b4e2d0 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -5,11 +5,13 @@ use std::pin::Pin; use crate::FromParallelStream; +pub use filter_map::FilterMap; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +mod filter_map; mod for_each; mod map; mod next; @@ -40,6 +42,19 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { Map::new(self, f) } + /// Applies `f` to each item of this stream in parallel, where `f` returns + /// an Future>. If the future yields a None the item is + /// dropped, if the future yields a Some(T), T is added to the new stream of + /// results + fn filter_map(self, f: F) -> FilterMap + where + F: FnMut(Self::Item) -> Fut + Send + Sync + Copy + 'static, + T: Send + 'static, + Fut: Future> + Send, + { + FilterMap::new(self, f) + } + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn next(&mut self) -> NextFuture<'_, Self> { From 2de532582920e4f1d9a38e86d7f141315efbee50 Mon Sep 17 00:00:00 2001 From: CollinValley Date: Mon, 23 Mar 2020 10:13:26 -0700 Subject: [PATCH 2/2] Filter Map: Cargo Fmt Simple fixup with Cargo Fmt, so the tests pass. --- src/par_stream/filter_map.rs | 19 +++++++++---------- tests/test.rs | 1 + 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/par_stream/filter_map.rs b/src/par_stream/filter_map.rs index a101d1c..4df0c5d 100644 --- a/src/par_stream/filter_map.rs +++ b/src/par_stream/filter_map.rs @@ -61,16 +61,15 @@ impl ParallelStream for FilterMap { #[async_std::test] async fn smoke() { - let s = async_std::stream::from_iter(vec![1,2,1,2,1,2]); - let mut output : Vec = vec![]; - let mut stream = crate::from_stream(s) - .filter_map(|n| async move { - if n%2 == 0 { - Some(n) - } else { - None - } - }); + let s = async_std::stream::from_iter(vec![1, 2, 1, 2, 1, 2]); + let mut output: Vec = vec![]; + let mut stream = crate::from_stream(s).filter_map(|n| async move { + if n % 2 == 0 { + Some(n) + } else { + None + } + }); while let Some(n) = stream.next().await { output.push(n); } diff --git a/tests/test.rs b/tests/test.rs index e69de29..8b13789 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -0,0 +1 @@ +