Skip to content

refactor!: use concrete type in the EventStream #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ unused-async = "warn"

[dependencies]
anyhow = { version = "1" }
async-channel = { version = "2.3.1", optional = true }
blake3 = { package = "iroh-blake3", version = "1.4.5"}
bytes = { version = "1.7", features = ["serde"] }
derive_more = { version = "1.0.0", features = ["add", "debug", "deref", "display", "from", "try_into", "into"] }
Expand All @@ -51,6 +50,7 @@ tokio-util = { version = "0.7.12", optional = true, features = ["codec", "rt"] }
tracing = "0.1"
data-encoding = { version = "2.6.0", optional = true }
hex = "0.4.3"
tokio-stream = { version = "0.1.17", optional = true }

# rpc dependencies (optional)
nested_enum_utils = { version = "0.1.0", optional = true }
Expand All @@ -76,9 +76,9 @@ net = [
"dep:iroh",
"dep:tokio",
"dep:tokio-util",
"dep:async-channel",
"dep:futures-util",
"dep:futures-concurrency"
"dep:futures-concurrency",
"dep:tokio-stream",
]
rpc = [
"dep:nested_enum_utils",
Expand All @@ -92,6 +92,7 @@ cli = [
]

examples = ["net", "dep:data-encoding"]
tokio-stream = ["dep:tokio-stream"]

[[example]]
name = "chat"
Expand Down
128 changes: 65 additions & 63 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use anyhow::{anyhow, Context as _, Result};
use bytes::BytesMut;
use futures_concurrency::stream::{stream_group, StreamGroup};
use futures_lite::{future::Boxed as BoxedFuture, stream::Stream, StreamExt};
use futures_util::TryFutureExt;
use iroh::{
endpoint::{get_remote_node_id, Connecting, Connection, DirectAddr},
protocol::ProtocolHandler,
Expand All @@ -24,6 +23,7 @@ use rand::rngs::StdRng;
use rand_core::SeedableRng;
use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc, task::JoinSet};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle};
use tracing::{debug, error, error_span, trace, warn, Instrument};

Expand Down Expand Up @@ -190,16 +190,24 @@ impl Gossip {
topic_id: TopicId,
bootstrap: Vec<NodeId>,
) -> Result<GossipTopic> {
let mut sub = self.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap));
let mut sub = self
.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap))
.await?;
sub.joined().await?;
Ok(sub)
}

/// Join a gossip topic with the default options.
///
/// Note that this will not wait for any bootstrap node to be available. To ensure the topic is connected to at least one node, use [`GossipTopic::joined`] or [`Gossip::subscribe_and_join`]
pub fn subscribe(&self, topic_id: TopicId, bootstrap: Vec<NodeId>) -> Result<GossipTopic> {
let sub = self.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap));
pub async fn subscribe(
&self,
topic_id: TopicId,
bootstrap: Vec<NodeId>,
) -> Result<GossipTopic> {
let sub = self
.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap))
.await?;

Ok(sub)
}
Expand All @@ -211,11 +219,17 @@ impl Gossip {
///
/// Messages will be queued until a first connection is available. If the internal channel becomes full,
/// the oldest messages will be dropped from the channel.
pub fn subscribe_with_opts(&self, topic_id: TopicId, opts: JoinOptions) -> GossipTopic {
let (command_tx, command_rx) = async_channel::bounded(TOPIC_COMMANDS_DEFAULT_CAP);
let command_rx: CommandStream = Box::pin(command_rx);
let event_rx = self.subscribe_with_stream(topic_id, opts, command_rx);
GossipTopic::new(command_tx, event_rx)
pub async fn subscribe_with_opts(
&self,
topic_id: TopicId,
opts: JoinOptions,
) -> Result<GossipTopic> {
let (command_tx, command_rx) = mpsc::channel(TOPIC_COMMANDS_DEFAULT_CAP);
let command_rx: CommandStream = Box::pin(ReceiverStream::new(command_rx));
let event_rx = self
.subscribe_with_stream(topic_id, opts, command_rx)
.await?;
Ok(GossipTopic::new(command_tx, event_rx))
}

/// Join a gossip topic with options and an externally-created update stream.
Expand All @@ -225,24 +239,26 @@ impl Gossip {
///
/// It returns a stream of events. If you want to wait for the topic to become active, wait for
/// the [`GossipEvent::Joined`] event.
pub fn subscribe_with_stream(
pub async fn subscribe_with_stream(
&self,
topic_id: TopicId,
options: JoinOptions,
updates: CommandStream,
) -> EventStream {
self.inner.subscribe_with_stream(topic_id, options, updates)
) -> Result<EventStream> {
self.inner
.subscribe_with_stream(topic_id, options, updates)
.await
}
}

impl Inner {
pub fn subscribe_with_stream(
pub async fn subscribe_with_stream(
&self,
topic_id: TopicId,
options: JoinOptions,
updates: CommandStream,
) -> EventStream {
let (event_tx, event_rx) = async_channel::bounded(options.subscription_capacity);
) -> Result<EventStream> {
let (event_tx, event_rx) = mpsc::channel(options.subscription_capacity);
let to_actor_tx = self.to_actor_tx.clone();
let receiver_id = ReceiverId(
self.next_receiver_id
Expand All @@ -253,33 +269,22 @@ impl Inner {
command_rx: updates,
event_tx,
};
// We spawn a task to send the subscribe action to the actor, because we want the send to
// succeed even if the returned stream is dropped right away without being polled, because
// it is legit to keep only the `updates` stream and drop the event stream. This situation
// is handled fine within the actor, but we have to make sure that the message reaches the
// actor.
let task = tokio::task::spawn(async move {
to_actor_tx
.send(ToActor::Join {
topic_id,
bootstrap: options.bootstrap,
channels,
})
.await
.map_err(|_| anyhow!("Gossip actor dropped"))
});
let stream = async move {
task.await
.map_err(|err| anyhow!("Task for sending to gossip actor failed: {err:?}"))??;
Ok(event_rx)
}
.try_flatten_stream();
EventStream {
inner: Box::pin(stream),

to_actor_tx
.send(ToActor::Join {
topic_id,
bootstrap: options.bootstrap,
channels,
})
.await
.map_err(|_| anyhow!("Gossip actor dropped"))?;

Ok(EventStream {
inner: ReceiverStream::new(event_rx),
to_actor_tx: self.to_actor_tx.clone(),
topic: topic_id,
receiver_id,
}
})
}

async fn send(&self, event: ToActor) -> anyhow::Result<()> {
Expand All @@ -298,11 +303,10 @@ impl Inner {
}

/// Stream of events for a topic.
#[derive(derive_more::Debug)]
#[derive(Debug)]
pub struct EventStream {
/// The actual stream polled to return [`Event`]s to the application.
#[debug("Stream")]
inner: Pin<Box<dyn Stream<Item = Result<Event>> + Send + 'static>>,
inner: ReceiverStream<Result<Event>>,

/// Channel to the actor task.
///
Expand All @@ -323,7 +327,7 @@ impl Stream for EventStream {
type Item = Result<Event>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next(cx)
Pin::new(&mut self.inner).poll_next(cx)
}
}

Expand Down Expand Up @@ -930,7 +934,7 @@ enum ConnOrigin {
struct SubscriberChannels {
/// Id for the receiver counter part of [`Self::event_tx`].
receiver_id: ReceiverId,
event_tx: async_channel::Sender<Result<Event>>,
event_tx: mpsc::Sender<Result<Event>>,
#[debug("CommandStream")]
command_rx: CommandStream,
}
Expand Down Expand Up @@ -1028,7 +1032,7 @@ struct EventSenders {
/// Channels to communicate [`Event`] to [`EventStream`]s.
///
/// This is indexed by receiver id. The boolean indicates a lagged channel ([`Event::Lagged`]).
senders: HashMap<ReceiverId, (async_channel::Sender<Result<Event>>, bool)>,
senders: HashMap<ReceiverId, (mpsc::Sender<Result<Event>>, bool)>,
}

/// Id for a gossip receiver.
Expand All @@ -1042,27 +1046,24 @@ impl EventSenders {
self.senders.is_empty()
}

fn push(&mut self, id: ReceiverId, sender: async_channel::Sender<Result<Event>>) {
fn push(&mut self, id: ReceiverId, sender: mpsc::Sender<Result<Event>>) {
self.senders.insert(id, (sender, false));
}

/// Send an event to all subscribers.
///
/// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full.
fn send(&mut self, event: &GossipEvent) {
let mut remove = Vec::new();
for (&id, (send, lagged)) in self.senders.iter_mut() {
self.senders.retain(|_id, (send, lagged)| {
// If the stream is disconnected, we don't need to send to it.
if send.is_closed() {
remove.push(id);
continue;
return false;
}

// Check if the send buffer is almost full, and send a lagged response if it is.
let cap = send.capacity().expect("we only use bounded channels");
let event = if send.len() >= cap - 1 {
let event = if send.capacity() <= 1 {
if *lagged {
continue;
return true;
}
*lagged = true;
Event::Lagged
Expand All @@ -1071,14 +1072,15 @@ impl EventSenders {
Event::Gossip(event.clone())
};

if let Err(async_channel::TrySendError::Closed(_)) = send.try_send(Ok(event)) {
remove.push(id);
match send.try_send(Ok(event)) {
Ok(()) => true,
Err(mpsc::error::TrySendError::Full(_)) => {
*lagged = true;
true
}
Err(mpsc::error::TrySendError::Closed(_)) => false,
}
}

for id in remove.into_iter() {
self.senders.remove(&id);
}
});
}

/// Removes a sender based on the corresponding receiver's id.
Expand Down Expand Up @@ -1676,7 +1678,7 @@ mod test {
let addr1 = NodeAddr::new(node_id1).with_relay_url(relay_url.clone());
ep2.add_node_addr(addr1)?;
let go2_task = async move {
let mut sub = go2.subscribe(topic, Vec::new())?;
let mut sub = go2.subscribe(topic, Vec::new()).await?;
sub.joined().await?;

rx.recv().await.expect("signal to unsubscribe");
Expand All @@ -1685,7 +1687,7 @@ mod test {

rx.recv().await.expect("signal to subscribe again");
tracing::info!("resubscribing");
let mut sub = go2.subscribe(topic, vec![node_id1])?;
let mut sub = go2.subscribe(topic, vec![node_id1]).await?;

sub.joined().await?;
tracing::info!("subscription successful!");
Expand All @@ -1700,7 +1702,7 @@ mod test {
let addr2 = NodeAddr::new(node_id2).with_relay_url(relay_url);
ep1.add_node_addr(addr2)?;

let mut sub = go1.subscribe(topic, vec![node_id2])?;
let mut sub = go1.subscribe(topic, vec![node_id2]).await?;
// wait for subscribed notification
sub.joined().await?;

Expand Down
Loading
Loading