From a75602e4b94b2c34a0061892e682808b24b2122f Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Mon, 23 Dec 2024 13:45:15 -0800 Subject: [PATCH] Remove explicit use of hyper in integration/ducks I can't figure out how to update hyper in our project given the set of interlinked crates that all have to update in one shot. As a result, I'm removing explicit references to hyper and started with ducks. This commit also removes the tower layers as it was not clear to me that these are required for integration test purposes. Signed-off-by: Brian L. Troutwine --- Cargo.lock | 81 ++++++++++++++++++++++++++++--- Cargo.toml | 2 + integration/ducks/Cargo.toml | 8 +--- integration/ducks/src/main.rs | 90 ++++++++++++++--------------------- 4 files changed, 115 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 140811ea9..31cd8b960 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,7 +195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", @@ -216,6 +216,36 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.2", + "tokio", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -233,6 +263,45 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-server" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56bac90848f6a9393ac03c63c640925c4b7c8ca21654de40d53f55964667c7d8" +dependencies = [ + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower 0.4.13", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -667,16 +736,16 @@ name = "ducks" version = "0.1.0" dependencies = [ "anyhow", + "axum 0.7.9", + "axum-server", "bytes", "entropy", - "hyper 0.14.31", "once_cell", "shared", "sketches-ddsketch 0.3.0", "tokio", "tokio-stream", "tonic 0.9.2", - "tower 0.5.2", "tracing", "tracing-subscriber", ] @@ -1181,7 +1250,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -3054,7 +3123,7 @@ checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.13.1", "bytes", "futures-core", @@ -3085,7 +3154,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index a4ad49abd..e3c5a4944 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ members = [ ] [workspace.dependencies] +axum = { version = "0.7", default-features = false } +axum-server = { version = "0.7", default-features = false } bytes = { version = "1.8", default-features = false, features = ["std"] } byte-unit = { version = "4.0", features = ["serde"] } metrics = { version = "0.23.0" } diff --git a/integration/ducks/Cargo.toml b/integration/ducks/Cargo.toml index f8bacd6bf..df5a09347 100644 --- a/integration/ducks/Cargo.toml +++ b/integration/ducks/Cargo.toml @@ -8,10 +8,11 @@ license = "MIT" publish = false [dependencies] +axum = { workspace = true, features = ["tokio", "http1", "http2"] } +axum-server = { workspace = true } anyhow = "1.0" bytes = { workspace = true } entropy = "0.4" -hyper = { workspace = true, features = ["server", "backports", "deprecated"] } once_cell = { workspace = true } shared = { path = "../shared" } sketches-ddsketch = "0.3" @@ -30,10 +31,5 @@ tonic = { workspace = true, default-features = false, features = [ "transport", "prost", ] } -tower = { workspace = true, features = [ - "timeout", - "limit", - "load-shed", -] } tracing = { version = "0.1", features = ["std", "attributes"] } tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] } diff --git a/integration/ducks/src/main.rs b/integration/ducks/src/main.rs index a93063525..205ea4413 100644 --- a/integration/ducks/src/main.rs +++ b/integration/ducks/src/main.rs @@ -14,13 +14,13 @@ //! - Receive data on other protocols & formats use anyhow::Context; -use bytes::BytesMut; -use hyper::{ - body::HttpBody, - server::conn::{AddrIncoming, AddrStream}, - service::{make_service_fn, service_fn}, - Body, Method, Request, StatusCode, +use axum::{ + http::{Method, StatusCode}, + response::IntoResponse, + routing::any, + Router, }; +use bytes::BytesMut; use once_cell::sync::OnceCell; use shared::{ integration_api::{ @@ -39,7 +39,6 @@ use tokio::{ }; use tokio_stream::{wrappers::UnixListenerStream, Stream}; use tonic::Status; -use tower::ServiceBuilder; use tracing::{debug, trace, warn}; static HTTP_COUNTERS: OnceCell>> = OnceCell::new(); @@ -124,33 +123,6 @@ impl From<&SocketCounters> for SocketMetrics { } } -#[tracing::instrument(level = "trace")] -async fn http_req_handler(req: Request) -> Result, hyper::Error> { - let (parts, body) = req.into_parts(); - let body = body.collect().await?.to_bytes(); - - { - let metric = HTTP_COUNTERS.get().expect("HTTP_COUNTERS not initialized"); - let mut m = metric.lock().await; - m.request_count += 1; - - m.total_bytes = body.len() as u64; - m.entropy.add(entropy::metric_entropy(&body) as f64); - - m.body_size.add(body.len() as f64); - - let method_counter = m.methods.entry(parts.method).or_default(); - *method_counter += 1; - } - - let mut okay = hyper::Response::default(); - *okay.status_mut() = StatusCode::OK; - - let body_bytes = vec![]; - *okay.body_mut() = Body::from(body_bytes); - Ok(okay) -} - /// Tracks state for a ducks instance pub struct DucksTarget { /// Shutdown channel. Send on this to exit the process immediately. @@ -190,11 +162,9 @@ impl IntegrationTarget for DucksTarget { match config.listen { shared::ListenConfig::Http => { - // bind to a random open TCP port - let bind_addr = SocketAddr::from(([127, 0, 0, 1], 0)); - let addr = AddrIncoming::bind(&bind_addr) - .map_err(|_e| Status::internal("unable to bind a port"))?; - let port = addr.local_addr().port() as u32; + // Bind a random open TCP port on localhost + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let port = addr.port() as u32; tokio::spawn(Self::http_listen(config, addr)); Ok(tonic::Response::new(ListenInfo { port })) @@ -254,25 +224,37 @@ impl IntegrationTarget for DucksTarget { } } +#[tracing::instrument(level = "trace")] +async fn req_handle(method: Method, body: axum::body::Bytes) -> impl IntoResponse { + if let Some(metric) = HTTP_COUNTERS.get() { + let mut m = metric.lock().await; + m.request_count += 1; + + m.total_bytes += body.len() as u64; + m.entropy.add(entropy::metric_entropy(&body) as f64); + + m.body_size.add(body.len() as f64); + + let method_counter = m.methods.entry(method.clone()).or_default(); + *method_counter += 1; + } else { + warn!("HTTP_COUNTERS not initialized"); + } + + (StatusCode::OK, body) +} + impl DucksTarget { - async fn http_listen(_config: DucksConfig, addr: AddrIncoming) -> Result<(), anyhow::Error> { + async fn http_listen(_config: DucksConfig, addr: SocketAddr) -> Result<(), anyhow::Error> { debug!("HTTP listener active"); HTTP_COUNTERS.get_or_init(|| Arc::new(Mutex::new(HttpCounters::default()))); - let service = make_service_fn(|_: &AddrStream| async move { - Ok::<_, hyper::Error>(service_fn(move |request: Request| { - trace!("REQUEST: {:?}", request); - http_req_handler(request) - })) - }); - let svc = ServiceBuilder::new() - .load_shed() - .concurrency_limit(1_000) - .timeout(Duration::from_secs(1)) - .service(service); - - let server = hyper::Server::builder(addr).serve(svc); - server.await?; + let app = Router::new().route("/*path", any(req_handle)); + + axum_server::bind(addr) + .serve(app.into_make_service()) + .await?; + Ok(()) }