-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Replace unbounded channels with bounded ones. #1191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
To remove the unbounded channels used for communicating with node tasks an API similar to `futures::Sink` is used, i.e. sending is split into a start and complete phase. The start phase returns `StartSend` and first attempts to complete any pending send operations. Completing the send means polling until `Poll::Ready(())` is returned. In addition this PR has split the `handled_node_tasks` module into several smaller ones (cf. `nodes::tasks`) and renamed some types: - `nodes::handled_node_tasks::NodeTask` -> `nodes::tasks::task::Task` - `nodes::handled_node_tasks::NodeTaskInner` -> `nodes::tasks::task::State` - `nodes::handled_node_tasks::NodeTasks` -> `nodes::tasks::Manager` - `nodes::handled_node_tasks::TaskClosedEvent` -> `nodes::tasks::Error` - `nodes::handled_node_tasks::HandledNodesEvent` -> `nodes::tasks::Event` - `nodes::handled_node_tasks::Task` -> `nodes::tasks::TaskEntry` - `nodes::handled_node_tasks::ExtToInMessage` -> `nodes::tasks::task::ToTaskMessage` - `nodes::handled_node_tasks::InToExtMessage` -> `nodes::tasks::task::FromTaskMessage`
So #973 was only about events from the handler towards the main task, and not the other way around. In your PR you end up replacing the unbounded channel with a bounded one, but then you other unbounded containers ( |
Since it is always holding just a single pending message.
Could you please point out to me where in #973 this is stated because I can not find it?
I take this as a cryptic hint that |
The question is: what happens if a background task is stuck (for example in an infinite loop), and the channel towards that task is full, and the user continues sending events? With this PR, this will fill |
Assuming the consumer ( So this means that whereas unbounded channels would consume all available memory if items are not consumed, with this change the programme would instead not make progress. This is why I added the following note to
|
Oh I see, you're blocking the swarm. That looks more reasonable. Let's go with that. The other thing I don't like is that when the "pending item" is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't spot any logic mistake, but I feel like the code is complicated enough that I wouldn't be able to spot mistakes unfortunately.
Co-Authored-By: Pierre Krieger <[email protected]>
Co-Authored-By: Pierre Krieger <[email protected]>
Is this not standard usage with sinks? Fwiw https://docs.rs/futures/0.1.28/futures/enum.AsyncSink.html documents
|
Instead of having a field |
But it does. The API contract of
And as I quoted above, |
Imagine this piece of code: /// If this method returns `Some`, you have to call `bar` with the value.
#[must_use]
fn foo() -> Option<Bar> {
...
} That does not imply that Calling |
But it seems that the property is indeed explicitly tied to the value Just my 2 superficial cents on this aspect - I don't really have much insight into the details and larger context of this PR (yet). |
of take-over and event messages delivered over Sinks.
I have added some documentation to the struct fields to make this explicit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't understand the argument and I personally find that a very bad practice, but I also don't want to block this PR on that.
The only argument is that pub struct RawSwarm {
// ...
/// A take over message needs to be sent.
take_over_to_send: Option<(TPeerId, InterruptedReachAttempt<TInEvent, (TConnInfo, ConnectedPoint), ()>)>,
/// A take over message has been sent and needs to be flushed.
take_over_to_flush: Option<TPeerId>
}
// ...
pub fn poll(&mut self) -> Async<RawSwarmEvent /* ... */> {
// ...
// Attempt to send a take over message.
if let Some((id, interrupted)) = self.take_over_to_send.take() {
if let Some(mut peer) = self.active_nodes.peer_mut(&id) {
if let StartTakeOver::NotReady(i) = peer.start_take_over(interrupted) {
self.take_over_to_send = Some((id, i));
return Async::NotReady
}
self.take_over_to_flush = Some(id)
}
}
// Attempt to flush a take over message.
if let Some(id) = self.take_over_to_flush.take() {
if let Some(mut peer) = self.active_nodes.peer_mut(&id) {
if let Ok(Async::NotReady) = peer.complete_take_over() {
self.take_over_to_flush = Some(id);
return Async::NotReady
}
}
}
// ...
} over the current implementation, right? |
Let's merge this? 🙌 |
No objections. |
Context: #973
To remove the remaining unbounded channels, which are used for communicating with node tasks, an API similar to
futures::Sink
is used, i.e. sending is split into a start and complete phase. The start phase returnsStartSend
and first attempts to complete any pending send operations. Completing the send means polling untilPoll::Ready(())
is returned.In addition this PR has split the
handled_node_tasks
module into several smaller ones (cf.nodes::tasks
) and renamed some types:nodes::handled_node_tasks::NodeTask
->nodes::tasks::task::Task
nodes::handled_node_tasks::NodeTaskInner
->nodes::tasks::task::State
nodes::handled_node_tasks::NodeTasks
->nodes::tasks::Manager
nodes::handled_node_tasks::TaskClosedEvent
->nodes::tasks::Error
nodes::handled_node_tasks::HandledNodesEvent
->nodes::tasks::Event
nodes::handled_node_tasks::Task
->nodes::tasks::TaskEntry
nodes::handled_node_tasks::ExtToInMessage
->nodes::tasks::task::ToTaskMessage
nodes::handled_node_tasks::InToExtMessage
->nodes::tasks::task::FromTaskMessage
To review this PR it is probably best to use the diff against
swarm.rs
,raw_swarm.rs
andcollection.rs
and reviewnodes::tasks
directly (keeping in mind that its contents used to formhandled_nodes_tasks
).