Skip to content

Commit 6aba796

Browse files
twittnertomaka
authored andcommitted
Replace unbounded channels with bounded ones. (#1191)
* Replace unbounded channels with bounded ones. To remove the unbounded channels used for communicating with node tasks an API similar to `futures::Sink` is used, i.e. sending is split into a start and complete phase. The start phase returns `StartSend` and first attempts to complete any pending send operations. Completing the send means polling until `Poll::Ready(())` is returned. In addition this PR has split the `handled_node_tasks` module into several smaller ones (cf. `nodes::tasks`) and renamed some types: - `nodes::handled_node_tasks::NodeTask` -> `nodes::tasks::task::Task` - `nodes::handled_node_tasks::NodeTaskInner` -> `nodes::tasks::task::State` - `nodes::handled_node_tasks::NodeTasks` -> `nodes::tasks::Manager` - `nodes::handled_node_tasks::TaskClosedEvent` -> `nodes::tasks::Error` - `nodes::handled_node_tasks::HandledNodesEvent` -> `nodes::tasks::Event` - `nodes::handled_node_tasks::Task` -> `nodes::tasks::TaskEntry` - `nodes::handled_node_tasks::ExtToInMessage` -> `nodes::tasks::task::ToTaskMessage` - `nodes::handled_node_tasks::InToExtMessage` -> `nodes::tasks::task::FromTaskMessage` * `take_over_to_complete` can be an `Option`. Since it is always holding just a single pending message. * `send_event_to_complete` can be an `Option`. * Update core/src/nodes/tasks/manager.rs Co-Authored-By: Pierre Krieger <[email protected]> * Update core/src/nodes/tasks/manager.rs Co-Authored-By: Pierre Krieger <[email protected]> * Add comments to explain the need to flush sends ... of take-over and event messages delivered over Sinks.
1 parent a0d278a commit 6aba796

File tree

14 files changed

+1287
-806
lines changed

14 files changed

+1287
-806
lines changed

core/src/nodes/collection.rs

+57-33
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ use crate::{
2323
muxing::StreamMuxer,
2424
nodes::{
2525
node::Substream,
26-
handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent},
27-
handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId, ClosedTask},
28-
handled_node::{HandledNodeError, NodeHandler}
26+
handled_node::{HandledNodeError, IntoNodeHandler, NodeHandler},
27+
tasks::{self, ClosedTask, TaskEntry, TaskId}
2928
}
3029
};
3130
use fnv::FnvHashMap;
3231
use futures::prelude::*;
3332
use std::{error, fmt, hash::Hash, mem};
3433

34+
pub use crate::nodes::tasks::StartTakeOver;
35+
3536
mod tests;
3637

3738
/// Implementation of `Stream` that handles a collection of nodes.
@@ -40,7 +41,7 @@ pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerEr
4041
///
4142
/// The user data contains the state of the task. If `Connected`, then a corresponding entry
4243
/// must be present in `nodes`.
43-
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TConnInfo, TUserData>, TConnInfo>,
44+
inner: tasks::Manager<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TConnInfo, TUserData>, TConnInfo>,
4445

4546
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
4647
/// must always be in the `Connected` state.
@@ -310,7 +311,7 @@ where
310311
#[inline]
311312
pub fn new() -> Self {
312313
CollectionStream {
313-
inner: HandledNodesTasks::new(),
314+
inner: tasks::Manager::new(),
314315
nodes: Default::default(),
315316
}
316317
}
@@ -357,12 +358,17 @@ where
357358
}
358359

359360
/// Sends an event to all nodes.
360-
#[inline]
361-
pub fn broadcast_event(&mut self, event: &TInEvent)
362-
where TInEvent: Clone,
361+
#[must_use]
362+
pub fn start_broadcast(&mut self, event: &TInEvent) -> AsyncSink<()>
363+
where
364+
TInEvent: Clone
363365
{
364-
// TODO: remove the ones we're not connected to?
365-
self.inner.broadcast_event(event)
366+
self.inner.start_broadcast(event)
367+
}
368+
369+
#[must_use]
370+
pub fn complete_broadcast(&mut self) -> Async<()> {
371+
self.inner.complete_broadcast()
366372
}
367373

368374
/// Adds an existing connection to a node to the collection.
@@ -383,8 +389,8 @@ where
383389
TConnInfo: Clone + Send + 'static,
384390
TPeerId: Clone,
385391
{
386-
// Calling `HandledNodesTasks::add_connection` is the same as calling
387-
// `HandledNodesTasks::add_reach_attempt`, except that we don't get any `NodeReached` event.
392+
// Calling `tasks::Manager::add_connection` is the same as calling
393+
// `tasks::Manager::add_reach_attempt`, except that we don't get any `NodeReached` event.
388394
// We therefore implement this method the same way as calling `add_reach_attempt` followed
389395
// with simulating a received `NodeReached` event and accepting it.
390396

@@ -451,29 +457,28 @@ where
451457
};
452458

453459
match item {
454-
HandledNodesEvent::TaskClosed { task, result, handler } => {
460+
tasks::Event::TaskClosed { task, result, handler } => {
455461
let id = task.id();
456462
let user_data = task.into_user_data();
457463

458464
match (user_data, result, handler) {
459-
(TaskState::Pending, TaskClosedEvent::Reach(err), Some(handler)) => {
465+
(TaskState::Pending, tasks::Error::Reach(err), Some(handler)) => {
460466
Async::Ready(CollectionEvent::ReachError {
461467
id: ReachAttemptId(id),
462468
error: err,
463469
handler,
464470
})
465471
},
466-
(TaskState::Pending, TaskClosedEvent::Node(_), _) => {
472+
(TaskState::Pending, tasks::Error::Node(_), _) => {
467473
panic!("We switch the task state to Connected once we're connected, and \
468-
a TaskClosedEvent::Node can only happen after we're \
469-
connected; QED");
474+
a tasks::Error::Node can only happen after we're connected; QED");
470475
},
471-
(TaskState::Pending, TaskClosedEvent::Reach(_), None) => {
472-
// TODO: this could be improved in the API of HandledNodesTasks
473-
panic!("The HandledNodesTasks is guaranteed to always return the handler \
474-
when producing a TaskClosedEvent::Reach error");
476+
(TaskState::Pending, tasks::Error::Reach(_), None) => {
477+
// TODO: this could be improved in the API of tasks::Manager
478+
panic!("The tasks::Manager is guaranteed to always return the handler \
479+
when producing a tasks::Error::Reach error");
475480
},
476-
(TaskState::Connected(conn_info, user_data), TaskClosedEvent::Node(err), _handler) => {
481+
(TaskState::Connected(conn_info, user_data), tasks::Error::Node(err), _handler) => {
477482
debug_assert!(_handler.is_none());
478483
let _node_task_id = self.nodes.remove(conn_info.peer_id());
479484
debug_assert_eq!(_node_task_id, Some(id));
@@ -483,13 +488,13 @@ where
483488
user_data,
484489
})
485490
},
486-
(TaskState::Connected(_, _), TaskClosedEvent::Reach(_), _) => {
487-
panic!("A TaskClosedEvent::Reach can only happen before we are connected \
491+
(TaskState::Connected(_, _), tasks::Error::Reach(_), _) => {
492+
panic!("A tasks::Error::Reach can only happen before we are connected \
488493
to a node; therefore the TaskState won't be Connected; QED");
489494
},
490495
}
491496
},
492-
HandledNodesEvent::NodeReached { task, conn_info } => {
497+
tasks::Event::NodeReached { task, conn_info } => {
493498
let id = task.id();
494499
drop(task);
495500
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
@@ -498,7 +503,7 @@ where
498503
conn_info: Some(conn_info),
499504
}))
500505
},
501-
HandledNodesEvent::NodeEvent { task, event } => {
506+
tasks::Event::NodeEvent { task, event } => {
502507
let conn_info = match task.user_data() {
503508
TaskState::Connected(conn_info, _) => conn_info.clone(),
504509
_ => panic!("we can only receive NodeEvent events from a task after we \
@@ -566,7 +571,7 @@ where
566571

567572
/// Access to a peer in the collection.
568573
pub struct PeerMut<'a, TInEvent, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
569-
inner: HandledNodesTask<'a, TInEvent, TaskState<TConnInfo, TUserData>>,
574+
inner: TaskEntry<'a, TInEvent, TaskState<TConnInfo, TUserData>>,
570575
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
571576
}
572577

@@ -612,9 +617,13 @@ where
612617
}
613618

614619
/// Sends an event to the given node.
615-
#[inline]
616-
pub fn send_event(&mut self, event: TInEvent) {
617-
self.inner.send_event(event)
620+
pub fn start_send_event(&mut self, event: TInEvent) -> StartSend<TInEvent, ()> {
621+
self.inner.start_send_event(event)
622+
}
623+
624+
/// Complete sending an event message initiated by `start_send_event`.
625+
pub fn complete_send_event(&mut self) -> Poll<(), ()> {
626+
self.inner.complete_send_event()
618627
}
619628

620629
/// Closes the connections to this node. Returns the user data.
@@ -639,8 +648,23 @@ where
639648
/// The reach attempt will only be effectively cancelled once the peer (the object you're
640649
/// manipulating) has received some network activity. However no event will be ever be
641650
/// generated from this reach attempt, and this takes effect immediately.
642-
pub fn take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>) {
643-
let _state = self.inner.take_over(id.inner);
644-
debug_assert!(if let TaskState::Pending = _state { true } else { false });
651+
#[must_use]
652+
pub fn start_take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>)
653+
-> StartTakeOver<(), InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>>
654+
{
655+
match self.inner.start_take_over(id.inner) {
656+
StartTakeOver::Ready(_state) => {
657+
debug_assert!(if let TaskState::Pending = _state { true } else { false });
658+
StartTakeOver::Ready(())
659+
}
660+
StartTakeOver::NotReady(inner) =>
661+
StartTakeOver::NotReady(InterruptedReachAttempt { inner }),
662+
StartTakeOver::Gone => StartTakeOver::Gone
663+
}
664+
}
665+
666+
/// Complete a take over initiated by `start_take_over`.
667+
pub fn complete_take_over(&mut self) -> Poll<(), ()> {
668+
self.inner.complete_take_over()
645669
}
646670
}

core/src/nodes/collection/tests.rs

+53-6
Original file line numberDiff line numberDiff line change
@@ -154,41 +154,81 @@ fn events_in_a_node_reaches_the_collection_stream() {
154154
Ok(Async::Ready(()))
155155
})).expect("tokio works");
156156

157+
let cs2 = cs.clone();
158+
rt.block_on(future::poll_fn(move || {
159+
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
160+
Ok::<_, ()>(Async::NotReady)
161+
} else {
162+
Ok(Async::Ready(()))
163+
}
164+
})).unwrap();
157165
let cs_fut = cs.clone();
158166
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
159167
let mut cs = cs_fut.lock();
160-
cs.broadcast_event(&InEvent::NextState);
168+
if cs.complete_broadcast().is_not_ready() {
169+
return Ok(Async::NotReady)
170+
}
161171
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
162172
reach_ev.accept(());
163173
});
164174
Ok(Async::Ready(()))
165175
})).expect("tokio works");
166176

177+
let cs2 = cs.clone();
178+
rt.block_on(future::poll_fn(move || {
179+
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
180+
Ok::<_, ()>(Async::NotReady)
181+
} else {
182+
Ok(Async::Ready(()))
183+
}
184+
})).unwrap();
167185
let cs_fut = cs.clone();
168186
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
169187
let mut cs = cs_fut.lock();
170-
cs.broadcast_event(&InEvent::NextState);
188+
if cs.complete_broadcast().is_not_ready() {
189+
return Ok(Async::NotReady)
190+
}
171191
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
172192
assert_matches!(event, OutEvent::Custom("init"));
173193
});
174194
Ok(Async::Ready(()))
175195
})).expect("tokio works");
176196

177197

198+
let cs2 = cs.clone();
199+
rt.block_on(future::poll_fn(move || {
200+
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
201+
Ok::<_, ()>(Async::NotReady)
202+
} else {
203+
Ok(Async::Ready(()))
204+
}
205+
})).unwrap();
178206
let cs_fut = cs.clone();
179207
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
180208
let mut cs = cs_fut.lock();
181-
cs.broadcast_event(&InEvent::NextState);
209+
if cs.complete_broadcast().is_not_ready() {
210+
return Ok(Async::NotReady)
211+
}
182212
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
183213
assert_matches!(event, OutEvent::Custom("from handler 1"));
184214
});
185215
Ok(Async::Ready(()))
186216
})).expect("tokio works");
187217

218+
let cs2 = cs.clone();
219+
rt.block_on(future::poll_fn(move || {
220+
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
221+
Ok::<_, ()>(Async::NotReady)
222+
} else {
223+
Ok(Async::Ready(()))
224+
}
225+
})).unwrap();
188226
let cs_fut = cs.clone();
189227
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
190228
let mut cs = cs_fut.lock();
191-
cs.broadcast_event(&InEvent::NextState);
229+
if cs.complete_broadcast().is_not_ready() {
230+
return Ok(Async::NotReady)
231+
}
192232
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
193233
assert_matches!(event, OutEvent::Custom("from handler 2"));
194234
});
@@ -238,13 +278,20 @@ fn task_closed_with_error_when_task_is_connected_yields_node_error() {
238278
let mut rt = Builder::new().core_threads(1).build().unwrap();
239279

240280
// Kick it off
281+
let cs2 = cs.clone();
282+
rt.block_on(future::poll_fn(move || {
283+
if cs2.lock().start_broadcast(&InEvent::NextState).is_not_ready() {
284+
Ok::<_, ()>(Async::NotReady)
285+
} else {
286+
Ok(Async::Ready(()))
287+
}
288+
})).unwrap();
241289
let cs_fut = cs.clone();
242290
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
243291
let mut cs = cs_fut.lock();
244292
assert_matches!(cs.poll(), Async::NotReady);
245293
// send an event so the Handler errors in two polls
246-
cs.broadcast_event(&InEvent::NextState);
247-
Ok(Async::Ready(()))
294+
Ok(cs.complete_broadcast())
248295
})).expect("tokio works");
249296

250297
// Accept the new node

0 commit comments

Comments
 (0)