Skip to content

Commit

Permalink
Tokio runtime metrics improvements (#3462)
Browse files Browse the repository at this point in the history
* Update name/config of Tokio poll time histogram

* Update name from injection queue to global queue

* Fix Prometheus HTTP metrics test

* Add --features=prometheus to tests in CI

* Make stable Tokio runtime metrics always available
  • Loading branch information
divergentdave authored Oct 30, 2024
1 parent 236debf commit a3ef9df
Show file tree
Hide file tree
Showing 19 changed files with 318 additions and 233 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ jobs:
run: cargo test --package janus_client --features ohttp
# Note: keep Build & Test steps consecutive, and match flags other than `--no-run`.
- name: Build
run: cargo test --profile ci --locked --all-targets --no-run
run: cargo test --profile ci --locked --all-targets --features=prometheus --no-run
- name: Test
id: test
env:
RUST_LOG: info
JANUS_E2E_LOGS_PATH: ${{ github.workspace }}/test-logs
DIVVIUP_TS_INTEROP_CONTAINER: ${{ steps.default-input-values.outputs.divviup_ts_interop_container }}
run: cargo test --profile ci --locked --all-targets
run: cargo test --profile ci --locked --all-targets --features=prometheus
# Continue on error so we can upload logs
continue-on-error: true
- name: Build (Docker-specific tests)
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl BinaryOptions for Options {
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
// TODO(#3293): remove aliases during next breaking changes window.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(flatten)]
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl BinaryOptions for Options {
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(flatten)]
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ where
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(flatten)]
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl BinaryOptions for Options {
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(flatten)]
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/janus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ impl KubernetesSecretOptions {
}
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct ConfigFile {
#[serde(flatten)]
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/key_rotator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl BinaryOptions for Options {
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(flatten)]
Expand Down
10 changes: 1 addition & 9 deletions aggregator/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,21 +270,13 @@ where
runtime_builder.enable_all();
if let Some(tokio_metrics_config) = config.common_config().metrics_config.tokio.as_ref() {
if tokio_metrics_config.enabled {
#[cfg(all(tokio_unstable, feature = "prometheus"))]
#[cfg(feature = "prometheus")]
{
crate::metrics::tokio_runtime::configure_runtime(
&mut runtime_builder,
tokio_metrics_config,
);
}
#[cfg(not(all(tokio_unstable, feature = "prometheus")))]
{
return Err(anyhow!(
"Tokio runtime metrics were enabled in the configuration file, but support \
was not enabled at compile time. Rebuild with \
`RUSTFLAGS=\"--cfg tokio_unstable\"`."
));
}
}
}
let runtime = runtime_builder.build()?;
Expand Down
22 changes: 14 additions & 8 deletions aggregator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use url::Url;
///
/// let _decoded: CommonConfig = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CommonConfig {
/// The database configuration.
Expand Down Expand Up @@ -333,7 +333,7 @@ mod tests {
test_util::{generate_db_config, generate_metrics_config, generate_trace_config},
CommonConfig, DbConfig, JobDriverConfig,
},
metrics::{HistogramScale, MetricsExporterConfiguration},
metrics::{MetricsExporterConfiguration, PollTimeHistogramConfiguration},
trace::OpenTelemetryTraceConfiguration,
};
use assert_matches::assert_matches;
Expand Down Expand Up @@ -438,16 +438,22 @@ metrics_config:
tokio:
enabled: true
enable_poll_time_histogram: true
poll_time_histogram_scale: log
poll_time_histogram_resolution_microseconds: 100
poll_time_histogram_buckets: 15
poll_time_histogram: !log
min_value_us: 100
max_value_us: 3000000
max_relative_error: 0.25
";
let config: CommonConfig = serde_yaml::from_str(input).unwrap();
let tokio_config = config.metrics_config.tokio.unwrap();
assert!(tokio_config.enabled);
assert!(tokio_config.enable_poll_time_histogram);
assert_eq!(tokio_config.poll_time_histogram_scale, HistogramScale::Log);
assert_eq!(tokio_config.poll_time_histogram_resolution_us, Some(100));
assert_eq!(tokio_config.poll_time_histogram_buckets, Some(15));
assert_eq!(
tokio_config.poll_time_histogram,
PollTimeHistogramConfiguration::Log {
min_value_us: Some(100),
max_value_us: Some(3_000_000),
max_relative_error: Some(0.25),
}
);
}
}
79 changes: 47 additions & 32 deletions aggregator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use {
},
};

#[cfg(all(tokio_unstable, feature = "prometheus"))]
#[cfg(feature = "prometheus")]
pub(crate) mod tokio_runtime;

#[cfg(test)]
Expand All @@ -66,7 +66,7 @@ pub enum Error {
}

/// Configuration for collection/exporting of application-level metrics.
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct MetricsConfiguration {
/// Configuration for OpenTelemetry metrics, with a choice of exporters.
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct OtlpExporterConfiguration {
}

/// Configuration options for Tokio's (unstable) metrics feature.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct TokioMetricsConfiguration {
/// Enable collecting metrics from Tokio. The flag `--cfg tokio_unstable` must be passsed
Expand All @@ -111,37 +111,53 @@ pub struct TokioMetricsConfiguration {
#[serde(default)]
pub enable_poll_time_histogram: bool,

/// Choose whether poll times should be tracked on a linear scale or a logarithmic scale.
/// If a linear scale is chosen, each histogram bucket will have an equal range. If a
/// logarithmic scale is chosen, an exponential histogram will be used, where each bucket
/// has double the width of the previous bucket.
/// Choose whether poll times should be tracked on a linear scale or a logarithmic scale, and
/// set the parameters for the histogram.
#[serde(default)]
pub poll_time_histogram_scale: HistogramScale,
pub poll_time_histogram: PollTimeHistogramConfiguration,
}

/// Resolution of the histogram tracking poll times, in microseconds. When using a linear
/// scale, every bucket will have this width. When using a logarithmic scale, the smallest
/// bucket will have this width.
// TODO(#3293): remove this alias during next breaking changes window.
#[serde(default, alias = "poll_time_histogram_resolution_microseconds")]
pub poll_time_histogram_resolution_us: Option<u64>,
/// Configuration for the poll time histogram.
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum PollTimeHistogramConfiguration {
/// Linear histogram scale.
Linear {
/// Width of each histogram bucket, in microseconds.
#[serde(default = "default_linear_histogram_resolution_us")]
resolution_us: u64,
/// Number of histogram buckets.
#[serde(default = "default_linear_histogram_num_buckets")]
num_buckets: usize,
},
/// Logarithmic histogram scale.
Log {
/// Sets the minimum duration that can be accurately recorded, in microseconds.
min_value_us: Option<u64>,
/// Sets the maximum value that can be accurately recorded, in microseconds.
max_value_us: Option<u64>,
/// Sets the maximum relative error. This should be between 0.0 and 1.0.
max_relative_error: Option<f64>,
},
}

/// Chooses the number of buckets in the histogram used to track poll times. This number of
/// buckets includes the bucket with a range extending to positive infinity.
#[serde(default)]
pub poll_time_histogram_buckets: Option<usize>,
impl Default for PollTimeHistogramConfiguration {
/// This uses the default configuration values of
/// [`tokio::runtime::Builder`].
fn default() -> Self {
Self::Linear {
resolution_us: default_linear_histogram_resolution_us(),
num_buckets: default_linear_histogram_num_buckets(),
}
}
}

fn default_linear_histogram_resolution_us() -> u64 {
100
}

/// Selects whether to use a linear scale or a logarithmic scale for a histogram.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum HistogramScale {
/// Linear histogram scale. Each bucket will cover a range of the same width.
#[default]
Linear,

/// Logarithmic histogram scale. Each successive bucket will cover a range twice as wide as its
/// predecessor.
Log,
fn default_linear_histogram_num_buckets() -> usize {
10
}

/// Choice of OpenTelemetry metrics exporter implementation.
Expand Down Expand Up @@ -254,12 +270,11 @@ impl View for CustomView {
#[cfg(feature = "prometheus")]
fn build_opentelemetry_prometheus_meter_provider(
registry: Registry,
_runtime_opt: Option<&Runtime>,
runtime_opt: Option<&Runtime>,
) -> Result<SdkMeterProvider, MetricsError> {
let mut reader_builder = opentelemetry_prometheus::exporter();
reader_builder = reader_builder.with_registry(registry);
#[cfg(tokio_unstable)]
if let Some(runtime) = _runtime_opt {
if let Some(runtime) = runtime_opt {
reader_builder = reader_builder
.with_producer(tokio_runtime::TokioRuntimeMetrics::new(runtime.metrics()));
}
Expand Down
21 changes: 5 additions & 16 deletions aggregator/src/metrics/tests/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ async fn http_metrics() {
let clock = MockClock::default();
let ephemeral_datastore = ephemeral_datastore().await;
let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);
let mut aggregator_config = default_aggregator_config();
aggregator_config.require_global_hpke_keys = false;
let handler = AggregatorHandlerBuilder::new(
datastore.clone(),
clock.clone(),
TestRuntime::default(),
&meter,
default_aggregator_config(),
aggregator_config,
)
.await
.unwrap()
Expand Down Expand Up @@ -117,19 +119,6 @@ async fn http_metrics() {
assert!(target_info_metric_labels.contains_key("process_runtime_name"));
assert!(target_info_metric_labels.contains_key("process_runtime_version"));

// Info metric, with one time series per instrumentation scope (i.e. unique instance of
// `Meter`), generated by opentelemetry-prometheus. It has a name label, and an optional version
// label.
assert_eq!(
metric_families["otel_scope_info"].get_field_type(),
MetricType::GAUGE
);
assert!(metric_families["otel_scope_info"].has_help());
assert_eq!(metric_families["otel_scope_info"].get_metric().len(), 1);
let otel_scope_info_metric_labels =
labels_to_map(&metric_families["otel_scope_info"].get_metric()[0]);
assert_eq!(otel_scope_info_metric_labels["otel_scope_name"], "tests");

// Custom sum metric for HTTP responses.
assert_eq!(
metric_families["janus_aggregator_responses_total"].get_field_type(),
Expand All @@ -150,7 +139,7 @@ async fn http_metrics() {
);
assert_eq!(
janus_aggregator_responses_total_metric_labels["route"],
"/hpke_config"
"hpke_config"
);
assert_eq!(
janus_aggregator_responses_total_metric_labels["error_code"],
Expand Down Expand Up @@ -181,7 +170,7 @@ async fn http_metrics() {
);
assert_eq!(
http_server_request_duration_seconds_metric_labels["http_route"],
"/hpke_config"
"hpke_config"
);
assert_eq!(
http_server_request_duration_seconds_metric_labels["http_request_method"],
Expand Down
Loading

0 comments on commit a3ef9df

Please sign in to comment.