Skip to content

Commit

Permalink
Move static labels to Registry to clean up Prometheus metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
oliy authored and mendess committed Oct 18, 2023
1 parent 422d3d0 commit 2fb4b88
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 242 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daphne/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ assert_matches.workspace = true
criterion.workspace = true
matchit.workspace = true
paste.workspace = true
regex = "1.10.0"
tokio.workspace = true

[features]
Expand Down
81 changes: 24 additions & 57 deletions daphne/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
use crate::{fatal_error, DapError};
use prometheus::{
exponential_buckets, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, HistogramVec, IntCounterVec, Registry,
exponential_buckets, register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_counter_with_registry, Histogram, IntCounter, IntCounterVec, Registry,
};

pub struct DaphneMetrics {
Expand All @@ -21,42 +21,35 @@ pub struct DaphneMetrics {
aggregation_job_counter: IntCounterVec,

/// Helper: Number of records in an incoming AggregationJobInitReq.
aggregation_job_batch_size_histogram: HistogramVec,
aggregation_job_batch_size_histogram: Histogram,

/// Helper: Number of times replays caused the aggregation to be retried.
aggregation_job_continue_repeats_due_to_replays: IntCounterVec,
aggregation_job_continue_repeats_due_to_replays: IntCounter,
}

impl DaphneMetrics {
/// Register Daphne metrics with the specified registry. If a prefix is provided, then
/// "{prefix_}" is prepended to the name.
pub fn register(registry: &Registry, prefix: Option<&str>) -> Result<Self, DapError> {
let front = if let Some(prefix) = prefix {
format!("{prefix}_")
} else {
"".into()
};

pub fn register(registry: &Registry) -> Result<Self, DapError> {
let inbound_request_counter = register_int_counter_vec_with_registry!(
format!("{front}inbound_request_counter"),
"inbound_request_counter",
"Total number of successful inbound requests.",
&["host", "type"],
&["type"],
registry
)
.map_err(|e| fatal_error!(err = ?e, "failed to regsiter inbound_request_counter"))?;

let report_counter = register_int_counter_vec_with_registry!(
format!("{front}report_counter"),
"report_counter",
"Total number reports rejected, aggregated, and collected.",
&["host", "status"],
&["status"],
registry
)
.map_err(|e| fatal_error!(err = ?e, "failed to register report_counter"))?;

let aggregation_job_batch_size_histogram = register_histogram_vec_with_registry!(
format!("{front}aggregation_job_batch_size"),
let aggregation_job_batch_size_histogram = register_histogram_with_registry!(
"aggregation_job_batch_size",
"Number of records in an incoming AggregationJobInitReq.",
&["host"],
// <1, <2, <4, <8, ... <256, +Inf
exponential_buckets(1.0, 2.0, 8)
.expect("this shouldn't panic for these hardcoded values"),
Expand All @@ -65,18 +58,17 @@ impl DaphneMetrics {
.map_err(|e| fatal_error!(err = ?e, "failed to register aggregation_job_batch_size"))?;

let aggregation_job_counter = register_int_counter_vec_with_registry!(
format!("{front}aggregation_job_counter"),
format!("aggregation_job_counter"),
"Total number of aggregation jobs started and completed.",
&["host", "status"],
&["status"],
registry
)
.map_err(|e| fatal_error!(err = ?e, "failed to register aggregation_job_counter"))?;

let aggregation_job_continue_repeats_due_to_replays =
register_int_counter_vec_with_registry!(
format!("{front}aggregation_continuation_repeats_due_to_replays"),
register_int_counter_with_registry!(
"aggregation_continuation_repeats_due_to_replays",
"Total number of times the aggregation continuation was restarted due to replayed reports",
&["host"],
registry
)
.map_err(|e| fatal_error!(err = ?e, "failed to register aggregation_continuation_repeats_due_to_replays"))?;
Expand All @@ -90,20 +82,6 @@ impl DaphneMetrics {
})
}

pub fn with_host<'req>(&'req self, host: &'req str) -> ContextualizedDaphneMetrics<'req> {
ContextualizedDaphneMetrics {
metrics: self,
host,
}
}
}

pub struct ContextualizedDaphneMetrics<'req> {
metrics: &'req DaphneMetrics,
host: &'req str,
}

impl ContextualizedDaphneMetrics<'_> {
pub fn inbound_req_inc(&self, request_type: DaphneRequestType) {
let request_type_str = match request_type {
DaphneRequestType::HpkeConfig => "hpke_config",
Expand All @@ -112,45 +90,34 @@ impl ContextualizedDaphneMetrics<'_> {
DaphneRequestType::Collect => "collect",
};

self.metrics
.inbound_request_counter
.with_label_values(&[self.host, request_type_str])
self.inbound_request_counter
.with_label_values(&[request_type_str])
.inc();
}

pub fn report_inc_by(&self, status: &str, val: u64) {
self.metrics
.report_counter
.with_label_values(&[self.host, status])
.inc_by(val);
self.report_counter.with_label_values(&[status]).inc_by(val);
}

pub fn agg_job_observe_batch_size(&self, val: usize) {
self.metrics
.aggregation_job_batch_size_histogram
.with_label_values(&[self.host])
self.aggregation_job_batch_size_histogram
.observe(val as f64);
}

pub fn agg_job_started_inc(&self) {
self.metrics
.aggregation_job_counter
.with_label_values(&[self.host, "started"])
self.aggregation_job_counter
.with_label_values(&["started"])
.inc();
}

pub fn agg_job_completed_inc(&self) {
self.metrics
.aggregation_job_counter
.with_label_values(&[self.host, "completed"])
self.aggregation_job_counter
.with_label_values(&["completed"])
.inc();
}

pub fn agg_job_cont_restarted_inc(&self) {
self.metrics
.aggregation_job_continue_repeats_due_to_replays
.with_label_values(&[self.host])
.inc();
self.aggregation_job_continue_repeats_due_to_replays.inc();
}
}

Expand Down
2 changes: 1 addition & 1 deletion daphne/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub trait DapAggregator<S>: HpkeDecrypter + DapReportInitializer + Sized {
return Err(DapAbort::version_unknown());
}

let metrics = self.metrics().with_host(req.host());
let metrics = self.metrics();

// Parse the task ID from the query string, ensuring that it is the only query parameter.
let mut id = None;
Expand Down
29 changes: 11 additions & 18 deletions daphne/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
constant_time_eq, AggregateShare, AggregateShareReq, AggregationJobContinueReq,
AggregationJobInitReq, Draft02AggregationJobId, PartialBatchSelector, TaskId,
},
metrics::{ContextualizedDaphneMetrics, DaphneRequestType},
metrics::{DaphneMetrics, DaphneRequestType},
DapError, DapHelperState, DapHelperTransition, DapRequest, DapResource, DapResponse,
DapTaskConfig, DapVersion, MetaAggregationJobId,
};
Expand Down Expand Up @@ -46,7 +46,7 @@ pub trait DapHelper<S>: DapAggregator<S> {
async fn handle_agg_job_init_req<'req>(
&self,
req: &'req DapRequest<S>,
metrics: ContextualizedDaphneMetrics<'req>,
metrics: &DaphneMetrics,
task_id: &TaskId,
) -> Result<DapResponse, DapAbort> {
let agg_job_init_req =
Expand Down Expand Up @@ -118,14 +118,7 @@ pub trait DapHelper<S>: DapAggregator<S> {

let transition = task_config
.vdaf
.handle_agg_job_init_req(
self,
self,
task_id,
task_config,
&agg_job_init_req,
&metrics,
)
.handle_agg_job_init_req(self, self, task_id, task_config, &agg_job_init_req, metrics)
.map_err(DapError::Abort)
.await?;

Expand Down Expand Up @@ -167,7 +160,7 @@ pub trait DapHelper<S>: DapAggregator<S> {
async fn handle_agg_job_cont_req<'req>(
&self,
req: &'req DapRequest<S>,
metrics: ContextualizedDaphneMetrics<'req>,
metrics: &DaphneMetrics,
task_id: &TaskId,
) -> Result<DapResponse, DapAbort> {
if let Some(taskprov_version) = self.get_global_config().taskprov_version {
Expand Down Expand Up @@ -235,7 +228,7 @@ pub trait DapHelper<S>: DapAggregator<S> {
|id| replayed_reports.contains(id),
&agg_job_id,
&agg_job_cont_req,
&metrics,
metrics,
)?;

let out_shares_count = agg_share_span.report_count().try_into().unwrap();
Expand Down Expand Up @@ -272,7 +265,7 @@ pub trait DapHelper<S>: DapAggregator<S> {

/// Handle a request pertaining to an aggregation job.
async fn handle_agg_job_req(&self, req: &DapRequest<S>) -> Result<DapResponse, DapAbort> {
let metrics = self.metrics().with_host(req.host());
let metrics = self.metrics();
let task_id = req.task_id()?;

// Check whether the DAP version indicated by the sender is supported.
Expand All @@ -296,7 +289,7 @@ pub trait DapHelper<S>: DapAggregator<S> {
/// collection job.
async fn handle_agg_share_req(&self, req: &DapRequest<S>) -> Result<DapResponse, DapAbort> {
let now = self.get_current_time();
let metrics = self.metrics().with_host(req.host());
let metrics = self.metrics();
let task_id = req.task_id()?;

// Check whether the DAP version indicated by the sender is supported.
Expand Down Expand Up @@ -504,7 +497,7 @@ mod tests {
));

helper
.handle_agg_job_init_req(&req, helper.metrics.with_host("test"), &task_id)
.handle_agg_job_init_req(&req, &helper.metrics, &task_id)
.await
.unwrap();

Expand All @@ -524,7 +517,7 @@ mod tests {
.await;

let resp = helper
.handle_agg_job_cont_req(&req, helper.metrics.with_host("test"), &task_id)
.handle_agg_job_cont_req(&req, &helper.metrics, &task_id)
.await
.unwrap();

Expand Down Expand Up @@ -553,7 +546,7 @@ mod tests {
.await;

let resp = helper
.handle_agg_job_cont_req(&req, helper.metrics.with_host("test"), &task_id)
.handle_agg_job_cont_req(&req, &helper.metrics, &task_id)
.await
.unwrap();

Expand All @@ -569,7 +562,7 @@ mod tests {
};

let Some(metric) = test
.prometheus_registry
.helper_registry
.gather()
.into_iter()
.find(|metric| metric.get_name().ends_with("report_counter"))
Expand Down
32 changes: 9 additions & 23 deletions daphne/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {

/// Handle a report from a Client.
async fn handle_upload_req(&self, req: &DapRequest<S>) -> Result<(), DapAbort> {
let metrics = self.metrics().with_host(req.host());
let metrics = self.metrics();
let task_id = req.task_id()?;
debug!("upload for task {task_id}");

Expand Down Expand Up @@ -226,7 +226,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
/// poll later on to get the collection.
async fn handle_collect_job_req(&self, req: &DapRequest<S>) -> Result<Url, DapAbort> {
let now = self.get_current_time();
let metrics = self.metrics().with_host(req.host());
let metrics = self.metrics();
let task_id = req.task_id()?;
debug!("collect for task {task_id}");

Expand Down Expand Up @@ -327,9 +327,8 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
task_config: &DapTaskConfig,
part_batch_sel: &PartialBatchSelector,
reports: Vec<Report>,
host: &str,
) -> Result<u64, DapAbort> {
let metrics = self.metrics().with_host(host);
let metrics = self.metrics();

// Prepare AggregationJobInitReq.
let agg_job_id = MetaAggregationJobId::gen_for_version(&task_config.version);
Expand All @@ -343,7 +342,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
&agg_job_id,
part_batch_sel,
reports,
&metrics,
metrics,
)
.await?;
let (state, agg_job_init_req) = match transition {
Expand Down Expand Up @@ -393,7 +392,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
state,
agg_job_resp,
task_config.version,
&metrics,
metrics,
)?;
let (uncommited, agg_job_cont_req) = match transition {
DapLeaderTransition::Uncommitted(uncommited, agg_job_cont_req) => {
Expand Down Expand Up @@ -428,7 +427,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
task_config,
uncommited,
agg_job_resp,
&metrics,
metrics,
)?;
let out_shares_count = agg_share_span.report_count() as u64;

Expand Down Expand Up @@ -461,9 +460,8 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
collect_id: &CollectionJobId,
task_config: &DapTaskConfig,
collect_req: &CollectionReq,
host: &str,
) -> Result<u64, DapAbort> {
let metrics = self.metrics().with_host(host);
let metrics = self.metrics();

debug!("collecting id {collect_id}");
let batch_selector = BatchSelector::try_from(collect_req.query.clone())?;
Expand Down Expand Up @@ -594,13 +592,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
"RUNNING run_agg_job FOR TID {task_id} AND {part_batch_sel:?} AND {host}"
);
telem.reports_aggregated += self
.run_agg_job(
&task_id,
task_config.as_ref(),
&part_batch_sel,
reports,
host,
)
.run_agg_job(&task_id, task_config.as_ref(), &part_batch_sel, reports)
.await?;
}
}
Expand All @@ -619,13 +611,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {

tracing::debug!("RUNNING run_collect_job FOR TID {task_id} AND {collect_id} AND {collect_req:?} AND {host}");
telem.reports_collected += self
.run_collect_job(
&task_id,
&collect_id,
task_config.as_ref(),
&collect_req,
host,
)
.run_collect_job(&task_id, &collect_id, task_config.as_ref(), &collect_req)
.await?;
}

Expand Down
Loading

0 comments on commit 2fb4b88

Please sign in to comment.