Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion prometric-derive/tests/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ fn quantiles_with_batching_work() {
encoder.encode(&metric_families, &mut buffer).unwrap();
let output = String::from_utf8(buffer).unwrap();

println!("{}", output);
println!("{output}");

assert!(output.contains("test_summary"));
}
4 changes: 2 additions & 2 deletions prometric/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ prometheus = { workspace = true }
# Exporter
hyper = { version = "1.7.0", optional = true, features = ["http1", "server"] }
hyper-util = { version = "0.1.17", optional = true, features = ["tokio"] }
tokio = { version = "1.40.0", optional = true, features = ["net", "rt"] }
tokio = { version = "1.40.0", optional = true, features = ["net", "rt", "macros"] }

# Process
sysinfo = { version = "0.37.2", optional = true }

# Summary
arc-cell = {version = "0.3.3", optional = true }
arc-cell = { version = "0.3.3", optional = true }
metrics-util = { version = "0.20.0", optional = true }
metrics-exporter-prometheus = { version = "0.17.2", optional = true }
orx-concurrent-vec = { version = "3.10.0", optional = true }
Expand Down
48 changes: 39 additions & 9 deletions prometric/src/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, thread};
use std::{net::SocketAddr, thread, time::Duration};

use hyper::{
Request, Response, body::Incoming, header::CONTENT_TYPE, server::conn::http1,
Expand All @@ -13,6 +13,7 @@ pub struct ExporterBuilder {
address: String,
path: String,
global_prefix: Option<String>,
process_metrics_poll_interval: Option<Duration>,
}

impl Default for ExporterBuilder {
Expand All @@ -22,6 +23,7 @@ impl Default for ExporterBuilder {
address: "0.0.0.0:9090".to_owned(),
path: "/metrics".to_owned(),
global_prefix: None,
process_metrics_poll_interval: None,
}
}
}
Expand Down Expand Up @@ -66,6 +68,15 @@ impl ExporterBuilder {
self
}

/// Also collect process metrics, polling at the given interval in the background.
///
/// A 10 second interval is a good default for most applications.
#[cfg(feature = "process")]
pub fn with_process_metrics(mut self, poll_interval: Duration) -> Self {
self.process_metrics_poll_interval = Some(poll_interval);
self
}

fn path(&self) -> Result<String, ExporterError> {
if self.path.is_empty() {
return Err(ExporterError::InvalidPath(self.path.clone()));
Expand Down Expand Up @@ -101,18 +112,20 @@ impl ExporterBuilder {
let address = self.address()?;
let registry = self.registry.unwrap_or_else(|| prometheus::default_registry().clone());

// Build the serve function
// Build the serve and process collection futures.
let serve = serve(address, registry, path, self.global_prefix);
let collect = collect_process_metrics(self.process_metrics_poll_interval);
let fut = async { tokio::try_join!(serve, collect) };

// If a Tokio runtime is available, use it to spawn the listener. Otherwise,
// create a new single-threaded runtime and spawn the listener there.
if let Ok(runtime) = tokio::runtime::Handle::try_current() {
runtime.spawn(serve);
runtime.spawn(fut);
} else {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build()?;

thread::spawn(move || {
runtime.block_on(serve).unwrap_or_else(|e| panic!("server error: {:?}", e));
runtime.block_on(fut).unwrap_or_else(|e| panic!("server error: {e:?}"));
});
}

Expand Down Expand Up @@ -176,6 +189,23 @@ async fn serve_req(
Ok(response)
}

/// If the "process" feature is enabled AND the poll interval is provided, collect
/// process metrics at the given interval. Otherwise, no-op.
///
/// NOTE: the return type is Result to use [`tokio::try_join!`] with [`serve`].
async fn collect_process_metrics(_poll_interval: Option<Duration>) -> Result<(), ExporterError> {
#[cfg(feature = "process")]
if let Some(interval) = _poll_interval {
let mut collector = crate::process::ProcessCollector::default();
loop {
collector.collect();
tokio::time::sleep(interval).await;
}
}

Ok(())
}

/// An error that can occur when building or installing the Prometheus HTTP exporter.
pub enum ExporterError {
BindError(std::io::Error),
Expand All @@ -189,10 +219,10 @@ impl std::error::Error for ExporterError {}
impl std::fmt::Display for ExporterError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BindError(e) => write!(f, "Failed to bind to address: {:?}", e),
Self::ServeError(e) => write!(f, "HTTP server failed: {:?}", e),
Self::InvalidPath(path) => write!(f, "Invalid path: {}", path),
Self::InvalidAddress(address, e) => write!(f, "Invalid address: {}: {:?}", address, e),
Self::BindError(e) => write!(f, "Failed to bind to address: {e:?}"),
Self::ServeError(e) => write!(f, "HTTP server failed: {e:?}"),
Self::InvalidPath(path) => write!(f, "Invalid path: {path}"),
Self::InvalidAddress(address, e) => write!(f, "Invalid address: {address}: {e:?}"),
}
}
}
Expand All @@ -205,6 +235,6 @@ impl From<std::io::Error> for ExporterError {

impl std::fmt::Debug for ExporterError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
write!(f, "{self}")
}
}