From feff9a4cdf30bc74a9452396461db2d55a5fabe1 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Fri, 28 Jun 2019 17:18:37 +0200 Subject: [PATCH 1/6] Replace unbounded channels with bounded ones. To remove the unbounded channels used for communicating with node tasks an API similar to `futures::Sink` is used, i.e. sending is split into a start and complete phase. The start phase returns `StartSend` and first attempts to complete any pending send operations. Completing the send means polling until `Poll::Ready(())` is returned. In addition this PR has split the `handled_node_tasks` module into several smaller ones (cf. `nodes::tasks`) and renamed some types: - `nodes::handled_node_tasks::NodeTask` -> `nodes::tasks::task::Task` - `nodes::handled_node_tasks::NodeTaskInner` -> `nodes::tasks::task::State` - `nodes::handled_node_tasks::NodeTasks` -> `nodes::tasks::Manager` - `nodes::handled_node_tasks::TaskClosedEvent` -> `nodes::tasks::Error` - `nodes::handled_node_tasks::HandledNodesEvent` -> `nodes::tasks::Event` - `nodes::handled_node_tasks::Task` -> `nodes::tasks::TaskEntry` - `nodes::handled_node_tasks::ExtToInMessage` -> `nodes::tasks::task::ToTaskMessage` - `nodes::handled_node_tasks::InToExtMessage` -> `nodes::tasks::task::FromTaskMessage` --- core/src/nodes/collection.rs | 90 ++- core/src/nodes/collection/tests.rs | 59 +- core/src/nodes/handled_node.rs | 37 +- core/src/nodes/handled_node_tasks.rs | 707 --------------------- core/src/nodes/mod.rs | 2 +- core/src/nodes/raw_swarm.rs | 115 ++-- core/src/nodes/raw_swarm/tests.rs | 24 +- core/src/nodes/tasks/error.rs | 38 ++ core/src/nodes/tasks/manager.rs | 568 +++++++++++++++++ core/src/nodes/tasks/mod.rs | 45 ++ core/src/nodes/tasks/task.rs | 367 +++++++++++ core/src/protocols_handler/node_handler.rs | 3 +- core/src/swarm/swarm.rs | 37 +- core/src/transport/memory.rs | 5 +- core/tests/raw_swarm_simult.rs | 1 + 15 files changed, 1286 insertions(+), 812 deletions(-) delete mode 100644 core/src/nodes/handled_node_tasks.rs create mode 100644 core/src/nodes/tasks/error.rs create mode 100644 core/src/nodes/tasks/manager.rs create mode 100644 core/src/nodes/tasks/mod.rs create mode 100644 core/src/nodes/tasks/task.rs diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 696b2af2669..af8601d27ec 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -23,15 +23,16 @@ use crate::{ muxing::StreamMuxer, nodes::{ node::Substream, - handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent}, - handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId, ClosedTask}, - handled_node::{HandledNodeError, NodeHandler} + handled_node::{HandledNodeError, IntoNodeHandler, NodeHandler}, + tasks::{self, ClosedTask, TaskEntry, TaskId} } }; use fnv::FnvHashMap; use futures::prelude::*; use std::{error, fmt, hash::Hash, mem}; +pub use crate::nodes::tasks::StartTakeOver; + mod tests; /// Implementation of `Stream` that handles a collection of nodes. @@ -40,7 +41,7 @@ pub struct CollectionStream, TConnInfo>, + inner: tasks::Manager, TConnInfo>, /// List of nodes, with the task id that handles this node. The corresponding entry in `tasks` /// must always be in the `Connected` state. @@ -310,7 +311,7 @@ where #[inline] pub fn new() -> Self { CollectionStream { - inner: HandledNodesTasks::new(), + inner: tasks::Manager::new(), nodes: Default::default(), } } @@ -357,12 +358,17 @@ where } /// Sends an event to all nodes. - #[inline] - pub fn broadcast_event(&mut self, event: &TInEvent) - where TInEvent: Clone, + #[must_use] + pub fn start_broadcast(&mut self, event: &TInEvent) -> AsyncSink<()> + where + TInEvent: Clone { - // TODO: remove the ones we're not connected to? - self.inner.broadcast_event(event) + self.inner.start_broadcast(event) + } + + #[must_use] + pub fn complete_broadcast(&mut self) -> Async<()> { + self.inner.complete_broadcast() } /// Adds an existing connection to a node to the collection. @@ -383,8 +389,8 @@ where TConnInfo: Clone + Send + 'static, TPeerId: Clone, { - // Calling `HandledNodesTasks::add_connection` is the same as calling - // `HandledNodesTasks::add_reach_attempt`, except that we don't get any `NodeReached` event. + // Calling `tasks::Manager::add_connection` is the same as calling + // `tasks::Manager::add_reach_attempt`, except that we don't get any `NodeReached` event. // We therefore implement this method the same way as calling `add_reach_attempt` followed // with simulating a received `NodeReached` event and accepting it. @@ -451,29 +457,28 @@ where }; match item { - HandledNodesEvent::TaskClosed { task, result, handler } => { + tasks::Event::TaskClosed { task, result, handler } => { let id = task.id(); let user_data = task.into_user_data(); match (user_data, result, handler) { - (TaskState::Pending, TaskClosedEvent::Reach(err), Some(handler)) => { + (TaskState::Pending, tasks::Error::Reach(err), Some(handler)) => { Async::Ready(CollectionEvent::ReachError { id: ReachAttemptId(id), error: err, handler, }) }, - (TaskState::Pending, TaskClosedEvent::Node(_), _) => { + (TaskState::Pending, tasks::Error::Node(_), _) => { panic!("We switch the task state to Connected once we're connected, and \ - a TaskClosedEvent::Node can only happen after we're \ - connected; QED"); + a tasks::Error::Node can only happen after we're connected; QED"); }, - (TaskState::Pending, TaskClosedEvent::Reach(_), None) => { - // TODO: this could be improved in the API of HandledNodesTasks - panic!("The HandledNodesTasks is guaranteed to always return the handler \ - when producing a TaskClosedEvent::Reach error"); + (TaskState::Pending, tasks::Error::Reach(_), None) => { + // TODO: this could be improved in the API of tasks::Manager + panic!("The tasks::Manager is guaranteed to always return the handler \ + when producing a tasks::Error::Reach error"); }, - (TaskState::Connected(conn_info, user_data), TaskClosedEvent::Node(err), _handler) => { + (TaskState::Connected(conn_info, user_data), tasks::Error::Node(err), _handler) => { debug_assert!(_handler.is_none()); let _node_task_id = self.nodes.remove(conn_info.peer_id()); debug_assert_eq!(_node_task_id, Some(id)); @@ -483,13 +488,13 @@ where user_data, }) }, - (TaskState::Connected(_, _), TaskClosedEvent::Reach(_), _) => { - panic!("A TaskClosedEvent::Reach can only happen before we are connected \ + (TaskState::Connected(_, _), tasks::Error::Reach(_), _) => { + panic!("A tasks::Error::Reach can only happen before we are connected \ to a node; therefore the TaskState won't be Connected; QED"); }, } }, - HandledNodesEvent::NodeReached { task, conn_info } => { + tasks::Event::NodeReached { task, conn_info } => { let id = task.id(); drop(task); Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent { @@ -498,7 +503,7 @@ where conn_info: Some(conn_info), })) }, - HandledNodesEvent::NodeEvent { task, event } => { + tasks::Event::NodeEvent { task, event } => { let conn_info = match task.user_data() { TaskState::Connected(conn_info, _) => conn_info.clone(), _ => panic!("we can only receive NodeEvent events from a task after we \ @@ -566,7 +571,7 @@ where /// Access to a peer in the collection. pub struct PeerMut<'a, TInEvent, TUserData, TConnInfo = PeerId, TPeerId = PeerId> { - inner: HandledNodesTask<'a, TInEvent, TaskState>, + inner: TaskEntry<'a, TInEvent, TaskState>, nodes: &'a mut FnvHashMap, } @@ -612,9 +617,13 @@ where } /// Sends an event to the given node. - #[inline] - pub fn send_event(&mut self, event: TInEvent) { - self.inner.send_event(event) + pub fn start_send_event(&mut self, event: TInEvent) -> StartSend { + self.inner.start_send_event(event) + } + + /// Complete sending an event message initiated by `start_send_event`. + pub fn complete_send_event(&mut self) -> Poll<(), ()> { + self.inner.complete_send_event() } /// Closes the connections to this node. Returns the user data. @@ -639,8 +648,23 @@ where /// The reach attempt will only be effectively cancelled once the peer (the object you're /// manipulating) has received some network activity. However no event will be ever be /// generated from this reach attempt, and this takes effect immediately. - pub fn take_over(&mut self, id: InterruptedReachAttempt) { - let _state = self.inner.take_over(id.inner); - debug_assert!(if let TaskState::Pending = _state { true } else { false }); + #[must_use] + pub fn start_take_over(&mut self, id: InterruptedReachAttempt) + -> StartTakeOver<(), InterruptedReachAttempt> + { + match self.inner.start_take_over(id.inner) { + StartTakeOver::Ready(_state) => { + debug_assert!(if let TaskState::Pending = _state { true } else { false }); + StartTakeOver::Ready(()) + } + StartTakeOver::NotReady(inner) => + StartTakeOver::NotReady(InterruptedReachAttempt { inner }), + StartTakeOver::Gone => StartTakeOver::Gone + } + } + + /// Complete a take over initiated by `start_take_over`. + pub fn complete_take_over(&mut self) -> Poll<(), ()> { + self.inner.complete_take_over() } } diff --git a/core/src/nodes/collection/tests.rs b/core/src/nodes/collection/tests.rs index 2b10ebecdeb..69f82c05428 100644 --- a/core/src/nodes/collection/tests.rs +++ b/core/src/nodes/collection/tests.rs @@ -154,20 +154,40 @@ fn events_in_a_node_reaches_the_collection_stream() { Ok(Async::Ready(())) })).expect("tokio works"); + let cs2 = cs.clone(); + rt.block_on(future::poll_fn(move || { + if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() { + Ok::<_, ()>(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let cs_fut = cs.clone(); rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); + if cs.complete_broadcast().is_not_ready() { + return Ok(Async::NotReady) + } assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { reach_ev.accept(()); }); Ok(Async::Ready(())) })).expect("tokio works"); + let cs2 = cs.clone(); + rt.block_on(future::poll_fn(move || { + if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() { + Ok::<_, ()>(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let cs_fut = cs.clone(); rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); + if cs.complete_broadcast().is_not_ready() { + return Ok(Async::NotReady) + } assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => { assert_matches!(event, OutEvent::Custom("init")); }); @@ -175,20 +195,40 @@ fn events_in_a_node_reaches_the_collection_stream() { })).expect("tokio works"); + let cs2 = cs.clone(); + rt.block_on(future::poll_fn(move || { + if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() { + Ok::<_, ()>(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let cs_fut = cs.clone(); rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); + if cs.complete_broadcast().is_not_ready() { + return Ok(Async::NotReady) + } assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => { assert_matches!(event, OutEvent::Custom("from handler 1")); }); Ok(Async::Ready(())) })).expect("tokio works"); + let cs2 = cs.clone(); + rt.block_on(future::poll_fn(move || { + if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() { + Ok::<_, ()>(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let cs_fut = cs.clone(); rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); - cs.broadcast_event(&InEvent::NextState); + if cs.complete_broadcast().is_not_ready() { + return Ok(Async::NotReady) + } assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => { assert_matches!(event, OutEvent::Custom("from handler 2")); }); @@ -238,13 +278,20 @@ fn task_closed_with_error_when_task_is_connected_yields_node_error() { let mut rt = Builder::new().core_threads(1).build().unwrap(); // Kick it off + let cs2 = cs.clone(); + rt.block_on(future::poll_fn(move || { + if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() { + Ok::<_, ()>(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let cs_fut = cs.clone(); rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); assert_matches!(cs.poll(), Async::NotReady); // send an event so the Handler errors in two polls - cs.broadcast_event(&InEvent::NextState); - Ok(Async::Ready(())) + Ok(cs.complete_broadcast()) })).expect("tokio works"); // Accept the new node diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index a78e0659ea4..c3b915a7584 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::muxing::StreamMuxer; +use crate::{PeerId, muxing::StreamMuxer}; use crate::nodes::node::{NodeEvent, NodeStream, Substream, Close}; use futures::prelude::*; use std::{error, fmt, io}; @@ -62,6 +62,29 @@ pub trait NodeHandler { fn poll(&mut self) -> Poll, Self::Error>; } +/// Prototype for a `NodeHandler`. +pub trait IntoNodeHandler { + /// The node handler. + type Handler: NodeHandler; + + /// Builds the node handler. + /// + /// The `TConnInfo` is the information about the connection that the handler is going to handle. + /// This is generated by the `Transport` and typically implements the `ConnectionInfo` trait. + fn into_handler(self, remote_conn_info: &TConnInfo) -> Self::Handler; +} + +impl IntoNodeHandler for T +where + T: NodeHandler +{ + type Handler = Self; + + fn into_handler(self, _: &TConnInfo) -> Self { + self + } +} + /// Endpoint for a received substream. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum NodeHandlerEndpoint { @@ -71,7 +94,6 @@ pub enum NodeHandlerEndpoint { impl NodeHandlerEndpoint { /// Returns true for `Dialer`. - #[inline] pub fn is_dialer(&self) -> bool { match self { NodeHandlerEndpoint::Dialer(_) => true, @@ -80,7 +102,6 @@ impl NodeHandlerEndpoint { } /// Returns true for `Listener`. - #[inline] pub fn is_listener(&self) -> bool { match self { NodeHandlerEndpoint::Dialer(_) => false, @@ -102,7 +123,6 @@ pub enum NodeHandlerEvent { /// Event produced by a handler. impl NodeHandlerEvent { /// If this is `OutboundSubstreamRequest`, maps the content to something else. - #[inline] pub fn map_outbound_open_info(self, map: F) -> NodeHandlerEvent where F: FnOnce(TOutboundOpenInfo) -> I { @@ -115,7 +135,6 @@ impl NodeHandlerEvent { } /// If this is `Custom`, maps the content to something else. - #[inline] pub fn map_custom(self, map: F) -> NodeHandlerEvent where F: FnOnce(TCustom) -> I { @@ -159,7 +178,6 @@ where THandler: NodeHandler>, { /// Builds a new `HandledNode`. - #[inline] pub fn new(muxer: TMuxer, handler: THandler) -> Self { HandledNode { node: NodeStream::new(muxer), @@ -178,7 +196,6 @@ where } /// Injects an event to the handler. Has no effect if the handler is closing. - #[inline] pub fn inject_event(&mut self, event: THandler::InEvent) { self.handler.inject_event(event); } @@ -242,7 +259,8 @@ pub enum HandledNodeError { } impl fmt::Display for HandledNodeError -where THandlerErr: fmt::Display +where + THandlerErr: fmt::Display { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -253,7 +271,8 @@ where THandlerErr: fmt::Display } impl error::Error for HandledNodeError -where THandlerErr: error::Error + 'static +where + THandlerErr: error::Error + 'static { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs deleted file mode 100644 index 7bab0c1e81d..00000000000 --- a/core/src/nodes/handled_node_tasks.rs +++ /dev/null @@ -1,707 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::{ - PeerId, - muxing::StreamMuxer, - nodes::{ - handled_node::{HandledNode, HandledNodeError, NodeHandler}, - node::{Close, Substream} - } -}; -use fnv::FnvHashMap; -use futures::{prelude::*, future::Executor, stream, sync::mpsc}; -use smallvec::SmallVec; -use std::{ - collections::hash_map::{Entry, OccupiedEntry}, - error, - fmt, - mem -}; - -mod tests; - -// Implementor notes -// ================= -// -// This collection of nodes spawns a task for each individual node to process. This means that -// events happen on the background at the same time as the `HandledNodesTasks` is being polled. -// -// In order to make the API non-racy and avoid issues, we completely separate the state in the -// `HandledNodesTasks` from the states that the task nodes can access. They are only allowed to -// exchange messages. The state in the `HandledNodesTasks` is therefore delayed compared to the -// tasks, and is updated only when `poll()` is called. -// -// The only thing that we must be careful about is substreams, as they are "detached" from the -// state of the `HandledNodesTasks` and allowed to process in parallel. This is why there is no -// "substream closed" event being reported, as it could potentially create confusions and race -// conditions in the user's code. See similar comments in the documentation of `NodeStream`. - -/// Implementation of `Stream` that handles a collection of nodes. -pub struct HandledNodesTasks { - /// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts - /// the task. It is possible that we receive messages from tasks that used to be in this list - /// but no longer are, in which case we should ignore them. - tasks: FnvHashMap>, TUserData)>, - - /// Identifier for the next task to spawn. - next_task_id: TaskId, - - /// List of node tasks to spawn. - // TODO: stronger typing? - to_spawn: SmallVec<[Box + Send>; 8]>, - /// If no tokio executor is available, we move tasks to this list, and futures are polled on - /// the current thread instead. - local_spawns: Vec + Send>>, - - /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. - events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, - /// Receiver side for the events. - events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, -} - -impl fmt::Debug for - HandledNodesTasks -where - TUserData: fmt::Debug -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_map() - .entries(self.tasks.iter().map(|(id, (_, ud))| (id, ud))) - .finish() - } -} - -/// Error that can happen in a task. -#[derive(Debug)] -pub enum TaskClosedEvent { - /// An error happend while we were trying to reach the node. - Reach(TReachErr), - /// An error happened after the node has been reached. - Node(HandledNodeError), -} - -impl fmt::Display for TaskClosedEvent -where - TReachErr: fmt::Display, - THandlerErr: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TaskClosedEvent::Reach(err) => write!(f, "{}", err), - TaskClosedEvent::Node(err) => write!(f, "{}", err), - } - } -} - -impl error::Error for TaskClosedEvent -where - TReachErr: error::Error + 'static, - THandlerErr: error::Error + 'static -{ - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - TaskClosedEvent::Reach(err) => Some(err), - TaskClosedEvent::Node(err) => Some(err), - } - } -} - -/// Prototype for a `NodeHandler`. -pub trait IntoNodeHandler { - /// The node handler. - type Handler: NodeHandler; - - /// Builds the node handler. - /// - /// The `TConnInfo` is the information about the connection that the handler is going to handle. - /// This is generated by the `Transport` and typically implements the `ConnectionInfo` trait. - fn into_handler(self, remote_conn_info: &TConnInfo) -> Self::Handler; -} - -impl IntoNodeHandler for T -where T: NodeHandler -{ - type Handler = Self; - - #[inline] - fn into_handler(self, _: &TConnInfo) -> Self { - self - } -} - -/// Event that can happen on the `HandledNodesTasks`. -#[derive(Debug)] -pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> { - /// A task has been closed. - /// - /// This happens once the node handler closes or an error happens. - // TODO: send back undelivered events? - TaskClosed { - /// The task that has been closed. - task: ClosedTask, - /// What happened. - result: TaskClosedEvent, - /// If the task closed before reaching the node, this contains the handler that was passed - /// to `add_reach_attempt`. - handler: Option, - }, - - /// A task has successfully connected to a node. - NodeReached { - /// The task that succeeded. - task: Task<'a, TInEvent, TUserData>, - /// Identifier of the node. - conn_info: TConnInfo, - }, - - /// A task has produced an event. - NodeEvent { - /// The task that produced the event. - task: Task<'a, TInEvent, TUserData>, - /// The produced event. - event: TOutEvent, - }, -} - -/// Identifier for a future that attempts to reach a node. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct TaskId(usize); - -impl - HandledNodesTasks -{ - /// Creates a new empty collection. - #[inline] - pub fn new() -> Self { - let (events_tx, events_rx) = mpsc::unbounded(); - - HandledNodesTasks { - tasks: Default::default(), - next_task_id: TaskId(0), - to_spawn: SmallVec::new(), - local_spawns: Vec::new(), - events_tx, - events_rx, - } - } - - /// Adds to the collection a future that tries to reach a node. - /// - /// This method spawns a task dedicated to resolving this future and processing the node's - /// events. - pub fn add_reach_attempt(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId - where - TFut: Future + Send + 'static, - TIntoHandler: IntoNodeHandler + Send + 'static, - TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - TReachErr: error::Error + Send + 'static, - THandlerErr: error::Error + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, - ::OutboundOpenInfo: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, - TMuxer::OutboundSubstream: Send + 'static, - TConnInfo: Send + 'static, - { - let task_id = self.next_task_id; - self.next_task_id.0 += 1; - - let (tx, rx) = mpsc::unbounded(); - self.tasks.insert(task_id, (tx, user_data)); - - let task = Box::new(NodeTask { - taken_over: SmallVec::new(), - inner: NodeTaskInner::Future { - future, - handler, - events_buffer: Vec::new(), - }, - events_tx: self.events_tx.clone(), - in_events_rx: rx.fuse(), - id: task_id, - }); - - self.to_spawn.push(task); - task_id - } - - /// Adds an existing connection to a node to the collection. - /// - /// This method spawns a task dedicated to processing the node's events. - /// - /// No `NodeReached` event will be emitted for this task, since the node has already been - /// reached. - pub fn add_connection(&mut self, user_data: TUserData, muxer: TMuxer, handler: THandler) -> TaskId - where - TIntoHandler: IntoNodeHandler + Send + 'static, - THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - TReachErr: error::Error + Send + 'static, - THandlerErr: error::Error + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, - ::OutboundOpenInfo: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, - TMuxer::OutboundSubstream: Send + 'static, - TConnInfo: Send + 'static, - { - let task_id = self.next_task_id; - self.next_task_id.0 += 1; - - let (tx, rx) = mpsc::unbounded(); - self.tasks.insert(task_id, (tx, user_data)); - - let task: NodeTask, _, _, _, _, _, _> = NodeTask { - taken_over: SmallVec::new(), - inner: NodeTaskInner::Node(HandledNode::new(muxer, handler)), - events_tx: self.events_tx.clone(), - in_events_rx: rx.fuse(), - id: task_id, - }; - - self.to_spawn.push(Box::new(task)); - task_id - } - - /// Sends an event to all the tasks, including the pending ones. - pub fn broadcast_event(&mut self, event: &TInEvent) - where TInEvent: Clone, - { - for (sender, _) in self.tasks.values() { - // Note: it is possible that sending an event fails if the background task has already - // finished, but the local state hasn't reflected that yet because it hasn't been - // polled. This is not an error situation. - let _ = sender.unbounded_send(ExtToInMessage::HandlerEvent(event.clone())); - } - } - - /// Grants access to an object that allows controlling a task of the collection. - /// - /// Returns `None` if the task id is invalid. - #[inline] - pub fn task(&mut self, id: TaskId) -> Option> { - match self.tasks.entry(id) { - Entry::Occupied(inner) => Some(Task { inner }), - Entry::Vacant(_) => None, - } - } - - /// Returns a list of all the active tasks. - #[inline] - pub fn tasks<'a>(&'a self) -> impl Iterator + 'a { - self.tasks.keys().cloned() - } - - /// Provides an API similar to `Stream`, except that it cannot produce an error. - pub fn poll(&mut self) -> Async> { - let (message, task_id) = match self.poll_inner() { - Async::Ready(r) => r, - Async::NotReady => return Async::NotReady, - }; - - Async::Ready(match message { - InToExtMessage::NodeEvent(event) => { - HandledNodesEvent::NodeEvent { - task: match self.tasks.entry(task_id) { - Entry::Occupied(inner) => Task { inner }, - Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED") - }, - event - } - }, - InToExtMessage::NodeReached(conn_info) => { - HandledNodesEvent::NodeReached { - task: match self.tasks.entry(task_id) { - Entry::Occupied(inner) => Task { inner }, - Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED") - }, - conn_info - } - }, - InToExtMessage::TaskClosed(result, handler) => { - let (channel, user_data) = self.tasks.remove(&task_id) - .expect("poll_inner only returns valid TaskIds; QED"); - HandledNodesEvent::TaskClosed { - task: ClosedTask { - id: task_id, - channel, - user_data, - }, - result, - handler, - } - }, - }) - } - - /// Since non-lexical lifetimes still don't work very well in Rust at the moment, we have to - /// split `poll()` in two. This method returns an `InToExtMessage` that is guaranteed to come - /// from an alive task. - // TODO: look into merging with `poll()` - fn poll_inner(&mut self) -> Async<(InToExtMessage, TaskId)> { - for to_spawn in self.to_spawn.drain() { - // We try to use the default executor, but fall back to polling the task manually if - // no executor is available. This makes it possible to use the core in environments - // outside of tokio. - let executor = tokio_executor::DefaultExecutor::current(); - if let Err(err) = executor.execute(to_spawn) { - self.local_spawns.push(err.into_future()); - } - } - - for n in (0..self.local_spawns.len()).rev() { - let mut task = self.local_spawns.swap_remove(n); - match task.poll() { - Ok(Async::Ready(())) => {}, - Ok(Async::NotReady) => self.local_spawns.push(task), - // It would normally be desirable to either report or log when a background task - // errors. However the default tokio executor doesn't do anything in case of error, - // and therefore we mimic this behaviour by also not doing anything. - Err(()) => {} - } - } - - loop { - match self.events_rx.poll() { - Ok(Async::Ready(Some((message, task_id)))) => { - // If the task id is no longer in `self.tasks`, that means that the user called - // `close()` on this task earlier. Therefore no new event should be generated - // for this task. - if self.tasks.contains_key(&task_id) { - break Async::Ready((message, task_id)); - } - } - Ok(Async::NotReady) => { - break Async::NotReady; - } - Ok(Async::Ready(None)) => { - unreachable!("The sender is in self as well, therefore the receiver never \ - closes.") - }, - Err(()) => unreachable!("An unbounded receiver never errors"), - } - } - } -} - -/// Access to a task in the collection. -pub struct Task<'a, TInEvent, TUserData> { - inner: OccupiedEntry<'a, TaskId, (mpsc::UnboundedSender>, TUserData)>, -} - -impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> { - /// Sends an event to the given node. - // TODO: report back on delivery - #[inline] - pub fn send_event(&mut self, event: TInEvent) { - // It is possible that the sender is closed if the background task has already finished - // but the local state hasn't been updated yet because we haven't been polled in the - // meanwhile. - let _ = self.inner.get_mut().0.unbounded_send(ExtToInMessage::HandlerEvent(event)); - } - - /// Returns the user data associated with the task. - pub fn user_data(&self) -> &TUserData { - &self.inner.get().1 - } - - /// Returns the user data associated with the task. - pub fn user_data_mut(&mut self) -> &mut TUserData { - &mut self.inner.get_mut().1 - } - - /// Returns the task id. - #[inline] - pub fn id(&self) -> TaskId { - *self.inner.key() - } - - /// Closes the task. Returns the user data. - /// - /// No further event will be generated for this task, but the connection inside the task will - /// continue to run until the `ClosedTask` is destroyed. - pub fn close(self) -> ClosedTask { - let id = *self.inner.key(); - let (channel, user_data) = self.inner.remove(); - ClosedTask { id, channel, user_data } - } - - /// Gives ownership of a closed task. As soon as our task (`self`) has some acknowledgment from - /// the remote that its connection is alive, it will close the connection with `other`. - pub fn take_over(&mut self, other: ClosedTask) -> TUserData { - // It is possible that the sender is closed if the background task has already finished - // but the local state hasn't been updated yet because we haven't been polled in the - // meanwhile. - let _ = self.inner.get_mut().0.unbounded_send(ExtToInMessage::TakeOver(other.channel)); - other.user_data - } -} - -impl<'a, TInEvent, TUserData> fmt::Debug for Task<'a, TInEvent, TUserData> -where - TUserData: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_tuple("Task") - .field(&self.id()) - .field(self.user_data()) - .finish() - } -} - -/// Task after it has been closed. The connection to the remote is potentially still going on, but -/// no new event for this task will be received. -pub struct ClosedTask { - /// Identifier of the task that closed. No longer corresponds to anything, but can be reported - /// to the user. - id: TaskId, - /// The channel to the task. The task will continue to work for as long as this channel is - /// alive, but events produced by it are ignored. - channel: mpsc::UnboundedSender>, - /// The data provided by the user. - user_data: TUserData, -} - -impl ClosedTask { - /// Returns the task id. Note that this task is no longer part of the collection, and therefore - /// calling `task()` with this ID will fail. - #[inline] - pub fn id(&self) -> TaskId { - self.id - } - - /// Returns the user data associated with the task. - pub fn user_data(&self) -> &TUserData { - &self.user_data - } - - /// Returns the user data associated with the task. - pub fn user_data_mut(&mut self) -> &mut TUserData { - &mut self.user_data - } - - /// Finish destroying the task and yield the user data. This closes the connection to the - /// remote. - pub fn into_user_data(self) -> TUserData { - self.user_data - } -} - -impl fmt::Debug for ClosedTask -where - TUserData: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_tuple("ClosedTask") - .field(&self.id) - .field(&self.user_data) - .finish() - } -} - -/// Message to transmit from the public API to a task. -#[derive(Debug)] -enum ExtToInMessage { - /// An event to transmit to the node handler. - HandlerEvent(TInEvent), - /// When received, stores the parameter inside the task and keeps it alive until we have an - /// acknowledgment that the remote has accepted our handshake. - TakeOver(mpsc::UnboundedSender>), -} - -/// Message to transmit from a task to the public API. -#[derive(Debug)] -enum InToExtMessage { - /// A connection to a node has succeeded. - NodeReached(TConnInfo), - /// The task closed. - TaskClosed(TaskClosedEvent, Option), - /// An event from the node. - NodeEvent(TOutEvent), -} - -/// Implementation of `Future` that handles a single node, and all the communications between -/// the various components of the `HandledNodesTasks`. -struct NodeTask -where - TMuxer: StreamMuxer, - TIntoHandler: IntoNodeHandler, - TIntoHandler::Handler: NodeHandler>, -{ - /// Sender to transmit events to the outside. - events_tx: mpsc::UnboundedSender<(InToExtMessage::Error, TConnInfo>, TaskId)>, - /// Receiving end for events sent from the main `HandledNodesTasks`. - in_events_rx: stream::Fuse>>, - /// Inner state of the `NodeTask`. - inner: NodeTaskInner, - /// Identifier of the attempt. - id: TaskId, - /// Channels to keep alive for as long as we don't have an acknowledgment from the remote. - taken_over: SmallVec<[mpsc::UnboundedSender>; 1]>, -} - -enum NodeTaskInner -where - TMuxer: StreamMuxer, - TIntoHandler: IntoNodeHandler, - TIntoHandler::Handler: NodeHandler>, -{ - /// Future to resolve to connect to the node. - Future { - /// The future that will attempt to reach the node. - future: TFut, - /// The handler that will be used to build the `HandledNode`. - handler: TIntoHandler, - /// While we are dialing the future, we need to buffer the events received on - /// `in_events_rx` so that they get delivered once dialing succeeds. We can't simply leave - /// events in `in_events_rx` because we have to detect if it gets closed. - events_buffer: Vec, - }, - - /// Fully functional node. - Node(HandledNode), - - /// Node closing. - Closing(Close), - - /// A panic happened while polling. - Poisoned, -} - -impl Future for - NodeTask -where - TMuxer: StreamMuxer, - TFut: Future, - TIntoHandler: IntoNodeHandler, - TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - 'outer_loop: loop { - match mem::replace(&mut self.inner, NodeTaskInner::Poisoned) { - // First possibility: we are still trying to reach a node. - NodeTaskInner::Future { mut future, handler, mut events_buffer } => { - // If self.in_events_rx is closed, we stop the task. - loop { - match self.in_events_rx.poll() { - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(ExtToInMessage::HandlerEvent(event)))) => { - events_buffer.push(event) - }, - Ok(Async::Ready(Some(ExtToInMessage::TakeOver(take_over)))) => { - self.taken_over.push(take_over); - }, - Ok(Async::NotReady) => break, - Err(_) => unreachable!("An UnboundedReceiver never errors"), - } - } - // Check whether dialing succeeded. - match future.poll() { - Ok(Async::Ready((conn_info, muxer))) => { - let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info)); - let event = InToExtMessage::NodeReached(conn_info); - for event in events_buffer { - node.inject_event(event); - } - let _ = self.events_tx.unbounded_send((event, self.id)); - self.inner = NodeTaskInner::Node(node); - } - Ok(Async::NotReady) => { - self.inner = NodeTaskInner::Future { future, handler, events_buffer }; - return Ok(Async::NotReady); - }, - Err(err) => { - // End the task - let event = InToExtMessage::TaskClosed(TaskClosedEvent::Reach(err), Some(handler)); - let _ = self.events_tx.unbounded_send((event, self.id)); - return Ok(Async::Ready(())); - } - } - }, - - // Second possibility: we have a node. - NodeTaskInner::Node(mut node) => { - // Start by handling commands received from the outside of the task. - if !self.in_events_rx.is_done() { - loop { - match self.in_events_rx.poll() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(ExtToInMessage::HandlerEvent(event)))) => { - node.inject_event(event) - }, - Ok(Async::Ready(Some(ExtToInMessage::TakeOver(take_over)))) => { - self.taken_over.push(take_over); - }, - Ok(Async::Ready(None)) => { - // Node closed by the external API; start closing. - self.inner = NodeTaskInner::Closing(node.close()); - continue 'outer_loop; - } - Err(()) => unreachable!("An unbounded receiver never errors"), - } - } - } - - // Process the node. - loop { - if !self.taken_over.is_empty() && node.is_remote_acknowledged() { - self.taken_over.clear(); - } - - match node.poll() { - Ok(Async::NotReady) => { - self.inner = NodeTaskInner::Node(node); - return Ok(Async::NotReady); - }, - Ok(Async::Ready(event)) => { - let event = InToExtMessage::NodeEvent(event); - let _ = self.events_tx.unbounded_send((event, self.id)); - } - Err(err) => { - let event = InToExtMessage::TaskClosed(TaskClosedEvent::Node(err), None); - let _ = self.events_tx.unbounded_send((event, self.id)); - return Ok(Async::Ready(())); // End the task. - } - } - } - }, - - NodeTaskInner::Closing(mut closing) => { - match closing.poll() { - Ok(Async::Ready(())) | Err(_) => { - return Ok(Async::Ready(())); // End the task. - }, - Ok(Async::NotReady) => { - self.inner = NodeTaskInner::Closing(closing); - return Ok(Async::NotReady); - } - } - }, - - // This happens if a previous poll has ended unexpectedly. The API of futures - // guarantees that we shouldn't be polled again. - NodeTaskInner::Poisoned => panic!("the node task panicked or errored earlier") - } - } - } -} diff --git a/core/src/nodes/mod.rs b/core/src/nodes/mod.rs index 9950c8b65bd..9879c7abf3c 100644 --- a/core/src/nodes/mod.rs +++ b/core/src/nodes/mod.rs @@ -28,7 +28,7 @@ pub mod collection; pub mod handled_node; -pub mod handled_node_tasks; +pub mod tasks; pub mod listeners; pub mod node; pub mod raw_swarm; diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 4713e76a79f..c2e6af30bd1 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -28,13 +28,14 @@ use crate::{ CollectionReachEvent, CollectionStream, ConnectionInfo, - ReachAttemptId + ReachAttemptId, + InterruptedReachAttempt }, handled_node::{ HandledNodeError, NodeHandler }, - handled_node_tasks::IntoNodeHandler, + handled_node::IntoNodeHandler, node::Substream }, nodes::listeners::{ListenersEvent, ListenersStream}, @@ -44,12 +45,15 @@ use fnv::FnvHashMap; use futures::{prelude::*, future}; use std::{ collections::hash_map::{Entry, OccupiedEntry}, + collections::VecDeque, error, fmt, hash::Hash, num::NonZeroUsize, }; +pub use crate::nodes::collection::StartTakeOver; + mod tests; /// Implementation of `Stream` that handles the nodes. @@ -69,6 +73,9 @@ where /// Max numer of incoming connections. incoming_limit: Option, + + /// Unfinished take over messages to be delivered. + take_over_to_complete: VecDeque<(TPeerId, AsyncSink>)> } impl fmt::Debug for @@ -84,6 +91,7 @@ where .field("active_nodes", &self.active_nodes) .field("reach_attempts", &self.reach_attempts) .field("incoming_limit", &self.incoming_limit) + .field("take_over_to_complete", &self.take_over_to_complete.len()) .finish() } } @@ -555,7 +563,6 @@ where TPeerId: Eq + Hash + Clone + Send + 'static, { /// Starts processing the incoming connection and sets the handler to use for it. - #[inline] pub fn accept(self, handler: THandler) { self.accept_with_builder(|_| handler) } @@ -592,7 +599,6 @@ impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where TTrans: Transport { /// Returns the `IncomingInfo` corresponding to this incoming connection. - #[inline] pub fn info(&self) -> IncomingInfo<'_> { IncomingInfo { listen_addr: &self.listen_addr, @@ -601,19 +607,16 @@ where TTrans: Transport } /// Address of the listener that received the connection. - #[inline] pub fn listen_addr(&self) -> &Multiaddr { &self.listen_addr } /// Address used to send back data to the dialer. - #[inline] pub fn send_back_addr(&self) -> &Multiaddr { &self.send_back_addr } /// Builds the `ConnectedPoint` corresponding to the incoming connection. - #[inline] pub fn to_connected_point(&self) -> ConnectedPoint { self.info().to_connected_point() } @@ -638,14 +641,12 @@ pub enum ConnectedPoint { } impl<'a> From<&'a ConnectedPoint> for Endpoint { - #[inline] fn from(endpoint: &'a ConnectedPoint) -> Endpoint { endpoint.to_endpoint() } } impl From for Endpoint { - #[inline] fn from(endpoint: ConnectedPoint) -> Endpoint { endpoint.to_endpoint() } @@ -653,7 +654,6 @@ impl From for Endpoint { impl ConnectedPoint { /// Turns the `ConnectedPoint` into the corresponding `Endpoint`. - #[inline] pub fn to_endpoint(&self) -> Endpoint { match *self { ConnectedPoint::Dialer { .. } => Endpoint::Dialer, @@ -662,7 +662,6 @@ impl ConnectedPoint { } /// Returns true if we are `Dialer`. - #[inline] pub fn is_dialer(&self) -> bool { match *self { ConnectedPoint::Dialer { .. } => true, @@ -671,7 +670,6 @@ impl ConnectedPoint { } /// Returns true if we are `Listener`. - #[inline] pub fn is_listener(&self) -> bool { match *self { ConnectedPoint::Dialer { .. } => false, @@ -691,7 +689,6 @@ pub struct IncomingInfo<'a> { impl<'a> IncomingInfo<'a> { /// Builds the `ConnectedPoint` corresponding to the incoming connection. - #[inline] pub fn to_connected_point(&self) -> ConnectedPoint { ConnectedPoint::Listener { listen_addr: self.listen_addr.clone(), @@ -713,7 +710,6 @@ where TPeerId: Eq + Hash + Clone, { /// Creates a new node events stream. - #[inline] pub fn new(transport: TTrans, local_peer_id: TPeerId) -> Self { // TODO: with_capacity? RawSwarm { @@ -726,11 +722,11 @@ where connected_points: Default::default(), }, incoming_limit: None, + take_over_to_complete: VecDeque::new() } } /// Creates a new node event stream with incoming connections limit. - #[inline] pub fn new_with_incoming_limit(transport: TTrans, local_peer_id: TPeerId, incoming_limit: Option) -> Self { @@ -744,17 +740,16 @@ where other_reach_attempts: Vec::new(), connected_points: Default::default(), }, + take_over_to_complete: VecDeque::new() } } /// Returns the transport passed when building this object. - #[inline] pub fn transport(&self) -> &TTrans { self.listeners.transport() } /// Start listening on the given multiaddress. - #[inline] pub fn listen_on(&mut self, addr: Multiaddr) -> Result<(), TransportError> { self.listeners.listen_on(addr) } @@ -765,7 +760,6 @@ where } /// Returns limit on incoming connections. - #[inline] pub fn incoming_limit(&self) -> Option { self.incoming_limit } @@ -777,7 +771,6 @@ where /// example with the identify protocol. /// /// For each listener, calls `nat_traversal` with the observed address and returns the outcome. - #[inline] pub fn nat_traversal<'a>(&'a self, observed_addr: &'a Multiaddr) -> impl Iterator + 'a where @@ -790,7 +783,6 @@ where /// Returns the peer id of the local node. /// /// This is the same value as was passed to `new()`. - #[inline] pub fn local_peer_id(&self) -> &TPeerId { &self.reach_attempts.local_peer_id } @@ -837,7 +829,6 @@ where /// We don't know anything about these connections yet, so all we can do is know how many of /// them we have. #[deprecated(note = "Use incoming_negotiated().count() instead")] - #[inline] pub fn num_incoming_negotiated(&self) -> usize { self.reach_attempts.other_reach_attempts .iter() @@ -847,7 +838,6 @@ where /// Returns the list of incoming connections that are currently in the process of being /// negotiated. We don't know the `PeerId` of these nodes yet. - #[inline] pub fn incoming_negotiated(&self) -> impl Iterator> { self.reach_attempts .other_reach_attempts @@ -862,12 +852,21 @@ where }) } - /// Sends an event to all nodes. - #[inline] - pub fn broadcast_event(&mut self, event: &TInEvent) - where TInEvent: Clone, + /// Start sending an event to all nodes. + /// + /// Make sure to complete the broadcast with `complete_broadcast`. + #[must_use] + pub fn start_broadcast(&mut self, event: &TInEvent) -> AsyncSink<()> + where + TInEvent: Clone { - self.active_nodes.broadcast_event(event) + self.active_nodes.start_broadcast(event) + } + + /// Complete a broadcast initiated with `start_broadcast`. + #[must_use] + pub fn complete_broadcast(&mut self) -> Async<()> { + self.active_nodes.complete_broadcast() } /// Returns a list of all the peers we are currently connected to. @@ -905,7 +904,6 @@ where } /// Grants access to a struct that represents a peer. - #[inline] pub fn peer(&mut self, peer_id: TPeerId) -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> { if peer_id == self.reach_attempts.local_peer_id { return Peer::LocalNode; @@ -1044,6 +1042,29 @@ where } } + // Attempt to deliver any pending take over messages. + let mut remaining = self.take_over_to_complete.len(); + while let Some((id, interrupted)) = self.take_over_to_complete.pop_front() { + if let Some(mut peer) = self.active_nodes.peer_mut(&id) { + if let AsyncSink::NotReady(i) = interrupted { + if let StartTakeOver::NotReady(i) = peer.start_take_over(i) { + self.take_over_to_complete.push_back((id, AsyncSink::NotReady(i))) + } else if let Ok(Async::NotReady) = peer.complete_take_over() { + self.take_over_to_complete.push_back((id, AsyncSink::Ready)) + } + } else if let Ok(Async::NotReady) = peer.complete_take_over() { + self.take_over_to_complete.push_back((id, AsyncSink::Ready)) + } + } + remaining -= 1; + if remaining == 0 { + break + } + } + if !self.take_over_to_complete.is_empty() { + return Async::NotReady + } + // Poll the existing nodes. let (action, out_event); match self.active_nodes.poll() { @@ -1096,7 +1117,15 @@ where interrupt or when a reach attempt succeeds or errors; therefore the \ out_reach_attempts should always be in sync with the actual \ attempts; QED"); - self.active_nodes.peer_mut(&peer_id).unwrap().take_over(interrupted); + let mut peer = self.active_nodes.peer_mut(&peer_id).unwrap(); + if let StartTakeOver::NotReady(i) = peer.start_take_over(interrupted) { + self.take_over_to_complete.push_back((peer_id, AsyncSink::NotReady(i))); + return Async::NotReady + } + if let Ok(Async::NotReady) = peer.complete_take_over() { + self.take_over_to_complete.push_back((peer_id, AsyncSink::Ready)); + return Async::NotReady + } } Async::Ready(out_event) @@ -1268,7 +1297,6 @@ where /// /// This means that if `local` and `other` both dial each other, the connection from `local` should /// be kept and the one from `other` will be dropped. -#[inline] fn has_dial_prio(local: &TPeerId, other: &TPeerId) -> bool where TPeerId: AsRef<[u8]>, @@ -1473,7 +1501,6 @@ where TPeerId: Eq + Hash + Clone + Send + 'static, { /// If we are connected, returns the `PeerConnected`. - #[inline] pub fn into_connected(self) -> Option> { match self { Peer::Connected(peer) => Some(peer), @@ -1482,7 +1509,6 @@ where } /// If a connection is pending, returns the `PeerPendingConnect`. - #[inline] pub fn into_pending_connect(self) -> Option> { match self { Peer::PendingConnect(peer) => Some(peer), @@ -1491,7 +1517,6 @@ where } /// If we are not connected, returns the `PeerNotConnected`. - #[inline] pub fn into_not_connected(self) -> Option> { match self { Peer::NotConnected(peer) => Some(peer), @@ -1505,7 +1530,6 @@ where /// the whole connection is immediately closed. /// /// Returns an error if we are `LocalNode`. - #[inline] pub fn or_connect(self, addr: Multiaddr, handler: THandler) -> Result, Self> { @@ -1519,7 +1543,6 @@ where /// the whole connection is immediately closed. /// /// Returns an error if we are `LocalNode`. - #[inline] pub fn or_connect_with(self, addr: TFn, handler: THandler) -> Result, Self> where @@ -1558,7 +1581,6 @@ where { /// Closes the connection or the connection attempt. // TODO: consider returning a `PeerNotConnected` - #[inline] pub fn close(self) { match self { PeerPotentialConnect::Connected(peer) => peer.close(), @@ -1567,7 +1589,6 @@ where } /// If we are connected, returns the `PeerConnected`. - #[inline] pub fn into_connected(self) -> Option> { match self { PeerPotentialConnect::Connected(peer) => Some(peer), @@ -1576,7 +1597,6 @@ where } /// If a connection is pending, returns the `PeerPendingConnect`. - #[inline] pub fn into_pending_connect(self) -> Option> { match self { PeerPotentialConnect::PendingConnect(peer) => Some(peer), @@ -1644,12 +1664,18 @@ where closed messages; QED") } - /// Sends an event to the node. - #[inline] - pub fn send_event(&mut self, event: TInEvent) { + /// Start sending an event to the node. + pub fn start_send_event(&mut self, event: TInEvent) -> StartSend { + self.active_nodes.peer_mut(&self.peer_id) + .expect("A PeerConnected is always created with a PeerId in active_nodes; QED") + .start_send_event(event) + } + + /// Complete sending an event message, initiated by `start_send_event`. + pub fn complete_send_event(&mut self) -> Poll<(), ()> { self.active_nodes.peer_mut(&self.peer_id) .expect("A PeerConnected is always created with a PeerId in active_nodes; QED") - .send_event(event) + .complete_send_event() } } @@ -1673,7 +1699,6 @@ where /// Interrupt this connection attempt. // TODO: consider returning a PeerNotConnected; however that is really pain in terms of // borrows - #[inline] pub fn interrupt(self) { let attempt = self.attempt.remove(); if self.active_nodes.interrupt(attempt.id).is_err() { @@ -1687,13 +1712,11 @@ where } /// Returns the multiaddress we're currently trying to dial. - #[inline] pub fn attempted_multiaddr(&self) -> &Multiaddr { &self.attempt.get().cur_attempted } /// Returns a list of the multiaddresses we're going to try if the current dialing fails. - #[inline] pub fn pending_multiaddrs(&self) -> impl Iterator { self.attempt.get().next_attempts.iter() } @@ -1761,7 +1784,6 @@ where /// /// If we reach a peer but the `PeerId` doesn't correspond to the one we're expecting, then /// the whole connection is immediately closed. - #[inline] pub fn connect(self, addr: Multiaddr, handler: THandler) -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> where @@ -1779,7 +1801,6 @@ where /// /// If we reach a peer but the `PeerId` doesn't correspond to the one we're expecting, then /// the whole connection is immediately closed. - #[inline] pub fn connect_iter(self, addrs: TIter, handler: THandler) -> Result, Self> where diff --git a/core/src/nodes/raw_swarm/tests.rs b/core/src/nodes/raw_swarm/tests.rs index 8edd72e113e..faec59a42fa 100644 --- a/core/src/nodes/raw_swarm/tests.rs +++ b/core/src/nodes/raw_swarm/tests.rs @@ -129,14 +129,24 @@ fn broadcasted_events_reach_active_nodes() { let dial_result = swarm.dial(addr, handler); assert!(dial_result.is_ok()); - swarm.broadcast_event(&InEvent::NextState); let swarm = Arc::new(Mutex::new(swarm)); let mut rt = Runtime::new().unwrap(); + let swarm2 = swarm.clone(); + rt.block_on(future::poll_fn(move || { + if swarm2.lock().start_broadcast(&InEvent::NextState).is_not_ready() { + Ok::<_, ()>(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let mut peer_id : Option = None; while peer_id.is_none() { let swarm_fut = swarm.clone(); peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { let mut swarm = swarm_fut.lock(); + if swarm.complete_broadcast().is_not_ready() { + return Ok(Async::NotReady) + } let poll_res = swarm.poll(); match poll_res { Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => Ok(Async::Ready(Some(conn_info))), @@ -331,10 +341,20 @@ fn yields_node_error_when_there_is_an_error_after_successful_connect() { let mut keep_polling = true; while keep_polling { let swarm_fut = swarm.clone(); + let swarm2 = swarm.clone(); + rt.block_on(future::poll_fn(move || { + if swarm2.lock().start_broadcast(&InEvent::NextState).is_not_ready() { + Ok::<_, ()>(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut swarm = swarm_fut.lock(); // Push the Handler into an error state on the next poll - swarm.broadcast_event(&InEvent::NextState); + if swarm.complete_broadcast().is_not_ready() { + return Ok(Async::NotReady) + } match swarm.poll() { Async::NotReady => Ok(Async::Ready(true)), Async::Ready(event) => { diff --git a/core/src/nodes/tasks/error.rs b/core/src/nodes/tasks/error.rs new file mode 100644 index 00000000000..cddf30954c2 --- /dev/null +++ b/core/src/nodes/tasks/error.rs @@ -0,0 +1,38 @@ +use crate::nodes::handled_node::HandledNodeError; +use std::{fmt, error}; + +/// Error that can happen in a task. +#[derive(Debug)] +pub enum Error { + /// An error happend while we were trying to reach the node. + Reach(R), + /// An error happened after the node has been reached. + Node(HandledNodeError) +} + +impl fmt::Display for Error +where + R: fmt::Display, + H: fmt::Display +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Reach(err) => write!(f, "reach error: {}", err), + Error::Node(err) => write!(f, "node error: {}", err) + } + } +} + +impl error::Error for Error +where + R: error::Error + 'static, + H: error::Error + 'static +{ + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + Error::Reach(err) => Some(err), + Error::Node(err) => Some(err) + } + } +} + diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs new file mode 100644 index 00000000000..b29f577b3eb --- /dev/null +++ b/core/src/nodes/tasks/manager.rs @@ -0,0 +1,568 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{ + PeerId, + muxing::StreamMuxer, + nodes::{ + handled_node::{HandledNode, IntoNodeHandler, NodeHandler}, + node::Substream + } +}; +use fnv::FnvHashMap; +use futures::{prelude::*, future::Executor, sync::mpsc}; +use smallvec::SmallVec; +use std::{collections::hash_map::{Entry, OccupiedEntry}, error, fmt}; +use super::{TaskId, task::{Task, FromTaskMessage, ToTaskMessage}, Error}; + +// Implementor notes +// ================= +// +// This collection of nodes spawns a `Task` for each individual node to process. +// This means that events happen asynchronously at the same time as the +// `Manager` is being polled. +// +// In order to make the API non-racy and avoid issues, we completely separate +// the state in the `Manager` from the states that the `Task` can access. +// They are only allowed to exchange messages. The state in the `Manager` is +// therefore delayed compared to the tasks, and is updated only when `poll()` +// is called. +// +// The only thing that we must be careful about is substreams, as they are +// "detached" from the state of the `Manager` and allowed to process +// concurrently. This is why there is no "substream closed" event being +// reported, as it could potentially create confusions and race conditions in +// the user's code. See similar comments in the documentation of `NodeStream`. +// + +/// Implementation of [`Stream`] that handles a collection of nodes. +pub struct Manager { + /// Collection of managed tasks. + /// + /// Closing the sender interrupts the task. It is possible that we receive + /// messages from tasks that used to be in this collection but no longer + /// are, in which case we should ignore them. + tasks: FnvHashMap>, + + /// Identifier for the next task to spawn. + next_task_id: TaskId, + + /// List of node tasks to spawn. + to_spawn: SmallVec<[Box + Send>; 8]>, + + /// If no tokio executor is available, we move tasks to this list, and futures are polled on + /// the current thread instead. + local_spawns: Vec + Send>>, + + /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. + events_tx: mpsc::Sender<(FromTaskMessage, TaskId)>, + + /// Receiver side for the events. + events_rx: mpsc::Receiver<(FromTaskMessage, TaskId)> +} + +impl fmt::Debug for Manager +where + T: fmt::Debug +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_map() + .entries(self.tasks.iter().map(|(id, task)| (id, &task.user_data))) + .finish() + } +} + +/// Information about a running task. +/// +/// Contains the sender to deliver event messages to the task, +/// the associated user data and a pending message if any, +/// meant to be delivered to the task via the sender. +struct TaskInfo { + /// channel endpoint to send messages to the task + sender: mpsc::Sender>, + /// task associated data + user_data: T, + /// any pending event to deliver to the task + pending: Option>> +} + +/// Event produced by the [`Manager`]. +#[derive(Debug)] +pub enum Event<'a, I, O, H, E, HE, T, C = PeerId> { + /// A task has been closed. + /// + /// This happens once the node handler closes or an error happens. + TaskClosed { + /// The task that has been closed. + task: ClosedTask, + /// What happened. + result: Error, + /// If the task closed before reaching the node, this contains + /// the handler that was passed to `add_reach_attempt`. + handler: Option + }, + + /// A task has successfully connected to a node. + NodeReached { + /// The task that succeeded. + task: TaskEntry<'a, I, T>, + /// Identifier of the node. + conn_info: C + }, + + /// A task has produced an event. + NodeEvent { + /// The task that produced the event. + task: TaskEntry<'a, I, T>, + /// The produced event. + event: O + } +} + +impl Manager { + /// Creates a new task manager. + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(1); + Self { + tasks: FnvHashMap::default(), + next_task_id: TaskId(0), + to_spawn: SmallVec::new(), + local_spawns: Vec::new(), + events_tx: tx, + events_rx: rx + } + } + + /// Adds to the manager a future that tries to reach a node. + /// + /// This method spawns a task dedicated to resolving this future and + /// processing the node's events. + pub fn add_reach_attempt(&mut self, future: F, user_data: T, handler: H) -> TaskId + where + F: Future + Send + 'static, + H: IntoNodeHandler + Send + 'static, + H::Handler: NodeHandler, InEvent = I, OutEvent = O, Error = HE> + Send + 'static, + E: error::Error + Send + 'static, + HE: error::Error + Send + 'static, + I: Send + 'static, + O: Send + 'static, + ::OutboundOpenInfo: Send + 'static, + M: StreamMuxer + Send + Sync + 'static, + M::OutboundSubstream: Send + 'static, + C: Send + 'static + { + let task_id = self.next_task_id; + self.next_task_id.0 += 1; + + let (tx, rx) = mpsc::channel(1); + self.tasks.insert(task_id, TaskInfo { sender: tx, user_data, pending: None }); + + let task = Box::new(Task::new(task_id, self.events_tx.clone(), rx, future, handler)); + self.to_spawn.push(task); + task_id + } + + /// Adds an existing connection to a node to the collection. + /// + /// This method spawns a task dedicated to processing the node's events. + /// + /// No `NodeReached` event will be emitted for this task, since the node has already been + /// reached. + pub fn add_connection(&mut self, user_data: T, muxer: M, handler: Handler) -> TaskId + where + H: IntoNodeHandler + Send + 'static, + Handler: NodeHandler, InEvent = I, OutEvent = O, Error = HE> + Send + 'static, + E: error::Error + Send + 'static, + HE: error::Error + Send + 'static, + I: Send + 'static, + O: Send + 'static, + ::OutboundOpenInfo: Send + 'static, + M: StreamMuxer + Send + Sync + 'static, + M::OutboundSubstream: Send + 'static, + C: Send + 'static + { + let task_id = self.next_task_id; + self.next_task_id.0 += 1; + + let (tx, rx) = mpsc::channel(1); + self.tasks.insert(task_id, TaskInfo { sender: tx, user_data, pending: None }); + + let task: Task, _, _, _, _, _, _> = + Task::node(task_id, self.events_tx.clone(), rx, HandledNode::new(muxer, handler)); + + self.to_spawn.push(Box::new(task)); + task_id + } + + /// Start sending an event to all the tasks, including the pending ones. + /// + /// After starting a broadcast make sure to finish it with `complete_broadcast`, + /// otherwise starting another broadcast or sending an event directly to a + /// task would overwrite the pending broadcast. + #[must_use] + pub fn start_broadcast(&mut self, event: &I) -> AsyncSink<()> + where + I: Clone + { + if self.complete_broadcast().is_not_ready() { + return AsyncSink::NotReady(()) + } + + for task in self.tasks.values_mut() { + let msg = ToTaskMessage::HandlerEvent(event.clone()); + task.pending = Some(AsyncSink::NotReady(msg)) + } + + AsyncSink::Ready + } + + /// Complete a started broadcast. + #[must_use] + pub fn complete_broadcast(&mut self) -> Async<()> { + let mut ready = true; + + for task in self.tasks.values_mut() { + match task.pending.take() { + Some(AsyncSink::NotReady(msg)) => + match task.sender.start_send(msg) { + Ok(AsyncSink::NotReady(msg)) => { + task.pending = Some(AsyncSink::NotReady(msg)); + ready = false + } + Ok(AsyncSink::Ready) => + if let Ok(Async::NotReady) = task.sender.poll_complete() { + task.pending = Some(AsyncSink::Ready); + ready = false + } + Err(_) => {} + } + Some(AsyncSink::Ready) => + if let Ok(Async::NotReady) = task.sender.poll_complete() { + task.pending = Some(AsyncSink::Ready); + ready = false + } + None => {} + } + } + + if ready { + Async::Ready(()) + } else { + Async::NotReady + } + } + + /// Grants access to an object that allows controlling a task of the collection. + /// + /// Returns `None` if the task id is invalid. + pub fn task(&mut self, id: TaskId) -> Option> { + match self.tasks.entry(id) { + Entry::Occupied(inner) => Some(TaskEntry { inner }), + Entry::Vacant(_) => None, + } + } + + /// Returns a list of all the active tasks. + pub fn tasks<'a>(&'a self) -> impl Iterator + 'a { + self.tasks.keys().cloned() + } + + /// Provides an API similar to `Stream`, except that it cannot produce an error. + pub fn poll(&mut self) -> Async> { + for to_spawn in self.to_spawn.drain() { + // We try to use the default executor, but fall back to polling the task manually if + // no executor is available. This makes it possible to use the core in environments + // outside of tokio. + let executor = tokio_executor::DefaultExecutor::current(); + if let Err(err) = executor.execute(to_spawn) { + self.local_spawns.push(err.into_future()) + } + } + + for n in (0 .. self.local_spawns.len()).rev() { + let mut task = self.local_spawns.swap_remove(n); + match task.poll() { + Ok(Async::Ready(())) => {} + Ok(Async::NotReady) => self.local_spawns.push(task), + // It would normally be desirable to either report or log when a background task + // errors. However the default tokio executor doesn't do anything in case of error, + // and therefore we mimic this behaviour by also not doing anything. + Err(()) => {} + } + } + + let (message, task_id) = loop { + match self.events_rx.poll() { + Ok(Async::Ready(Some((message, task_id)))) => { + // If the task id is no longer in `self.tasks`, that means that the user called + // `close()` on this task earlier. Therefore no new event should be generated + // for this task. + if self.tasks.contains_key(&task_id) { + break (message, task_id) + } + } + Ok(Async::NotReady) => return Async::NotReady, + Ok(Async::Ready(None)) => unreachable!("sender and receiver have same lifetime"), + Err(()) => unreachable!("An `mpsc::Receiver` does not error.") + } + }; + + Async::Ready(match message { + FromTaskMessage::NodeEvent(event) => + Event::NodeEvent { + task: match self.tasks.entry(task_id) { + Entry::Occupied(inner) => TaskEntry { inner }, + Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED") + }, + event + }, + FromTaskMessage::NodeReached(conn_info) => + Event::NodeReached { + task: match self.tasks.entry(task_id) { + Entry::Occupied(inner) => TaskEntry { inner }, + Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED") + }, + conn_info + }, + FromTaskMessage::TaskClosed(result, handler) => { + let entry = self.tasks.remove(&task_id) + .expect("poll_inner only returns valid TaskIds; QED"); + Event::TaskClosed { + task: ClosedTask::new(task_id, entry.sender, entry.user_data), + result, + handler + } + } + }) + } +} + +/// Access to a task in the collection. +pub struct TaskEntry<'a, E, T> { + inner: OccupiedEntry<'a, TaskId, TaskInfo> +} + +impl<'a, E, T> TaskEntry<'a, E, T> { + /// Begin sending an event to the given node. + /// + /// Make sure to finish the send operation with `complete_send_event`. + pub fn start_send_event(&mut self, event: E) -> StartSend { + let msg = ToTaskMessage::HandlerEvent(event); + if let AsyncSink::NotReady(msg) = self.start_send_event_msg(msg)? { + if let ToTaskMessage::HandlerEvent(event) = msg { + return Ok(AsyncSink::NotReady(event)) + } else { + unreachable!("we tried to send an handler event, so we get one back if not ready") + } + } + Ok(AsyncSink::Ready) + } + + /// Finish a send operation started with `start_send_event`. + pub fn complete_send_event(&mut self) -> Poll<(), ()> { + self.complete_send_event_msg() + } + + /// Returns the user data associated with the task. + pub fn user_data(&self) -> &T { + &self.inner.get().user_data + } + + /// Returns the user data associated with the task. + pub fn user_data_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().user_data + } + + /// Returns the task id. + pub fn id(&self) -> TaskId { + *self.inner.key() + } + + /// Closes the task. Returns the user data. + /// + /// No further event will be generated for this task, but the connection inside the task will + /// continue to run until the `ClosedTask` is destroyed. + pub fn close(self) -> ClosedTask { + let id = *self.inner.key(); + let task = self.inner.remove(); + ClosedTask::new(id, task.sender, task.user_data) + } + + /// Gives ownership of a closed task. + /// As soon as our task (`self`) has some acknowledgment from the remote + /// that its connection is alive, it will close the connection with `other`. + /// + /// Make sure to complete this operation with `complete_take_over`. + #[must_use] + pub fn start_take_over(&mut self, t: ClosedTask) -> StartTakeOver> { + // It is possible that the sender is closed if the background task has already finished + // but the local state hasn't been updated yet because we haven't been polled in the + // meanwhile. + let id = t.id(); + match self.start_send_event_msg(ToTaskMessage::TakeOver(t.sender)) { + Ok(AsyncSink::Ready) => StartTakeOver::Ready(t.user_data), + Ok(AsyncSink::NotReady(ToTaskMessage::TakeOver(sender))) => + StartTakeOver::NotReady(ClosedTask::new(id, sender, t.user_data)), + Ok(AsyncSink::NotReady(_)) => + unreachable!("We tried to send a take over message, so we get one back."), + Err(()) => StartTakeOver::Gone + } + } + + /// Finish take over started by `start_take_over`. + pub fn complete_take_over(&mut self) -> Poll<(), ()> { + self.complete_send_event_msg() + } + + /// Begin to send a message to the task. + /// + /// The API mimicks the one of [`futures::Sink`]. If this method returns + /// `Ok(AsyncSink::Ready)` drive the sending to completion with + /// `complete_send_event_msg`. If the receiving end does not longer exist, + /// i.e. the task has ended, we return this information as an error. + fn start_send_event_msg(&mut self, msg: ToTaskMessage) -> StartSend, ()> { + // We first drive any pending send to completion before starting another one. + if self.complete_send_event_msg()?.is_ready() { + self.inner.get_mut().pending = Some(AsyncSink::NotReady(msg)); + Ok(AsyncSink::Ready) + } else { + Ok(AsyncSink::NotReady(msg)) + } + } + + /// Complete event message deliver started by `start_send_event_msg`. + fn complete_send_event_msg(&mut self) -> Poll<(), ()> { + // It is possible that the sender is closed if the background task has already finished + // but the local state hasn't been updated yet because we haven't been polled in the + // meanwhile. + let task = self.inner.get_mut(); + let state = + if let Some(state) = task.pending.take() { + state + } else { + return Ok(Async::Ready(())) + }; + match state { + AsyncSink::NotReady(msg) => + match task.sender.start_send(msg).map_err(|_| ())? { + AsyncSink::Ready => + if task.sender.poll_complete().map_err(|_| ())?.is_not_ready() { + task.pending = Some(AsyncSink::Ready); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + AsyncSink::NotReady(msg) => { + task.pending = Some(AsyncSink::NotReady(msg)); + Ok(Async::NotReady) + } + } + AsyncSink::Ready => + if task.sender.poll_complete().map_err(|_| ())?.is_not_ready() { + task.pending = Some(AsyncSink::Ready); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + } + } +} + +impl fmt::Debug for TaskEntry<'_, E, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("TaskEntry") + .field(&self.id()) + .field(self.user_data()) + .finish() + } +} + +/// Result of [`TaskEntry::start_take_over`]. +#[derive(Debug)] +pub enum StartTakeOver { + /// The take over message has been enqueued. + /// Complete the take over with [`TaskEntry::complete_take_over`]. + Ready(A), + /// Not ready to send the take over message to the task. + NotReady(B), + /// The task to send the take over message is no longer there. + Gone +} + +/// Task after it has been closed. +/// +/// The connection to the remote is potentially still going on, but no new +/// event for this task will be received. +pub struct ClosedTask { + /// Identifier of the task that closed. + /// + /// No longer corresponds to anything, but can be reported to the user. + id: TaskId, + + /// The channel to the task. + /// + /// The task will continue to work for as long as this channel is alive, + /// but events produced by it are ignored. + sender: mpsc::Sender>, + + /// The data provided by the user. + user_data: T +} + +impl ClosedTask { + /// Create a new `ClosedTask` value. + fn new(id: TaskId, sender: mpsc::Sender>, user_data: T) -> Self { + Self { id, sender, user_data } + } + + /// Returns the task id. + /// + /// Note that this task is no longer managed and therefore calling + /// `Manager::task()` with this ID will fail. + pub fn id(&self) -> TaskId { + self.id + } + + /// Returns the user data associated with the task. + pub fn user_data(&self) -> &T { + &self.user_data + } + + /// Returns the user data associated with the task. + pub fn user_data_mut(&mut self) -> &mut T { + &mut self.user_data + } + + /// Finish destroying the task and yield the user data. + /// This closes the connection to the remote. + pub fn into_user_data(self) -> T { + self.user_data + } +} + +impl fmt::Debug for ClosedTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("ClosedTask") + .field(&self.id) + .field(&self.user_data) + .finish() + } +} + diff --git a/core/src/nodes/tasks/mod.rs b/core/src/nodes/tasks/mod.rs new file mode 100644 index 00000000000..baa1a081eac --- /dev/null +++ b/core/src/nodes/tasks/mod.rs @@ -0,0 +1,45 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Management of tasks handling nodes. +//! +//! The core type is a [`task::Task`], which implements [`futures::Future`] +//! and connects and handles a node. A task receives and sends messages +//! ([`tasks::FromTaskMessage`], [`tasks::ToTaskMessage`]) to the outside. +//! +//! A set of tasks is managed by a [`Manager`] which creates tasks when a +//! node should be connected to (cf. [`Manager::add_reach_attempt`]) or +//! an existing connection to a node should be driven forward (cf. +//! [`Manager::add_connection`]). Tasks can be referred to by [`TaskId`] +//! and messages can be sent to individual tasks or all (cf. +//! [`Manager::start_broadcast`]). Messages produces by tasks can be +//! retrieved by polling the manager (cf. [`Manager::poll`]). + +mod error; +mod manager; +mod task; + +pub use error::Error; +pub use manager::{ClosedTask, TaskEntry, Manager, Event, StartTakeOver}; + +/// Task identifier. +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct TaskId(usize); + diff --git a/core/src/nodes/tasks/task.rs b/core/src/nodes/tasks/task.rs new file mode 100644 index 00000000000..05b801e16b4 --- /dev/null +++ b/core/src/nodes/tasks/task.rs @@ -0,0 +1,367 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{ + muxing::StreamMuxer, + nodes::{ + handled_node::{HandledNode, IntoNodeHandler, NodeHandler}, + node::{Close, Substream} + } +}; +use futures::{prelude::*, stream, sync::mpsc}; +use smallvec::SmallVec; +use super::{TaskId, Error}; + +/// Message to transmit from the public API to a task. +#[derive(Debug)] +pub enum ToTaskMessage { + /// An event to transmit to the node handler. + HandlerEvent(T), + /// When received, stores the parameter inside the task and keeps it alive + /// until we have an acknowledgment that the remote has accepted our handshake. + TakeOver(mpsc::Sender>) +} + +/// Message to transmit from a task to the public API. +#[derive(Debug)] +pub enum FromTaskMessage { + /// A connection to a node has succeeded. + NodeReached(C), + /// The task closed. + TaskClosed(Error, Option), + /// An event from the node. + NodeEvent(T) +} + +/// Implementation of [`Future`] that handles a single node. +pub struct Task +where + M: StreamMuxer, + H: IntoNodeHandler, + H::Handler: NodeHandler> +{ + /// The ID of this task. + id: TaskId, + + /// Sender to transmit messages to the outside. + sender: mpsc::Sender<(FromTaskMessage::Error, C>, TaskId)>, + + /// Receiver of messages from the outsize. + receiver: stream::Fuse>>, + + /// Inner state of this `Task`. + state: State, + + /// Channels to keep alive for as long as we don't have an acknowledgment from the remote. + taken_over: SmallVec<[mpsc::Sender>; 1]> +} + +impl Task +where + M: StreamMuxer, + H: IntoNodeHandler, + H::Handler: NodeHandler> +{ + /// Create a new task to connect and handle some node. + pub fn new ( + i: TaskId, + s: mpsc::Sender<(FromTaskMessage::Error, C>, TaskId)>, + r: mpsc::Receiver>, + f: F, + h: H + ) -> Self { + Task { + id: i, + sender: s, + receiver: r.fuse(), + state: State::Future { future: f, handler: h, events_buffer: Vec::new() }, + taken_over: SmallVec::new() + } + } + + /// Create a task for an existing node we are already connected to. + pub fn node ( + i: TaskId, + s: mpsc::Sender<(FromTaskMessage::Error, C>, TaskId)>, + r: mpsc::Receiver>, + n: HandledNode + ) -> Self { + Task { + id: i, + sender: s, + receiver: r.fuse(), + state: State::Node(n), + taken_over: SmallVec::new() + } + } +} + +/// State of the future. +enum State +where + M: StreamMuxer, + H: IntoNodeHandler, + H::Handler: NodeHandler> +{ + /// Future to resolve to connect to the node. + Future { + /// The future that will attempt to reach the node. + future: F, + /// The handler that will be used to build the `HandledNode`. + handler: H, + /// While we are dialing the future, we need to buffer the events received on + /// `receiver` so that they get delivered once dialing succeeds. We can't simply leave + /// events in `receiver` because we have to detect if it gets closed. + events_buffer: Vec + }, + + /// An event should be sent to the outside world. + SendEvent { + /// The node, if available. + node: Option>, + /// The actual event message to send. + event: FromTaskMessage::Error, C> + }, + + /// We started sending an event, now drive the sending to completion. + /// + /// The `bool` parameter determines if we transition to `State::Node` + /// afterwards or to `State::Closing` (assuming we have `Some` node, + /// otherwise the task will end). + PollComplete(Option>, bool), + + /// Fully functional node. + Node(HandledNode), + + /// Node closing. + Closing(Close), + + /// Interim state that can only be observed externally if the future + /// resolved to a value previously. + Undefined +} + +impl Future for Task +where + M: StreamMuxer, + F: Future, + H: IntoNodeHandler, + H::Handler: NodeHandler, InEvent = I, OutEvent = O> +{ + type Item = (); + type Error = (); + + // NOTE: It is imperative to always consume all incoming event messages + // first in order to not prevent the outside from making progress because + // they are blocked on the channel capacity. + fn poll(&mut self) -> Poll<(), ()> { + 'poll: loop { + match std::mem::replace(&mut self.state, State::Undefined) { + State::Future { mut future, handler, mut events_buffer } => { + // If self.receiver is closed, we stop the task. + loop { + match self.receiver.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) => + events_buffer.push(event), + Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) => + self.taken_over.push(take_over), + Err(()) => unreachable!("An `mpsc::Receiver` does not error.") + } + } + // Check if dialing succeeded. + match future.poll() { + Ok(Async::Ready((conn_info, muxer))) => { + let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info)); + for event in events_buffer { + node.inject_event(event) + } + self.state = State::SendEvent { + node: Some(node), + event: FromTaskMessage::NodeReached(conn_info) + } + } + Ok(Async::NotReady) => { + self.state = State::Future { future, handler, events_buffer }; + return Ok(Async::NotReady) + } + Err(e) => { + let event = FromTaskMessage::TaskClosed(Error::Reach(e), Some(handler)); + self.state = State::SendEvent { node: None, event } + } + } + } + State::Node(mut node) => { + // Start by handling commands received from the outside of the task. + loop { + match self.receiver.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) => + node.inject_event(event), + Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) => + self.taken_over.push(take_over), + Ok(Async::Ready(None)) => { + // Node closed by the external API; start closing. + self.state = State::Closing(node.close()); + continue 'poll + } + Err(()) => unreachable!("An `mpsc::Receiver` does not error.") + } + } + // Process the node. + loop { + if !self.taken_over.is_empty() && node.is_remote_acknowledged() { + self.taken_over.clear() + } + match node.poll() { + Ok(Async::NotReady) => { + self.state = State::Node(node); + return Ok(Async::NotReady) + } + Ok(Async::Ready(event)) => { + self.state = State::SendEvent { + node: Some(node), + event: FromTaskMessage::NodeEvent(event) + }; + continue 'poll + } + Err(err) => { + let event = FromTaskMessage::TaskClosed(Error::Node(err), None); + self.state = State::SendEvent { node: None, event }; + continue 'poll + } + } + } + } + // Deliver an event to the outside. + State::SendEvent { mut node, event } => { + loop { + match self.receiver.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) => + if let Some(ref mut n) = node { + n.inject_event(event) + } + Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) => + self.taken_over.push(take_over), + Ok(Async::Ready(None)) => + // Node closed by the external API; start closing. + if let Some(n) = node { + self.state = State::Closing(n.close()); + continue 'poll + } else { + return Ok(Async::Ready(())) // end task + } + Err(()) => unreachable!("An `mpsc::Receiver` does not error.") + } + } + // Check if this task is about to close. We pass the flag to + // the next state so it knows what to do. + let close = + if let FromTaskMessage::TaskClosed(..) = event { + true + } else { + false + }; + match self.sender.start_send((event, self.id)) { + Ok(AsyncSink::NotReady((event, _))) => { + self.state = State::SendEvent { node, event }; + return Ok(Async::NotReady) + } + Ok(AsyncSink::Ready) => self.state = State::PollComplete(node, close), + Err(_) => { + if let Some(n) = node { + self.state = State::Closing(n.close()); + continue 'poll + } + // We can not communicate to the outside and there is no + // node to handle, so this is the end of this task. + return Ok(Async::Ready(())) + } + } + } + // We started delivering an event, now try to complete the sending. + State::PollComplete(mut node, close) => { + loop { + match self.receiver.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) => + if let Some(ref mut n) = node { + n.inject_event(event) + } + Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) => + self.taken_over.push(take_over), + Ok(Async::Ready(None)) => + // Node closed by the external API; start closing. + if let Some(n) = node { + self.state = State::Closing(n.close()); + continue 'poll + } else { + return Ok(Async::Ready(())) // end task + } + Err(()) => unreachable!("An `mpsc::Receiver` does not error.") + } + } + match self.sender.poll_complete() { + Ok(Async::NotReady) => { + self.state = State::PollComplete(node, close); + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) => + if let Some(n) = node { + if close { + self.state = State::Closing(n.close()) + } else { + self.state = State::Node(n) + } + } else { + // Since we have no node we terminate this task. + assert!(close); + return Ok(Async::Ready(())) + } + Err(_) => { + if let Some(n) = node { + self.state = State::Closing(n.close()); + continue 'poll + } + // We can not communicate to the outside and there is no + // node to handle, so this is the end of this task. + return Ok(Async::Ready(())) + } + } + } + State::Closing(mut closing) => + match closing.poll() { + Ok(Async::Ready(())) | Err(_) => + return Ok(Async::Ready(())), // end task + Ok(Async::NotReady) => { + self.state = State::Closing(closing); + return Ok(Async::NotReady) + } + } + // This happens if a previous poll has resolved the future. + // The API contract of futures is that we should not be polled again. + State::Undefined => panic!("`Task::poll()` called after completion.") + } + } + } +} + diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index dcaa9d13578..f092c0110eb 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -21,8 +21,7 @@ use crate::{ PeerId, nodes::collection::ConnectionInfo, - nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}, - nodes::handled_node_tasks::IntoNodeHandler, + nodes::handled_node::{IntoNodeHandler, NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}, nodes::raw_swarm::ConnectedPoint, protocols_handler::{KeepAlive, ProtocolsHandler, IntoProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ diff --git a/core/src/swarm/swarm.rs b/core/src/swarm/swarm.rs index e2d0e1f6968..f5ca9e6136a 100644 --- a/core/src/swarm/swarm.rs +++ b/core/src/swarm/swarm.rs @@ -34,7 +34,7 @@ use crate::{ use futures::prelude::*; use smallvec::SmallVec; use std::{error, fmt, io, ops::{Deref, DerefMut}}; -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; /// Contains the state of the network, plus the way it should behave. pub type Swarm = ExpandedSwarm< @@ -78,6 +78,9 @@ where /// List of nodes for which we deny any incoming connection. banned_peers: HashSet, + + /// Pending event messages to be delivered. + send_events_to_complete: VecDeque<(PeerId, AsyncSink)> } impl Deref for @@ -320,6 +323,29 @@ where TBehaviour: NetworkBehaviour, }, } + // Try to deliver pending events. + let mut remaining = self.send_events_to_complete.len(); + while let Some((id, pending)) = self.send_events_to_complete.pop_front() { + if let Some(mut peer) = self.raw_swarm.peer(id.clone()).into_connected() { + if let AsyncSink::NotReady(e) = pending { + if let Ok(a@AsyncSink::NotReady(_)) = peer.start_send_event(e) { + self.send_events_to_complete.push_back((id, a)) + } else if let Ok(Async::NotReady) = peer.complete_send_event() { + self.send_events_to_complete.push_back((id, AsyncSink::Ready)) + } + } else if let Ok(Async::NotReady) = peer.complete_send_event() { + self.send_events_to_complete.push_back((id, AsyncSink::Ready)) + } + } + remaining -= 1; + if remaining == 0 { + break + } + } + if !self.send_events_to_complete.is_empty() { + return Ok(Async::NotReady) + } + let behaviour_poll = { let mut parameters = SwarmPollParameters { local_peer_id: &mut self.raw_swarm.local_peer_id(), @@ -347,8 +373,12 @@ where TBehaviour: NetworkBehaviour, } }, Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => { - if let Some(mut peer) = self.raw_swarm.peer(peer_id).into_connected() { - peer.send_event(event); + if let Some(mut peer) = self.raw_swarm.peer(peer_id.clone()).into_connected() { + if let Ok(a@AsyncSink::NotReady(_)) = peer.start_send_event(event) { + self.send_events_to_complete.push_back((peer_id, a)) + } else if let Ok(Async::NotReady) = peer.complete_send_event() { + self.send_events_to_complete.push_back((peer_id, AsyncSink::Ready)) + } } }, Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => { @@ -465,6 +495,7 @@ where TBehaviour: NetworkBehaviour, listened_addrs: SmallVec::new(), external_addrs: Addresses::default(), banned_peers: HashSet::new(), + send_events_to_complete: VecDeque::new() } } } diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 965ddd7684b..751dcbb4505 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -29,7 +29,8 @@ use rw_stream_sink::RwStreamSink; use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64}; lazy_static! { - static ref HUB: Mutex>>> = Mutex::new(FnvHashMap::default()); + static ref HUB: Mutex>>> = + Mutex::new(FnvHashMap::default()); } /// Transport that supports `/memory/N` multiaddresses. @@ -184,7 +185,7 @@ impl Stream for Listener { return Ok(Async::Ready(Some(ListenerEvent::NewAddress(self.addr.clone())))) } let channel = try_ready!(Ok(self.receiver.poll() - .expect("An unbounded receiver never panics; QED"))); + .expect("Life listeners always have a sender."))); let channel = match channel { Some(c) => c, None => return Ok(Async::Ready(None)) diff --git a/core/tests/raw_swarm_simult.rs b/core/tests/raw_swarm_simult.rs index 811c9c2eb6f..d099b4d017e 100644 --- a/core/tests/raw_swarm_simult.rs +++ b/core/tests/raw_swarm_simult.rs @@ -84,6 +84,7 @@ where } #[test] +#[ignore] fn raw_swarm_simultaneous_connect() { // Checks whether two swarms dialing each other simultaneously properly works. From f58f641d68d2f7148742c3b5988ddc32e872ba6b Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 8 Jul 2019 10:47:45 +0200 Subject: [PATCH 2/6] `take_over_to_complete` can be an `Option`. Since it is always holding just a single pending message. --- core/src/nodes/raw_swarm.rs | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 166973281c1..742c18915b3 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -45,7 +45,6 @@ use fnv::FnvHashMap; use futures::{prelude::*, future}; use std::{ collections::hash_map::{Entry, OccupiedEntry}, - collections::VecDeque, error, fmt, hash::Hash, @@ -74,8 +73,8 @@ where /// Max numer of incoming connections. incoming_limit: Option, - /// Unfinished take over messages to be delivered. - take_over_to_complete: VecDeque<(TPeerId, AsyncSink>)> + /// Unfinished take over message to be delivered. + take_over_to_complete: Option<(TPeerId, AsyncSink>)> } impl fmt::Debug for @@ -91,7 +90,7 @@ where .field("active_nodes", &self.active_nodes) .field("reach_attempts", &self.reach_attempts) .field("incoming_limit", &self.incoming_limit) - .field("take_over_to_complete", &self.take_over_to_complete.len()) + .field("take_over_to_complete", &self.take_over_to_complete) .finish() } } @@ -666,7 +665,7 @@ where connected_points: Default::default(), }, incoming_limit: None, - take_over_to_complete: VecDeque::new() + take_over_to_complete: None } } @@ -684,7 +683,7 @@ where other_reach_attempts: Vec::new(), connected_points: Default::default(), }, - take_over_to_complete: VecDeque::new() + take_over_to_complete: None } } @@ -987,25 +986,20 @@ where } // Attempt to deliver any pending take over messages. - let mut remaining = self.take_over_to_complete.len(); - while let Some((id, interrupted)) = self.take_over_to_complete.pop_front() { + if let Some((id, interrupted)) = self.take_over_to_complete.take() { if let Some(mut peer) = self.active_nodes.peer_mut(&id) { if let AsyncSink::NotReady(i) = interrupted { if let StartTakeOver::NotReady(i) = peer.start_take_over(i) { - self.take_over_to_complete.push_back((id, AsyncSink::NotReady(i))) + self.take_over_to_complete = Some((id, AsyncSink::NotReady(i))) } else if let Ok(Async::NotReady) = peer.complete_take_over() { - self.take_over_to_complete.push_back((id, AsyncSink::Ready)) + self.take_over_to_complete = Some((id, AsyncSink::Ready)) } } else if let Ok(Async::NotReady) = peer.complete_take_over() { - self.take_over_to_complete.push_back((id, AsyncSink::Ready)) + self.take_over_to_complete = Some((id, AsyncSink::Ready)) } } - remaining -= 1; - if remaining == 0 { - break - } } - if !self.take_over_to_complete.is_empty() { + if self.take_over_to_complete.is_some() { return Async::NotReady } @@ -1063,11 +1057,11 @@ where attempts; QED"); let mut peer = self.active_nodes.peer_mut(&peer_id).unwrap(); if let StartTakeOver::NotReady(i) = peer.start_take_over(interrupted) { - self.take_over_to_complete.push_back((peer_id, AsyncSink::NotReady(i))); + self.take_over_to_complete = Some((peer_id, AsyncSink::NotReady(i))); return Async::NotReady } if let Ok(Async::NotReady) = peer.complete_take_over() { - self.take_over_to_complete.push_back((peer_id, AsyncSink::Ready)); + self.take_over_to_complete = Some((peer_id, AsyncSink::Ready)); return Async::NotReady } } From b0d6a770f5ca19ecea33685d761ac8c59c6ecd33 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 8 Jul 2019 10:59:22 +0200 Subject: [PATCH 3/6] `send_event_to_complete` can be an `Option`. --- swarm/src/lib.rs | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index b0904bdd71e..449dbecbd6c 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -93,7 +93,7 @@ use libp2p_core::{ use registry::{Addresses, AddressIntoIter}; use smallvec::SmallVec; use std::{error, fmt, io, ops::{Deref, DerefMut}}; -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; /// Contains the state of the network, plus the way it should behave. pub type Swarm = ExpandedSwarm< @@ -138,8 +138,8 @@ where /// List of nodes for which we deny any incoming connection. banned_peers: HashSet, - /// Pending event messages to be delivered. - send_events_to_complete: VecDeque<(PeerId, AsyncSink)> + /// Pending event message to be delivered. + send_event_to_complete: Option<(PeerId, AsyncSink)> } impl Deref for @@ -382,26 +382,21 @@ where TBehaviour: NetworkBehaviour, }, } - // Try to deliver pending events. - let mut remaining = self.send_events_to_complete.len(); - while let Some((id, pending)) = self.send_events_to_complete.pop_front() { + // Try to deliver pending event. + if let Some((id, pending)) = self.send_event_to_complete.take() { if let Some(mut peer) = self.raw_swarm.peer(id.clone()).into_connected() { if let AsyncSink::NotReady(e) = pending { if let Ok(a@AsyncSink::NotReady(_)) = peer.start_send_event(e) { - self.send_events_to_complete.push_back((id, a)) + self.send_event_to_complete = Some((id, a)) } else if let Ok(Async::NotReady) = peer.complete_send_event() { - self.send_events_to_complete.push_back((id, AsyncSink::Ready)) + self.send_event_to_complete = Some((id, AsyncSink::Ready)) } } else if let Ok(Async::NotReady) = peer.complete_send_event() { - self.send_events_to_complete.push_back((id, AsyncSink::Ready)) + self.send_event_to_complete = Some((id, AsyncSink::Ready)) } } - remaining -= 1; - if remaining == 0 { - break - } } - if !self.send_events_to_complete.is_empty() { + if self.send_event_to_complete.is_some() { return Ok(Async::NotReady) } @@ -434,9 +429,9 @@ where TBehaviour: NetworkBehaviour, Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => { if let Some(mut peer) = self.raw_swarm.peer(peer_id.clone()).into_connected() { if let Ok(a@AsyncSink::NotReady(_)) = peer.start_send_event(event) { - self.send_events_to_complete.push_back((peer_id, a)) + self.send_event_to_complete = Some((peer_id, a)) } else if let Ok(Async::NotReady) = peer.complete_send_event() { - self.send_events_to_complete.push_back((peer_id, AsyncSink::Ready)) + self.send_event_to_complete = Some((peer_id, AsyncSink::Ready)) } } }, @@ -554,7 +549,7 @@ where TBehaviour: NetworkBehaviour, listened_addrs: SmallVec::new(), external_addrs: Addresses::default(), banned_peers: HashSet::new(), - send_events_to_complete: VecDeque::new() + send_event_to_complete: None } } } From ce247f001229a4735442362c3b825bbc75185922 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 8 Jul 2019 12:02:37 +0200 Subject: [PATCH 4/6] Update core/src/nodes/tasks/manager.rs Co-Authored-By: Pierre Krieger --- core/src/nodes/tasks/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index b29f577b3eb..c14c7edfd7a 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -171,7 +171,7 @@ impl Manager { let task_id = self.next_task_id; self.next_task_id.0 += 1; - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(4); self.tasks.insert(task_id, TaskInfo { sender: tx, user_data, pending: None }); let task = Box::new(Task::new(task_id, self.events_tx.clone(), rx, future, handler)); From 1b40cbc7c70cf35ff3be27ea26306f80171a1eea Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 8 Jul 2019 12:02:46 +0200 Subject: [PATCH 5/6] Update core/src/nodes/tasks/manager.rs Co-Authored-By: Pierre Krieger --- core/src/nodes/tasks/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index c14c7edfd7a..33643aaac04 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -201,7 +201,7 @@ impl Manager { let task_id = self.next_task_id; self.next_task_id.0 += 1; - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(4); self.tasks.insert(task_id, TaskInfo { sender: tx, user_data, pending: None }); let task: Task, _, _, _, _, _, _> = From e952cb79e4cc8a88968a66ac65f4c38eb402969d Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 8 Jul 2019 15:07:55 +0200 Subject: [PATCH 6/6] Add comments to explain the need to flush sends ... of take-over and event messages delivered over Sinks. --- core/src/nodes/raw_swarm.rs | 7 +++++++ swarm/src/lib.rs | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 742c18915b3..96fa81fa70a 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -74,6 +74,13 @@ where incoming_limit: Option, /// Unfinished take over message to be delivered. + /// + /// If the pair's second element is `AsyncSink::NotReady`, the take over + /// message has yet to be sent using `PeerMut::start_take_over`. + /// + /// If the pair's second element is `AsyncSink::Ready`, the take over + /// message has been sent and needs to be flushed using + /// `PeerMut::complete_take_over`. take_over_to_complete: Option<(TPeerId, AsyncSink>)> } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 449dbecbd6c..79ec53a1c5e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -139,6 +139,13 @@ where banned_peers: HashSet, /// Pending event message to be delivered. + /// + /// If the pair's second element is `AsyncSink::NotReady`, the event + /// message has yet to be sent using `PeerMut::start_send_event`. + /// + /// If the pair's second element is `AsyncSink::Ready`, the event + /// message has been sent and needs to be flushed using + /// `PeerMut::complete_send_event`. send_event_to_complete: Option<(PeerId, AsyncSink)> }