Skip to content

Commit 1ad7f30

Browse files
authored
Join stream (#3)
* join stream works! Signed-off-by: Yoshua Wuyts <[email protected]> * join_stream done Signed-off-by: Yoshua Wuyts <[email protected]> * clean up join_stream Signed-off-by: Yoshua Wuyts <[email protected]> * remove phantomdata Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent f1514fa commit 1ad7f30

File tree

2 files changed

+78
-1
lines changed

2 files changed

+78
-1
lines changed

src/join_stream.rs

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
4+
use futures_core::Stream;
5+
6+
/// A stream joining two or more streams.
7+
///
8+
/// This stream is returned by `join!`.
9+
#[derive(Debug)]
10+
pub struct JoinStream<L, R> {
11+
left: L,
12+
right: R,
13+
}
14+
15+
impl<L, R> Unpin for JoinStream<L, R> {}
16+
17+
impl<L, R> JoinStream<L, R> {
18+
#[doc(hidden)]
19+
pub fn new(left: L, right: R) -> Self {
20+
Self { left, right }
21+
}
22+
}
23+
24+
impl<L, R, T> Stream for JoinStream<L, R>
25+
where
26+
L: Stream<Item = T> + Unpin,
27+
R: Stream<Item = T> + Unpin,
28+
{
29+
type Item = T;
30+
31+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32+
if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
33+
// The first stream made progress. The JoinStream needs to be polled
34+
// again to check the progress of the second stream.
35+
cx.waker().wake_by_ref();
36+
Poll::Ready(Some(item))
37+
} else {
38+
Pin::new(&mut self.right).poll_next(cx)
39+
}
40+
}
41+
}
42+
43+
/// Combines multiple streams into a single stream of all their outputs.
44+
///
45+
/// This macro is only usable inside of async functions, closures, and blocks.
46+
///
47+
/// # Examples
48+
///
49+
/// ```
50+
/// # futures::executor::block_on(async {
51+
/// use async_macros::join_stream as join;
52+
/// use futures::stream::{self, StreamExt};
53+
/// use futures::future::ready;
54+
///
55+
/// let a = stream::once(ready(1u8));
56+
/// let b = stream::once(ready(2u8));
57+
/// let c = stream::once(ready(3u8));
58+
///
59+
/// let mut s = join!(a, b, c);
60+
///
61+
/// assert_eq!(s.next().await, Some(1u8));
62+
/// assert_eq!(s.next().await, Some(2u8));
63+
/// assert_eq!(s.next().await, Some(3u8));
64+
/// assert_eq!(s.next().await, None);
65+
/// # });
66+
/// ```
67+
#[macro_export]
68+
macro_rules! join_stream {
69+
($stream1:ident, $stream2:ident, $($stream:ident),* $(,)?) => {{
70+
let joined = $crate::JoinStream::new($stream1, $stream2);
71+
$(
72+
let joined = $crate::JoinStream::new(joined, $stream);
73+
)*
74+
joined
75+
}};
76+
}

src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//! # Examples
44
//!
55
//! ```
6-
//! #![feature(async_await)]
76
//! # futures::executor::block_on(async {
87
//! use async_macros::join;
98
//! use futures::future;
@@ -21,13 +20,15 @@
2120
#![cfg_attr(test, deny(warnings))]
2221

2322
mod join;
23+
mod join_stream;
2424
mod maybe_done;
2525
mod poll_fn;
2626
mod ready;
2727
mod select;
2828
mod try_join;
2929
mod try_select;
3030

31+
pub use join_stream::JoinStream;
3132
pub use maybe_done::MaybeDone;
3233

3334
/// Helper re-exports for use in macros.

0 commit comments

Comments
 (0)