Skip to content

Commit b17f2d8

Browse files
barzamincramertj
authored andcommitted
implement concat() on fallible streams as TryStreamExt::try_concat()
1 parent 5d87cce commit b17f2d8

File tree

2 files changed

+107
-0
lines changed

2 files changed

+107
-0
lines changed

futures-util/src/try_stream/mod.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub use self::try_for_each::TryForEach;
3131
mod try_filter_map;
3232
pub use self::try_filter_map::TryFilterMap;
3333

34+
mod try_concat;
35+
pub use self::try_concat::TryConcat;
36+
3437
mod try_fold;
3538
pub use self::try_fold::TryFold;
3639

@@ -438,6 +441,47 @@ pub trait TryStreamExt: TryStream {
438441
TryFold::new(self, f, init)
439442
}
440443

444+
/// Attempt to concatenate all items of a stream into a single
445+
/// extendable destination, returning a future representing the end result.
446+
///
447+
/// This combinator will extend the first item with the contents of all
448+
/// the subsequent successful results of the stream. If the stream is empty,
449+
/// the default value will be returned.
450+
///
451+
/// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
452+
///
453+
/// This method is similar to [`concat`](super::StreamExt::concat), but will
454+
/// exit early if an error is encountered in the stream.
455+
///
456+
/// # Examples
457+
///
458+
/// ```
459+
/// use futures::channel::mpsc;
460+
/// use futures::executor::block_on;
461+
/// use futures::stream::TryStreamExt;
462+
/// use std::thread;
463+
///
464+
/// let (mut tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
465+
///
466+
/// thread::spawn(move || {
467+
/// for i in (0..3).rev() {
468+
/// let n = i * 3;
469+
/// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
470+
/// }
471+
/// });
472+
///
473+
/// let result = block_on(rx.try_concat());
474+
///
475+
/// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
476+
/// ```
477+
fn try_concat(self) -> TryConcat<Self>
478+
where Self: Sized,
479+
Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> +
480+
IntoIterator + Default,
481+
{
482+
TryConcat::new(self)
483+
}
484+
441485
/// Attempt to execute several futures from a stream concurrently.
442486
///
443487
/// This stream's `Ok` type must be a [`TryFuture`] with an `Error` type
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use core::default::Default;
2+
use core::marker::Unpin;
3+
use core::pin::Pin;
4+
use futures_core::future::Future;
5+
use futures_core::stream::TryStream;
6+
use futures_core::task::{LocalWaker, Poll};
7+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
8+
9+
/// A stream combinator which attempts to concatenate the results of a stream into the
10+
/// first yielded item.
11+
///
12+
/// This structure is produced by the `TryStream::try_concat` method.
13+
#[derive(Debug)]
14+
#[must_use = "streams do nothing unless polled"]
15+
pub struct TryConcat<St: TryStream> {
16+
stream: St,
17+
accum: Option<St::Ok>,
18+
}
19+
20+
impl<St: TryStream + Unpin> Unpin for TryConcat<St> {}
21+
22+
impl<St> TryConcat<St>
23+
where
24+
St: TryStream,
25+
St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
26+
{
27+
unsafe_pinned!(stream: St);
28+
unsafe_unpinned!(accum: Option<St::Ok>);
29+
30+
pub(super) fn new(stream: St) -> TryConcat<St> {
31+
TryConcat {
32+
stream,
33+
accum: None,
34+
}
35+
}
36+
}
37+
38+
impl<St> Future for TryConcat<St>
39+
where
40+
St: TryStream,
41+
St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
42+
{
43+
type Output = Result<St::Ok, St::Error>;
44+
45+
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
46+
loop {
47+
match ready!(self.stream().try_poll_next(lw)) {
48+
Some(Ok(x)) => {
49+
let accum = self.accum();
50+
if let Some(a) = accum {
51+
a.extend(x)
52+
} else {
53+
*accum = Some(x)
54+
}
55+
},
56+
Some(Err(e)) => return Poll::Ready(Err(e)),
57+
None => {
58+
return Poll::Ready(Ok(self.accum().take().unwrap_or_default()))
59+
}
60+
}
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)