Skip to content

Commit d59b8a3

Browse files
committed
Implement FuturesUnordered::iter_mut
1 parent 2f85d73 commit d59b8a3

File tree

3 files changed

+111
-26
lines changed

3 files changed

+111
-26
lines changed

src/stream/futures_unordered.rs

+43-24
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! An unbounded set of futures.
2+
13
use std::cell::UnsafeCell;
24
use std::fmt::{self, Debug};
35
use std::iter::FromIterator;
@@ -9,7 +11,7 @@ use std::sync::atomic::{AtomicPtr, AtomicBool};
911
use std::sync::{Arc, Weak};
1012
use std::usize;
1113

12-
use {task, Stream, Future, Poll, Async, IntoFuture};
14+
use {task, Stream, Future, Poll, Async};
1315
use executor::{Notify, UnsafeNotify, NotifyHandle};
1416
use task_impl::{self, AtomicTask};
1517

@@ -51,29 +53,6 @@ pub struct FuturesUnordered<F> {
5153
unsafe impl<T: Send> Send for FuturesUnordered<T> {}
5254
unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
5355

54-
/// Converts a list of futures into a `Stream` of results from the futures.
55-
///
56-
/// This function will take an list of futures (e.g. a vector, an iterator,
57-
/// etc), and return a stream. The stream will yield items as they become
58-
/// available on the futures internally, in the order that they become
59-
/// available. This function is similar to `buffer_unordered` in that it may
60-
/// return items in a different order than in the list specified.
61-
///
62-
/// Note that the returned set can also be used to dynamically push more
63-
/// futures into the set as they become available.
64-
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
65-
where I: IntoIterator,
66-
I::Item: IntoFuture
67-
{
68-
let mut set = FuturesUnordered::new();
69-
70-
for future in futures {
71-
set.push(future.into_future());
72-
}
73-
74-
return set
75-
}
76-
7756
// FuturesUnordered is implemented using two linked lists. One which links all
7857
// futures managed by a `FuturesUnordered` and one that tracks futures that have
7958
// been scheduled for polling. The first linked list is not thread safe and is
@@ -207,6 +186,15 @@ impl<T> FuturesUnordered<T> {
207186
self.inner.enqueue(ptr);
208187
}
209188

189+
/// Returns an iterator that allows modifying each future in the set.
190+
pub fn iter_mut(&mut self) -> IterMut<T> {
191+
IterMut {
192+
node: self.head_all,
193+
len: self.len,
194+
_marker: PhantomData
195+
}
196+
}
197+
210198
fn release_node(&mut self, node: Arc<Node<T>>) {
211199
// The future is done, try to reset the queued flag. This will prevent
212200
// `notify` from doing any work in the future
@@ -440,6 +428,37 @@ impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
440428
}
441429
}
442430

431+
#[derive(Debug)]
432+
/// Mutable iterator over all futures in the unordered set.
433+
pub struct IterMut<'a, F: 'a> {
434+
node: *const Node<F>,
435+
len: usize,
436+
_marker: PhantomData<&'a mut FuturesUnordered<F>>
437+
}
438+
439+
impl<'a, F> Iterator for IterMut<'a, F> {
440+
type Item = &'a mut F;
441+
442+
fn next(&mut self) -> Option<&'a mut F> {
443+
if self.node.is_null() {
444+
return None;
445+
}
446+
unsafe {
447+
let future = (*(*self.node).future.get()).as_mut().unwrap();
448+
let next = *(*self.node).next_all.get();
449+
self.node = next;
450+
self.len -= 1;
451+
return Some(future);
452+
}
453+
}
454+
455+
fn size_hint(&self) -> (usize, Option<usize>) {
456+
(self.len, Some(self.len))
457+
}
458+
}
459+
460+
impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
461+
443462
impl<T> Inner<T> {
444463
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
445464
fn enqueue(&self, node: *const Node<T>) {

src/stream/mod.rs

+26-2
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ if_std! {
103103
mod wait;
104104
mod channel;
105105
mod split;
106-
mod futures_unordered;
106+
pub mod futures_unordered;
107107
mod futures_ordered;
108108
pub use self::buffered::Buffered;
109109
pub use self::buffer_unordered::BufferUnordered;
@@ -112,7 +112,7 @@ if_std! {
112112
pub use self::collect::Collect;
113113
pub use self::wait::Wait;
114114
pub use self::split::{SplitStream, SplitSink};
115-
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
115+
pub use self::futures_unordered::FuturesUnordered;
116116
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
117117

118118
#[doc(hidden)]
@@ -1102,3 +1102,27 @@ impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
11021102
(**self).poll()
11031103
}
11041104
}
1105+
1106+
/// Converts a list of futures into a `Stream` of results from the futures.
1107+
///
1108+
/// This function will take an list of futures (e.g. a vector, an iterator,
1109+
/// etc), and return a stream. The stream will yield items as they become
1110+
/// available on the futures internally, in the order that they become
1111+
/// available. This function is similar to `buffer_unordered` in that it may
1112+
/// return items in a different order than in the list specified.
1113+
///
1114+
/// Note that the returned set can also be used to dynamically push more
1115+
/// futures into the set as they become available.
1116+
#[cfg(feature = "use_std")]
1117+
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
1118+
where I: IntoIterator,
1119+
I::Item: IntoFuture
1120+
{
1121+
let mut set = FuturesUnordered::new();
1122+
1123+
for future in futures {
1124+
set.push(future.into_future());
1125+
}
1126+
1127+
return set
1128+
}

tests/futures_unordered.rs

+42
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,45 @@ fn finished_future_ok() {
8383
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
8484
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
8585
}
86+
87+
#[test]
88+
fn iter_mut_cancel() {
89+
let (a_tx, a_rx) = oneshot::channel::<u32>();
90+
let (b_tx, b_rx) = oneshot::channel::<u32>();
91+
let (c_tx, c_rx) = oneshot::channel::<u32>();
92+
93+
let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);
94+
95+
for rx in stream.iter_mut() {
96+
rx.close();
97+
}
98+
99+
assert!(a_tx.is_canceled());
100+
assert!(b_tx.is_canceled());
101+
assert!(c_tx.is_canceled());
102+
103+
let mut spawn = futures::executor::spawn(stream);
104+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
105+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
106+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
107+
assert_eq!(None, spawn.wait_stream());
108+
}
109+
110+
#[test]
111+
fn iter_mut_len() {
112+
let mut stream = futures_unordered(vec![
113+
futures::future::empty::<(),()>(),
114+
futures::future::empty::<(),()>(),
115+
futures::future::empty::<(),()>()
116+
]);
117+
118+
let mut iter_mut = stream.iter_mut();
119+
assert_eq!(iter_mut.len(), 3);
120+
assert!(iter_mut.next().is_some());
121+
assert_eq!(iter_mut.len(), 2);
122+
assert!(iter_mut.next().is_some());
123+
assert_eq!(iter_mut.len(), 1);
124+
assert!(iter_mut.next().is_some());
125+
assert_eq!(iter_mut.len(), 0);
126+
assert!(iter_mut.next().is_none());
127+
}

0 commit comments

Comments
 (0)