|
1 |
| -use std::{future::Future, io, net::SocketAddr, pin::Pin}; |
| 1 | +use std::net::SocketAddr; |
2 | 2 |
|
3 |
| -use hyper::{ |
4 |
| - service::{make_service_fn, service_fn}, |
5 |
| - Body, Error, Request, Response, Server, |
6 |
| -}; |
7 |
| - |
8 |
| -use tracing::info; |
| 3 | +use anyhow::{anyhow, Result}; |
| 4 | +use hyper::service::service_fn; |
| 5 | +use hyper::{Request, Response}; |
| 6 | +use tokio::net::TcpListener; |
| 7 | +use tracing::{error, info}; |
9 | 8 |
|
10 | 9 | use crate::core::Core;
|
11 | 10 |
|
| 11 | +type BytesBody = http_body_util::Full<hyper::body::Bytes>; |
| 12 | + |
12 | 13 | /// Start a HTTP server to report metrics.
|
13 |
| -pub async fn run(metrics_addr: SocketAddr) -> Result<(), Error> { |
| 14 | +pub async fn run(metrics_addr: SocketAddr) -> Result<()> { |
14 | 15 | info!("Starting metrics server on {metrics_addr}");
|
15 |
| - Server::bind(&metrics_addr) |
16 |
| - .serve(make_service_fn(move |_conn| async move { |
17 |
| - let handler = make_handler(); |
18 |
| - Ok::<_, io::Error>(service_fn(handler)) |
19 |
| - })) |
20 |
| - .await |
| 16 | + let listener = TcpListener::bind(metrics_addr).await?; |
| 17 | + loop { |
| 18 | + let (stream, _addr) = listener.accept().await?; |
| 19 | + let io = hyper_util::rt::TokioIo::new(stream); |
| 20 | + tokio::spawn(async move { |
| 21 | + if let Err(err) = hyper::server::conn::http1::Builder::new() |
| 22 | + .serve_connection(io, service_fn(handler)) |
| 23 | + .await |
| 24 | + { |
| 25 | + error!("Error serving metrics connection: {err:#}"); |
| 26 | + } |
| 27 | + }); |
| 28 | + } |
21 | 29 | }
|
22 | 30 |
|
23 |
| -/// This function returns an HTTP handler fn that will respond with the |
24 |
| -/// OpenMetrics encoding of our metrics. |
25 |
| -fn make_handler( |
26 |
| -) -> impl Fn(Request<Body>) -> Pin<Box<dyn Future<Output = io::Result<Response<Body>>> + Send>> { |
27 |
| - // This closure accepts a request and responds with the OpenMetrics encoding of our metrics. |
28 |
| - move |_req: Request<Body>| { |
29 |
| - Box::pin(async move { |
30 |
| - let core = Core::get() |
31 |
| - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "metrics disabled"))?; |
32 |
| - core.encode() |
33 |
| - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) |
34 |
| - .map(|r| { |
35 |
| - let body = Body::from(r); |
36 |
| - Response::builder() |
37 |
| - .header(hyper::header::CONTENT_TYPE, "text/plain; charset=utf-8") |
38 |
| - .body(body) |
39 |
| - .expect("Failed to build response") |
40 |
| - }) |
41 |
| - }) |
42 |
| - } |
| 31 | +/// HTTP handler that will respond with the OpenMetrics encoding of our metrics. |
| 32 | +async fn handler(_req: Request<hyper::body::Incoming>) -> Result<Response<BytesBody>> { |
| 33 | + let core = Core::get().ok_or_else(|| anyhow!("metrics disabled"))?; |
| 34 | + core.encode().map_err(anyhow::Error::new).map(|r| { |
| 35 | + Response::builder() |
| 36 | + .header(hyper::header::CONTENT_TYPE, "text/plain; charset=utf-8") |
| 37 | + .body(body_full(r)) |
| 38 | + .expect("Failed to build response") |
| 39 | + }) |
| 40 | +} |
| 41 | + |
| 42 | +/// Creates a new [`BytesBody`] with given content. |
| 43 | +fn body_full(content: impl Into<hyper::body::Bytes>) -> BytesBody { |
| 44 | + http_body_util::Full::new(content.into()) |
43 | 45 | }
|
0 commit comments