From cc9742080d8238d35a1169c8a0eb8f22f1483a31 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 4 Apr 2025 18:44:57 +0300 Subject: [PATCH 1/8] Use workspace deps in a few places --- content-discovery/Cargo.toml | 6 ++++++ .../iroh-mainline-content-discovery-cli/Cargo.toml | 2 +- .../iroh-mainline-content-discovery/Cargo.toml | 10 +++++----- content-discovery/iroh-mainline-tracker/Cargo.toml | 12 ++++++------ 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/content-discovery/Cargo.toml b/content-discovery/Cargo.toml index 6fc1258..cd7e7b8 100644 --- a/content-discovery/Cargo.toml +++ b/content-discovery/Cargo.toml @@ -31,3 +31,9 @@ iroh-base = "0.34" iroh-blobs = { version = "0.34", features = ["rpc"] } # explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255 tokio = { version = "1.44.1" } +mainline = { version = "2.0.0", default-features = false } +pkarr = { version = "2.0.3", default-features = false } +postcard = { version = "1", default-features = false } +quinn = { package = "iroh-quinn", version = "0.13", default-features = false } +anyhow = { version = "1", default-features = false } +flume = { version = "0.11" } diff --git a/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml b/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml index a5cf3c6..e8770dc 100644 --- a/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml +++ b/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml @@ -12,7 +12,7 @@ iroh = { workspace = true } iroh-blobs = { workspace = true } iroh-mainline-content-discovery = { path = "../iroh-mainline-content-discovery" } mainline = { version = "2.0.0" } -anyhow = { version = "1", features = ["backtrace"] } +anyhow = { workspace = true, features = ["backtrace"] } futures = { version = "0.3.25" } clap = { version = "4", features = ["derive"] } tempfile = { version = "3.4" } diff --git a/content-discovery/iroh-mainline-content-discovery/Cargo.toml b/content-discovery/iroh-mainline-content-discovery/Cargo.toml index bbebca6..cedf3e7 100644 --- a/content-discovery/iroh-mainline-content-discovery/Cargo.toml +++ b/content-discovery/iroh-mainline-content-discovery/Cargo.toml @@ -22,16 +22,16 @@ hex = "0.4.3" # Optional features for the client functionality tracing = { version = "0.1", optional = true } -quinn = { package = "iroh-quinn", version = "0.13", optional = true } -mainline = { version = "2.0.0", optional = true, features = ["async"] } -anyhow = { version = "1", features = ["backtrace"], optional = true } -postcard = { version = "1", default-features = false, features = ["alloc", "use-std"], optional = true } +quinn = { workspace = true, optional = true } +mainline = { workspace = true, optional = true, features = ["async"] } +anyhow = { workspace = true, features = ["backtrace"], optional = true } +postcard = { workspace = true, features = ["alloc", "use-std"], optional = true } futures = { version = "0.3.25", optional = true } rcgen = { version = "0.13.1", optional = true } rustls = { version = "0.23", default-features = false, features = ["ring"], optional = true } genawaiter = { version = "0.99.1", features = ["futures03"], optional = true } tokio = { workspace = true, optional = true } -flume = "0.11.0" +flume = { workspace = true } # dependencies for the tls utils der = { version = "0.7", features = ["alloc", "derive"], optional = true } diff --git a/content-discovery/iroh-mainline-tracker/Cargo.toml b/content-discovery/iroh-mainline-tracker/Cargo.toml index 4f3920a..81ae8f2 100644 --- a/content-discovery/iroh-mainline-tracker/Cargo.toml +++ b/content-discovery/iroh-mainline-tracker/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = { version = "1", features = ["backtrace"] } +anyhow = { workspace = true, features = ["backtrace"] } # needs to keep updated with the dep of iroh-blobs bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false } bytes = "1" @@ -20,9 +20,9 @@ hex = "0.4.3" humantime = "2.1.0" iroh = { workspace = true } iroh-blobs = { workspace = true } -mainline = { version = "2.0.0", features = ["async"] } -pkarr = { version = "2.0.3", features = ["async"] } -postcard = { version = "1", default-features = false, features = ["alloc", "use-std"] } +mainline = { workspace = true, features = ["async"] } +pkarr = { workspace = true, features = ["async"] } +postcard = { workspace = true, features = ["alloc", "use-std"] } rand = "0.8" rcgen = "0.12.0" redb = "1.5.0" @@ -38,10 +38,10 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } ttl_cache = "0.5.1" url = "2.5.0" -flume = "0.11.0" +flume = { workspace = true } genawaiter = { version = "0.99.1", features = ["futures03"] } iroh-mainline-content-discovery = { path = "../iroh-mainline-content-discovery", features = ["client"] } -quinn = { package = "iroh-quinn", version = "0.13" } +quinn = { workspace = true } clap = { version = "4", features = ["derive"], optional = true } serde-big-array = "0.5.1" From b04b682f5d40c2ed05cb34190618aaac99e12f69 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 4 Apr 2025 18:49:52 +0300 Subject: [PATCH 2/8] more workspace deps --- content-discovery/Cargo.toml | 4 ++++ .../iroh-mainline-content-discovery/Cargo.toml | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/content-discovery/Cargo.toml b/content-discovery/Cargo.toml index cd7e7b8..4e36b16 100644 --- a/content-discovery/Cargo.toml +++ b/content-discovery/Cargo.toml @@ -37,3 +37,7 @@ postcard = { version = "1", default-features = false } quinn = { package = "iroh-quinn", version = "0.13", default-features = false } anyhow = { version = "1", default-features = false } flume = { version = "0.11" } +futures = { version = "0.3.25" } +rcgen = { version = "0.13.1" } +rustls = { version = "0.23", default-features = false, features = ["ring"] } +genawaiter = { version = "0.99.1", features = ["futures03"] } \ No newline at end of file diff --git a/content-discovery/iroh-mainline-content-discovery/Cargo.toml b/content-discovery/iroh-mainline-content-discovery/Cargo.toml index cedf3e7..45dca64 100644 --- a/content-discovery/iroh-mainline-content-discovery/Cargo.toml +++ b/content-discovery/iroh-mainline-content-discovery/Cargo.toml @@ -26,9 +26,9 @@ quinn = { workspace = true, optional = true } mainline = { workspace = true, optional = true, features = ["async"] } anyhow = { workspace = true, features = ["backtrace"], optional = true } postcard = { workspace = true, features = ["alloc", "use-std"], optional = true } -futures = { version = "0.3.25", optional = true } -rcgen = { version = "0.13.1", optional = true } -rustls = { version = "0.23", default-features = false, features = ["ring"], optional = true } +futures = { workspace = true, optional = true } +rcgen = { workspace = true, optional = true } +rustls = { workspace = true, features = ["ring"], optional = true } genawaiter = { version = "0.99.1", features = ["futures03"], optional = true } tokio = { workspace = true, optional = true } flume = { workspace = true } From f8a43cedcb9df4b6254e3d5da1b420721a7582d6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 4 Apr 2025 19:07:47 +0300 Subject: [PATCH 3/8] Update to latest pkarr and mainline --- content-discovery/Cargo.toml | 4 ++-- .../Cargo.toml | 2 +- .../src/client.rs | 20 +++++++++---------- .../iroh-mainline-tracker/Cargo.toml | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/content-discovery/Cargo.toml b/content-discovery/Cargo.toml index 4e36b16..d26aad2 100644 --- a/content-discovery/Cargo.toml +++ b/content-discovery/Cargo.toml @@ -31,8 +31,8 @@ iroh-base = "0.34" iroh-blobs = { version = "0.34", features = ["rpc"] } # explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255 tokio = { version = "1.44.1" } -mainline = { version = "2.0.0", default-features = false } -pkarr = { version = "2.0.3", default-features = false } +mainline = { version = "5.4.0", default-features = false } +pkarr = { version = "3.7.0", default-features = false } postcard = { version = "1", default-features = false } quinn = { package = "iroh-quinn", version = "0.13", default-features = false } anyhow = { version = "1", default-features = false } diff --git a/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml b/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml index e8770dc..7023133 100644 --- a/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml +++ b/content-discovery/iroh-mainline-content-discovery-cli/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT OR Apache-2.0" iroh = { workspace = true } iroh-blobs = { workspace = true } iroh-mainline-content-discovery = { path = "../iroh-mainline-content-discovery" } -mainline = { version = "2.0.0" } +mainline = { workspace = true } anyhow = { workspace = true, features = ["backtrace"] } futures = { version = "0.3.25" } clap = { version = "4", features = ["derive"] } diff --git a/content-discovery/iroh-mainline-content-discovery/src/client.rs b/content-discovery/iroh-mainline-content-discovery/src/client.rs index 57faed9..41103fc 100644 --- a/content-discovery/iroh-mainline-content-discovery/src/client.rs +++ b/content-discovery/iroh-mainline-content-discovery/src/client.rs @@ -84,8 +84,8 @@ pub fn to_infohash(haf: HashAndFormat) -> mainline::Id { } fn unique_tracker_addrs( - response: flume::IntoIter>, -) -> impl Stream { + response: impl IntoIterator>, +) -> impl Stream { Gen::new(|co| async move { let mut found = HashSet::new(); for response in response { @@ -160,19 +160,19 @@ pub fn query_trackers( /// Query the mainline DHT for trackers for the given content, then query each tracker for peers. pub fn query_dht( endpoint: impl QuinnConnectionProvider, - dht: mainline::dht::Dht, + dht: mainline::Dht, args: Query, query_parallelism: usize, ) -> impl Stream> { // let dht = dht.as_async(); let info_hash = to_infohash(args.content); - let response = dht.get_peers(info_hash).unwrap(); + let response = dht.get_peers(info_hash); let unique_tracker_addrs = unique_tracker_addrs(response); unique_tracker_addrs .map(move |addr| { let endpoint = endpoint.clone(); async move { - let hosts = match query_socket_one(endpoint, addr, args).await { + let hosts = match query_socket_one(endpoint, addr.into(), args).await { Ok(hosts) => hosts.into_iter().map(anyhow::Ok).collect(), Err(cause) => vec![Err(cause)], }; @@ -188,14 +188,14 @@ pub fn query_dht( /// Note that this should only be called from a publicly reachable node, where port is the port /// on which the tracker protocol is reachable. pub fn announce_dht( - dht: mainline::dht::Dht, + dht: mainline::Dht, content: BTreeSet, port: u16, announce_parallelism: usize, ) -> impl Stream< Item = ( HashAndFormat, - mainline::Result, + std::result::Result, ), > { let dht = dht.as_async(); @@ -447,7 +447,7 @@ impl UdpDiscovery { /// Query the mainline DHT for trackers for the given content, then query each tracker for peers. pub async fn query_dht( &self, - dht: mainline::dht::Dht, + dht: mainline::Dht, args: Query, ) -> anyhow::Result> { let results = self.query(args).await?.into_stream().boxed(); @@ -457,10 +457,10 @@ impl UdpDiscovery { // delay before querying the DHT tokio::time::sleep(Duration::from_millis(50)).await; let info_hash = to_infohash(args.content); - let mut addrs = dht.get_peers(info_hash).unwrap(); + let mut addrs = dht.get_peers(info_hash); while let Some(addrs) = addrs.next().await { for addr in addrs { - this.add_tracker(addr).await.ok(); + this.add_tracker(addr.into()).await.ok(); } } future::pending::().await diff --git a/content-discovery/iroh-mainline-tracker/Cargo.toml b/content-discovery/iroh-mainline-tracker/Cargo.toml index 81ae8f2..9c327f2 100644 --- a/content-discovery/iroh-mainline-tracker/Cargo.toml +++ b/content-discovery/iroh-mainline-tracker/Cargo.toml @@ -21,7 +21,7 @@ humantime = "2.1.0" iroh = { workspace = true } iroh-blobs = { workspace = true } mainline = { workspace = true, features = ["async"] } -pkarr = { workspace = true, features = ["async"] } +pkarr = { workspace = true } postcard = { workspace = true, features = ["alloc", "use-std"] } rand = "0.8" rcgen = "0.12.0" From e2f4b670958d911896d18b0add53a844ecede301 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 4 Apr 2025 19:23:59 +0300 Subject: [PATCH 4/8] remove flume dep for iroh-mainline-content-discovery --- content-discovery/Cargo.toml | 1 + .../src/main.rs | 4 +- .../Cargo.toml | 2 +- .../src/client.rs | 45 +++++++++---------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/content-discovery/Cargo.toml b/content-discovery/Cargo.toml index d26aad2..55710ee 100644 --- a/content-discovery/Cargo.toml +++ b/content-discovery/Cargo.toml @@ -31,6 +31,7 @@ iroh-base = "0.34" iroh-blobs = { version = "0.34", features = ["rpc"] } # explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255 tokio = { version = "1.44.1" } +tokio-stream = { version = "0.1.17" } mainline = { version = "5.4.0", default-features = false } pkarr = { version = "3.7.0", default-features = false } postcard = { version = "1", default-features = false } diff --git a/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs b/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs index 165fd06..15e0102 100644 --- a/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs +++ b/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs @@ -106,8 +106,8 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> { verified: args.verified, }, }; - let res = discovery.query(q).await?; - for sa in res { + let mut res = discovery.query(q).await?; + while let Some(sa) = res.recv().await { if sa.verify().is_ok() { println!("{}: {:?}", sa.announce.host, sa.announce.kind); } else { diff --git a/content-discovery/iroh-mainline-content-discovery/Cargo.toml b/content-discovery/iroh-mainline-content-discovery/Cargo.toml index 45dca64..97d12da 100644 --- a/content-discovery/iroh-mainline-content-discovery/Cargo.toml +++ b/content-discovery/iroh-mainline-content-discovery/Cargo.toml @@ -31,7 +31,7 @@ rcgen = { workspace = true, optional = true } rustls = { workspace = true, features = ["ring"], optional = true } genawaiter = { version = "0.99.1", features = ["futures03"], optional = true } tokio = { workspace = true, optional = true } -flume = { workspace = true } +tokio-stream = { workspace = true } # dependencies for the tls utils der = { version = "0.7", features = ["alloc", "derive"], optional = true } diff --git a/content-discovery/iroh-mainline-content-discovery/src/client.rs b/content-discovery/iroh-mainline-content-discovery/src/client.rs index 41103fc..4554b72 100644 --- a/content-discovery/iroh-mainline-content-discovery/src/client.rs +++ b/content-discovery/iroh-mainline-content-discovery/src/client.rs @@ -19,6 +19,7 @@ use iroh::{ Endpoint, NodeId, }; use iroh_blobs::HashAndFormat; +use tokio::sync::mpsc; use crate::{ protocol::{ @@ -423,13 +424,13 @@ where #[derive(Debug, Clone)] pub struct UdpDiscovery { - tx: flume::Sender, + tx: mpsc::Sender, } impl UdpDiscovery { /// Create a new UDP discovery service. pub async fn new(socket: SocketAddr) -> anyhow::Result { - let (tx, rx) = flume::unbounded(); + let (tx, rx) = mpsc::channel(1024); let socket = tokio::net::UdpSocket::bind(socket).await?; let _task = tokio::spawn( UdpActor { @@ -450,7 +451,7 @@ impl UdpDiscovery { dht: mainline::Dht, args: Query, ) -> anyhow::Result> { - let results = self.query(args).await?.into_stream().boxed(); + let results = tokio_stream::wrappers::ReceiverStream::new(self.query(args).await?).boxed(); let dht = dht.as_async(); let this = self.clone(); let find_new_trackers = async move { @@ -474,7 +475,7 @@ impl UdpDiscovery { pub async fn add_tracker(&self, tracker: SocketAddr) -> anyhow::Result<()> { Ok(self .tx - .send_async(UdpActorMessage::AddTracker { tracker }) + .send(UdpActorMessage::AddTracker { tracker }) .await?) } @@ -482,22 +483,20 @@ impl UdpDiscovery { pub async fn remove_tracker(&self, tracker: SocketAddr) -> anyhow::Result<()> { Ok(self .tx - .send_async(UdpActorMessage::RemoveTracker { tracker }) + .send(UdpActorMessage::RemoveTracker { tracker }) .await?) } - pub async fn query(&self, query: Query) -> anyhow::Result> { + pub async fn query(&self, query: Query) -> anyhow::Result> { let (tx, rx) = oneshot::channel(); - self.tx - .send_async(UdpActorMessage::Query { query, tx }) - .await?; + self.tx.send(UdpActorMessage::Query { query, tx }).await?; Ok(rx.await?) } pub async fn announce_once(&self, announce: SignedAnnounce) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); self.tx - .send_async(UdpActorMessage::AnnounceOnce { announce, tx }) + .send(UdpActorMessage::AnnounceOnce { announce, tx }) .await?; rx.await?; Ok(()) @@ -514,7 +513,7 @@ impl UdpDiscovery { }) .into(); self.tx - .send_async(UdpActorMessage::StoreAnnounceTask { announce, task }) + .send(UdpActorMessage::StoreAnnounceTask { announce, task }) .await?; Ok(()) } @@ -522,9 +521,9 @@ impl UdpDiscovery { struct UdpActor { socket: tokio::net::UdpSocket, - rx: flume::Receiver, + rx: mpsc::Receiver, trackers: BTreeSet, - listeners: BTreeMap>>, + listeners: BTreeMap>>, announces: BTreeMap<(HashAndFormat, AnnounceKind), AbortingJoinHandle<()>>, } @@ -548,7 +547,7 @@ impl Drop for AbortingJoinHandle { enum UdpActorMessage { Query { query: Query, - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, AddTracker { tracker: SocketAddr, @@ -572,11 +571,11 @@ impl UdpActor { let mut buf = [0u8; MAX_MSG_SIZE]; loop { tokio::select! { - msg = self.rx.recv_async() => { + msg = self.rx.recv() => { tracing::trace!("got msg {:?}", msg); match msg { - Ok(UdpActorMessage::Query { query, tx }) => { - let (announce_tx, announce_rx) = flume::bounded(1024); + Some(UdpActorMessage::Query { query, tx }) => { + let (announce_tx, announce_rx) = mpsc::channel(1024); self.listeners.entry(query).or_default().push(announce_tx); let msg = Request::Query(query); let msg = postcard::to_slice(&msg, &mut buf).unwrap(); @@ -586,7 +585,7 @@ impl UdpActor { } tx.send(announce_rx).ok(); } - Ok(UdpActorMessage::AddTracker { tracker }) => { + Some(UdpActorMessage::AddTracker { tracker }) => { if self.trackers.insert(tracker) { for query in self.listeners.keys() { let msg = Request::Query(*query); @@ -595,21 +594,21 @@ impl UdpActor { } } } - Ok(UdpActorMessage::RemoveTracker { tracker }) => { + Some(UdpActorMessage::RemoveTracker { tracker }) => { self.trackers.remove(&tracker); } - Ok(UdpActorMessage::StoreAnnounceTask { announce, task }) => { + Some(UdpActorMessage::StoreAnnounceTask { announce, task }) => { let key = (announce.announce.content, announce.announce.kind); self.announces.insert(key, task); } - Ok(UdpActorMessage::AnnounceOnce { announce, tx }) => { + Some(UdpActorMessage::AnnounceOnce { announce, tx }) => { let msg = postcard::to_slice(&Request::Announce(announce), &mut buf).unwrap(); for tracker in &self.trackers { self.socket.send_to(msg, tracker).await.ok(); } tx.send(()).ok(); } - Err(flume::RecvError::Disconnected) => break, + None => break, } }, res = self.socket.recv_from(&mut buf) => { @@ -642,7 +641,7 @@ impl UdpActor { } let mut to_remove = Vec::new(); for (i, sender) in senders.iter().enumerate() { - if sender.send_async(sa).await.is_err() { + if sender.send(sa).await.is_err() { to_remove.push(i); } } From 7344f8ac4d7a75051edfa58cb0d5c6296fd40010 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 7 Apr 2025 11:34:03 +0300 Subject: [PATCH 5/8] Use a tokio task to directly perform io on. This also removes the flume dep TODO: move this on a dedicated thread "donated" to the tokio runtime. --- content-discovery/Cargo.toml | 1 - .../iroh-mainline-tracker/Cargo.toml | 1 - .../iroh-mainline-tracker/src/tracker.rs | 97 ++++++++++++------- .../iroh-mainline-tracker/src/tracker/util.rs | 88 ++--------------- 4 files changed, 71 insertions(+), 116 deletions(-) diff --git a/content-discovery/Cargo.toml b/content-discovery/Cargo.toml index 55710ee..9861ba3 100644 --- a/content-discovery/Cargo.toml +++ b/content-discovery/Cargo.toml @@ -37,7 +37,6 @@ pkarr = { version = "3.7.0", default-features = false } postcard = { version = "1", default-features = false } quinn = { package = "iroh-quinn", version = "0.13", default-features = false } anyhow = { version = "1", default-features = false } -flume = { version = "0.11" } futures = { version = "0.3.25" } rcgen = { version = "0.13.1" } rustls = { version = "0.23", default-features = false, features = ["ring"] } diff --git a/content-discovery/iroh-mainline-tracker/Cargo.toml b/content-discovery/iroh-mainline-tracker/Cargo.toml index 9c327f2..2519baa 100644 --- a/content-discovery/iroh-mainline-tracker/Cargo.toml +++ b/content-discovery/iroh-mainline-tracker/Cargo.toml @@ -38,7 +38,6 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } ttl_cache = "0.5.1" url = "2.5.0" -flume = { workspace = true } genawaiter = { version = "0.99.1", features = ["futures03"] } iroh-mainline-content-discovery = { path = "../iroh-mainline-content-discovery", features = ["client"] } quinn = { workspace = true } diff --git a/content-discovery/iroh-mainline-tracker/src/tracker.rs b/content-discovery/iroh-mainline-tracker/src/tracker.rs index 6b559d2..b7eee59 100644 --- a/content-discovery/iroh-mainline-tracker/src/tracker.rs +++ b/content-discovery/iroh-mainline-tracker/src/tracker.rs @@ -5,6 +5,7 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::Context; use bao_tree::ChunkNum; use iroh::{Endpoint, NodeId}; use iroh_blobs::{ @@ -24,14 +25,15 @@ use rand::Rng; use redb::{ReadableTable, RedbValue}; use serde::{Deserialize, Serialize}; use serde_big_array::BigArray; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::task::AbortOnDropHandle; mod tables; mod util; use self::{ tables::{ReadOnlyTables, ReadableTables, Tables}, - util::PeekableFlumeReceiver, + util::PeekableReceiver, }; use crate::{ io::{log_connection_attempt, log_probe_attempt}, @@ -51,7 +53,9 @@ pub struct Tracker(Arc); /// The inner state of the tracker server. Options are immutable and don't need to be locked. #[derive(Debug)] struct Inner { - actor: flume::Sender, + actor: mpsc::Sender, + /// A permit to send a single message to the actor, for drop. + drop_permit: Option>, /// The options for the tracker server. options: Options, /// Tasks that announce to the DHT. There is one task per content item, just to inform the DHT @@ -66,20 +70,22 @@ struct Inner { /// To spawn non-send futures. local_pool: tokio_util::task::LocalPoolHandle, /// The handle to the actor thread. - handle: Option>, + handle: Option>, } impl Drop for Inner { fn drop(&mut self) { - self.actor.send(ActorMessage::Stop).ok(); + if let Some(drop_permit) = self.drop_permit.take() { + drop_permit.send(ActorMessage::Stop); + } if let Some(handle) = self.handle.take() { - handle.join().ok(); + tokio::spawn(handle); } } } struct Actor { - rx: flume::Receiver, + rx: Option>, state: State, options: Options, } @@ -174,9 +180,9 @@ struct AnnounceResponse { } impl Actor { - fn run(mut self, db: redb::Database) -> anyhow::Result<()> { - let mut msgs = PeekableFlumeReceiver::new(self.rx.clone()); - while let Some(msg) = msgs.peek() { + async fn run(mut self, db: redb::Database) -> anyhow::Result<()> { + let mut msgs = PeekableReceiver::new(self.rx.take().context("receiver gone")?); + while let Some(msg) = msgs.peek().await { if let ActorMessage::Stop = msg { break; } @@ -184,12 +190,24 @@ impl Actor { MessageCategory::ReadWrite => { tracing::debug!("write transaction"); let txn = db.begin_write()?; + let mut n = 0; + let t0 = Instant::now(); let mut tables = Tables::new(&txn)?; - for msg in msgs.batch_iter(1000, Duration::from_secs(1)) { + loop { + let Some(msg) = msgs.recv().await else { + break; + }; if let Err(msg) = self.handle_readwrite(msg, &mut tables)? { msgs.push_back(msg).expect("just recv'd"); break; } + if n > 1000 { + break; + } + if t0.elapsed() > Duration::from_secs(1) { + break; + } + n += 1; } drop(tables); tracing::debug!("write transaction end"); @@ -199,11 +217,23 @@ impl Actor { tracing::debug!("read transaction"); let txn = db.begin_read()?; let tables = ReadOnlyTables::new(&txn)?; - for msg in msgs.batch_iter(1000, Duration::from_secs(10)) { + let mut n = 0; + let t0 = Instant::now(); + loop { + let Some(msg) = msgs.recv().await else { + break; + }; if let Err(msg) = self.handle_readonly(msg, &tables)? { msgs.push_back(msg).expect("just recv'd"); break; } + if n > 1000 { + break; + } + if t0.elapsed() > Duration::from_secs(1) { + break; + } + n += 1; } tracing::debug!("read transaction end"); } @@ -747,9 +777,9 @@ impl Tracker { let db = redb::Database::create(&options.announce_data_path)?; let dht = Arc::new(mainline::Dht::client()?.as_async()); let tpc = tokio_util::task::LocalPoolHandle::new(1); - let (tx, rx) = flume::unbounded(); + let (tx, rx) = mpsc::channel(1024); let actor = Actor { - rx, + rx: Some(rx), state: State::default(), options: options.clone(), }; @@ -759,14 +789,14 @@ impl Tracker { actor.get_distinct_content(&tables)? }; txn.commit()?; - let handle = std::thread::spawn(move || { - if let Err(cause) = actor.run(db) { + let handle = AbortOnDropHandle::new(tokio::spawn(async move { + if let Err(cause) = actor.run(db).await { tracing::error!("error in actor: {}", cause); } - }); - tx.send(ActorMessage::Dump).ok(); + })); let res = Self(Arc::new(Inner { - actor: tx, + actor: tx.clone(), + drop_permit: Some(tx.try_reserve_owned().context("unable to reserve permit")?), options, announce_tasks: Default::default(), probe_tasks: Default::default(), @@ -853,7 +883,7 @@ impl Tracker { let (tx, rx) = oneshot::channel(); self.0 .actor - .send_async(ActorMessage::GetDistinctContent { tx }) + .send(ActorMessage::GetDistinctContent { tx }) .await?; rx.await? } @@ -1007,7 +1037,7 @@ impl Tracker { let (tx, rx) = oneshot::channel(); self.0 .actor - .send_async(ActorMessage::GetSize { hash, tx }) + .send(ActorMessage::GetSize { hash, tx }) .await?; rx.await? } @@ -1016,7 +1046,7 @@ impl Tracker { let (tx, rx) = oneshot::channel(); self.0 .actor - .send_async(ActorMessage::GetSizes { hash, tx }) + .send(ActorMessage::GetSizes { hash, tx }) .await?; rx.await? } @@ -1025,10 +1055,10 @@ impl Tracker { &self, hash: Hash, size: u64, - ) -> std::result::Result<(), flume::SendError> { + ) -> std::result::Result<(), mpsc::error::SendError> { self.0 .actor - .send_async(ActorMessage::SetSize { hash, size }) + .send(ActorMessage::SetSize { hash, size }) .await } @@ -1036,10 +1066,10 @@ impl Tracker { &self, hash: Hash, sizes: (HashSeq, Arc<[u64]>), - ) -> std::result::Result<(), flume::SendError> { + ) -> std::result::Result<(), mpsc::error::SendError> { self.0 .actor - .send_async(ActorMessage::SetSizes { hash, sizes }) + .send(ActorMessage::SetSizes { hash, sizes }) .await } @@ -1162,7 +1192,7 @@ impl Tracker { let (tx, rx) = oneshot::channel(); self.0 .actor - .send_async(ActorMessage::Announce { announce, tx }) + .send(ActorMessage::Announce { announce, tx }) .await?; let AnnounceResponse { new_content, @@ -1181,10 +1211,7 @@ impl Tracker { async fn handle_query(&self, query: Query) -> anyhow::Result { let (tx, rx) = oneshot::channel(); - self.0 - .actor - .send_async(ActorMessage::Query { query, tx }) - .await?; + self.0.actor.send(ActorMessage::Query { query, tx }).await?; rx.await? } @@ -1198,7 +1225,7 @@ impl Tracker { let (tx, rx) = oneshot::channel(); self.0 .actor - .send_async(ActorMessage::GetContentForNode { node, tx }) + .send(ActorMessage::GetContentForNode { node, tx }) .await?; rx.await? } @@ -1209,10 +1236,10 @@ impl Tracker { node: NodeId, results: Vec<(HashAndFormat, AnnounceKind, anyhow::Result)>, now: AbsoluteTime, - ) -> std::result::Result<(), flume::SendError> { + ) -> std::result::Result<(), mpsc::error::SendError> { self.0 .actor - .send_async(ActorMessage::StoreProbeResult { node, results, now }) + .send(ActorMessage::StoreProbeResult { node, results, now }) .await } @@ -1262,7 +1289,7 @@ impl Tracker { async fn gc(&self) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); - self.0.actor.send_async(ActorMessage::Gc { tx }).await?; + self.0.actor.send(ActorMessage::Gc { tx }).await?; rx.await? } } diff --git a/content-discovery/iroh-mainline-tracker/src/tracker/util.rs b/content-discovery/iroh-mainline-tracker/src/tracker/util.rs index 67e25f5..7f20bb8 100644 --- a/content-discovery/iroh-mainline-tracker/src/tracker/util.rs +++ b/content-discovery/iroh-mainline-tracker/src/tracker/util.rs @@ -1,15 +1,15 @@ -use std::time::{Duration, Instant}; +use tokio::sync::mpsc; /// A wrapper for a flume receiver that allows peeking at the next message. #[derive(Debug)] -pub(super) struct PeekableFlumeReceiver { +pub(super) struct PeekableReceiver { msg: Option, - recv: flume::Receiver, + recv: mpsc::Receiver, } #[allow(dead_code)] -impl PeekableFlumeReceiver { - pub fn new(recv: flume::Receiver) -> Self { +impl PeekableReceiver { + pub fn new(recv: mpsc::Receiver) -> Self { Self { msg: None, recv } } @@ -17,9 +17,9 @@ impl PeekableFlumeReceiver { /// /// Will block if there are no messages. /// Returns None only if there are no more messages (sender is dropped). - pub fn peek(&mut self) -> Option<&T> { + pub async fn peek(&mut self) -> Option<&T> { if self.msg.is_none() { - self.msg = self.recv.recv().ok(); + self.msg = self.recv.recv().await; } self.msg.as_ref() } @@ -28,46 +28,11 @@ impl PeekableFlumeReceiver { /// /// Will block if there are no messages. /// Returns None only if there are no more messages (sender is dropped). - pub fn recv(&mut self) -> Option { + pub async fn recv(&mut self) -> Option { if let Some(msg) = self.msg.take() { return Some(msg); } - self.recv.recv().ok() - } - - /// Try to peek at the next message. - /// - /// Will not block. - /// Returns None if reading would block, or if there are no more messages (sender is dropped). - pub fn try_peek(&mut self) -> Option<&T> { - if self.msg.is_none() { - self.msg = self.recv.try_recv().ok(); - } - self.msg.as_ref() - } - - /// Try to receive the next message. - /// - /// Will not block. - /// Returns None if reading would block, or if there are no more messages (sender is dropped). - pub fn try_recv(&mut self) -> Option { - if let Some(msg) = self.msg.take() { - return Some(msg); - } - self.recv.try_recv().ok() - } - - pub fn recv_timeout(&mut self, timeout: std::time::Duration) -> Option { - if let Some(msg) = self.msg.take() { - return Some(msg); - } - self.recv.recv_timeout(timeout).ok() - } - - /// Create an iterator that pulls messages from the receiver for at most - /// `count` messages or `max_duration` time. - pub fn batch_iter(&mut self, count: usize, max_duration: Duration) -> BatchIter { - BatchIter::new(self, count, max_duration) + self.recv.recv().await } /// Push back a message. This will only work if there is room for it. @@ -81,38 +46,3 @@ impl PeekableFlumeReceiver { } } } - -pub(super) struct BatchIter<'a, T> { - recv: &'a mut PeekableFlumeReceiver, - start: Instant, - remaining: usize, - max_duration: Duration, -} - -impl<'a, T> BatchIter<'a, T> { - fn new(recv: &'a mut PeekableFlumeReceiver, count: usize, max_duration: Duration) -> Self { - Self { - recv, - start: Instant::now(), - remaining: count, - max_duration, - } - } -} - -impl Iterator for BatchIter<'_, T> { - type Item = T; - - fn next(&mut self) -> Option { - if self.remaining == 0 { - return None; - } - let elapsed = self.start.elapsed(); - if elapsed >= self.max_duration { - return None; - } - let remaining_time = self.max_duration - elapsed; - self.remaining -= 1; - self.recv.recv_timeout(remaining_time) - } -} From 3fb55c5fe3f955dca2f48c1d81018a204b2aafd4 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 7 Apr 2025 14:43:28 +0300 Subject: [PATCH 6/8] Use a std thread donated to the tokio runtime also fix endpoint setup so we have discovery --- .../iroh-mainline-tracker/src/main.rs | 14 ++--- .../iroh-mainline-tracker/src/tracker.rs | 59 +++++++++++-------- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/content-discovery/iroh-mainline-tracker/src/main.rs b/content-discovery/iroh-mainline-tracker/src/main.rs index 0828573..97129d7 100644 --- a/content-discovery/iroh-mainline-tracker/src/main.rs +++ b/content-discovery/iroh-mainline-tracker/src/main.rs @@ -10,7 +10,7 @@ use std::{ }; use clap::Parser; -use iroh::{discovery::pkarr::dht::DhtDiscovery, Endpoint, NodeId}; +use iroh::{Endpoint, NodeId}; use iroh_blobs::util::fs::load_secret_key; use iroh_mainline_content_discovery::{protocol::ALPN, tls_utils}; use iroh_mainline_tracker::{ @@ -62,16 +62,11 @@ async fn await_relay_region(endpoint: &Endpoint) -> anyhow::Result<()> { async fn create_endpoint( key: iroh::SecretKey, ipv4_addr: SocketAddrV4, - publish: bool, ) -> anyhow::Result { - let mainline_discovery = if publish { - DhtDiscovery::builder().secret_key(key.clone()).build()? - } else { - DhtDiscovery::default() - }; iroh::Endpoint::builder() .secret_key(key) - .discovery(Box::new(mainline_discovery)) + .discovery_dht() + .discovery_n0() .alpns(vec![ALPN.to_vec()]) .bind_addr_v4(ipv4_addr) .bind() @@ -133,8 +128,9 @@ async fn server(args: Args) -> anyhow::Result<()> { let quinn_endpoint = quinn::Endpoint::server(server_config, quinn_bind_addr)?; // set the quinn port to the actual port we bound to so the DHT will announce it correctly options.quinn_port = quinn_endpoint.local_addr()?.port(); - let iroh_endpoint = create_endpoint(key.clone(), options.iroh_ipv4_addr, true).await?; + let iroh_endpoint = create_endpoint(key.clone(), options.iroh_ipv4_addr).await?; let db = Tracker::new(options, iroh_endpoint.clone())?; + db.dump().await?; await_relay_region(&iroh_endpoint).await?; let addr = iroh_endpoint.node_addr().await?; log!("listening on {:?}", addr); diff --git a/content-discovery/iroh-mainline-tracker/src/tracker.rs b/content-discovery/iroh-mainline-tracker/src/tracker.rs index b7eee59..d3cec65 100644 --- a/content-discovery/iroh-mainline-tracker/src/tracker.rs +++ b/content-discovery/iroh-mainline-tracker/src/tracker.rs @@ -26,7 +26,7 @@ use redb::{ReadableTable, RedbValue}; use serde::{Deserialize, Serialize}; use serde_big_array::BigArray; use tokio::sync::{mpsc, oneshot}; -use tokio_util::task::AbortOnDropHandle; +use tracing::{debug, error, info, trace}; mod tables; mod util; @@ -70,7 +70,7 @@ struct Inner { /// To spawn non-send futures. local_pool: tokio_util::task::LocalPoolHandle, /// The handle to the actor thread. - handle: Option>, + handle: Option>, } impl Drop for Inner { @@ -79,7 +79,9 @@ impl Drop for Inner { drop_permit.send(ActorMessage::Stop); } if let Some(handle) = self.handle.take() { - tokio::spawn(handle); + if let Err(_) = handle.join() { + error!("error joining actor thread"); + } } } } @@ -789,11 +791,14 @@ impl Tracker { actor.get_distinct_content(&tables)? }; txn.commit()?; - let handle = AbortOnDropHandle::new(tokio::spawn(async move { - if let Err(cause) = actor.run(db).await { - tracing::error!("error in actor: {}", cause); - } - })); + let rt = tokio::runtime::Handle::current(); + let handle = std::thread::spawn(move|| { + rt.block_on(async move { + if let Err(cause) = actor.run(db).await { + tracing::error!("error in actor: {}", cause); + } + }); + }); let res = Self(Arc::new(Inner { actor: tx.clone(), drop_permit: Some(tx.try_reserve_owned().context("unable to reserve permit")?), @@ -890,17 +895,17 @@ impl Tracker { pub async fn iroh_accept_loop(self, endpoint: Endpoint) -> std::io::Result<()> { while let Some(incoming) = endpoint.accept().await { - tracing::info!("got incoming"); + info!("got incoming"); let connecting = incoming.accept()?; - tracing::info!("got connecting"); + info!("got connecting"); let tracker = self.clone(); tokio::spawn(async move { let Ok((remote_node_id, alpn, conn)) = iroh_accept_conn(connecting).await else { - tracing::error!("error accepting connection"); + error!("error accepting connection"); return; }; // if we were supporting multiple protocols, we'd need to check the ALPN here. - tracing::info!( + info!( "got connection from {} {:?}", remote_node_id, std::str::from_utf8(&alpn) @@ -917,9 +922,9 @@ impl Tracker { let local_addr = endpoint.local_addr()?; println!("quinn listening on {local_addr:?}"); while let Some(incoming) = endpoint.accept().await { - tracing::info!("got incoming"); + info!("got incoming"); let connecting = incoming.accept()?; - tracing::info!("got connecting"); + info!("got connecting"); let tracker = self.clone(); tokio::spawn(async move { let Ok((remote_node_id, alpn, conn)) = accept_conn(connecting).await else { @@ -927,9 +932,9 @@ impl Tracker { return; }; // if we were supporting multiple protocols, we'd need to check the ALPN here. - tracing::info!("got connection from {} {}", remote_node_id, alpn); + info!("got connection from {} {}", remote_node_id, alpn); if let Err(cause) = tracker.handle_quinn_connection(conn).await { - tracing::error!("error handling connection: {}", cause); + error!("error handling connection: {}", cause); } }); } @@ -944,7 +949,7 @@ impl Tracker { let data = &buf[..len]; let res = self.handle_udp_packet(data, &socket, addr).await; if let Err(cause) = res { - tracing::error!("error handling UDP packet: {}", cause); + error!("error handling UDP packet: {}", cause); } } } @@ -955,21 +960,22 @@ impl Tracker { socket: &tokio::net::UdpSocket, addr: std::net::SocketAddr, ) -> anyhow::Result<()> { - tracing::trace!("got UDP packet from {}, {} bytes", addr, data.len()); + trace!("got UDP packet from {}, {} bytes", addr, data.len()); let request = postcard::from_bytes::(data)?; match request { Request::Announce(announce) => { - tracing::debug!("got announce: {:?}", announce); + debug!("got announce: {:?}", announce); self.handle_announce(announce).await?; } Request::Query(query) => { let mut buf = [0u8; 1200]; - tracing::debug!("handle query: {:?}", query); + debug!("handle query: {:?}", query); let response = self.handle_query(query).await?; let response = Response::QueryResponse(response); - let response = postcard::to_slice(&response, &mut buf)?; - socket.send_to(response, addr).await?; + let response_bytes = postcard::to_slice(&response, &mut buf)?; + trace!("sending response, {:?} {} bytes", response, response_bytes.len()); + socket.send_to(response_bytes, addr).await?; } } Ok(()) @@ -1262,7 +1268,7 @@ impl Tracker { let connection = match res { Ok(connection) => connection, Err(cause) => { - tracing::error!("error dialing host {}: {}", host, cause); + error!("error dialing host {}: {}", host, cause); return Err(cause); } }; @@ -1280,7 +1286,7 @@ impl Tracker { &res, )?; if let Err(cause) = &res { - tracing::debug!("error probing host {}: {}", host, cause); + debug!("error probing host {}: {}", host, cause); } results.push((content, announce_kind, res)); } @@ -1292,6 +1298,11 @@ impl Tracker { self.0.actor.send(ActorMessage::Gc { tx }).await?; rx.await? } + + pub async fn dump(&self) -> anyhow::Result<()> { + self.0.actor.send(ActorMessage::Dump).await?; + Ok(()) + } } /// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol. From e06a6121f1276f4792d512a1895d2be3ccff4f32 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 7 Apr 2025 14:57:05 +0300 Subject: [PATCH 7/8] fmt --- content-discovery/iroh-mainline-tracker/src/tracker.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/content-discovery/iroh-mainline-tracker/src/tracker.rs b/content-discovery/iroh-mainline-tracker/src/tracker.rs index d3cec65..c3f31a6 100644 --- a/content-discovery/iroh-mainline-tracker/src/tracker.rs +++ b/content-discovery/iroh-mainline-tracker/src/tracker.rs @@ -792,7 +792,7 @@ impl Tracker { }; txn.commit()?; let rt = tokio::runtime::Handle::current(); - let handle = std::thread::spawn(move|| { + let handle = std::thread::spawn(move || { rt.block_on(async move { if let Err(cause) = actor.run(db).await { tracing::error!("error in actor: {}", cause); @@ -974,7 +974,11 @@ impl Tracker { let response = self.handle_query(query).await?; let response = Response::QueryResponse(response); let response_bytes = postcard::to_slice(&response, &mut buf)?; - trace!("sending response, {:?} {} bytes", response, response_bytes.len()); + trace!( + "sending response, {:?} {} bytes", + response, + response_bytes.len() + ); socket.send_to(response_bytes, addr).await?; } } From 36e3f5c8e9489e4ca85991dce35b07b06398aac5 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 7 Apr 2025 15:01:57 +0300 Subject: [PATCH 8/8] clippy --- content-discovery/iroh-mainline-tracker/src/tracker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/content-discovery/iroh-mainline-tracker/src/tracker.rs b/content-discovery/iroh-mainline-tracker/src/tracker.rs index c3f31a6..8cf99c2 100644 --- a/content-discovery/iroh-mainline-tracker/src/tracker.rs +++ b/content-discovery/iroh-mainline-tracker/src/tracker.rs @@ -79,7 +79,7 @@ impl Drop for Inner { drop_permit.send(ActorMessage::Stop); } if let Some(handle) = self.handle.take() { - if let Err(_) = handle.join() { + if handle.join().is_err() { error!("error joining actor thread"); } }