Skip to content

Replace unbounded channels with bounded ones. #1191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 9, 2019
90 changes: 57 additions & 33 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ use crate::{
muxing::StreamMuxer,
nodes::{
node::Substream,
handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent},
handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId, ClosedTask},
handled_node::{HandledNodeError, NodeHandler}
handled_node::{HandledNodeError, IntoNodeHandler, NodeHandler},
tasks::{self, ClosedTask, TaskEntry, TaskId}
}
};
use fnv::FnvHashMap;
use futures::prelude::*;
use std::{error, fmt, hash::Hash, mem};

pub use crate::nodes::tasks::StartTakeOver;

mod tests;

/// Implementation of `Stream` that handles a collection of nodes.
Expand All @@ -40,7 +41,7 @@ pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerEr
///
/// The user data contains the state of the task. If `Connected`, then a corresponding entry
/// must be present in `nodes`.
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TConnInfo, TUserData>, TConnInfo>,
inner: tasks::Manager<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TConnInfo, TUserData>, TConnInfo>,

/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
/// must always be in the `Connected` state.
Expand Down Expand Up @@ -310,7 +311,7 @@ where
#[inline]
pub fn new() -> Self {
CollectionStream {
inner: HandledNodesTasks::new(),
inner: tasks::Manager::new(),
nodes: Default::default(),
}
}
Expand Down Expand Up @@ -357,12 +358,17 @@ where
}

/// Sends an event to all nodes.
#[inline]
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,
#[must_use]
pub fn start_broadcast(&mut self, event: &TInEvent) -> AsyncSink<()>
where
TInEvent: Clone
{
// TODO: remove the ones we're not connected to?
self.inner.broadcast_event(event)
self.inner.start_broadcast(event)
}

#[must_use]
pub fn complete_broadcast(&mut self) -> Async<()> {
self.inner.complete_broadcast()
}

/// Adds an existing connection to a node to the collection.
Expand All @@ -383,8 +389,8 @@ where
TConnInfo: Clone + Send + 'static,
TPeerId: Clone,
{
// Calling `HandledNodesTasks::add_connection` is the same as calling
// `HandledNodesTasks::add_reach_attempt`, except that we don't get any `NodeReached` event.
// Calling `tasks::Manager::add_connection` is the same as calling
// `tasks::Manager::add_reach_attempt`, except that we don't get any `NodeReached` event.
// We therefore implement this method the same way as calling `add_reach_attempt` followed
// with simulating a received `NodeReached` event and accepting it.

Expand Down Expand Up @@ -451,29 +457,28 @@ where
};

match item {
HandledNodesEvent::TaskClosed { task, result, handler } => {
tasks::Event::TaskClosed { task, result, handler } => {
let id = task.id();
let user_data = task.into_user_data();

match (user_data, result, handler) {
(TaskState::Pending, TaskClosedEvent::Reach(err), Some(handler)) => {
(TaskState::Pending, tasks::Error::Reach(err), Some(handler)) => {
Async::Ready(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: err,
handler,
})
},
(TaskState::Pending, TaskClosedEvent::Node(_), _) => {
(TaskState::Pending, tasks::Error::Node(_), _) => {
panic!("We switch the task state to Connected once we're connected, and \
a TaskClosedEvent::Node can only happen after we're \
connected; QED");
a tasks::Error::Node can only happen after we're connected; QED");
},
(TaskState::Pending, TaskClosedEvent::Reach(_), None) => {
// TODO: this could be improved in the API of HandledNodesTasks
panic!("The HandledNodesTasks is guaranteed to always return the handler \
when producing a TaskClosedEvent::Reach error");
(TaskState::Pending, tasks::Error::Reach(_), None) => {
// TODO: this could be improved in the API of tasks::Manager
panic!("The tasks::Manager is guaranteed to always return the handler \
when producing a tasks::Error::Reach error");
},
(TaskState::Connected(conn_info, user_data), TaskClosedEvent::Node(err), _handler) => {
(TaskState::Connected(conn_info, user_data), tasks::Error::Node(err), _handler) => {
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(conn_info.peer_id());
debug_assert_eq!(_node_task_id, Some(id));
Expand All @@ -483,13 +488,13 @@ where
user_data,
})
},
(TaskState::Connected(_, _), TaskClosedEvent::Reach(_), _) => {
panic!("A TaskClosedEvent::Reach can only happen before we are connected \
(TaskState::Connected(_, _), tasks::Error::Reach(_), _) => {
panic!("A tasks::Error::Reach can only happen before we are connected \
to a node; therefore the TaskState won't be Connected; QED");
},
}
},
HandledNodesEvent::NodeReached { task, conn_info } => {
tasks::Event::NodeReached { task, conn_info } => {
let id = task.id();
drop(task);
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
Expand All @@ -498,7 +503,7 @@ where
conn_info: Some(conn_info),
}))
},
HandledNodesEvent::NodeEvent { task, event } => {
tasks::Event::NodeEvent { task, event } => {
let conn_info = match task.user_data() {
TaskState::Connected(conn_info, _) => conn_info.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
Expand Down Expand Up @@ -566,7 +571,7 @@ where

/// Access to a peer in the collection.
pub struct PeerMut<'a, TInEvent, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
inner: HandledNodesTask<'a, TInEvent, TaskState<TConnInfo, TUserData>>,
inner: TaskEntry<'a, TInEvent, TaskState<TConnInfo, TUserData>>,
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
}

Expand Down Expand Up @@ -612,9 +617,13 @@ where
}

/// Sends an event to the given node.
#[inline]
pub fn send_event(&mut self, event: TInEvent) {
self.inner.send_event(event)
pub fn start_send_event(&mut self, event: TInEvent) -> StartSend<TInEvent, ()> {
self.inner.start_send_event(event)
}

/// Complete sending an event message initiated by `start_send_event`.
pub fn complete_send_event(&mut self) -> Poll<(), ()> {
self.inner.complete_send_event()
}

/// Closes the connections to this node. Returns the user data.
Expand All @@ -639,8 +648,23 @@ where
/// The reach attempt will only be effectively cancelled once the peer (the object you're
/// manipulating) has received some network activity. However no event will be ever be
/// generated from this reach attempt, and this takes effect immediately.
pub fn take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>) {
let _state = self.inner.take_over(id.inner);
debug_assert!(if let TaskState::Pending = _state { true } else { false });
#[must_use]
pub fn start_take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>)
-> StartTakeOver<(), InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>>
{
match self.inner.start_take_over(id.inner) {
StartTakeOver::Ready(_state) => {
debug_assert!(if let TaskState::Pending = _state { true } else { false });
StartTakeOver::Ready(())
}
StartTakeOver::NotReady(inner) =>
StartTakeOver::NotReady(InterruptedReachAttempt { inner }),
StartTakeOver::Gone => StartTakeOver::Gone
}
}

/// Complete a take over initiated by `start_take_over`.
pub fn complete_take_over(&mut self) -> Poll<(), ()> {
self.inner.complete_take_over()
}
}
59 changes: 53 additions & 6 deletions core/src/nodes/collection/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,41 +154,81 @@ fn events_in_a_node_reaches_the_collection_stream() {
Ok(Async::Ready(()))
})).expect("tokio works");

let cs2 = cs.clone();
rt.block_on(future::poll_fn(move || {
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
Ok::<_, ()>(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
if cs.complete_broadcast().is_not_ready() {
return Ok(Async::NotReady)
}
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept(());
});
Ok(Async::Ready(()))
})).expect("tokio works");

let cs2 = cs.clone();
rt.block_on(future::poll_fn(move || {
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
Ok::<_, ()>(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
if cs.complete_broadcast().is_not_ready() {
return Ok(Async::NotReady)
}
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
assert_matches!(event, OutEvent::Custom("init"));
});
Ok(Async::Ready(()))
})).expect("tokio works");


let cs2 = cs.clone();
rt.block_on(future::poll_fn(move || {
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
Ok::<_, ()>(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
if cs.complete_broadcast().is_not_ready() {
return Ok(Async::NotReady)
}
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
assert_matches!(event, OutEvent::Custom("from handler 1"));
});
Ok(Async::Ready(()))
})).expect("tokio works");

let cs2 = cs.clone();
rt.block_on(future::poll_fn(move || {
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
Ok::<_, ()>(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState);
if cs.complete_broadcast().is_not_ready() {
return Ok(Async::NotReady)
}
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
assert_matches!(event, OutEvent::Custom("from handler 2"));
});
Expand Down Expand Up @@ -238,13 +278,20 @@ fn task_closed_with_error_when_task_is_connected_yields_node_error() {
let mut rt = Builder::new().core_threads(1).build().unwrap();

// Kick it off
let cs2 = cs.clone();
rt.block_on(future::poll_fn(move || {
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
Ok::<_, ()>(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
let cs_fut = cs.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock();
assert_matches!(cs.poll(), Async::NotReady);
// send an event so the Handler errors in two polls
cs.broadcast_event(&InEvent::NextState);
Ok(Async::Ready(()))
Ok(cs.complete_broadcast())
})).expect("tokio works");

// Accept the new node
Expand Down
Loading