Skip to content

Commit

Permalink
Switch to Hyper 1
Browse files Browse the repository at this point in the history
  • Loading branch information
nox committed Nov 22, 2024
1 parent e1d1a5a commit d67d549
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 150 deletions.
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ darling = "0.20.10"
erased-serde = "0.3.28"
futures-util = "0.3.28"
governor = "0.6"
hyper = { version = "0.14", default-features = false }
http-body-util = "0.1"
hyper = { version = "1", default-features = false }
hyper-util = { version = "0.1", default-features = false }
indexmap = "2.0.0"
ipnetwork = "0.20"
once_cell = "1.5"
tonic = { version = "0.11.0", default-features = false }
opentelemetry-proto = "0.5.0"
tonic = { version = "0.12", default-features = false }
opentelemetry-proto = "0.7"
parking_lot = "0.12.1"
proc-macro2 = { version = "1", default-features = false }
prometheus = { version = "0.13.3", default-features = false }
Expand All @@ -68,6 +70,7 @@ tokio = "1.41.0"
thread_local = "1.1"
tikv-jemallocator = "0.5"
tikv-jemalloc-ctl = "0.5"
tower-service = "0.3"
yaml-merge-keys = "0.5"

# needed for minver
Expand Down
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ publish = false
anyhow = { workspace = true }
foundations = { workspace = true }
futures-util = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true, features = ["server", "tokio"] }
tokio = { workspace = true, features = ["full"]}

[[example]]
Expand Down
15 changes: 10 additions & 5 deletions examples/http_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use foundations::settings::collections::Map;
use foundations::telemetry::{self, log, tracing, TelemetryConfig, TelemetryContext};
use foundations::BootstrapResult;
use futures_util::stream::{FuturesUnordered, StreamExt};
use hyper::server::conn::Http;
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use hyper::{Request, Response};
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::convert::Infallible;
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::sync::Arc;
Expand Down Expand Up @@ -193,7 +195,10 @@ async fn serve_connection(
}
});

if let Err(e) = Http::new().serve_connection(conn, on_request).await {
if let Err(e) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(conn), on_request)
.await
{
log::error!("failed to serve HTTP"; "error" => ?e);
metrics::http_server::failed_connections_total(&endpoint_name).inc();
}
Expand All @@ -204,9 +209,9 @@ async fn serve_connection(
#[tracing::span_fn("respond to request")]
async fn respond(
endpoint_name: Arc<String>,
req: Request<Body>,
req: Request<Incoming>,
routes: Arc<Map<String, ResponseSettings>>,
) -> Result<Response<Body>, Infallible> {
) -> Result<Response<Full<Bytes>>, Infallible> {
log::add_fields! {
"request_uri" => req.uri().to_string(),
"method" => req.method().to_string()
Expand Down
14 changes: 7 additions & 7 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ client-telemetry = ["logging", "metrics", "tracing", "dep:futures-util"]

# Enables the telemetry server.
telemetry-server = [
"dep:http-body-util",
"dep:hyper",
"dep:hyper-util",
"dep:socket2",
"dep:percent-encoding"
"dep:percent-encoding",
]

# Enables telemetry reporting over gRPC
telemetry-otlp-grpc = ["dep:tonic", "dep:tokio", "dep:hyper"]
telemetry-otlp-grpc = ["dep:tonic", "tonic/prost", "dep:tokio", "dep:hyper"]

# Enables experimental tokio runtime metrics
tokio-runtime-metrics = [
Expand Down Expand Up @@ -177,11 +179,9 @@ clap = { workspace = true, optional = true }
erased-serde = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }
governor = { workspace = true, optional = true }
hyper = { workspace = true, optional = true, features = [
"http1",
"runtime",
"server",
] }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true, features = ["http1", "server"] }
hyper-util = { workspace = true, optional = true, features = ["tokio"] }
indexmap = { workspace = true, optional = true, features = ["serde"] }
once_cell = { workspace = true, optional = true }
opentelemetry-proto = { workspace = true, optional = true, features = ["gen-tonic-messages", "trace"] }
Expand Down
18 changes: 9 additions & 9 deletions foundations/src/telemetry/driver.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::utils::feature_use;
use crate::BootstrapResult;
use futures_util::future::BoxFuture;
use futures_util::stream::{FuturesUnordered, Stream};
use futures_util::FutureExt;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, Stream};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

feature_use!(cfg(feature = "telemetry-server"), {
use super::server::TelemetryServerFuture;
use anyhow::anyhow;
use hyper::Server;
use std::net::SocketAddr;
});

Expand Down Expand Up @@ -38,7 +36,7 @@ impl TelemetryDriver {
) -> Self {
Self {
#[cfg(feature = "telemetry-server")]
server_addr: server_fut.as_ref().map(Server::local_addr),
server_addr: server_fut.as_ref().map(|fut| fut.local_addr()),

#[cfg(feature = "telemetry-server")]
server_fut,
Expand Down Expand Up @@ -66,9 +64,11 @@ impl TelemetryDriver {
#[cfg(feature = "telemetry-server")]
{
if let Some(server_fut) = self.server_fut.take() {
self.tele_futures.push(
async move { Ok(server_fut.with_graceful_shutdown(signal).await?) }.boxed(),
);
self.tele_futures.push(Box::pin(async move {
server_fut.with_graceful_shutdown(signal).await;

Ok(())
}));

return;
}
Expand All @@ -93,7 +93,7 @@ impl Future for TelemetryDriver {
#[cfg(feature = "telemetry-server")]
if let Some(server_fut) = &mut self.server_fut {
if let Poll::Ready(res) = Pin::new(server_fut).poll(cx) {
ready_res.push(res.map_err(|err| anyhow!(err)));
match res {}
}
}

Expand Down
9 changes: 7 additions & 2 deletions foundations/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ pub use self::testing::TestTelemetryContext;
pub use self::memory_profiler::MemoryProfiler;

#[cfg(feature = "telemetry-server")]
pub use self::server::{TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute};
pub use self::server::{
BoxError, TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute,
};

pub use self::driver::TelemetryDriver;
pub use self::telemetry_context::{
Expand Down Expand Up @@ -290,7 +292,10 @@ pub fn init(config: TelemetryConfig) -> BootstrapResult<TelemetryDriver> {

#[cfg(feature = "telemetry-server")]
{
let server_fut = self::server::init(config.settings.clone(), config.custom_server_routes)?;
let server_fut = server::TelemetryServerFuture::new(
config.settings.clone(),
config.custom_server_routes,
)?;

Ok(TelemetryDriver::new(server_fut, tele_futures))
}
Expand Down
203 changes: 203 additions & 0 deletions foundations/src/telemetry/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#[cfg(feature = "metrics")]
use super::metrics;
use super::settings::TelemetrySettings;
use crate::telemetry::log;
use crate::BootstrapResult;
use anyhow::Context as _;
use futures_util::future::FutureExt;
use futures_util::{pin_mut, ready};
use hyper_util::rt::TokioIo;
use socket2::{Domain, SockAddr, Socket, Type};
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::net::TcpListener;
use tokio::sync::watch;

mod router;

use router::Router;
pub use router::{
BoxError, TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute,
};

pub(super) struct TelemetryServerFuture {
listener: TcpListener,
router: Router,
}

impl TelemetryServerFuture {
pub(super) fn new(
settings: TelemetrySettings,
custom_routes: Vec<TelemetryServerRoute>,
) -> BootstrapResult<Option<TelemetryServerFuture>> {
if !settings.server.enabled {
return Ok(None);
}

let settings = Arc::new(settings);

// Eagerly init the memory profiler so it gets set up before syscalls are sandboxed with seccomp.
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
if settings.memory_profiler.enabled {
memory_profiling::profiler(Arc::clone(&settings))
.map_err(|err| anyhow::anyhow!(err))?;
}

let addr = settings.server.addr;

#[cfg(feature = "settings")]
let addr = SocketAddr::from(addr);

let router = Router::new(custom_routes, settings);

let listener = {
let std_listener = std::net::TcpListener::from(
bind_socket(addr).with_context(|| format!("binding to socket {addr:?}"))?,
);

std_listener.set_nonblocking(true)?;

tokio::net::TcpListener::from_std(std_listener)?
};

Ok(Some(TelemetryServerFuture { listener, router }))
}
pub(super) fn local_addr(&self) -> SocketAddr {
self.listener.local_addr().unwrap()
}

// Adapted from Hyper 0.14 Server stuff and axum::serve::serve.
pub(super) async fn with_graceful_shutdown(
self,
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
) {
let (signal_tx, signal_rx) = watch::channel(());
let signal_tx = Arc::new(signal_tx);

tokio::spawn(async move {
shutdown_signal.await;

drop(signal_rx);
});

let (close_tx, close_rx) = watch::channel(());
let listener = self.listener;

pin_mut!(listener);

loop {
let socket = tokio::select! {
conn = listener.accept() => match conn {
Ok((conn, _)) => TokioIo::new(conn),
Err(e) => {
log::warn!("failed to accept connection"; "error" => e);

continue;
}
},
_ = signal_tx.closed() => { break },
};

let router = self.router.clone();
let signal_tx = Arc::clone(&signal_tx);
let close_rx = close_rx.clone();

tokio::spawn(async move {
let conn = hyper::server::conn::http1::Builder::new()
.serve_connection(socket, router)
.with_upgrades();

let signal_closed = signal_tx.closed().fuse();

pin_mut!(conn);
pin_mut!(signal_closed);

loop {
tokio::select! {
_ = conn.as_mut() => break,
_ = &mut signal_closed => conn.as_mut().graceful_shutdown(),
}
}

drop(close_rx);
});
}

drop(close_rx);

close_tx.closed().await;
}
}

impl Future for TelemetryServerFuture {
type Output = Infallible;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;

loop {
let socket = match ready!(Pin::new(&mut this.listener).poll_accept(cx)) {
Ok((conn, _)) => TokioIo::new(conn),
Err(e) => {
log::warn!("failed to accept connection"; "error" => e);

continue;
}
};

let router = this.router.clone();

tokio::spawn(
hyper::server::conn::http1::Builder::new()
// upgrades needed for websockets
.serve_connection(socket, router)
.with_upgrades(),
);
}
}
}

fn bind_socket(addr: SocketAddr) -> BootstrapResult<Socket> {
let socket = Socket::new(
if addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
},
Type::STREAM,
None,
)?;

socket.set_reuse_address(true)?;
#[cfg(unix)]
socket.set_reuse_port(true)?;
socket.bind(&SockAddr::from(addr))?;
socket.listen(1024)?;

Ok(socket)
}

#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
mod memory_profiling {
use super::*;
use crate::telemetry::MemoryProfiler;
use crate::Result;

pub(super) fn profiler(settings: Arc<TelemetrySettings>) -> Result<MemoryProfiler> {
MemoryProfiler::get_or_init_with(&settings.memory_profiler)?.ok_or_else(|| {
"profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var".into()
})
}

pub(super) async fn heap_profile(settings: Arc<TelemetrySettings>) -> Result<String> {
profiler(settings)?.heap_profile().await
}

pub(super) async fn heap_stats(settings: Arc<TelemetrySettings>) -> Result<String> {
profiler(settings)?.heap_stats()
}
}
Loading

0 comments on commit d67d549

Please sign in to comment.