From 0d2a094a8df8a18ec2f073fb9187297c0109195e Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 1 Dec 2023 17:33:26 +0100 Subject: [PATCH 1/5] refactor: Upgrade to hyper 1.0 This upgrades our direct use of hyper to version 1.0 of the crate. --- Cargo.lock | 154 +++++++++++++++++++++++++------ iroh-metrics/Cargo.toml | 6 +- iroh-metrics/src/metrics.rs | 5 +- iroh-metrics/src/service.rs | 65 +++++++------ iroh-net/Cargo.toml | 6 +- iroh-net/src/bin/derper.rs | 143 +++++++++++++--------------- iroh-net/src/derp/http.rs | 25 ++--- iroh-net/src/derp/http/client.rs | 113 ++++++++++------------- iroh-net/src/derp/http/server.rs | 134 ++++++++++++++------------- 9 files changed, 368 insertions(+), 283 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a416a8f5f7..c6396e66b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,7 +229,7 @@ version = "0.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb8867f378f33f78a811a8eb9bf108ad99430d7aad43315dd9319c827ef6247" dependencies = [ - "http", + "http 0.2.11", "log", "url", "wildmatch", @@ -252,9 +252,9 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.11", + "http-body 0.4.5", + "hyper 0.14.27", "itoa", "matchit", "memchr", @@ -278,8 +278,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.5", "mime", "rustversion", "tower-layer", @@ -1700,7 +1700,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", "indexmap 1.9.3", "slab", "tokio", @@ -1708,6 +1708,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "1.8.2" @@ -1822,6 +1841,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1829,7 +1859,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.11", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1880,9 +1933,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.21", + "http 0.2.11", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -1894,6 +1947,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.0", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1901,8 +1974,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.11", + "hyper 0.14.27", "rustls", "tokio", "tokio-rustls", @@ -1914,12 +1987,32 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.27", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca339002caeb0d159cc6e023dff48e199f081e42fa039895c7c6f38b37f2e9d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.0.1", + "pin-project-lite", + "socket2 0.5.5", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -1968,8 +2061,8 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http", - "hyper", + "http 0.2.11", + "hyper 0.14.27", "log", "rand", "tokio", @@ -2171,7 +2264,7 @@ dependencies = [ "futures", "genawaiter", "hex", - "http-body", + "http-body 0.4.5", "iroh-base", "iroh-io", "iroh-test", @@ -2245,8 +2338,11 @@ dependencies = [ name = "iroh-metrics" version = "0.11.0" dependencies = [ + "anyhow", "erased_set", - "hyper", + "http-body-util", + "hyper 1.0.1", + "hyper-util", "once_cell", "prometheus-client", "struct_iterable", @@ -2277,8 +2373,10 @@ dependencies = [ "governor", "hex", "hostname", - "http", - "hyper", + "http 1.0.0", + "http-body-util", + "hyper 1.0.1", + "hyper-util", "igd", "iroh-base", "iroh-metrics", @@ -3892,10 +3990,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.21", + "http 0.2.11", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-rustls", "ipnet", "js-sys", @@ -5093,10 +5191,10 @@ dependencies = [ "axum", "base64 0.21.5", "bytes", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.21", + "http 0.2.11", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-timeout", "percent-encoding", "pin-project", diff --git a/iroh-metrics/Cargo.toml b/iroh-metrics/Cargo.toml index adb74f1b69..96bd04c7ab 100644 --- a/iroh-metrics/Cargo.toml +++ b/iroh-metrics/Cargo.toml @@ -18,9 +18,13 @@ workspace = true prometheus-client = { version = "0.22.0", optional = true } once_cell = "1.17.0" tracing = "0.1" -hyper = { version = "0.14.25", features = ["server", "client", "http1", "tcp"] } +hyper = { version = "1", features = ["server", "client", "http1"] } erased_set = "0.7" struct_iterable = "0.1" +http-body-util = "0.1.0" +tokio = "1" +anyhow = "1.0.75" +hyper-util = { version = "0.1.1", features = ["full"] } [dev-dependencies] tokio = { version = "1", features = ["io-util", "sync", "rt", "net", "fs", "macros", "time", "test-util"] } diff --git a/iroh-metrics/src/metrics.rs b/iroh-metrics/src/metrics.rs index adac3a9cbe..b8d57e5d23 100644 --- a/iroh-metrics/src/metrics.rs +++ b/iroh-metrics/src/metrics.rs @@ -44,13 +44,12 @@ //! inc!(Metrics, things_added); //! ``` -#[cfg(feature = "metrics")] -use hyper::Error; +// TODO: move cfg to lib.rs #[cfg(feature = "metrics")] use std::net::SocketAddr; /// Start a server to serve the OpenMetrics endpoint. #[cfg(feature = "metrics")] -pub async fn start_metrics_server(addr: SocketAddr) -> Result<(), Error> { +pub async fn start_metrics_server(addr: SocketAddr) -> anyhow::Result<()> { crate::service::run(addr).await } diff --git a/iroh-metrics/src/service.rs b/iroh-metrics/src/service.rs index 8e3721b1e2..2fe01d2c74 100644 --- a/iroh-metrics/src/service.rs +++ b/iroh-metrics/src/service.rs @@ -1,43 +1,42 @@ -use std::{future::Future, io, net::SocketAddr, pin::Pin}; +use std::net::SocketAddr; -use hyper::{ - service::{make_service_fn, service_fn}, - Body, Error, Request, Response, Server, -}; +use anyhow::{anyhow, Result}; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use tokio::net::TcpListener; -use tracing::info; +use tracing::{error, info}; use crate::core::Core; /// Start a HTTP server to report metrics. -pub async fn run(metrics_addr: SocketAddr) -> Result<(), Error> { +pub async fn run(metrics_addr: SocketAddr) -> Result<()> { info!("Starting metrics server on {metrics_addr}"); - Server::bind(&metrics_addr) - .serve(make_service_fn(move |_conn| async move { - let handler = make_handler(); - Ok::<_, io::Error>(service_fn(handler)) - })) - .await + let listener = TcpListener::bind(metrics_addr).await?; + loop { + let (stream, _addr) = listener.accept().await?; + let io = hyper_util::rt::TokioIo::new(stream); + tokio::spawn(async move { + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service_fn(handler)) + .await + { + error!("Error serving metrics connection: {err:#}"); + } + }); + } } -/// This function returns an HTTP handler fn that will respond with the -/// OpenMetrics encoding of our metrics. -fn make_handler( -) -> impl Fn(Request) -> Pin>> + Send>> { - // This closure accepts a request and responds with the OpenMetrics encoding of our metrics. - move |_req: Request| { - Box::pin(async move { - let core = Core::get() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "metrics disabled"))?; - core.encode() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - .map(|r| { - let body = Body::from(r); - Response::builder() - .header(hyper::header::CONTENT_TYPE, "text/plain; charset=utf-8") - .body(body) - .expect("Failed to build response") - }) - }) - } +/// HTTP handler that will respond with the OpenMetrics encoding of our metrics. +async fn handler( + _req: Request, +) -> Result>> { + let core = Core::get().ok_or_else(|| anyhow!("metrics disabled"))?; + core.encode().map_err(anyhow::Error::new).map(|r| { + let body = http_body_util::Full::new(hyper::body::Bytes::from(r)); + Response::builder() + .header(hyper::header::CONTENT_TYPE, "text/plain; charset=utf-8") + .body(body) + .expect("Failed to build response") + }) } diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index d7a302c167..55c1a32fd7 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -32,8 +32,8 @@ governor = "0.6.0" iroh-base = { version = "0.11.0", path = "../iroh-base" } hex = "0.4.3" hostname = "0.3.1" -http = "0.2.9" -hyper = { version = "0.14.25", features = ["server", "client", "http1", "tcp"] } +http = "1" +hyper = { version = "1", features = ["server", "client", "http1"] } igd = { version = "0.12.1", features = ["aio"] } libc = "0.2.139" num_enum = "0.7" @@ -82,6 +82,8 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = tr # metrics iroh-metrics = { version = "0.11.0", path = "../iroh-metrics", default-features = false } +http-body-util = "0.1.0" +hyper-util = "0.1.1" [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] netlink-packet-core = "0.7.0" diff --git a/iroh-net/src/bin/derper.rs b/iroh-net/src/bin/derper.rs index 6fd63207aa..5adac52d56 100644 --- a/iroh-net/src/bin/derper.rs +++ b/iroh-net/src/bin/derper.rs @@ -8,30 +8,25 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::Arc, - task::{Context, Poll}, }; use anyhow::{anyhow, bail, Context as _, Result}; use clap::Parser; use futures::{Future, StreamExt}; use http::response::Builder as ResponseBuilder; -use hyper::{server::conn::Http, Body, Method, Request, Response, StatusCode}; +use hyper::body::Incoming; +use hyper::{Method, Request, Response, StatusCode}; use iroh_metrics::inc; -use iroh_net::{ - defaults::{DEFAULT_DERP_STUN_PORT, NA_DERP_HOSTNAME}, - derp::{ - self, - http::{ - MeshAddrs, ServerBuilder as DerpServerBuilder, TlsAcceptor, TlsConfig as DerpTlsConfig, - }, - }, - key::SecretKey, - stun, +use iroh_net::defaults::{DEFAULT_DERP_STUN_PORT, NA_DERP_HOSTNAME}; +use iroh_net::derp; +use iroh_net::derp::http::{ + MeshAddrs, ServerBuilder as DerpServerBuilder, TlsAcceptor, TlsConfig as DerpTlsConfig, }; -use serde_with::{serde_as, DisplayFromStr}; - +use iroh_net::key::SecretKey; +use iroh_net::stun; use reqwest::Url; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use tokio::net::{TcpListener, UdpSocket}; use tokio_rustls_acme::{caches::DirCache, AcmeConfig}; use tracing::{debug, debug_span, error, info, info_span, trace, warn, Instrument}; @@ -39,6 +34,7 @@ use tracing_subscriber::{prelude::*, EnvFilter}; use metrics::StunMetrics; +type BytesBody = http_body_util::Full; type HyperError = Box; type HyperResult = std::result::Result; @@ -545,11 +541,10 @@ async fn serve_captive_portal_service(addr: SocketAddr) -> Result Result> for CaptivePortalService { - type Response = Response; +impl hyper::service::Service> for CaptivePortalService { + type Response = Response>; type Error = HyperError; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { match (req.method(), req.uri().path()) { // Captive Portal checker (&Method::GET, "/generate_204") => { @@ -605,16 +596,19 @@ impl hyper::service::Service> for CaptivePortalService { } fn derp_disabled_handler( - _r: Request, + _r: Request, response: ResponseBuilder, -) -> HyperResult> { +) -> HyperResult>> { Ok(response .status(StatusCode::NOT_FOUND) .body(DERP_DISABLED.into()) .unwrap()) } -fn root_handler(_r: Request, response: ResponseBuilder) -> HyperResult> { +fn root_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult>> { let response = response .status(StatusCode::OK) .header("Content-Type", "text/html; charset=utf-8") @@ -625,17 +619,23 @@ fn root_handler(_r: Request, response: ResponseBuilder) -> HyperResult, response: ResponseBuilder) -> HyperResult> { +fn probe_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult>> { let response = response .status(StatusCode::OK) .header("Access-Control-Allow-Origin", "*") - .body(Body::empty()) + .body(http_body_util::Full::new(hyper::body::Bytes::new())) .unwrap(); Ok(response) } -fn robots_handler(_r: Request, response: ResponseBuilder) -> HyperResult> { +fn robots_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult>> { Ok(response .status(StatusCode::OK) .body(ROBOTS_TXT.into()) @@ -643,10 +643,10 @@ fn robots_handler(_r: Request, response: ResponseBuilder) -> HyperResult, +fn serve_no_content_handler( + r: Request, mut response: ResponseBuilder, -) -> HyperResult> { +) -> HyperResult> { if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { if !challenge.is_empty() && challenge.len() < 64 @@ -664,7 +664,7 @@ fn serve_no_content_handler( Ok(response .status(StatusCode::NO_CONTENT) - .body(Body::empty()) + .body(http_body_util::Full::new(hyper::body::Bytes::new())) .unwrap()) } @@ -893,17 +893,21 @@ mod tests { use anyhow::Result; use bytes::Bytes; - use iroh_net::{ - derp::{http::ClientBuilder, ReceivedMessage}, - key::SecretKey, - }; + use http_body_util::BodyExt; + use iroh_net::derp::http::ClientBuilder; + use iroh_net::derp::ReceivedMessage; + use iroh_net::key::SecretKey; + + fn empty_body() -> http_body_util::Full { + http_body_util::Full::new(hyper::body::Bytes::new()) + } #[tokio::test] async fn test_serve_no_content_handler() { let challenge = "123az__."; let req = Request::builder() .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .body(Body::empty()) + .body(http_body_util::Full::new(hyper::body::Bytes::new())) .unwrap(); let res = serve_no_content_handler(req, Response::builder()).unwrap(); @@ -916,9 +920,12 @@ mod tests { .to_str() .unwrap(); assert_eq!(header, format!("response {challenge}")); - assert!(hyper::body::to_bytes(res.into_body()) + assert!(res + .into_body() + .collect() .await .unwrap() + .to_bytes() .is_empty()); } @@ -989,7 +996,7 @@ mod tests { let b_secret_key = SecretKey::generate(); let b_key = b_secret_key.public(); let (client_b, mut client_b_receiver) = ClientBuilder::new() - .server_url(derper_url) + .server_url(derper_url.clone()) .build(b_secret_key)?; client_b.connect().await?; @@ -1044,46 +1051,28 @@ mod tests { // get 200 home page response tracing::info!("send request for homepage"); - let req = hyper::Request::builder() - .method(hyper::Method::GET) - .uri(derper_str_url.clone()) - .body(Body::empty()) - .unwrap(); - - let client = hyper::Client::new(); - let res = client.request(req).await?; - assert_eq!(StatusCode::OK, res.status()); - tracing::info!("got OK"); - - assert!(!hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .is_empty()); + let res = reqwest::get(derper_str_url).await?; + assert!(res.status().is_success()); + let body = res.bytes().await?; + assert!(body.is_empty()); // test captive portal tracing::info!("test captive portal response"); + + let url = derper_url.join("/generate_204")?; let challenge = "123az__."; - let req = hyper::Request::builder() - .method(hyper::Method::GET) - .uri(format!("{derper_str_url}/generate_204")) + let client = reqwest::Client::new(); + let res = client + .get(url) .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .body(Body::empty()) - .unwrap(); - - let res = client.request(req).await?; - assert_eq!(StatusCode::NO_CONTENT, res.status()); + .send() + .await?; + assert_eq!(StatusCode::NO_CONTENT.as_u16(), res.status().as_u16()); + let header = res.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); + assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); + let body = res.bytes().await?; + assert!(body.is_empty()); - let header = res - .headers() - .get(NO_CONTENT_RESPONSE_HEADER) - .unwrap() - .to_str() - .unwrap(); - assert_eq!(header, format!("response {challenge}")); - assert!(hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .is_empty()); tracing::info!("got successful captive portal response"); derper_task.abort(); diff --git a/iroh-net/src/derp/http.rs b/iroh-net/src/derp/http.rs index 1bd2452cb7..7e76ca3c4b 100644 --- a/iroh-net/src/derp/http.rs +++ b/iroh-net/src/derp/http.rs @@ -54,11 +54,7 @@ mod tests { #[tokio::test] async fn test_http_clients_and_server() -> Result<()> { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); + let _guard = iroh_test::logging::setup(); let server_key = SecretKey::generate(); let a_key = SecretKey::generate(); @@ -100,11 +96,17 @@ mod tests { // create clients let derp_addr: Url = format!("http://{addr}:{port}").parse().unwrap(); - let (a_key, mut a_recv, client_a_task, client_a) = - create_test_client(a_key, region.clone(), Some(derp_addr.clone())); + let (a_key, mut a_recv, client_a_task, client_a) = { + let span = info_span!("client-a"); + let _guard = span.enter(); + create_test_client(a_key, region.clone(), Some(derp_addr.clone())) + }; info!("created client {a_key:?}"); - let (b_key, mut b_recv, client_b_task, client_b) = - create_test_client(b_key, region, Some(derp_addr)); + let (b_key, mut b_recv, client_b_task, client_b) = { + let span = info_span!("client-b"); + let _guard = span.enter(); + create_test_client(b_key, region, Some(derp_addr)) + }; info!("created client {b_key:?}"); info!("ping a"); @@ -129,11 +131,12 @@ mod tests { assert_eq!(b_key, got_key); assert_eq!(msg, got_msg); - server.shutdown().await; client_a.close().await?; client_a_task.abort(); client_b.close().await?; client_b_task.abort(); + server.shutdown().await; + Ok(()) } @@ -191,7 +194,7 @@ mod tests { } } } - .instrument(info_span!("test.client.reader")), + .instrument(info_span!("test-client-reader")), ); (public_key, received_msg_r, client_reader_task, client) } diff --git a/iroh-net/src/derp/http/client.rs b/iroh-net/src/derp/http/client.rs index f90acd0a63..22c94c983c 100644 --- a/iroh-net/src/derp/http/client.rs +++ b/iroh-net/src/derp/http/client.rs @@ -8,8 +8,10 @@ use std::time::Duration; use anyhow::bail; use bytes::Bytes; use futures::future::BoxFuture; +use hyper::body::Incoming; +use hyper::header::UPGRADE; use hyper::upgrade::{Parts, Upgraded}; -use hyper::{header::UPGRADE, Body, Request}; +use hyper::Request; use iroh_metrics::inc; use rand::Rng; use rustls::client::Resumption; @@ -18,7 +20,7 @@ use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot}; use tokio::task::{JoinHandle, JoinSet}; use tokio::time::Instant; -use tracing::{debug, info, info_span, trace, warn, Instrument}; +use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use url::Url; use crate::derp::{ @@ -721,81 +723,35 @@ impl Actor { debug!(server_addr = ?tcp_stream.peer_addr(), %local_addr, "TCP stream connected"); - let req = Request::builder() - .uri("/derp") - .header(UPGRADE, super::HTTP_UPGRADE_PROTOCOL) - .body(Body::empty()) - .unwrap(); - - let res = if self.use_https(derp_node.as_deref()) { + let response = if self.use_https(derp_node.as_deref()) { debug!("Starting TLS handshake"); - let hostname = self .tls_servername(derp_node.as_deref()) - .ok_or_else(|| ClientError::InvalidUrl("no tls servername".into()))?; + .ok_or_else(|| ClientError::InvalidUrl("No tls servername".into()))?; let tls_stream = self.tls_connector.connect(hostname, tcp_stream).await?; debug!("tls_connector connect success"); - let (mut request_sender, connection) = hyper::client::conn::Builder::new() - .handshake(tls_stream) - .await - .map_err(ClientError::Hyper)?; - tokio::spawn( - async move { - // polling `connection` drives the HTTP exchange - // this will poll until we upgrade the connection, but not shutdown the underlying - // stream - debug!("waiting for connection"); - if let Err(err) = connection.await { - warn!("client connection error: {:?}", err); - } - debug!("connection done"); - } - .instrument(info_span!("http.conn")), - ); - debug!("sending upgrade request"); - request_sender - .send_request(req) - .await - .map_err(ClientError::Hyper)? + Self::start_upgrade(tls_stream).await? } else { debug!("Starting handshake"); - let (mut request_sender, connection) = hyper::client::conn::Builder::new() - .handshake(tcp_stream) - .await - .map_err(ClientError::Hyper)?; - tokio::spawn( - async move { - // polling `connection` drives the HTTP exchange - // this will poll until we upgrade the connection, but not shutdown the underlying - // stream - debug!("waiting for connection"); - if let Err(err) = connection.await { - warn!("client connection error: {:?}", err); - } - debug!("connection done"); - } - .instrument(info_span!("http.conn")), - ); - debug!("sending upgrade request"); - request_sender - .send_request(req) - .await - .map_err(ClientError::Hyper)? + Self::start_upgrade(tcp_stream).await? }; - if res.status() != hyper::StatusCode::SWITCHING_PROTOCOLS { - warn!("invalid status received: {:?}", res.status()); + if response.status() != hyper::StatusCode::SWITCHING_PROTOCOLS { + error!( + "expected status 101 SWITCHING_PROTOCOLS, got: {}", + response.status() + ); return Err(ClientError::UnexpectedStatusCode( hyper::StatusCode::SWITCHING_PROTOCOLS, - res.status(), + response.status(), )); } debug!("starting upgrade"); - let upgraded = match hyper::upgrade::on(res).await { + let upgraded = match hyper::upgrade::on(response).await { Ok(upgraded) => upgraded, Err(err) => { - warn!("upgrade failed: {:?}", err); + warn!("upgrade failed: {:#}", err); return Err(ClientError::Hyper(err)); } }; @@ -823,6 +779,35 @@ impl Actor { Ok((derp_client, receiver)) } + /// Sends the upgrade request to the derper. + async fn start_upgrade(io: T) -> Result, hyper::Error> + where + T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, + { + let io = hyper_util::rt::TokioIo::new(io); + let (mut request_sender, connection) = hyper::client::conn::http1::Builder::new() + .handshake(io) + .await?; + tokio::spawn( + // This task drives the HTTP exchange, completes once connection is upgraded. + async move { + debug!("HTTP upgrade driver started"); + if let Err(err) = connection.with_upgrades().await { + error!("HTTP upgrade error: {err:#}"); + } + debug!("HTTP upgrade driver finished"); + } + .instrument(info_span!("http-driver")), + ); + debug!("Sending upgrade request"); + let req = Request::builder() + .uri("/derp") + .header(UPGRADE, super::HTTP_UPGRADE_PROTOCOL) + .body(http_body_util::Empty::::new()) + .unwrap(); + request_sender.send_request(req).await + } + async fn note_preferred(&mut self, is_preferred: bool) { let old = &mut self.is_preferred; if *old == is_preferred { @@ -1369,9 +1354,9 @@ fn downcast_upgrade( Box, Box, )> { - match upgraded.downcast::() { + match upgraded.downcast::>() { Ok(Parts { read_buf, io, .. }) => { - let (reader, writer) = tokio::io::split(io); + let (reader, writer) = tokio::io::split(io.into_inner()); // Prepend data to the reader to avoid data loss let reader = std::io::Cursor::new(read_buf).chain(reader); @@ -1379,9 +1364,9 @@ fn downcast_upgrade( } Err(upgraded) => { if let Ok(Parts { read_buf, io, .. }) = - upgraded.downcast::>() + upgraded.downcast::>>() { - let (reader, writer) = tokio::io::split(io); + let (reader, writer) = tokio::io::split(io.into_inner()); // Prepend data to the reader to avoid data loss let reader = std::io::Cursor::new(read_buf).chain(reader); diff --git a/iroh-net/src/derp/http/server.rs b/iroh-net/src/derp/http/server.rs index e4c168406a..ec02cfa5ec 100644 --- a/iroh-net/src/derp/http/server.rs +++ b/iroh-net/src/derp/http/server.rs @@ -1,26 +1,19 @@ -use std::{ - collections::HashMap, - net::SocketAddr, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; use anyhow::{bail, ensure, Context as _, Result}; use bytes::Bytes; use derive_more::Debug; use futures::future::{Future, FutureExt}; use http::response::Builder as ResponseBuilder; -use hyper::{ - header::{HeaderValue, UPGRADE}, - server::conn::Http, - upgrade::Upgraded, - Body, HeaderMap, Method, Request, Response, StatusCode, -}; -use tokio::{ - net::{TcpListener, TcpStream}, - task::JoinHandle, -}; +use hyper::body::Incoming; +use hyper::header::{HeaderValue, UPGRADE}; +use hyper::upgrade::Upgraded; +use hyper::{HeaderMap, Method, Request, Response, StatusCode}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::task::JoinHandle; use tokio_rustls_acme::AcmeAcceptor; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, warn, Instrument}; @@ -41,10 +34,29 @@ use crate::{ type HyperError = Box; type HyperResult = std::result::Result; +// type HyperFn = Box< +// dyn Fn( +// Request, +// ResponseBuilder, +// ) -> HyperResult>> +// + Send +// + Sync +// + 'static, +// >; +type HyperHandler = Box< + dyn Fn( + Request, + ResponseBuilder, + ) -> HyperResult>> + + Send + + Sync + + 'static, +>; +type Headers = Vec<(&'static str, &'static str)>; fn downcast_upgrade(upgraded: Upgraded) -> Result<(MaybeTlsStream, Bytes)> { - match upgraded.downcast::() { - Ok(parts) => Ok((parts.io, parts.read_buf)), + match upgraded.downcast::>() { + Ok(parts) => Ok((parts.io.into_inner(), parts.read_buf)), Err(_) => { bail!("could not downcast the upgraded connection to MaybeTlsStream") } @@ -179,15 +191,15 @@ pub struct ServerBuilder { /// Defaults to `GET` request at "/derp". derp_endpoint: &'static str, /// Use a custom derp response handler. Typically used when you want to disable any derp connections. - #[debug("{}", derp_override.as_ref().map_or("None", |_| "Some(Box Result> + Send + Sync + 'static>)"))] - derp_override: Option, + #[debug("{}", derp_override.as_ref().map_or("None", |_| "Some(Box, ResponseBuilder) -> Result> + Send + Sync + 'static>)"))] + derp_override: Option, /// Headers to use for HTTP or HTTPS messages. headers: Headers, /// 404 not found response /// /// When `None`, a default is provided. #[debug("{}", not_found_fn.as_ref().map_or("None", |_| "Some(Box Result> + Send + Sync + 'static>)"))] - not_found_fn: Option, + not_found_fn: Option, } impl ServerBuilder { @@ -239,21 +251,21 @@ impl ServerBuilder { mut self, method: Method, uri_path: &'static str, - handler: HyperFn, + handler: HyperHandler, ) -> Self { self.handlers.insert((method, uri_path), handler); self } /// Pass in a custom "404" handler. - pub fn not_found_handler(mut self, handler: HyperFn) -> Self { + pub fn not_found_handler(mut self, handler: HyperHandler) -> Self { self.not_found_fn = Some(handler); self } /// Handle the derp endpoint in a custom way. This is required if no [`SecretKey`] was provided /// to the builder. - pub fn derp_override(mut self, handler: HyperFn) -> Self { + pub fn derp_override(mut self, handler: HyperHandler) -> Self { self.derp_override = Some(handler); self } @@ -312,16 +324,19 @@ impl ServerBuilder { let h = self.headers.clone(); let not_found_fn = match self.not_found_fn { Some(f) => f, - None => Box::new(move |_req: Request, mut res: ResponseBuilder| { - for (k, v) in h.iter() { - res = res.header(*k, *v); - } - let r = res - .status(StatusCode::NOT_FOUND) - .body(b"Not Found"[..].into()) - .unwrap(); - HyperResult::Ok(r) - }), + None => Box::new( + move |_req: Request, mut res: ResponseBuilder| { + for (k, v) in h.iter() { + res = res.header(*k, *v); + } + let body = http_body_util::Full::new(hyper::body::Bytes::from("Not Found")); + let r = res + .status(http::status::StatusCode::NOT_FOUND) + .body(body) + .unwrap(); + HyperResult::Ok(r) + }, + ), }; let service = DerpService::new( @@ -422,19 +437,15 @@ impl ServerState { } } -impl

hyper::service::Service> for ClientConnHandler

+impl

hyper::service::Service> for ClientConnHandler

where P: PacketForwarder, { - type Response = Response; + type Response = Response>; type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, mut req: Request) -> Self::Future { + fn call(&self, mut req: Request) -> Self::Future { // TODO: soooo much cloning. See if there is an alternative let closure_conn_handler = self.clone(); let mut builder = Response::builder(); @@ -444,7 +455,9 @@ where async move { { - let mut res = builder.body(Body::empty()).unwrap(); + let mut res = builder + .body(http_body_util::Full::from(hyper::body::Bytes::new())) + .unwrap(); // Send a 400 to any request that doesn't have an `Upgrade` header. if !req.headers().contains_key(UPGRADE) { @@ -494,16 +507,12 @@ where } } -impl hyper::service::Service> for DerpService { - type Response = Response; +impl hyper::service::Service> for DerpService { + type Response = Response>; type Error = HyperError; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { // if the request hits the derp endpoint if req.method() == hyper::Method::GET && req.uri().path() == self.0.derp_endpoint { match &self.0.derp_handler { @@ -513,7 +522,7 @@ impl hyper::service::Service> for DerpService { return Box::pin(async move { res }); } DerpHandler::ConnHandler(handler) => { - let mut h = handler.clone(); + let h = handler.clone(); // otherwise handle the derp connection as normal return Box::pin(async move { h.call(req).await.map_err(Into::into) }); } @@ -535,17 +544,12 @@ impl hyper::service::Service> for DerpService { #[derive(Clone, Debug)] struct DerpService(Arc); -type HyperFn = Box< - dyn Fn(Request, ResponseBuilder) -> HyperResult> + Send + Sync + 'static, ->; -type Headers = Vec<(&'static str, &'static str)>; - #[derive(derive_more::Debug)] struct Inner { pub derp_handler: DerpHandler, pub derp_endpoint: &'static str, #[debug("Box Result> + Send + Sync + 'static>")] - pub not_found_fn: HyperFn, + pub not_found_fn: HyperHandler, pub handlers: Handlers, pub headers: Headers, } @@ -557,12 +561,13 @@ enum DerpHandler { ConnHandler(ClientConnHandler), /// Return some static response. Used when the http(s) should be running, but the derp portion /// of the server is disabled. + // TODO: Can we remove this debug override? Override( #[debug( "{}", - "Box Result> + Send + Sync + 'static>" + "Box, ResponseBuilder) -> Result> + Send + Sync + 'static>" )] - HyperFn, + HyperHandler, ), } @@ -591,7 +596,7 @@ impl DerpService { handlers: Handlers, derp_handler: DerpHandler, derp_endpoint: &'static str, - not_found_fn: HyperFn, + not_found_fn: HyperHandler, headers: Headers, ) -> Self { Self(Arc::new(Inner { @@ -656,8 +661,8 @@ impl DerpService { where I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync + 'static, { - Http::new() - .serve_connection(io, self) + hyper::server::conn::http1::Builder::new() + .serve_connection(hyper_util::rt::TokioIo::new(io), self) .with_upgrades() .await?; Ok(()) @@ -665,7 +670,7 @@ impl DerpService { } #[derive(Default)] -struct Handlers(HashMap<(Method, &'static str), HyperFn>); +struct Handlers(HashMap<(Method, &'static str), HyperHandler>); impl std::fmt::Debug for Handlers { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -678,7 +683,8 @@ impl std::fmt::Debug for Handlers { } impl std::ops::Deref for Handlers { - type Target = HashMap<(Method, &'static str), HyperFn>; + type Target = HashMap<(Method, &'static str), HyperHandler>; + fn deref(&self) -> &Self::Target { &self.0 } From d9466f2d9738c3e61eb3fb0b44a35c1f6e4d4212 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 4 Dec 2023 13:46:23 +0100 Subject: [PATCH 2/5] Tidy up iroh-metrics --- Cargo.lock | 26 +++----------------------- iroh-metrics/Cargo.toml | 14 +++++++------- iroh-metrics/src/service.rs | 15 +++++++++------ 3 files changed, 19 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c6396e66b5..00f4ba4969 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1708,25 +1708,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "h2" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 1.0.0", - "indexmap 2.1.0", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "half" version = "1.8.2" @@ -1933,7 +1914,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.21", + "h2", "http 0.2.11", "http-body 0.4.5", "httparse", @@ -1956,7 +1937,6 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.0", "http 1.0.0", "http-body 1.0.0", "httparse", @@ -3990,7 +3970,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.21", + "h2", "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", @@ -5191,7 +5171,7 @@ dependencies = [ "axum", "base64 0.21.5", "bytes", - "h2 0.3.21", + "h2", "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", diff --git a/iroh-metrics/Cargo.toml b/iroh-metrics/Cargo.toml index 96bd04c7ab..3e8513a446 100644 --- a/iroh-metrics/Cargo.toml +++ b/iroh-metrics/Cargo.toml @@ -15,16 +15,16 @@ rust-version = "1.72" workspace = true [dependencies] -prometheus-client = { version = "0.22.0", optional = true } -once_cell = "1.17.0" -tracing = "0.1" -hyper = { version = "1", features = ["server", "client", "http1"] } +anyhow = "1.0.75" erased_set = "0.7" -struct_iterable = "0.1" http-body-util = "0.1.0" +hyper = { version = "1", features = ["server", "http1"] } +hyper-util = { version = "0.1.1", features = ["tokio"] } +once_cell = "1.17.0" +prometheus-client = { version = "0.22.0", optional = true } +struct_iterable = "0.1" tokio = "1" -anyhow = "1.0.75" -hyper-util = { version = "0.1.1", features = ["full"] } +tracing = "0.1" [dev-dependencies] tokio = { version = "1", features = ["io-util", "sync", "rt", "net", "fs", "macros", "time", "test-util"] } diff --git a/iroh-metrics/src/service.rs b/iroh-metrics/src/service.rs index 2fe01d2c74..9f49731e03 100644 --- a/iroh-metrics/src/service.rs +++ b/iroh-metrics/src/service.rs @@ -4,11 +4,12 @@ use anyhow::{anyhow, Result}; use hyper::service::service_fn; use hyper::{Request, Response}; use tokio::net::TcpListener; - use tracing::{error, info}; use crate::core::Core; +type BytesBody = http_body_util::Full; + /// Start a HTTP server to report metrics. pub async fn run(metrics_addr: SocketAddr) -> Result<()> { info!("Starting metrics server on {metrics_addr}"); @@ -28,15 +29,17 @@ pub async fn run(metrics_addr: SocketAddr) -> Result<()> { } /// HTTP handler that will respond with the OpenMetrics encoding of our metrics. -async fn handler( - _req: Request, -) -> Result>> { +async fn handler(_req: Request) -> Result> { let core = Core::get().ok_or_else(|| anyhow!("metrics disabled"))?; core.encode().map_err(anyhow::Error::new).map(|r| { - let body = http_body_util::Full::new(hyper::body::Bytes::from(r)); Response::builder() .header(hyper::header::CONTENT_TYPE, "text/plain; charset=utf-8") - .body(body) + .body(body_full(r)) .expect("Failed to build response") }) } + +/// Creates a new [`BytesBody`] with given contents. +fn body_full(content: impl Into) -> BytesBody { + http_body_util::Full::new(content.into()) +} From 130bde565f46a63a4b45a19451f50cd687d34f34 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 4 Dec 2023 17:09:42 +0100 Subject: [PATCH 3/5] Lots of cleanup --- iroh-metrics/src/service.rs | 2 +- iroh-net/Cargo.toml | 6 +-- iroh-net/src/bin/derper.rs | 33 ++++++------ iroh-net/src/derp/http/client.rs | 4 +- iroh-net/src/derp/http/server.rs | 89 ++++++++++++++------------------ 5 files changed, 61 insertions(+), 73 deletions(-) diff --git a/iroh-metrics/src/service.rs b/iroh-metrics/src/service.rs index 9f49731e03..da5b4df3de 100644 --- a/iroh-metrics/src/service.rs +++ b/iroh-metrics/src/service.rs @@ -39,7 +39,7 @@ async fn handler(_req: Request) -> Result) -> BytesBody { http_body_util::Full::new(content.into()) } diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 55c1a32fd7..14312cf277 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -29,12 +29,14 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } flume = "0.11" futures = "0.3.25" governor = "0.6.0" -iroh-base = { version = "0.11.0", path = "../iroh-base" } hex = "0.4.3" hostname = "0.3.1" http = "1" +http-body-util = "0.1.0" hyper = { version = "1", features = ["server", "client", "http1"] } +hyper-util = "0.1.1" igd = { version = "0.12.1", features = ["aio"] } +iroh-base = { version = "0.11.0", path = "../iroh-base" } libc = "0.2.139" num_enum = "0.7" once_cell = "1.18.0" @@ -82,8 +84,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = tr # metrics iroh-metrics = { version = "0.11.0", path = "../iroh-metrics", default-features = false } -http-body-util = "0.1.0" -hyper-util = "0.1.1" [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] netlink-packet-core = "0.7.0" diff --git a/iroh-net/src/bin/derper.rs b/iroh-net/src/bin/derper.rs index 5adac52d56..fe79046d52 100644 --- a/iroh-net/src/bin/derper.rs +++ b/iroh-net/src/bin/derper.rs @@ -38,6 +38,11 @@ type BytesBody = http_body_util::Full; type HyperError = Box; type HyperResult = std::result::Result; +/// Creates a new [`BytesBody`] with no content. +fn body_empty() -> BytesBody { + http_body_util::Full::new(hyper::body::Bytes::new()) +} + /// A simple DERP server. #[derive(Parser, Debug, Clone)] #[clap(version, about, long_about = None)] @@ -572,12 +577,12 @@ async fn serve_captive_portal_service(addr: SocketAddr) -> Result> for CaptivePortalService { - type Response = Response>; +impl hyper::service::Service> for CaptivePortalService { + type Response = Response; type Error = HyperError; type Future = Pin> + Send>>; - fn call(&self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { match (req.method(), req.uri().path()) { // Captive Portal checker (&Method::GET, "/generate_204") => { @@ -596,9 +601,9 @@ impl hyper::service::Service> for CaptivePortalSe } fn derp_disabled_handler( - _r: Request, + _r: Request, response: ResponseBuilder, -) -> HyperResult>> { +) -> HyperResult> { Ok(response .status(StatusCode::NOT_FOUND) .body(DERP_DISABLED.into()) @@ -608,7 +613,7 @@ fn derp_disabled_handler( fn root_handler( _r: Request, response: ResponseBuilder, -) -> HyperResult>> { +) -> HyperResult> { let response = response .status(StatusCode::OK) .header("Content-Type", "text/html; charset=utf-8") @@ -620,22 +625,22 @@ fn root_handler( /// HTTP latency queries fn probe_handler( - _r: Request, + _r: Request, response: ResponseBuilder, -) -> HyperResult>> { +) -> HyperResult> { let response = response .status(StatusCode::OK) .header("Access-Control-Allow-Origin", "*") - .body(http_body_util::Full::new(hyper::body::Bytes::new())) + .body(body_empty()) .unwrap(); Ok(response) } fn robots_handler( - _r: Request, + _r: Request, response: ResponseBuilder, -) -> HyperResult>> { +) -> HyperResult> { Ok(response .status(StatusCode::OK) .body(ROBOTS_TXT.into()) @@ -898,16 +903,12 @@ mod tests { use iroh_net::derp::ReceivedMessage; use iroh_net::key::SecretKey; - fn empty_body() -> http_body_util::Full { - http_body_util::Full::new(hyper::body::Bytes::new()) - } - #[tokio::test] async fn test_serve_no_content_handler() { let challenge = "123az__."; let req = Request::builder() .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .body(http_body_util::Full::new(hyper::body::Bytes::new())) + .body(body_empty()) .unwrap(); let res = serve_no_content_handler(req, Response::builder()).unwrap(); diff --git a/iroh-net/src/derp/http/client.rs b/iroh-net/src/derp/http/client.rs index 22c94c983c..bd9736b34c 100644 --- a/iroh-net/src/derp/http/client.rs +++ b/iroh-net/src/derp/http/client.rs @@ -779,10 +779,10 @@ impl Actor { Ok((derp_client, receiver)) } - /// Sends the upgrade request to the derper. + /// Sends the HTTP upgrade request to the derper. async fn start_upgrade(io: T) -> Result, hyper::Error> where - T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, { let io = hyper_util::rt::TokioIo::new(io); let (mut request_sender, connection) = hyper::client::conn::http1::Builder::new() diff --git a/iroh-net/src/derp/http/server.rs b/iroh-net/src/derp/http/server.rs index ec02cfa5ec..303c2ccf57 100644 --- a/iroh-net/src/derp/http/server.rs +++ b/iroh-net/src/derp/http/server.rs @@ -10,6 +10,7 @@ use futures::future::{Future, FutureExt}; use http::response::Builder as ResponseBuilder; use hyper::body::Incoming; use hyper::header::{HeaderValue, UPGRADE}; +use hyper::service::Service; use hyper::upgrade::Upgraded; use hyper::{HeaderMap, Method, Request, Response, StatusCode}; use tokio::net::{TcpListener, TcpStream}; @@ -18,42 +19,35 @@ use tokio_rustls_acme::AcmeAcceptor; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, warn, Instrument}; -use super::HTTP_UPGRADE_PROTOCOL; -use crate::{ - derp::{ - http::client::Client as HttpClient, - http::mesh_clients::{MeshAddrs, MeshClients}, - server::ClientConnHandler, - server::MaybeTlsStream, - types::MeshKey, - types::PacketForwarder, - MaybeTlsStreamServer, - }, - key::SecretKey, -}; +use crate::derp::http::client::Client as HttpClient; +use crate::derp::http::mesh_clients::{MeshAddrs, MeshClients}; +use crate::derp::http::HTTP_UPGRADE_PROTOCOL; +use crate::derp::server::{ClientConnHandler, MaybeTlsStream}; +use crate::derp::types::{MeshKey, PacketForwarder}; +use crate::derp::MaybeTlsStreamServer; +use crate::key::SecretKey; +type BytesBody = http_body_util::Full; type HyperError = Box; type HyperResult = std::result::Result; -// type HyperFn = Box< -// dyn Fn( -// Request, -// ResponseBuilder, -// ) -> HyperResult>> -// + Send -// + Sync -// + 'static, -// >; type HyperHandler = Box< - dyn Fn( - Request, - ResponseBuilder, - ) -> HyperResult>> + dyn Fn(Request, ResponseBuilder) -> HyperResult> + Send + Sync + 'static, >; type Headers = Vec<(&'static str, &'static str)>; +/// Creates a new [`BytesBody`] with no content. +fn body_empty() -> BytesBody { + http_body_util::Full::new(hyper::body::Bytes::new()) +} + +/// Creates a new [`BytesBody`] with given content. +fn body_full(content: impl Into) -> BytesBody { + http_body_util::Full::new(content.into()) +} + fn downcast_upgrade(upgraded: Upgraded) -> Result<(MaybeTlsStream, Bytes)> { match upgraded.downcast::>() { Ok(parts) => Ok((parts.io.into_inner(), parts.read_buf)), @@ -191,7 +185,7 @@ pub struct ServerBuilder { /// Defaults to `GET` request at "/derp". derp_endpoint: &'static str, /// Use a custom derp response handler. Typically used when you want to disable any derp connections. - #[debug("{}", derp_override.as_ref().map_or("None", |_| "Some(Box, ResponseBuilder) -> Result> + Send + Sync + 'static>)"))] + #[debug("{}", derp_override.as_ref().map_or("None", |_| "Some(Box, ResponseBuilder) -> Result + Send + Sync + 'static>)"))] derp_override: Option, /// Headers to use for HTTP or HTTPS messages. headers: Headers, @@ -324,19 +318,14 @@ impl ServerBuilder { let h = self.headers.clone(); let not_found_fn = match self.not_found_fn { Some(f) => f, - None => Box::new( - move |_req: Request, mut res: ResponseBuilder| { - for (k, v) in h.iter() { - res = res.header(*k, *v); - } - let body = http_body_util::Full::new(hyper::body::Bytes::from("Not Found")); - let r = res - .status(http::status::StatusCode::NOT_FOUND) - .body(body) - .unwrap(); - HyperResult::Ok(r) - }, - ), + None => Box::new(move |_req: Request, mut res: ResponseBuilder| { + for (k, v) in h.iter() { + res = res.header(*k, *v); + } + let body = body_full("Not Found"); + let r = res.status(StatusCode::NOT_FOUND).body(body).unwrap(); + HyperResult::Ok(r) + }), }; let service = DerpService::new( @@ -437,15 +426,15 @@ impl ServerState { } } -impl

hyper::service::Service> for ClientConnHandler

+impl

Service> for ClientConnHandler

where P: PacketForwarder, { - type Response = Response>; + type Response = Response; type Error = hyper::Error; type Future = Pin> + Send>>; - fn call(&self, mut req: Request) -> Self::Future { + fn call(&self, mut req: Request) -> Self::Future { // TODO: soooo much cloning. See if there is an alternative let closure_conn_handler = self.clone(); let mut builder = Response::builder(); @@ -455,9 +444,7 @@ where async move { { - let mut res = builder - .body(http_body_util::Full::from(hyper::body::Bytes::new())) - .unwrap(); + let mut res = builder.body(body_empty()).unwrap(); // Send a 400 to any request that doesn't have an `Upgrade` header. if !req.headers().contains_key(UPGRADE) { @@ -507,12 +494,12 @@ where } } -impl hyper::service::Service> for DerpService { - type Response = Response>; +impl Service> for DerpService { + type Response = Response; type Error = HyperError; type Future = Pin> + Send>>; - fn call(&self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { // if the request hits the derp endpoint if req.method() == hyper::Method::GET && req.uri().path() == self.0.derp_endpoint { match &self.0.derp_handler { @@ -548,7 +535,7 @@ struct DerpService(Arc); struct Inner { pub derp_handler: DerpHandler, pub derp_endpoint: &'static str, - #[debug("Box Result> + Send + Sync + 'static>")] + #[debug("Box Result> + Send + Sync + 'static>")] pub not_found_fn: HyperHandler, pub handlers: Handlers, pub headers: Headers, @@ -565,7 +552,7 @@ enum DerpHandler { Override( #[debug( "{}", - "Box, ResponseBuilder) -> Result> + Send + Sync + 'static>" + "Box, ResponseBuilder) -> Result + Send + Sync + 'static>" )] HyperHandler, ), From e07cffe331ed1f62b76d07a10d0d76f5eef7e50d Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 5 Dec 2023 11:32:42 +0100 Subject: [PATCH 4/5] fix test, this body was never meant to be empty --- iroh-net/src/bin/derper.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iroh-net/src/bin/derper.rs b/iroh-net/src/bin/derper.rs index fe79046d52..21ddfbd212 100644 --- a/iroh-net/src/bin/derper.rs +++ b/iroh-net/src/bin/derper.rs @@ -1054,8 +1054,7 @@ mod tests { tracing::info!("send request for homepage"); let res = reqwest::get(derper_str_url).await?; assert!(res.status().is_success()); - let body = res.bytes().await?; - assert!(body.is_empty()); + tracing::info!("got OK"); // test captive portal tracing::info!("test captive portal response"); From ccb5c50f027e9b108755584cbb915783c14e109b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 5 Dec 2023 13:33:22 +0100 Subject: [PATCH 5/5] Only use a few tokio features --- iroh-metrics/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-metrics/Cargo.toml b/iroh-metrics/Cargo.toml index 3e8513a446..9ba991aada 100644 --- a/iroh-metrics/Cargo.toml +++ b/iroh-metrics/Cargo.toml @@ -23,7 +23,7 @@ hyper-util = { version = "0.1.1", features = ["tokio"] } once_cell = "1.17.0" prometheus-client = { version = "0.22.0", optional = true } struct_iterable = "0.1" -tokio = "1" +tokio = { version = "1", features = ["rt", "net"]} tracing = "0.1" [dev-dependencies]