Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,13 @@ iroh-base = "0.34"
iroh-blobs = { version = "0.34", features = ["rpc"] }
# explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255
tokio = { version = "1.44.1" }
tokio-stream = { version = "0.1.17" }
mainline = { version = "5.4.0", default-features = false }
pkarr = { version = "3.7.0", default-features = false }
postcard = { version = "1", default-features = false }
quinn = { package = "iroh-quinn", version = "0.13", default-features = false }
anyhow = { version = "1", default-features = false }
futures = { version = "0.3.25" }
rcgen = { version = "0.13.1" }
rustls = { version = "0.23", default-features = false, features = ["ring"] }
genawaiter = { version = "0.99.1", features = ["futures03"] }
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ license = "MIT OR Apache-2.0"
iroh = { workspace = true }
iroh-blobs = { workspace = true }
iroh-mainline-content-discovery = { path = "../iroh-mainline-content-discovery" }
mainline = { version = "2.0.0" }
anyhow = { version = "1", features = ["backtrace"] }
mainline = { workspace = true }
anyhow = { workspace = true, features = ["backtrace"] }
futures = { version = "0.3.25" }
clap = { version = "4", features = ["derive"] }
tempfile = { version = "3.4" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> {
verified: args.verified,
},
};
let res = discovery.query(q).await?;
for sa in res {
let mut res = discovery.query(q).await?;
while let Some(sa) = res.recv().await {
if sa.verify().is_ok() {
println!("{}: {:?}", sa.announce.host, sa.announce.kind);
} else {
Expand Down
16 changes: 8 additions & 8 deletions content-discovery/iroh-mainline-content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ hex = "0.4.3"

# Optional features for the client functionality
tracing = { version = "0.1", optional = true }
quinn = { package = "iroh-quinn", version = "0.13", optional = true }
mainline = { version = "2.0.0", optional = true, features = ["async"] }
anyhow = { version = "1", features = ["backtrace"], optional = true }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std"], optional = true }
futures = { version = "0.3.25", optional = true }
rcgen = { version = "0.13.1", optional = true }
rustls = { version = "0.23", default-features = false, features = ["ring"], optional = true }
quinn = { workspace = true, optional = true }
mainline = { workspace = true, optional = true, features = ["async"] }
anyhow = { workspace = true, features = ["backtrace"], optional = true }
postcard = { workspace = true, features = ["alloc", "use-std"], optional = true }
futures = { workspace = true, optional = true }
rcgen = { workspace = true, optional = true }
rustls = { workspace = true, features = ["ring"], optional = true }
genawaiter = { version = "0.99.1", features = ["futures03"], optional = true }
tokio = { workspace = true, optional = true }
flume = "0.11.0"
tokio-stream = { workspace = true }

# dependencies for the tls utils
der = { version = "0.7", features = ["alloc", "derive"], optional = true }
Expand Down
65 changes: 32 additions & 33 deletions content-discovery/iroh-mainline-content-discovery/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use iroh::{
Endpoint, NodeId,
};
use iroh_blobs::HashAndFormat;
use tokio::sync::mpsc;

use crate::{
protocol::{
Expand Down Expand Up @@ -84,8 +85,8 @@ pub fn to_infohash(haf: HashAndFormat) -> mainline::Id {
}

fn unique_tracker_addrs(
response: flume::IntoIter<Vec<SocketAddr>>,
) -> impl Stream<Item = SocketAddr> {
response: impl IntoIterator<Item = Vec<SocketAddrV4>>,
) -> impl Stream<Item = SocketAddrV4> {
Gen::new(|co| async move {
let mut found = HashSet::new();
for response in response {
Expand Down Expand Up @@ -160,19 +161,19 @@ pub fn query_trackers(
/// Query the mainline DHT for trackers for the given content, then query each tracker for peers.
pub fn query_dht(
endpoint: impl QuinnConnectionProvider<SocketAddr>,
dht: mainline::dht::Dht,
dht: mainline::Dht,
args: Query,
query_parallelism: usize,
) -> impl Stream<Item = anyhow::Result<SignedAnnounce>> {
// let dht = dht.as_async();
let info_hash = to_infohash(args.content);
let response = dht.get_peers(info_hash).unwrap();
let response = dht.get_peers(info_hash);
let unique_tracker_addrs = unique_tracker_addrs(response);
unique_tracker_addrs
.map(move |addr| {
let endpoint = endpoint.clone();
async move {
let hosts = match query_socket_one(endpoint, addr, args).await {
let hosts = match query_socket_one(endpoint, addr.into(), args).await {
Ok(hosts) => hosts.into_iter().map(anyhow::Ok).collect(),
Err(cause) => vec![Err(cause)],
};
Expand All @@ -188,14 +189,14 @@ pub fn query_dht(
/// Note that this should only be called from a publicly reachable node, where port is the port
/// on which the tracker protocol is reachable.
pub fn announce_dht(
dht: mainline::dht::Dht,
dht: mainline::Dht,
content: BTreeSet<HashAndFormat>,
port: u16,
announce_parallelism: usize,
) -> impl Stream<
Item = (
HashAndFormat,
mainline::Result<mainline::Id, mainline::Error>,
std::result::Result<mainline::Id, mainline::errors::PutQueryError>,
),
> {
let dht = dht.as_async();
Expand Down Expand Up @@ -423,13 +424,13 @@ where

#[derive(Debug, Clone)]
pub struct UdpDiscovery {
tx: flume::Sender<UdpActorMessage>,
tx: mpsc::Sender<UdpActorMessage>,
}

impl UdpDiscovery {
/// Create a new UDP discovery service.
pub async fn new(socket: SocketAddr) -> anyhow::Result<Self> {
let (tx, rx) = flume::unbounded();
let (tx, rx) = mpsc::channel(1024);
let socket = tokio::net::UdpSocket::bind(socket).await?;
let _task = tokio::spawn(
UdpActor {
Expand All @@ -447,20 +448,20 @@ impl UdpDiscovery {
/// Query the mainline DHT for trackers for the given content, then query each tracker for peers.
pub async fn query_dht(
&self,
dht: mainline::dht::Dht,
dht: mainline::Dht,
args: Query,
) -> anyhow::Result<impl Stream<Item = SignedAnnounce>> {
let results = self.query(args).await?.into_stream().boxed();
let results = tokio_stream::wrappers::ReceiverStream::new(self.query(args).await?).boxed();
let dht = dht.as_async();
let this = self.clone();
let find_new_trackers = async move {
// delay before querying the DHT
tokio::time::sleep(Duration::from_millis(50)).await;
let info_hash = to_infohash(args.content);
let mut addrs = dht.get_peers(info_hash).unwrap();
let mut addrs = dht.get_peers(info_hash);
while let Some(addrs) = addrs.next().await {
for addr in addrs {
this.add_tracker(addr).await.ok();
this.add_tracker(addr.into()).await.ok();
}
}
future::pending::<SignedAnnounce>().await
Expand All @@ -474,30 +475,28 @@ impl UdpDiscovery {
pub async fn add_tracker(&self, tracker: SocketAddr) -> anyhow::Result<()> {
Ok(self
.tx
.send_async(UdpActorMessage::AddTracker { tracker })
.send(UdpActorMessage::AddTracker { tracker })
.await?)
}

/// Manually remove a tracker from the list of trackers to query.
pub async fn remove_tracker(&self, tracker: SocketAddr) -> anyhow::Result<()> {
Ok(self
.tx
.send_async(UdpActorMessage::RemoveTracker { tracker })
.send(UdpActorMessage::RemoveTracker { tracker })
.await?)
}

pub async fn query(&self, query: Query) -> anyhow::Result<flume::Receiver<SignedAnnounce>> {
pub async fn query(&self, query: Query) -> anyhow::Result<mpsc::Receiver<SignedAnnounce>> {
let (tx, rx) = oneshot::channel();
self.tx
.send_async(UdpActorMessage::Query { query, tx })
.await?;
self.tx.send(UdpActorMessage::Query { query, tx }).await?;
Ok(rx.await?)
}

pub async fn announce_once(&self, announce: SignedAnnounce) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send_async(UdpActorMessage::AnnounceOnce { announce, tx })
.send(UdpActorMessage::AnnounceOnce { announce, tx })
.await?;
rx.await?;
Ok(())
Expand All @@ -514,17 +513,17 @@ impl UdpDiscovery {
})
.into();
self.tx
.send_async(UdpActorMessage::StoreAnnounceTask { announce, task })
.send(UdpActorMessage::StoreAnnounceTask { announce, task })
.await?;
Ok(())
}
}

struct UdpActor {
socket: tokio::net::UdpSocket,
rx: flume::Receiver<UdpActorMessage>,
rx: mpsc::Receiver<UdpActorMessage>,
trackers: BTreeSet<SocketAddr>,
listeners: BTreeMap<Query, Vec<flume::Sender<SignedAnnounce>>>,
listeners: BTreeMap<Query, Vec<mpsc::Sender<SignedAnnounce>>>,
announces: BTreeMap<(HashAndFormat, AnnounceKind), AbortingJoinHandle<()>>,
}

Expand All @@ -548,7 +547,7 @@ impl<T> Drop for AbortingJoinHandle<T> {
enum UdpActorMessage {
Query {
query: Query,
tx: oneshot::Sender<flume::Receiver<SignedAnnounce>>,
tx: oneshot::Sender<mpsc::Receiver<SignedAnnounce>>,
},
AddTracker {
tracker: SocketAddr,
Expand All @@ -572,11 +571,11 @@ impl UdpActor {
let mut buf = [0u8; MAX_MSG_SIZE];
loop {
tokio::select! {
msg = self.rx.recv_async() => {
msg = self.rx.recv() => {
tracing::trace!("got msg {:?}", msg);
match msg {
Ok(UdpActorMessage::Query { query, tx }) => {
let (announce_tx, announce_rx) = flume::bounded(1024);
Some(UdpActorMessage::Query { query, tx }) => {
let (announce_tx, announce_rx) = mpsc::channel(1024);
self.listeners.entry(query).or_default().push(announce_tx);
let msg = Request::Query(query);
let msg = postcard::to_slice(&msg, &mut buf).unwrap();
Expand All @@ -586,7 +585,7 @@ impl UdpActor {
}
tx.send(announce_rx).ok();
}
Ok(UdpActorMessage::AddTracker { tracker }) => {
Some(UdpActorMessage::AddTracker { tracker }) => {
if self.trackers.insert(tracker) {
for query in self.listeners.keys() {
let msg = Request::Query(*query);
Expand All @@ -595,21 +594,21 @@ impl UdpActor {
}
}
}
Ok(UdpActorMessage::RemoveTracker { tracker }) => {
Some(UdpActorMessage::RemoveTracker { tracker }) => {
self.trackers.remove(&tracker);
}
Ok(UdpActorMessage::StoreAnnounceTask { announce, task }) => {
Some(UdpActorMessage::StoreAnnounceTask { announce, task }) => {
let key = (announce.announce.content, announce.announce.kind);
self.announces.insert(key, task);
}
Ok(UdpActorMessage::AnnounceOnce { announce, tx }) => {
Some(UdpActorMessage::AnnounceOnce { announce, tx }) => {
let msg = postcard::to_slice(&Request::Announce(announce), &mut buf).unwrap();
for tracker in &self.trackers {
self.socket.send_to(msg, tracker).await.ok();
}
tx.send(()).ok();
}
Err(flume::RecvError::Disconnected) => break,
None => break,
}
},
res = self.socket.recv_from(&mut buf) => {
Expand Down Expand Up @@ -642,7 +641,7 @@ impl UdpActor {
}
let mut to_remove = Vec::new();
for (i, sender) in senders.iter().enumerate() {
if sender.send_async(sa).await.is_err() {
if sender.send(sa).await.is_err() {
to_remove.push(i);
}
}
Expand Down
11 changes: 5 additions & 6 deletions content-discovery/iroh-mainline-tracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = { version = "1", features = ["backtrace"] }
anyhow = { workspace = true, features = ["backtrace"] }
# needs to keep updated with the dep of iroh-blobs
bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false }
bytes = "1"
Expand All @@ -20,9 +20,9 @@ hex = "0.4.3"
humantime = "2.1.0"
iroh = { workspace = true }
iroh-blobs = { workspace = true }
mainline = { version = "2.0.0", features = ["async"] }
pkarr = { version = "2.0.3", features = ["async"] }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std"] }
mainline = { workspace = true, features = ["async"] }
pkarr = { workspace = true }
postcard = { workspace = true, features = ["alloc", "use-std"] }
rand = "0.8"
rcgen = "0.12.0"
redb = "1.5.0"
Expand All @@ -38,10 +38,9 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ttl_cache = "0.5.1"
url = "2.5.0"
flume = "0.11.0"
genawaiter = { version = "0.99.1", features = ["futures03"] }
iroh-mainline-content-discovery = { path = "../iroh-mainline-content-discovery", features = ["client"] }
quinn = { package = "iroh-quinn", version = "0.13" }
quinn = { workspace = true }

clap = { version = "4", features = ["derive"], optional = true }
serde-big-array = "0.5.1"
Expand Down
14 changes: 5 additions & 9 deletions content-discovery/iroh-mainline-tracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use clap::Parser;
use iroh::{discovery::pkarr::dht::DhtDiscovery, Endpoint, NodeId};
use iroh::{Endpoint, NodeId};
use iroh_blobs::util::fs::load_secret_key;
use iroh_mainline_content_discovery::{protocol::ALPN, tls_utils};
use iroh_mainline_tracker::{
Expand Down Expand Up @@ -62,16 +62,11 @@ async fn await_relay_region(endpoint: &Endpoint) -> anyhow::Result<()> {
async fn create_endpoint(
key: iroh::SecretKey,
ipv4_addr: SocketAddrV4,
publish: bool,
) -> anyhow::Result<Endpoint> {
let mainline_discovery = if publish {
DhtDiscovery::builder().secret_key(key.clone()).build()?
} else {
DhtDiscovery::default()
};
iroh::Endpoint::builder()
.secret_key(key)
.discovery(Box::new(mainline_discovery))
.discovery_dht()
.discovery_n0()
.alpns(vec![ALPN.to_vec()])
.bind_addr_v4(ipv4_addr)
.bind()
Expand Down Expand Up @@ -133,8 +128,9 @@ async fn server(args: Args) -> anyhow::Result<()> {
let quinn_endpoint = quinn::Endpoint::server(server_config, quinn_bind_addr)?;
// set the quinn port to the actual port we bound to so the DHT will announce it correctly
options.quinn_port = quinn_endpoint.local_addr()?.port();
let iroh_endpoint = create_endpoint(key.clone(), options.iroh_ipv4_addr, true).await?;
let iroh_endpoint = create_endpoint(key.clone(), options.iroh_ipv4_addr).await?;
let db = Tracker::new(options, iroh_endpoint.clone())?;
db.dump().await?;
await_relay_region(&iroh_endpoint).await?;
let addr = iroh_endpoint.node_addr().await?;
log!("listening on {:?}", addr);
Expand Down
Loading