Skip to content

Commit

Permalink
Remove explicit use of hyper in integration/ducks
Browse files Browse the repository at this point in the history
I can't figure out how to update hyper in our project given the
set of interlinked crates that all have to update in one shot. As
a result, I'm removing explicit references to hyper and started with
ducks. This commit also removes the tower layers as it was not clear
to me that these are required for integration test purposes.

Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt committed Dec 23, 2024
1 parent 14f19e3 commit a75602e
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 66 deletions.
81 changes: 75 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ members = [
]

[workspace.dependencies]
axum = { version = "0.7", default-features = false }
axum-server = { version = "0.7", default-features = false }
bytes = { version = "1.8", default-features = false, features = ["std"] }
byte-unit = { version = "4.0", features = ["serde"] }
metrics = { version = "0.23.0" }
Expand Down
8 changes: 2 additions & 6 deletions integration/ducks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ license = "MIT"
publish = false

[dependencies]
axum = { workspace = true, features = ["tokio", "http1", "http2"] }
axum-server = { workspace = true }
anyhow = "1.0"
bytes = { workspace = true }
entropy = "0.4"
hyper = { workspace = true, features = ["server", "backports", "deprecated"] }
once_cell = { workspace = true }
shared = { path = "../shared" }
sketches-ddsketch = "0.3"
Expand All @@ -30,10 +31,5 @@ tonic = { workspace = true, default-features = false, features = [
"transport",
"prost",
] }
tower = { workspace = true, features = [
"timeout",
"limit",
"load-shed",
] }
tracing = { version = "0.1", features = ["std", "attributes"] }
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }
90 changes: 36 additions & 54 deletions integration/ducks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
//! - Receive data on other protocols & formats
use anyhow::Context;
use bytes::BytesMut;
use hyper::{
body::HttpBody,
server::conn::{AddrIncoming, AddrStream},
service::{make_service_fn, service_fn},
Body, Method, Request, StatusCode,
use axum::{
http::{Method, StatusCode},
response::IntoResponse,
routing::any,
Router,
};
use bytes::BytesMut;
use once_cell::sync::OnceCell;
use shared::{
integration_api::{
Expand All @@ -39,7 +39,6 @@ use tokio::{
};
use tokio_stream::{wrappers::UnixListenerStream, Stream};
use tonic::Status;
use tower::ServiceBuilder;
use tracing::{debug, trace, warn};

static HTTP_COUNTERS: OnceCell<Arc<Mutex<HttpCounters>>> = OnceCell::new();
Expand Down Expand Up @@ -124,33 +123,6 @@ impl From<&SocketCounters> for SocketMetrics {
}
}

#[tracing::instrument(level = "trace")]
async fn http_req_handler(req: Request<Body>) -> Result<hyper::Response<Body>, hyper::Error> {
let (parts, body) = req.into_parts();
let body = body.collect().await?.to_bytes();

{
let metric = HTTP_COUNTERS.get().expect("HTTP_COUNTERS not initialized");
let mut m = metric.lock().await;
m.request_count += 1;

m.total_bytes = body.len() as u64;
m.entropy.add(entropy::metric_entropy(&body) as f64);

m.body_size.add(body.len() as f64);

let method_counter = m.methods.entry(parts.method).or_default();
*method_counter += 1;
}

let mut okay = hyper::Response::default();
*okay.status_mut() = StatusCode::OK;

let body_bytes = vec![];
*okay.body_mut() = Body::from(body_bytes);
Ok(okay)
}

/// Tracks state for a ducks instance
pub struct DucksTarget {
/// Shutdown channel. Send on this to exit the process immediately.
Expand Down Expand Up @@ -190,11 +162,9 @@ impl IntegrationTarget for DucksTarget {

match config.listen {
shared::ListenConfig::Http => {
// bind to a random open TCP port
let bind_addr = SocketAddr::from(([127, 0, 0, 1], 0));
let addr = AddrIncoming::bind(&bind_addr)
.map_err(|_e| Status::internal("unable to bind a port"))?;
let port = addr.local_addr().port() as u32;
// Bind a random open TCP port on localhost
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let port = addr.port() as u32;
tokio::spawn(Self::http_listen(config, addr));

Ok(tonic::Response::new(ListenInfo { port }))
Expand Down Expand Up @@ -254,25 +224,37 @@ impl IntegrationTarget for DucksTarget {
}
}

#[tracing::instrument(level = "trace")]
async fn req_handle(method: Method, body: axum::body::Bytes) -> impl IntoResponse {
if let Some(metric) = HTTP_COUNTERS.get() {
let mut m = metric.lock().await;
m.request_count += 1;

m.total_bytes += body.len() as u64;
m.entropy.add(entropy::metric_entropy(&body) as f64);

m.body_size.add(body.len() as f64);

let method_counter = m.methods.entry(method.clone()).or_default();
*method_counter += 1;
} else {
warn!("HTTP_COUNTERS not initialized");
}

(StatusCode::OK, body)
}

impl DucksTarget {
async fn http_listen(_config: DucksConfig, addr: AddrIncoming) -> Result<(), anyhow::Error> {
async fn http_listen(_config: DucksConfig, addr: SocketAddr) -> Result<(), anyhow::Error> {
debug!("HTTP listener active");
HTTP_COUNTERS.get_or_init(|| Arc::new(Mutex::new(HttpCounters::default())));

let service = make_service_fn(|_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {
trace!("REQUEST: {:?}", request);
http_req_handler(request)
}))
});
let svc = ServiceBuilder::new()
.load_shed()
.concurrency_limit(1_000)
.timeout(Duration::from_secs(1))
.service(service);

let server = hyper::Server::builder(addr).serve(svc);
server.await?;
let app = Router::new().route("/*path", any(req_handle));

axum_server::bind(addr)
.serve(app.into_make_service())
.await?;

Ok(())
}

Expand Down

0 comments on commit a75602e

Please sign in to comment.