diff --git a/content-discovery/Cargo.toml b/content-discovery/Cargo.toml index 7c8230a..6673e63 100644 --- a/content-discovery/Cargo.toml +++ b/content-discovery/Cargo.toml @@ -27,6 +27,6 @@ missing_debug_implementations = "warn" unused-async = "warn" [workspace.dependencies] -iroh = "0.31" -iroh-base = "0.31" -iroh-blobs = { version = "0.31", features = ["rpc"] } +iroh = { version ="0.32", features = ["discovery-pkarr-dht"] } +iroh-base = "0.32" +iroh-blobs = { version = "0.32", features = ["rpc"] } diff --git a/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs b/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs index b47dd16..165fd06 100644 --- a/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs +++ b/content-discovery/iroh-mainline-content-discovery-cli/src/main.rs @@ -66,7 +66,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> { let connection = iroh_endpoint .connect(tracker, iroh_mainline_content_discovery::protocol::ALPN) .await?; - iroh_mainline_content_discovery::announce(connection, signed_announce).await?; + iroh_mainline_content_discovery::announce_iroh(connection, signed_announce).await?; } } if !args.quic_tracker.is_empty() { @@ -82,7 +82,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> { for tracker in args.quic_tracker { println!("announcing via quic to {:?}: {}", tracker, content); let connection = quinn_endpoint.connect(tracker, "localhost")?.await?; - iroh_mainline_content_discovery::announce(connection, signed_announce).await?; + iroh_mainline_content_discovery::announce_quinn(connection, signed_announce).await?; } } diff --git a/content-discovery/iroh-mainline-content-discovery/Cargo.toml b/content-discovery/iroh-mainline-content-discovery/Cargo.toml index 45559f8..7e073fd 100644 --- a/content-discovery/iroh-mainline-content-discovery/Cargo.toml +++ b/content-discovery/iroh-mainline-content-discovery/Cargo.toml @@ -22,7 +22,7 @@ hex = "0.4.3" # Optional features for the client functionality tracing = { version = "0.1", optional = true } -iroh-quinn = { version = "0.12", optional = true } +iroh-quinn = { version = "0.13", optional = true } mainline = { version = "2.0.0", optional = true, features = ["async"] } anyhow = { version = "1", features = ["backtrace"], optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std"], optional = true } diff --git a/content-discovery/iroh-mainline-content-discovery/src/client.rs b/content-discovery/iroh-mainline-content-discovery/src/client.rs index 89ec3ae..0adcf48 100644 --- a/content-discovery/iroh-mainline-content-discovery/src/client.rs +++ b/content-discovery/iroh-mainline-content-discovery/src/client.rs @@ -32,7 +32,30 @@ use crate::protocol::{ /// `tracker` is the node id of the tracker to announce to. It must understand the [TRACKER_ALPN] protocol. /// `content` is the content to announce. /// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it. -pub async fn announce( +pub async fn announce_quinn( + connection: iroh_quinn::Connection, + signed_announce: SignedAnnounce, +) -> anyhow::Result<()> { + let (mut send, mut recv) = connection.open_bi().await?; + tracing::debug!("opened bi stream"); + let request = Request::Announce(signed_announce); + let request = postcard::to_stdvec(&request)?; + tracing::debug!("sending announce"); + send.write_all(&request).await?; + send.finish()?; + let _response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?; + Ok(()) +} + +/// Announce to a tracker. +/// +/// You can only announce content you yourself claim to have, to avoid spamming other nodes. +/// +/// `endpoint` is the iroh endpoint to use for announcing. +/// `tracker` is the node id of the tracker to announce to. It must understand the [TRACKER_ALPN] protocol. +/// `content` is the content to announce. +/// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it. +pub async fn announce_iroh( connection: iroh::endpoint::Connection, signed_announce: SignedAnnounce, ) -> anyhow::Result<()> { @@ -80,7 +103,7 @@ async fn query_socket_one( args: Query, ) -> anyhow::Result> { let connection = endpoint.connect(addr).await?; - let result = query(connection, args).await?; + let result = query_quinn(connection, args).await?; Ok(result.hosts) } @@ -90,7 +113,7 @@ async fn query_iroh_one( args: Query, ) -> anyhow::Result> { let connection = endpoint.connect(node_id, ALPN).await?; - let result = query(connection, args).await?; + let result = query_iroh(connection, args).await?; Ok(result.hosts) } @@ -185,9 +208,29 @@ pub fn announce_dht( } /// Assume an existing connection to a tracker and query it for peers for some content. -pub async fn query( +pub async fn query_iroh( connection: iroh::endpoint::Connection, args: Query, +) -> anyhow::Result { + tracing::info!("connected to {:?}", connection.remote_node_id()?); + let (mut send, mut recv) = connection.open_bi().await?; + tracing::info!("opened bi stream"); + let request = Request::Query(args); + let request = postcard::to_stdvec(&request)?; + tracing::info!("sending query"); + send.write_all(&request).await?; + send.finish()?; + let response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?; + let response = postcard::from_bytes::(&response)?; + Ok(match response { + Response::QueryResponse(response) => response, + }) +} + +/// Assume an existing connection to a tracker and query it for peers for some content. +pub async fn query_quinn( + connection: iroh_quinn::Connection, + args: Query, ) -> anyhow::Result { tracing::info!("connected to {:?}", connection.remote_address()); let (mut send, mut recv) = connection.open_bi().await?; @@ -283,14 +326,23 @@ pub async fn connect( tracker: &TrackerId, local_ipv4_addr: SocketAddrV4, local_ipv6_addr: SocketAddrV6, -) -> anyhow::Result { +) -> anyhow::Result { match tracker { - TrackerId::Quinn(tracker) => connect_socket(*tracker, local_ipv4_addr.into()).await, - TrackerId::Iroh(tracker) => connect_iroh(*tracker, local_ipv4_addr, local_ipv6_addr).await, + TrackerId::Quinn(tracker) => Ok(Connection::Quinn( + connect_socket(*tracker, local_ipv4_addr.into()).await?, + )), + TrackerId::Iroh(tracker) => Ok(Connection::Iroh( + connect_iroh(*tracker, local_ipv4_addr, local_ipv6_addr).await?, + )), TrackerId::Udp(_) => anyhow::bail!("can not connect to udp tracker"), } } +pub enum Connection { + Iroh(iroh::endpoint::Connection), + Quinn(iroh_quinn::Connection), +} + /// Create a iroh endpoint and connect to a tracker using the [crate::protocol::ALPN] protocol. async fn connect_iroh( tracker: NodeId, @@ -307,13 +359,13 @@ async fn connect_iroh( Ok(connection) } -/// Create a quinn endpoint and connect to a tracker using the [crate::protocol::ALPN] protocol. +/// Create a quinn endpoint and connect to a tracker using the [crate] protocol. async fn connect_socket( tracker: SocketAddr, local_addr: SocketAddr, -) -> anyhow::Result { +) -> anyhow::Result { let endpoint = create_quinn_client(local_addr, vec![ALPN.to_vec()], false)?; - tracing::info!("trying to connect to tracker at {:?}", tracker); + tracing::info!("trying t?o )connect to tracker at {:?}", tracker); let connection = endpoint.connect(tracker, "localhost")?.await?; Ok(connection) } diff --git a/content-discovery/iroh-mainline-tracker/Cargo.toml b/content-discovery/iroh-mainline-tracker/Cargo.toml index 5e4f840..365931a 100644 --- a/content-discovery/iroh-mainline-tracker/Cargo.toml +++ b/content-discovery/iroh-mainline-tracker/Cargo.toml @@ -23,11 +23,12 @@ iroh-blobs = { workspace = true } mainline = { version = "2.0.0", features = ["async"] } pkarr = { version = "1.0.1", features = ["async"] } postcard = { version = "1", default-features = false, features = ["alloc", "use-std"] } -iroh-quinn = "0.12" +iroh-quinn = "0.13" rand = "0.8" rcgen = "0.12.0" redb = "1.5.0" rustls = "0.21" +rustls-pki-types = "1.11" serde = { version = "1", features = ["derive"] } serde_json = "1.0.107" tempfile = "3.4" diff --git a/content-discovery/iroh-mainline-tracker/src/io.rs b/content-discovery/iroh-mainline-tracker/src/io.rs index 36b0fbc..0fac89a 100644 --- a/content-discovery/iroh-mainline-tracker/src/io.rs +++ b/content-discovery/iroh-mainline-tracker/src/io.rs @@ -91,7 +91,7 @@ pub fn log_connection_attempt( path: &Option, host: &NodeId, t0: Instant, - outcome: &anyhow::Result, + outcome: &anyhow::Result, ) -> anyhow::Result<()> { if let Some(path) = path { let now = SystemTime::now() diff --git a/content-discovery/iroh-mainline-tracker/src/iroh_blobs_util.rs b/content-discovery/iroh-mainline-tracker/src/iroh_blobs_util.rs index 771991d..f75b42e 100644 --- a/content-discovery/iroh-mainline-tracker/src/iroh_blobs_util.rs +++ b/content-discovery/iroh-mainline-tracker/src/iroh_blobs_util.rs @@ -19,7 +19,7 @@ use rand::Rng; /// This is just reading the size header and then immediately closing the connection. /// It can be used to check if a peer has any data at all. pub async fn unverified_size( - connection: &iroh_quinn::Connection, + connection: &iroh::endpoint::Connection, hash: &Hash, ) -> anyhow::Result<(u64, Stats)> { let request = iroh_blobs::protocol::GetRequest::new( @@ -42,7 +42,7 @@ pub async fn unverified_size( /// This asks for the last chunk of the blob and validates the response. /// Note that this does not validate that the peer has all the data. pub async fn verified_size( - connection: &iroh_quinn::Connection, + connection: &iroh::endpoint::Connection, hash: &Hash, ) -> anyhow::Result<(u64, Stats)> { tracing::debug!("Getting verified size of {}", hash.to_hex()); @@ -81,7 +81,7 @@ pub async fn verified_size( } pub async fn get_hash_seq_and_sizes( - connection: &iroh_quinn::Connection, + connection: &iroh::endpoint::Connection, hash: &Hash, max_size: u64, ) -> anyhow::Result<(HashSeq, Arc<[u64]>)> { @@ -135,7 +135,7 @@ pub async fn get_hash_seq_and_sizes( /// Probe for a single chunk of a blob. pub async fn chunk_probe( - connection: &iroh_quinn::Connection, + connection: &iroh::endpoint::Connection, hash: &Hash, chunk: ChunkNum, ) -> anyhow::Result { diff --git a/content-discovery/iroh-mainline-tracker/src/main.rs b/content-discovery/iroh-mainline-tracker/src/main.rs index ecb70cb..8ace4f4 100644 --- a/content-discovery/iroh-mainline-tracker/src/main.rs +++ b/content-discovery/iroh-mainline-tracker/src/main.rs @@ -10,7 +10,7 @@ use std::{ }; use clap::Parser; -use iroh::{discovery::pkarr::dht::DhtDiscovery, endpoint::get_remote_node_id, Endpoint, NodeId}; +use iroh::{discovery::pkarr::dht::DhtDiscovery, Endpoint, NodeId}; use iroh_blobs::util::fs::load_secret_key; use iroh_mainline_content_discovery::protocol::ALPN; use iroh_mainline_tracker::{ @@ -24,8 +24,6 @@ use iroh_mainline_tracker::{ use crate::args::Args; -use iroh_mainline_tracker::tracker::get_alpn; - static VERBOSE: AtomicBool = AtomicBool::new(false); fn set_verbose(verbose: bool) { @@ -82,11 +80,11 @@ async fn create_endpoint( /// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol. pub async fn accept_conn( - mut conn: iroh_quinn::Connecting, -) -> anyhow::Result<(NodeId, String, iroh_quinn::Connection)> { - let alpn = get_alpn(&mut conn).await?; + mut conn: iroh::endpoint::Connecting, +) -> anyhow::Result<(NodeId, String, iroh::endpoint::Connection)> { + let alpn = String::from_utf8(conn.alpn().await?)?; let conn = conn.await?; - let peer_id = get_remote_node_id(&conn)?; + let peer_id = conn.remote_node_id()?; Ok((peer_id, alpn, conn)) } diff --git a/content-discovery/iroh-mainline-tracker/src/tracker.rs b/content-discovery/iroh-mainline-tracker/src/tracker.rs index 4098ed5..bc2ad7f 100644 --- a/content-discovery/iroh-mainline-tracker/src/tracker.rs +++ b/content-discovery/iroh-mainline-tracker/src/tracker.rs @@ -6,7 +6,7 @@ use std::{ }; use bao_tree::ChunkNum; -use iroh::{endpoint::get_remote_node_id, Endpoint, NodeId}; +use iroh::{Endpoint, NodeId}; use iroh_blobs::{ get::{fsm::EndBlobNext, Stats}, hashseq::HashSeq, @@ -876,7 +876,7 @@ impl Tracker { remote_node_id, std::str::from_utf8(&alpn) ); - if let Err(cause) = tracker.handle_connection(conn).await { + if let Err(cause) = tracker.handle_iroh_connection(conn).await { tracing::error!("error handling connection: {}", cause); } }); @@ -886,7 +886,7 @@ impl Tracker { pub async fn quinn_accept_loop(self, endpoint: iroh_quinn::Endpoint) -> std::io::Result<()> { let local_addr = endpoint.local_addr()?; - println!("quinn listening on {}", local_addr); + println!("quinn listening on {local_addr:?}"); while let Some(incoming) = endpoint.accept().await { tracing::info!("got incoming"); let connecting = incoming.accept()?; @@ -899,7 +899,7 @@ impl Tracker { }; // if we were supporting multiple protocols, we'd need to check the ALPN here. tracing::info!("got connection from {} {}", remote_node_id, alpn); - if let Err(cause) = tracker.handle_connection(conn).await { + if let Err(cause) = tracker.handle_quinn_connection(conn).await { tracing::error!("error handling connection: {}", cause); } }); @@ -947,7 +947,7 @@ impl Tracker { } /// Handle a single incoming connection on the tracker ALPN. - pub async fn handle_connection( + pub async fn handle_quinn_connection( &self, connection: iroh_quinn::Connection, ) -> anyhow::Result<()> { @@ -975,6 +975,35 @@ impl Tracker { Ok(()) } + /// Handle a single incoming connection on the tracker ALPN. + pub async fn handle_iroh_connection( + &self, + connection: iroh::endpoint::Connection, + ) -> anyhow::Result<()> { + tracing::debug!("calling accept_bi"); + let (mut send, mut recv) = connection.accept_bi().await?; + tracing::debug!("got bi stream"); + let request = recv.read_to_end(REQUEST_SIZE_LIMIT).await?; + let request = postcard::from_bytes::(&request)?; + match request { + Request::Announce(announce) => { + tracing::debug!("got announce: {:?}", announce); + self.handle_announce(announce).await?; + send.finish()?; + } + + Request::Query(query) => { + tracing::debug!("handle query: {:?}", query); + let response = self.handle_query(query).await?; + let response = Response::QueryResponse(response); + let response = postcard::to_stdvec(&response)?; + send.write_all(&response).await?; + send.finish()?; + } + } + Ok(()) + } + async fn get_size(&self, hash: Hash) -> anyhow::Result> { let (tx, rx) = oneshot::channel(); self.0 @@ -1017,7 +1046,7 @@ impl Tracker { async fn get_or_insert_size( &self, - connection: &iroh_quinn::Connection, + connection: &iroh::endpoint::Connection, hash: &Hash, ) -> anyhow::Result { let size_opt = self.get_size(*hash).await?; @@ -1034,7 +1063,7 @@ impl Tracker { async fn get_or_insert_sizes( &self, - connection: &iroh_quinn::Connection, + connection: &iroh::endpoint::Connection, hash: &Hash, ) -> anyhow::Result<(HashSeq, Arc<[u64]>)> { let sizes = self.get_sizes(*hash).await?; @@ -1052,7 +1081,7 @@ impl Tracker { async fn probe( &self, - connection: &iroh_quinn::Connection, + connection: &iroh::endpoint::Connection, host: &NodeId, content: &HashAndFormat, probe_kind: ProbeKind, @@ -1245,8 +1274,8 @@ async fn accept_conn( ) -> anyhow::Result<(NodeId, String, iroh_quinn::Connection)> { let alpn = get_alpn(&mut conn).await?; let conn = conn.await?; - let peer_id = get_remote_node_id(&conn)?; - Ok((peer_id, alpn, conn)) + let node_id = get_remote_node_id(&conn)?; + Ok((node_id, alpn, conn)) } /// Extract the ALPN protocol from the peer's TLS certificate. @@ -1261,12 +1290,32 @@ pub async fn get_alpn(connecting: &mut iroh_quinn::Connecting) -> anyhow::Result } } +pub fn get_remote_node_id(connection: &iroh_quinn::Connection) -> anyhow::Result { + let data = connection.peer_identity(); + match data { + None => anyhow::bail!("no peer certificate found"), + Some(data) => match data.downcast::>() { + Ok(certs) => { + if certs.len() != 1 { + anyhow::bail!( + "expected a single peer certificate, but {} found", + certs.len() + ); + } + let cert = tls::certificate::parse(&certs[0])?; + Ok(cert.peer_id()) + } + Err(_) => anyhow::bail!("invalid peer certificate"), + }, + } +} + /// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol. async fn iroh_accept_conn( mut conn: iroh::endpoint::Connecting, -) -> anyhow::Result<(NodeId, Vec, iroh_quinn::Connection)> { +) -> anyhow::Result<(NodeId, Vec, iroh::endpoint::Connection)> { let alpn = conn.alpn().await?; let conn = conn.await?; - let peer_id = get_remote_node_id(&conn)?; - Ok((peer_id, alpn, conn)) + let node_id = conn.remote_node_id()?; + Ok((node_id, alpn, conn)) } diff --git a/content-discovery/tls/Cargo.toml b/content-discovery/tls/Cargo.toml index 08a958c..6790d65 100644 --- a/content-discovery/tls/Cargo.toml +++ b/content-discovery/tls/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT OR Apache-2.0" iroh-base = { workspace = true } der = { version = "0.7", features = ["alloc", "derive"] } derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] } -quinn = { package = "iroh-quinn", version = "0.12.0" } +quinn = { package = "iroh-quinn", version = "0.13.0" } rand = "0.8.5" rcgen = "0.13" ring = "0.17" diff --git a/h3-iroh/Cargo.toml b/h3-iroh/Cargo.toml index aac64fe..62da30e 100644 --- a/h3-iroh/Cargo.toml +++ b/h3-iroh/Cargo.toml @@ -15,8 +15,8 @@ http-body = { version = "1", optional = true } http-body-util = { version = "0.1", optional = true } hyper = { version = "1.5", optional = true } hyper-util = { version = "0.1", optional = true } -iroh = "0.31" -iroh-base = { version = "0.31", features = ["ticket"] } +iroh = "0.32" +iroh-base = { version = "0.32", features = ["ticket"] } tokio = { version = "1", features = ["io-util"], default-features = false} tokio-util = "0.7" tower = { version = "0.5", optional = true } diff --git a/h3-iroh/examples/server.rs b/h3-iroh/examples/server.rs index a4c0811..8f17015 100644 --- a/h3-iroh/examples/server.rs +++ b/h3-iroh/examples/server.rs @@ -12,7 +12,7 @@ use h3::error::ErrorLevel; use h3::quic::BidiStream; use h3::server::RequestStream; use http::{Request, StatusCode}; -use iroh::endpoint::{self, Incoming}; +use iroh::endpoint::Incoming; use iroh_base::ticket::NodeTicket; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -79,7 +79,7 @@ async fn main() -> Result<()> { async fn handle_connection(incoming: Incoming, root: Arc>) -> Result<()> { let conn = incoming.accept()?.await?; - let remote_node_id = endpoint::get_remote_node_id(&conn)?; + let remote_node_id = conn.remote_node_id()?; let span = Span::current(); span.record("remote_node_id", remote_node_id.fmt_short()); info!("new connection"); diff --git a/h3-iroh/src/lib.rs b/h3-iroh/src/lib.rs index fabaf64..3ff9689 100644 --- a/h3-iroh/src/lib.rs +++ b/h3-iroh/src/lib.rs @@ -492,7 +492,7 @@ impl quic::RecvStream for RecvStream { .as_ref() .unwrap() .id() - .0 + .index() .try_into() .expect("invalid stream id") } @@ -639,7 +639,7 @@ where .as_ref() .unwrap() .id() - .0 + .index() .try_into() .expect("invalid stream id") } diff --git a/iroh-dag-sync/Cargo.toml b/iroh-dag-sync/Cargo.toml index 3ea32f3..b73ef82 100644 --- a/iroh-dag-sync/Cargo.toml +++ b/iroh-dag-sync/Cargo.toml @@ -4,10 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] -iroh-blobs = "0.31" -iroh-gossip = "0.31" -iroh = "0.31" -iroh-base = { version ="0.31", features = ["ticket"] } +iroh-blobs = "0.32" +iroh-gossip = "0.32" +iroh = "0.32" +iroh-base = { version ="0.32", features = ["ticket"] } iroh-car = "0.5.0" redb = "2.1.1" clap = { version = "4.5.7", features = ["derive"] } @@ -21,7 +21,6 @@ iroh-io = "0.6" tokio-util = { version = "0.7.11", features = ["rt"] } bao-tree = "0.13.0" genawaiter = "0.99.1" -iroh-quinn = "0.12" bytes = "1.6.0" hex = "0.4.3" ron = "0.8.1" diff --git a/iroh-pkarr-naming-system/Cargo.toml b/iroh-pkarr-naming-system/Cargo.toml index edb4b3e..0dd4a55 100644 --- a/iroh-pkarr-naming-system/Cargo.toml +++ b/iroh-pkarr-naming-system/Cargo.toml @@ -8,8 +8,8 @@ edition = "2021" [dependencies] anyhow = "1.0.79" derive_more = "0.99.17" -iroh = "0.31" -iroh-blobs = "0.31" +iroh = "0.32" +iroh-blobs = "0.32" pkarr = { version = "2.3.1", features = ["async", "dht"] } tokio = "1.35.1" tokio-util = "0.7.12" diff --git a/iroh-s3-bao-store/Cargo.toml b/iroh-s3-bao-store/Cargo.toml index 62248a0..9ba3981 100644 --- a/iroh-s3-bao-store/Cargo.toml +++ b/iroh-s3-bao-store/Cargo.toml @@ -21,8 +21,8 @@ flume = "0.11.0" futures-lite = "2.3" hex = "0.4.3" indicatif = "0.17.7" -iroh = "0.31" -iroh-blobs = "0.31" +iroh = "0.32" +iroh-blobs = "0.32" iroh-io = { version = "0.6", features = ["x-http"] } num_cpus = "1.16.0" rand = "0.8.5"