Skip to content

Commit 2f85d73

Browse files
authored
Merge pull request #619 from srijs/feat/futures-unordered-from-iterator
Implement FromIterator for FuturesUnordered/FuturesOrdered
2 parents 2b3f0f0 + 61132e6 commit 2f85d73

File tree

4 files changed

+54
-0
lines changed

4 files changed

+54
-0
lines changed

src/stream/futures_ordered.rs

+13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::cmp::{Eq, PartialEq, PartialOrd, Ord, Ordering};
22
use std::collections::BinaryHeap;
33
use std::fmt::{self, Debug};
4+
use std::iter::FromIterator;
45

56
use {Async, Future, IntoFuture, Poll, Stream};
67
use stream::FuturesUnordered;
@@ -198,3 +199,15 @@ impl<T: Debug> Debug for FuturesOrdered<T>
198199
write!(fmt, "FuturesOrdered {{ ... }}")
199200
}
200201
}
202+
203+
impl<F: Future> FromIterator<F> for FuturesOrdered<F> {
204+
fn from_iter<T>(iter: T) -> Self
205+
where T: IntoIterator<Item = F>
206+
{
207+
let mut new = FuturesOrdered::new();
208+
for future in iter.into_iter() {
209+
new.push(future);
210+
}
211+
new
212+
}
213+
}

src/stream/futures_unordered.rs

+13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::cell::UnsafeCell;
22
use std::fmt::{self, Debug};
3+
use std::iter::FromIterator;
34
use std::marker::PhantomData;
45
use std::mem;
56
use std::ptr;
@@ -427,6 +428,18 @@ impl<T> Drop for FuturesUnordered<T> {
427428
}
428429
}
429430

431+
impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
432+
fn from_iter<T>(iter: T) -> Self
433+
where T: IntoIterator<Item = F>
434+
{
435+
let mut new = FuturesUnordered::new();
436+
for future in iter.into_iter() {
437+
new.push(future);
438+
}
439+
new
440+
}
441+
}
442+
430443
impl<T> Inner<T> {
431444
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
432445
fn enqueue(&self, node: *const Node<T>) {

tests/futures_ordered.rs

+14
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,20 @@ fn works_2() {
4848
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
4949
}
5050

51+
#[test]
52+
fn from_iterator() {
53+
use futures::future::ok;
54+
use futures::stream::FuturesOrdered;
55+
56+
let stream = vec![
57+
ok::<u32, ()>(1),
58+
ok::<u32, ()>(2),
59+
ok::<u32, ()>(3)
60+
].into_iter().collect::<FuturesOrdered<_>>();
61+
assert_eq!(stream.len(), 3);
62+
assert_eq!(stream.collect().wait(), Ok(vec![1,2,3]));
63+
}
64+
5165
#[test]
5266
fn queue_never_unblocked() {
5367
let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>();

tests/futures_unordered.rs

+14
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,20 @@ fn works_2() {
4646
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
4747
}
4848

49+
#[test]
50+
fn from_iterator() {
51+
use futures::future::ok;
52+
use futures::stream::FuturesUnordered;
53+
54+
let stream = vec![
55+
ok::<u32, ()>(1),
56+
ok::<u32, ()>(2),
57+
ok::<u32, ()>(3)
58+
].into_iter().collect::<FuturesUnordered<_>>();
59+
assert_eq!(stream.len(), 3);
60+
assert_eq!(stream.collect().wait(), Ok(vec![1,2,3]));
61+
}
62+
4963
#[test]
5064
fn finished_future_ok() {
5165
let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>();

0 commit comments

Comments
 (0)