Skip to content

Commit

Permalink
Replace gauges with observable gauges (#3452)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Oct 25, 2024
1 parent 149fd18 commit 4a107a7
Showing 1 changed file with 59 additions and 35 deletions.
94 changes: 59 additions & 35 deletions aggregator/src/aggregator/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use itertools::Itertools;
use opentelemetry::metrics::{Gauge, Meter};
use opentelemetry::metrics::{Meter, MetricsError};
use tokio::{
select,
sync::{
Expand Down Expand Up @@ -66,11 +66,10 @@ impl LIFORequestQueue {

let (message_tx, message_rx) = mpsc::unbounded_channel();
let id_counter = Default::default();
let metrics = Metrics::new(meter, meter_prefix);
metrics.max_outstanding_requests.record(
u64::try_from(usize::try_from(concurrency).unwrap() + depth).unwrap(),
&[],
);
let max_outstanding_requests =
u64::try_from(usize::try_from(concurrency).unwrap() + depth).unwrap();
let metrics = Metrics::new(meter, meter_prefix, max_outstanding_requests)
.map_err(|e| Error::Internal(e.to_string()))?;
Self::dispatcher(message_rx, concurrency, depth, metrics);

Ok(Self {
Expand Down Expand Up @@ -164,11 +163,11 @@ impl LIFORequestQueue {

// Unwrap safety: only fails on architectures where usize is less than 32 bits, or
// greater than 64 bits.
metrics.outstanding_requests.record(
metrics.outstanding_requests.store(
(stack.len() + usize::try_from(concurrency).unwrap() - permits.num_permits())
.try_into()
.unwrap(),
&[],
Ordering::Relaxed,
);
}
})
Expand Down Expand Up @@ -290,39 +289,64 @@ struct Metrics {
/// since the queue length may have changed before the measurement is taken. In practice, the
/// error should only be +/- 1. It is also more or less suitable for synchronization during
/// tests.
outstanding_requests: Gauge<u64>,

/// The maximum number of requests the queue will service at a time. This is always set to
/// `depth + concurrency`.
max_outstanding_requests: Gauge<u64>,
outstanding_requests: Arc<AtomicU64>,
}

impl Metrics {
const OUTSTANDING_REQUESTS_METRIC_NAME: &'static str = "outstanding_requests";
const MAX_OUTSTANDING_REQUESTS_METRIC_NAME: &'static str = "max_outstanding_requests";

fn new(meter: &Meter, prefix: &str) -> Self {
Self {
outstanding_requests: meter
.u64_gauge(Self::get_outstanding_requests_name(prefix))
.with_description(concat!(
"The approximate number of requests currently being serviced by the ",
"aggregator."
))
.with_unit("{request}")
.init(),
max_outstanding_requests: meter
.u64_gauge(
[prefix, Self::MAX_OUTSTANDING_REQUESTS_METRIC_NAME]
.into_iter()
.join("_"),
)
.with_description(concat!(
"The maximum number of requests that the aggregator can service at a time."
))
.with_unit("{request}")
.init(),
}
fn new(
meter: &Meter,
prefix: &str,
max_outstanding_requests: u64,
) -> Result<Self, MetricsError> {
let outstanding_requests = Arc::new(AtomicU64::new(0));
let outstanding_requests_gauge = meter
.u64_observable_gauge(Self::get_outstanding_requests_name(prefix))
.with_description(concat!(
"The approximate number of requests currently being serviced by the ",
"aggregator."
))
.with_unit("{request}")
.init();
let max_outstanding_requests_gauge = meter
.u64_observable_gauge(
[prefix, Self::MAX_OUTSTANDING_REQUESTS_METRIC_NAME]
.into_iter()
.join("_"),
)
.with_description(concat!(
"The maximum number of requests that the aggregator can service at a time."
))
.with_unit("{request}")
.init();

meter.register_callback(
&[
outstanding_requests_gauge.as_any(),
max_outstanding_requests_gauge.as_any(),
],
{
let outstanding_requests = Arc::clone(&outstanding_requests);
move |observer| {
observer.observe_u64(
&outstanding_requests_gauge,
outstanding_requests.load(Ordering::Relaxed),
&[],
);
observer.observe_u64(
&max_outstanding_requests_gauge,
max_outstanding_requests,
&[],
);
}
},
)?;

Ok(Self {
outstanding_requests,
})
}

fn get_outstanding_requests_name(prefix: &str) -> String {
Expand Down

0 comments on commit 4a107a7

Please sign in to comment.