diff --git a/Cargo.lock b/Cargo.lock index 85cb63ed083..f662e0c9fbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1580,11 +1580,11 @@ dependencies = [ [[package]] name = "futures-bounded" version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1e2774cc104e198ef3d3e1ff4ab40f86fa3245d6cb6a3a46174f21463cee173" dependencies = [ - "futures", "futures-timer", "futures-util", - "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 23fd4774b55..556919dcae6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ members = [ "interop-tests", "misc/allow-block-list", "misc/connection-limits", - "misc/futures-bounded", "misc/keygen", "misc/memory-connection-limits", "misc/metrics", @@ -73,7 +72,7 @@ rust-version = "1.73.0" [workspace.dependencies] asynchronous-codec = { version = "0.7.0" } -futures-bounded = { version = "0.2.3", path = "misc/futures-bounded" } +futures-bounded = { version = "0.2.3" } libp2p = { version = "0.53.2", path = "libp2p" } libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" } diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md deleted file mode 100644 index 72b0b4f457d..00000000000 --- a/misc/futures-bounded/CHANGELOG.md +++ /dev/null @@ -1,23 +0,0 @@ -## 0.2.3 - -- Introduce `FuturesTupleSet`, holding tuples of a `Future` together with an arbitrary piece of data. - See [PR 4841](https://github.com/libp2p/rust-libp2p/pull/4841). - -## 0.2.2 - -- Fix an issue where `{Futures,Stream}Map` returns `Poll::Pending` despite being ready after an item has been replaced as part of `try_push`. - See [PR 4865](https://github.com/libp2p/rust-libp2p/pull/4865). - -## 0.2.1 - -- Add `.len()` getter to `FuturesMap`, `FuturesSet`, `StreamMap` and `StreamSet`. - See [PR 4745](https://github.com/libp2p/rust-libp2p/pull/4745). - -## 0.2.0 - -- Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`. - See [PR 4616](https://github.com/libp2p/rust-libp2p/pull/4616). - -## 0.1.0 - -Initial release. diff --git a/misc/futures-bounded/Cargo.toml b/misc/futures-bounded/Cargo.toml deleted file mode 100644 index 1d4340df74f..00000000000 --- a/misc/futures-bounded/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "futures-bounded" -version = "0.2.3" -edition = "2021" -rust-version.workspace = true -license = "MIT" -repository = "https://github.com/libp2p/rust-libp2p" -keywords = ["futures", "async", "backpressure"] -categories = ["data-structures", "asynchronous"] -description = "Utilities for bounding futures in size and time." -publish = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -futures-util = { version = "0.3.30" } -futures-timer = "3.0.3" - -[dev-dependencies] -tokio = { version = "1.36.0", features = ["macros", "rt", "sync"] } -futures = "0.3.30" - -[lints] -workspace = true diff --git a/misc/futures-bounded/src/futures_map.rs b/misc/futures-bounded/src/futures_map.rs deleted file mode 100644 index fba3543f67b..00000000000 --- a/misc/futures-bounded/src/futures_map.rs +++ /dev/null @@ -1,319 +0,0 @@ -use std::future::Future; -use std::hash::Hash; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; -use std::time::Duration; -use std::{future, mem}; - -use futures_timer::Delay; -use futures_util::future::BoxFuture; -use futures_util::stream::FuturesUnordered; -use futures_util::{FutureExt, StreamExt}; - -use crate::{PushError, Timeout}; - -/// Represents a map of [`Future`]s. -/// -/// Each future must finish within the specified time and the map never outgrows its capacity. -pub struct FuturesMap { - timeout: Duration, - capacity: usize, - inner: FuturesUnordered>>>, - empty_waker: Option, - full_waker: Option, -} - -impl FuturesMap { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - timeout, - capacity, - inner: Default::default(), - empty_waker: None, - full_waker: None, - } - } -} - -impl FuturesMap -where - ID: Clone + Hash + Eq + Send + Unpin + 'static, - O: 'static, -{ - /// Push a future into the map. - /// - /// This method inserts the given future with defined `future_id` to the set. - /// If the length of the map is equal to the capacity, this method returns [PushError::BeyondCapacity], - /// that contains the passed future. In that case, the future is not inserted to the map. - /// If a future with the given `future_id` already exists, then the old future will be replaced by a new one. - /// In that case, the returned error [PushError::Replaced] contains the old future. - pub fn try_push(&mut self, future_id: ID, future: F) -> Result<(), PushError>> - where - F: Future + Send + 'static, - { - if self.inner.len() >= self.capacity { - return Err(PushError::BeyondCapacity(future.boxed())); - } - - if let Some(waker) = self.empty_waker.take() { - waker.wake(); - } - - let old = self.remove(future_id.clone()); - self.inner.push(TaggedFuture { - tag: future_id, - inner: TimeoutFuture { - inner: future.boxed(), - timeout: Delay::new(self.timeout), - cancelled: false, - }, - }); - match old { - None => Ok(()), - Some(old) => Err(PushError::Replaced(old)), - } - } - - pub fn remove(&mut self, id: ID) -> Option> { - let tagged = self.inner.iter_mut().find(|s| s.tag == id)?; - - let inner = mem::replace(&mut tagged.inner.inner, future::pending().boxed()); - tagged.inner.cancelled = true; - - Some(inner) - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic. - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - if self.inner.len() < self.capacity { - return Poll::Ready(()); - } - - self.full_waker = Some(cx.waker().clone()); - - Poll::Pending - } - - pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(ID, Result)> { - loop { - let maybe_result = futures_util::ready!(self.inner.poll_next_unpin(cx)); - - match maybe_result { - None => { - self.empty_waker = Some(cx.waker().clone()); - return Poll::Pending; - } - Some((id, Ok(output))) => return Poll::Ready((id, Ok(output))), - Some((id, Err(TimeoutError::Timeout))) => { - return Poll::Ready((id, Err(Timeout::new(self.timeout)))) - } - Some((_, Err(TimeoutError::Cancelled))) => continue, - } - } - } -} - -struct TimeoutFuture { - inner: F, - timeout: Delay, - - cancelled: bool, -} - -impl Future for TimeoutFuture -where - F: Future + Unpin, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.cancelled { - return Poll::Ready(Err(TimeoutError::Cancelled)); - } - - if self.timeout.poll_unpin(cx).is_ready() { - return Poll::Ready(Err(TimeoutError::Timeout)); - } - - self.inner.poll_unpin(cx).map(Ok) - } -} - -enum TimeoutError { - Timeout, - Cancelled, -} - -struct TaggedFuture { - tag: T, - inner: F, -} - -impl Future for TaggedFuture -where - T: Clone + Unpin, - F: Future + Unpin, -{ - type Output = (T, F::Output); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let output = futures_util::ready!(self.inner.poll_unpin(cx)); - - Poll::Ready((self.tag.clone(), output)) - } -} - -#[cfg(test)] -mod tests { - use futures::channel::oneshot; - use futures_util::task::noop_waker_ref; - use std::future::{pending, poll_fn, ready}; - use std::pin::Pin; - use std::time::Instant; - - use super::*; - - #[test] - fn cannot_push_more_than_capacity_tasks() { - let mut futures = FuturesMap::new(Duration::from_secs(10), 1); - - assert!(futures.try_push("ID_1", ready(())).is_ok()); - matches!( - futures.try_push("ID_2", ready(())), - Err(PushError::BeyondCapacity(_)) - ); - } - - #[test] - fn cannot_push_the_same_id_few_times() { - let mut futures = FuturesMap::new(Duration::from_secs(10), 5); - - assert!(futures.try_push("ID", ready(())).is_ok()); - matches!( - futures.try_push("ID", ready(())), - Err(PushError::Replaced(_)) - ); - } - - #[tokio::test] - async fn futures_timeout() { - let mut futures = FuturesMap::new(Duration::from_millis(100), 1); - - let _ = futures.try_push("ID", pending::<()>()); - Delay::new(Duration::from_millis(150)).await; - let (_, result) = poll_fn(|cx| futures.poll_unpin(cx)).await; - - assert!(result.is_err()) - } - - #[test] - fn resources_of_removed_future_are_cleaned_up() { - let mut futures = FuturesMap::new(Duration::from_millis(100), 1); - - let _ = futures.try_push("ID", pending::<()>()); - futures.remove("ID"); - - let poll = futures.poll_unpin(&mut Context::from_waker(noop_waker_ref())); - assert!(poll.is_pending()); - - assert_eq!(futures.len(), 0); - } - - #[tokio::test] - async fn replaced_pending_future_is_polled() { - let mut streams = FuturesMap::new(Duration::from_millis(100), 3); - - let (_tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - - let _ = streams.try_push("ID1", rx1); - let _ = streams.try_push("ID2", rx2); - - let _ = tx2.send(2); - let (id, res) = poll_fn(|cx| streams.poll_unpin(cx)).await; - assert_eq!(id, "ID2"); - assert_eq!(res.unwrap().unwrap(), 2); - - let (new_tx1, new_rx1) = oneshot::channel(); - let replaced = streams.try_push("ID1", new_rx1); - assert!(matches!(replaced.unwrap_err(), PushError::Replaced(_))); - - let _ = new_tx1.send(4); - let (id, res) = poll_fn(|cx| streams.poll_unpin(cx)).await; - - assert_eq!(id, "ID1"); - assert_eq!(res.unwrap().unwrap(), 4); - } - - // Each future causes a delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. - // We stop after NUM_FUTURES tasks, meaning the overall execution must at least take DELAY * NUM_FUTURES. - #[tokio::test] - async fn backpressure() { - const DELAY: Duration = Duration::from_millis(100); - const NUM_FUTURES: u32 = 10; - - let start = Instant::now(); - Task::new(DELAY, NUM_FUTURES, 1).await; - let duration = start.elapsed(); - - assert!(duration >= DELAY * NUM_FUTURES); - } - - struct Task { - future: Duration, - num_futures: usize, - num_processed: usize, - inner: FuturesMap, - } - - impl Task { - fn new(future: Duration, num_futures: u32, capacity: usize) -> Self { - Self { - future, - num_futures: num_futures as usize, - num_processed: 0, - inner: FuturesMap::new(Duration::from_secs(60), capacity), - } - } - } - - impl Future for Task { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while this.num_processed < this.num_futures { - if let Poll::Ready((_, result)) = this.inner.poll_unpin(cx) { - if result.is_err() { - panic!("Timeout is great than future delay") - } - - this.num_processed += 1; - continue; - } - - if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) { - // We push the constant future's ID to prove that user can use the same ID - // if the future was finished - let maybe_future = this.inner.try_push(1u8, Delay::new(this.future)); - assert!(maybe_future.is_ok(), "we polled for readiness"); - - continue; - } - - return Poll::Pending; - } - - Poll::Ready(()) - } - } -} diff --git a/misc/futures-bounded/src/futures_set.rs b/misc/futures-bounded/src/futures_set.rs deleted file mode 100644 index af7cedfcc85..00000000000 --- a/misc/futures-bounded/src/futures_set.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::future::Future; -use std::task::{ready, Context, Poll}; -use std::time::Duration; - -use futures_util::future::BoxFuture; - -use crate::{FuturesMap, PushError, Timeout}; - -/// Represents a list of [Future]s. -/// -/// Each future must finish within the specified time and the list never outgrows its capacity. -pub struct FuturesSet { - id: u32, - inner: FuturesMap, -} - -impl FuturesSet { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - id: 0, - inner: FuturesMap::new(timeout, capacity), - } - } -} - -impl FuturesSet -where - O: 'static, -{ - /// Push a future into the list. - /// - /// This method adds the given future to the list. - /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future. - /// In that case, the future is not added to the set. - pub fn try_push(&mut self, future: F) -> Result<(), BoxFuture> - where - F: Future + Send + 'static, - { - self.id = self.id.wrapping_add(1); - - match self.inner.try_push(self.id, future) { - Ok(()) => Ok(()), - Err(PushError::BeyondCapacity(w)) => Err(w), - Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), - } - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - self.inner.poll_ready_unpin(cx) - } - - pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll> { - let (_, res) = ready!(self.inner.poll_unpin(cx)); - - Poll::Ready(res) - } -} diff --git a/misc/futures-bounded/src/futures_tuple_set.rs b/misc/futures-bounded/src/futures_tuple_set.rs deleted file mode 100644 index e19b236aaf8..00000000000 --- a/misc/futures-bounded/src/futures_tuple_set.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::task::{ready, Context, Poll}; -use std::time::Duration; - -use futures_util::future::BoxFuture; - -use crate::{FuturesMap, PushError, Timeout}; - -/// Represents a list of tuples of a [Future] and an associated piece of data. -/// -/// Each future must finish within the specified time and the list never outgrows its capacity. -pub struct FuturesTupleSet { - id: u32, - inner: FuturesMap, - data: HashMap, -} - -impl FuturesTupleSet { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - id: 0, - inner: FuturesMap::new(timeout, capacity), - data: HashMap::new(), - } - } -} - -impl FuturesTupleSet -where - O: 'static, -{ - /// Push a future into the list. - /// - /// This method adds the given future to the list. - /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future. - /// In that case, the future is not added to the set. - pub fn try_push(&mut self, future: F, data: D) -> Result<(), (BoxFuture, D)> - where - F: Future + Send + 'static, - { - self.id = self.id.wrapping_add(1); - - match self.inner.try_push(self.id, future) { - Ok(()) => {} - Err(PushError::BeyondCapacity(w)) => return Err((w, data)), - Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), - } - self.data.insert(self.id, data); - - Ok(()) - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - self.inner.poll_ready_unpin(cx) - } - - pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(Result, D)> { - let (id, res) = ready!(self.inner.poll_unpin(cx)); - let data = self.data.remove(&id).expect("must have data for future"); - - Poll::Ready((res, data)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::poll_fn; - use futures_util::FutureExt; - use std::future::ready; - - #[test] - fn tracks_associated_data_of_future() { - let mut set = FuturesTupleSet::new(Duration::from_secs(10), 10); - - let _ = set.try_push(ready(1), 1); - let _ = set.try_push(ready(2), 2); - - let (res1, data1) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap(); - let (res2, data2) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap(); - - assert_eq!(res1.unwrap(), data1); - assert_eq!(res2.unwrap(), data2); - } -} diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs deleted file mode 100644 index da8483a595f..00000000000 --- a/misc/futures-bounded/src/lib.rs +++ /dev/null @@ -1,46 +0,0 @@ -mod futures_map; -mod futures_set; -mod futures_tuple_set; -mod stream_map; -mod stream_set; - -pub use futures_map::FuturesMap; -pub use futures_set::FuturesSet; -pub use futures_tuple_set::FuturesTupleSet; -pub use stream_map::StreamMap; -pub use stream_set::StreamSet; - -use std::fmt; -use std::fmt::Formatter; -use std::time::Duration; - -/// A future failed to complete within the given timeout. -#[derive(Debug)] -pub struct Timeout { - limit: Duration, -} - -impl Timeout { - fn new(duration: Duration) -> Self { - Self { limit: duration } - } -} - -impl fmt::Display for Timeout { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "future failed to complete within {:?}", self.limit) - } -} - -/// Error of a future pushing -#[derive(PartialEq, Debug)] -pub enum PushError { - /// The length of the set is equal to the capacity - BeyondCapacity(T), - /// The map already contained an item with this key. - /// - /// The old item is returned. - Replaced(T), -} - -impl std::error::Error for Timeout {} diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs deleted file mode 100644 index 8464f432d02..00000000000 --- a/misc/futures-bounded/src/stream_map.rs +++ /dev/null @@ -1,362 +0,0 @@ -use std::mem; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; -use std::time::Duration; - -use futures_timer::Delay; -use futures_util::stream::{BoxStream, SelectAll}; -use futures_util::{stream, FutureExt, Stream, StreamExt}; - -use crate::{PushError, Timeout}; - -/// Represents a map of [`Stream`]s. -/// -/// Each stream must finish within the specified time and the map never outgrows its capacity. -pub struct StreamMap { - timeout: Duration, - capacity: usize, - inner: SelectAll>>>, - empty_waker: Option, - full_waker: Option, -} - -impl StreamMap -where - ID: Clone + Unpin, -{ - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - timeout, - capacity, - inner: Default::default(), - empty_waker: None, - full_waker: None, - } - } -} - -impl StreamMap -where - ID: Clone + PartialEq + Send + Unpin + 'static, - O: Send + 'static, -{ - /// Push a stream into the map. - pub fn try_push(&mut self, id: ID, stream: F) -> Result<(), PushError>> - where - F: Stream + Send + 'static, - { - if self.inner.len() >= self.capacity { - return Err(PushError::BeyondCapacity(stream.boxed())); - } - - if let Some(waker) = self.empty_waker.take() { - waker.wake(); - } - - let old = self.remove(id.clone()); - self.inner.push(TaggedStream::new( - id, - TimeoutStream { - inner: stream.boxed(), - timeout: Delay::new(self.timeout), - }, - )); - - match old { - None => Ok(()), - Some(old) => Err(PushError::Replaced(old)), - } - } - - pub fn remove(&mut self, id: ID) -> Option> { - let tagged = self.inner.iter_mut().find(|s| s.key == id)?; - - let inner = mem::replace(&mut tagged.inner.inner, stream::pending().boxed()); - tagged.exhausted = true; // Setting this will emit `None` on the next poll and ensure `SelectAll` cleans up the resources. - - Some(inner) - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic. - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - if self.inner.len() < self.capacity { - return Poll::Ready(()); - } - - self.full_waker = Some(cx.waker().clone()); - - Poll::Pending - } - - pub fn poll_next_unpin( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<(ID, Option>)> { - match futures_util::ready!(self.inner.poll_next_unpin(cx)) { - None => { - self.empty_waker = Some(cx.waker().clone()); - Poll::Pending - } - Some((id, Some(Ok(output)))) => Poll::Ready((id, Some(Ok(output)))), - Some((id, Some(Err(())))) => { - self.remove(id.clone()); // Remove stream, otherwise we keep reporting the timeout. - - Poll::Ready((id, Some(Err(Timeout::new(self.timeout))))) - } - Some((id, None)) => Poll::Ready((id, None)), - } - } -} - -struct TimeoutStream { - inner: S, - timeout: Delay, -} - -impl Stream for TimeoutStream -where - F: Stream + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.timeout.poll_unpin(cx).is_ready() { - return Poll::Ready(Some(Err(()))); - } - - self.inner.poll_next_unpin(cx).map(|a| a.map(Ok)) - } -} - -struct TaggedStream { - key: K, - inner: S, - - exhausted: bool, -} - -impl TaggedStream { - fn new(key: K, inner: S) -> Self { - Self { - key, - inner, - exhausted: false, - } - } -} - -impl Stream for TaggedStream -where - K: Clone + Unpin, - S: Stream + Unpin, -{ - type Item = (K, Option); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.exhausted { - return Poll::Ready(None); - } - - match futures_util::ready!(self.inner.poll_next_unpin(cx)) { - Some(item) => Poll::Ready(Some((self.key.clone(), Some(item)))), - None => { - self.exhausted = true; - - Poll::Ready(Some((self.key.clone(), None))) - } - } - } -} - -#[cfg(test)] -mod tests { - use futures::channel::mpsc; - use futures_util::stream::{once, pending}; - use futures_util::SinkExt; - use std::future::{poll_fn, ready, Future}; - use std::pin::Pin; - use std::time::Instant; - - use super::*; - - #[test] - fn cannot_push_more_than_capacity_tasks() { - let mut streams = StreamMap::new(Duration::from_secs(10), 1); - - assert!(streams.try_push("ID_1", once(ready(()))).is_ok()); - matches!( - streams.try_push("ID_2", once(ready(()))), - Err(PushError::BeyondCapacity(_)) - ); - } - - #[test] - fn cannot_push_the_same_id_few_times() { - let mut streams = StreamMap::new(Duration::from_secs(10), 5); - - assert!(streams.try_push("ID", once(ready(()))).is_ok()); - matches!( - streams.try_push("ID", once(ready(()))), - Err(PushError::Replaced(_)) - ); - } - - #[tokio::test] - async fn streams_timeout() { - let mut streams = StreamMap::new(Duration::from_millis(100), 1); - - let _ = streams.try_push("ID", pending::<()>()); - Delay::new(Duration::from_millis(150)).await; - let (_, result) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - - assert!(result.unwrap().is_err()) - } - - #[tokio::test] - async fn timed_out_stream_gets_removed() { - let mut streams = StreamMap::new(Duration::from_millis(100), 1); - - let _ = streams.try_push("ID", pending::<()>()); - Delay::new(Duration::from_millis(150)).await; - poll_fn(|cx| streams.poll_next_unpin(cx)).await; - - let poll = streams.poll_next_unpin(&mut Context::from_waker( - futures_util::task::noop_waker_ref(), - )); - assert!(poll.is_pending()) - } - - #[test] - fn removing_stream() { - let mut streams = StreamMap::new(Duration::from_millis(100), 1); - - let _ = streams.try_push("ID", stream::once(ready(()))); - - { - let cancelled_stream = streams.remove("ID"); - assert!(cancelled_stream.is_some()); - } - - let poll = streams.poll_next_unpin(&mut Context::from_waker( - futures_util::task::noop_waker_ref(), - )); - - assert!(poll.is_pending()); - assert_eq!( - streams.len(), - 0, - "resources of cancelled streams are cleaned up properly" - ); - } - - #[tokio::test] - async fn replaced_stream_is_still_registered() { - let mut streams = StreamMap::new(Duration::from_millis(100), 3); - - let (mut tx1, rx1) = mpsc::channel(5); - let (mut tx2, rx2) = mpsc::channel(5); - - let _ = streams.try_push("ID1", rx1); - let _ = streams.try_push("ID2", rx2); - - let _ = tx2.send(2).await; - let _ = tx1.send(1).await; - let _ = tx2.send(3).await; - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - assert_eq!(id, "ID1"); - assert_eq!(res.unwrap().unwrap(), 1); - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - assert_eq!(id, "ID2"); - assert_eq!(res.unwrap().unwrap(), 2); - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - assert_eq!(id, "ID2"); - assert_eq!(res.unwrap().unwrap(), 3); - - let (mut new_tx1, new_rx1) = mpsc::channel(5); - let replaced = streams.try_push("ID1", new_rx1); - assert!(matches!(replaced.unwrap_err(), PushError::Replaced(_))); - - let _ = new_tx1.send(4).await; - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - - assert_eq!(id, "ID1"); - assert_eq!(res.unwrap().unwrap(), 4); - } - - // Each stream emits 1 item with delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. - // We stop after NUM_STREAMS tasks, meaning the overall execution must at least take DELAY * NUM_STREAMS. - #[tokio::test] - async fn backpressure() { - const DELAY: Duration = Duration::from_millis(100); - const NUM_STREAMS: u32 = 10; - - let start = Instant::now(); - Task::new(DELAY, NUM_STREAMS, 1).await; - let duration = start.elapsed(); - - assert!(duration >= DELAY * NUM_STREAMS); - } - - struct Task { - item_delay: Duration, - num_streams: usize, - num_processed: usize, - inner: StreamMap, - } - - impl Task { - fn new(item_delay: Duration, num_streams: u32, capacity: usize) -> Self { - Self { - item_delay, - num_streams: num_streams as usize, - num_processed: 0, - inner: StreamMap::new(Duration::from_secs(60), capacity), - } - } - } - - impl Future for Task { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while this.num_processed < this.num_streams { - match this.inner.poll_next_unpin(cx) { - Poll::Ready((_, Some(result))) => { - if result.is_err() { - panic!("Timeout is great than item delay") - } - - this.num_processed += 1; - continue; - } - Poll::Ready((_, None)) => { - continue; - } - _ => {} - } - - if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) { - // We push the constant ID to prove that user can use the same ID if the stream was finished - let maybe_future = this.inner.try_push(1u8, once(Delay::new(this.item_delay))); - assert!(maybe_future.is_ok(), "we polled for readiness"); - - continue; - } - - return Poll::Pending; - } - - Poll::Ready(()) - } - } -} diff --git a/misc/futures-bounded/src/stream_set.rs b/misc/futures-bounded/src/stream_set.rs deleted file mode 100644 index bb32835065f..00000000000 --- a/misc/futures-bounded/src/stream_set.rs +++ /dev/null @@ -1,64 +0,0 @@ -use futures_util::stream::BoxStream; -use futures_util::Stream; -use std::task::{ready, Context, Poll}; -use std::time::Duration; - -use crate::{PushError, StreamMap, Timeout}; - -/// Represents a set of [Stream]s. -/// -/// Each stream must finish within the specified time and the list never outgrows its capacity. -pub struct StreamSet { - id: u32, - inner: StreamMap, -} - -impl StreamSet { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - id: 0, - inner: StreamMap::new(timeout, capacity), - } - } -} - -impl StreamSet -where - O: Send + 'static, -{ - /// Push a stream into the list. - /// - /// This method adds the given stream to the list. - /// If the length of the list is equal to the capacity, this method returns a error that contains the passed stream. - /// In that case, the stream is not added to the set. - pub fn try_push(&mut self, stream: F) -> Result<(), BoxStream> - where - F: Stream + Send + 'static, - { - self.id = self.id.wrapping_add(1); - - match self.inner.try_push(self.id, stream) { - Ok(()) => Ok(()), - Err(PushError::BeyondCapacity(w)) => Err(w), - Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), - } - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - self.inner.poll_ready_unpin(cx) - } - - pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll>> { - let (_, res) = ready!(self.inner.poll_next_unpin(cx)); - - Poll::Ready(res) - } -}