diff --git a/.changelog/unreleased/improvements/ibc-relayer/4380-telemetry-namespace.md b/.changelog/unreleased/improvements/ibc-relayer/4380-telemetry-namespace.md new file mode 100644 index 0000000000..3a65eda16e --- /dev/null +++ b/.changelog/unreleased/improvements/ibc-relayer/4380-telemetry-namespace.md @@ -0,0 +1,3 @@ +- Add a new optional configuration `prefix` for telemetry, allowing users + to specify a prefixed namespace for Prometheus metrics. + ([\#4380](https://github.com/informalsystems/hermes/issues/4380)) \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 7c5481b3d7..8808c10b25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4656,6 +4656,7 @@ dependencies = [ "once_cell", "opentelemetry", "opentelemetry-prometheus", + "opentelemetry_sdk", "prometheus", "serde", "serde_json", @@ -6844,9 +6845,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" dependencies = [ "opentelemetry_api", "opentelemetry_sdk", @@ -6854,25 +6855,27 @@ dependencies = [ [[package]] name = "opentelemetry-prometheus" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a9f186f6293ebb693caddd0595e66b74a6068fa51048e26e0bf9c95478c639c" +checksum = "c7d81bc254e2d572120363a2b16cdb0d715d301b5789be0cfc26ad87e4e10e53" dependencies = [ - "opentelemetry", + "once_cell", + "opentelemetry_api", + "opentelemetry_sdk", "prometheus", "protobuf", ] [[package]] name = "opentelemetry_api" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" dependencies = [ - "fnv", "futures-channel", "futures-util", "indexmap 1.9.3", + "js-sys", "once_cell", "pin-project-lite", "thiserror 1.0.69", @@ -6881,21 +6884,21 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" dependencies = [ "async-trait", "crossbeam-channel", - "dashmap", - "fnv", "futures-channel", "futures-executor", "futures-util", "once_cell", "opentelemetry_api", + "ordered-float", "percent-encoding", "rand 0.8.5", + "regex", "thiserror 1.0.69", ] @@ -6905,6 +6908,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "3.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" +dependencies = [ + "num-traits", +] + [[package]] name = "orion" version = "0.17.10" diff --git a/Cargo.toml b/Cargo.toml index c5c87afcc5..75d5936572 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,8 +85,9 @@ num-bigint = "0.4" num-rational = "0.4.1" once_cell = "1.20.2" oneline-eyre = "0.1" -opentelemetry = "0.19.0" -opentelemetry-prometheus = "0.12.0" +opentelemetry = "0.20.0" +opentelemetry_sdk = { version = "0.20.0", features = ["metrics"] } +opentelemetry-prometheus = "0.13.0" primitive-types = { version = "0.12.1", default-features = false } prometheus = "0.13.4" prost = "0.13" diff --git a/config.toml b/config.toml index 224d49af7c..1c38c719fa 100644 --- a/config.toml +++ b/config.toml @@ -128,6 +128,18 @@ host = '127.0.0.1' # by the telemetry service. Default: 3001 port = 3001 +# Specify the prefix used for Prometheus metrics. This configuration is optional, if +# it is missing the result is equivalent to setting it to empty string: `prefix = ""` +# For example setting `prefix = "hermes"` will result in: +# * `hermes_acknowledgement_events_total` +# * `hermes_wallet_balance` +# * etc... +# The default is no prefix, which results in: +# * `acknowledgement_events_total` +# * `wallet_balance` +# * etc... +# prefix = "" + [telemetry.buckets] # Specify the range of the 10 histogram buckets in ms for the `tx_latency_submitted` metric. # Default: { start = 500, end = 10000, buckets = 10 } diff --git a/crates/relayer-cli/src/commands/start.rs b/crates/relayer-cli/src/commands/start.rs index cc9c9193f7..2493988deb 100644 --- a/crates/relayer-cli/src/commands/start.rs +++ b/crates/relayer-cli/src/commands/start.rs @@ -178,6 +178,7 @@ fn spawn_telemetry_server(config: &Config) { config.telemetry.buckets.latency_submitted.buckets, config.telemetry.buckets.latency_confirmed.range.clone(), config.telemetry.buckets.latency_confirmed.buckets, + config.telemetry.prefix.as_str(), ); let telemetry = config.telemetry.clone(); diff --git a/crates/relayer/src/config.rs b/crates/relayer/src/config.rs index e53c6a0d42..a2ec4e8a97 100644 --- a/crates/relayer/src/config.rs +++ b/crates/relayer/src/config.rs @@ -498,6 +498,8 @@ pub struct TelemetryConfig { pub port: u16, #[serde(default = "HistogramBuckets::default")] pub buckets: HistogramBuckets, + #[serde(default)] + pub prefix: String, } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -562,6 +564,7 @@ impl Default for TelemetryConfig { host: "127.0.0.1".to_string(), port: 3001, buckets: HistogramBuckets::default(), + prefix: Default::default(), } } } diff --git a/crates/telemetry/Cargo.toml b/crates/telemetry/Cargo.toml index d857f317b2..8e82dfe8b7 100644 --- a/crates/telemetry/Cargo.toml +++ b/crates/telemetry/Cargo.toml @@ -19,7 +19,8 @@ axum = { workspace = true } dashmap = { workspace = true } moka = { workspace = true, features = ["sync"] } once_cell = { workspace = true } -opentelemetry = { workspace = true, features = ["metrics"] } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["metrics"] } opentelemetry-prometheus = { workspace = true } prometheus = { workspace = true } serde = { workspace = true } diff --git a/crates/telemetry/src/lib.rs b/crates/telemetry/src/lib.rs index a26c7177e8..a52b710d8a 100644 --- a/crates/telemetry/src/lib.rs +++ b/crates/telemetry/src/lib.rs @@ -20,12 +20,14 @@ pub fn new_state( tx_latency_submitted_buckets: u64, tx_latency_confirmed_range: Range, tx_latency_confirmed_buckets: u64, + namespace: &str, ) -> Arc { Arc::new(TelemetryState::new( tx_latency_submitted_range, tx_latency_submitted_buckets, tx_latency_confirmed_range, tx_latency_confirmed_buckets, + namespace, )) } @@ -36,12 +38,14 @@ pub fn init( tx_latency_submitted_buckets: u64, tx_latency_confirmed_range: Range, tx_latency_confirmed_buckets: u64, + namespace: &str, ) -> &'static Arc { let new_state = new_state( tx_latency_submitted_range, tx_latency_submitted_buckets, tx_latency_confirmed_range, tx_latency_confirmed_buckets, + namespace, ); match GLOBAL_STATE.set(new_state) { Ok(_) => debug!("initialised telemetry global state"), @@ -68,6 +72,7 @@ pub fn global() -> &'static Arc { end: 20000, }, 10, + "", ) } } diff --git a/crates/telemetry/src/state.rs b/crates/telemetry/src/state.rs index 136b414077..759d84cdc4 100644 --- a/crates/telemetry/src/state.rs +++ b/crates/telemetry/src/state.rs @@ -8,11 +8,11 @@ use std::{ use dashmap::{DashMap, DashSet}; use opentelemetry::{ global, - metrics::{Counter, ObservableGauge, UpDownCounter}, - Context, KeyValue, + metrics::{Counter, Histogram, ObservableGauge, Unit, UpDownCounter}, + KeyValue, }; -use opentelemetry_prometheus::PrometheusExporter; -use prometheus::proto::MetricFamily; +use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, MeterProvider, Stream}; +use prometheus::{proto::MetricFamily, Registry}; use ibc_relayer_types::{ applications::transfer::Coin, @@ -93,7 +93,8 @@ impl Display for WorkerType { } pub struct TelemetryState { - exporter: PrometheusExporter, + registry: Registry, + _meter_provider: MeterProvider, /// Number of workers per type workers: UpDownCounter, @@ -138,12 +139,12 @@ pub struct TelemetryState { /// Indicates the latency for all transactions submitted to a specific chain, /// i.e. the difference between the moment when Hermes received a batch of events /// until the corresponding transaction(s) were submitted. Milliseconds. - tx_latency_submitted: ObservableGauge, + tx_latency_submitted: Histogram, /// Indicates the latency for all transactions submitted to a specific chain, /// i.e. the difference between the moment when Hermes received a batch of events /// until the corresponding transaction(s) were confirmed. Milliseconds. - tx_latency_confirmed: ObservableGauge, + tx_latency_confirmed: Histogram, /// Records the time at which we started processing an event batch. /// Used for computing the `tx_latency` metric. @@ -205,13 +206,13 @@ pub struct TelemetryState { simulate_errors: Counter, /// The EIP-1559 base fee queried - dynamic_gas_queried_fees: ObservableGauge, + dynamic_gas_queried_fees: Histogram, /// The EIP-1559 base fee paid - dynamic_gas_paid_fees: ObservableGauge, + dynamic_gas_paid_fees: Histogram, /// The EIP-1559 base fee successfully queried - dynamic_gas_queried_success_fees: ObservableGauge, + dynamic_gas_queried_success_fees: Histogram, /// Number of ICS-20 packets filtered because the memo and/or the receiver fields were exceeding the configured limits filtered_packets: Counter, @@ -232,27 +233,78 @@ impl TelemetryState { tx_latency_submitted_buckets: u64, tx_latency_confirmed_range: Range, tx_latency_confirmed_buckets: u64, + namespace: &str, ) -> Self { - use opentelemetry::sdk::export::metrics::aggregation; - use opentelemetry::sdk::metrics::{controllers, processors}; - - let controller = controllers::basic(processors::factory( - CustomAggregatorSelector::new( - tx_latency_submitted_range, - tx_latency_submitted_buckets, - tx_latency_confirmed_range, - tx_latency_confirmed_buckets, - ), - aggregation::cumulative_temporality_selector(), - )) - .build(); - - let exporter = opentelemetry_prometheus::ExporterBuilder::new(controller).init(); + let registry = Registry::new(); + + // Create views for custom histogram buckets + let tx_submitted_buckets = build_histogram_buckets( + tx_latency_submitted_range.start, + tx_latency_submitted_range.end, + tx_latency_submitted_buckets, + ); + + let tx_confirmed_buckets = build_histogram_buckets( + tx_latency_confirmed_range.start, + tx_latency_confirmed_range.end, + tx_latency_confirmed_buckets, + ); + + let tx_submitted_view = new_view( + Instrument::new().name("tx_latency_submitted"), + Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: tx_submitted_buckets, + record_min_max: true, + }), + ) + .unwrap(); + + let tx_confirmed_view = new_view( + Instrument::new().name("tx_latency_confirmed"), + Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: tx_confirmed_buckets, + record_min_max: true, + }), + ) + .unwrap(); + + let gas_fees_view = new_view( + Instrument::new().name("dynamic_gas_*_fees"), + Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: vec![0.0025, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0], + record_min_max: true, + }), + ) + .unwrap(); + + let raw_exporter = opentelemetry_prometheus::exporter().with_registry(registry.clone()); + + // Condition required to avoid prefixing `_` when using empty namespace + let exporter = if !namespace.is_empty() { + raw_exporter + .with_namespace(namespace) + .build() + .expect("Failed to create Prometheus exporter") + } else { + raw_exporter + .build() + .expect("Failed to create Prometheus exporter") + }; + + // Build MeterProvider with views + let meter_provider = MeterProvider::builder() + .with_reader(exporter) + .with_view(tx_submitted_view) + .with_view(tx_confirmed_view) + .with_view(gas_fees_view) + .build(); + global::set_meter_provider(meter_provider.clone()); let meter = global::meter("hermes"); Self { - exporter, + registry, + _meter_provider: meter_provider, workers: meter .i64_up_down_counter("workers") @@ -347,7 +399,7 @@ impl TelemetryState { .init(), tx_latency_submitted: meter - .u64_observable_gauge("tx_latency_submitted") + .u64_histogram("tx_latency_submitted") .with_unit(Unit::new("milliseconds")) .with_description("The latency for all transactions submitted to a specific chain, \ i.e. the difference between the moment when Hermes received a batch of events \ @@ -355,7 +407,7 @@ impl TelemetryState { .init(), tx_latency_confirmed: meter - .u64_observable_gauge("tx_latency_confirmed") + .u64_histogram("tx_latency_confirmed") .with_unit(Unit::new("milliseconds")) .with_description("The latency for all transactions submitted & confirmed to a specific chain, \ i.e. the difference between the moment when Hermes received a batch of events \ @@ -414,17 +466,17 @@ impl TelemetryState { .init(), dynamic_gas_queried_fees: meter - .f64_observable_gauge("dynamic_gas_queried_fees") + .f64_histogram("dynamic_gas_queried_fees") .with_description("The EIP-1559 base fee queried") .init(), dynamic_gas_paid_fees: meter - .f64_observable_gauge("dynamic_gas_paid_fees") + .f64_histogram("dynamic_gas_paid_fees") .with_description("The EIP-1559 base fee paid") .init(), dynamic_gas_queried_success_fees: meter - .f64_observable_gauge("dynamic_gas_queried_success_fees") + .f64_histogram("dynamic_gas_queried_success_fees") .with_description("The EIP-1559 base fee successfully queried") .init(), @@ -452,7 +504,7 @@ impl TelemetryState { /// Gather the metrics for export pub fn gather(&self) -> Vec { - self.exporter.registry().gather() + self.registry.gather() } pub fn init_worker_by_type(&self, worker_type: WorkerType) { @@ -460,13 +512,11 @@ impl TelemetryState { } pub fn init_per_chain(&self, chain_id: &ChainId) { - let cx = Context::current(); - let labels = &[KeyValue::new("chain", chain_id.to_string())]; - self.ws_reconnect.add(&cx, 0, labels); - self.ws_events.add(&cx, 0, labels); - self.messages_submitted.add(&cx, 0, labels); + self.ws_reconnect.add(0, labels); + self.ws_events.add(0, labels); + self.messages_submitted.add(0, labels); self.init_queries(chain_id); } @@ -480,8 +530,6 @@ impl TelemetryState { src_port: &PortId, dst_port: &PortId, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), KeyValue::new("dst_chain", dst_chain.to_string()), @@ -491,9 +539,9 @@ impl TelemetryState { KeyValue::new("dst_port", dst_port.to_string()), ]; - self.receive_packets_confirmed.add(&cx, 0, labels); - self.acknowledgment_packets_confirmed.add(&cx, 0, labels); - self.timeout_packets_confirmed.add(&cx, 0, labels); + self.receive_packets_confirmed.add(0, labels); + self.acknowledgment_packets_confirmed.add(0, labels); + self.timeout_packets_confirmed.add(0, labels); } pub fn init_per_path( @@ -504,8 +552,6 @@ impl TelemetryState { port: &PortId, clear_packets: bool, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain.to_string()), KeyValue::new("counterparty", counterparty.to_string()), @@ -513,18 +559,18 @@ impl TelemetryState { KeyValue::new("port", port.to_string()), ]; - self.send_packet_events.add(&cx, 0, labels); - self.acknowledgement_events.add(&cx, 0, labels); - self.timeout_events.add(&cx, 0, labels); + self.send_packet_events.add(0, labels); + self.acknowledgement_events.add(0, labels); + self.timeout_events.add(0, labels); if clear_packets { - self.cleared_send_packet_events.add(&cx, 0, labels); - self.cleared_acknowledgment_events.add(&cx, 0, labels); + self.cleared_send_packet_events.add(0, labels); + self.cleared_acknowledgment_events.add(0, labels); } - self.backlog_oldest_sequence.observe(&cx, 0, labels); - self.backlog_latest_update_timestamp.observe(&cx, 0, labels); - self.backlog_size.observe(&cx, 0, labels); + self.backlog_oldest_sequence.observe(0, labels); + self.backlog_latest_update_timestamp.observe(0, labels); + self.backlog_size.observe(0, labels); } pub fn init_per_client( @@ -534,32 +580,28 @@ impl TelemetryState { client: &ClientId, misbehaviour: bool, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), KeyValue::new("dst_chain", dst_chain.to_string()), KeyValue::new("client", client.to_string()), ]; - self.client_updates_submitted.add(&cx, 0, labels); - self.client_updates_skipped.add(&cx, 0, labels); + self.client_updates_submitted.add(0, labels); + self.client_updates_skipped.add(0, labels); if misbehaviour { - self.client_misbehaviours_submitted.add(&cx, 0, labels); + self.client_misbehaviours_submitted.add(0, labels); } } fn init_queries(&self, chain_id: &ChainId) { - let cx = Context::current(); - for query_type in QUERY_TYPES { let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("query_type", query_type), ]; - self.queries.add(&cx, 0, labels); + self.queries.add(0, labels); } for query_type in QUERY_TYPES_CACHE { @@ -568,15 +610,14 @@ impl TelemetryState { KeyValue::new("query_type", query_type), ]; - self.queries_cache_hits.add(&cx, 0, labels); + self.queries_cache_hits.add(0, labels); } } /// Update the number of workers per object pub fn worker(&self, worker_type: WorkerType, count: i64) { - let cx = Context::current(); let labels = &[KeyValue::new("type", worker_type.to_string())]; - self.workers.add(&cx, count, labels); + self.workers.add(count, labels); } /// Update the number of client updates per client @@ -587,15 +628,13 @@ impl TelemetryState { client: &ClientId, count: u64, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), KeyValue::new("dst_chain", dst_chain.to_string()), KeyValue::new("client", client.to_string()), ]; - self.client_updates_submitted.add(&cx, count, labels); + self.client_updates_submitted.add(count, labels); } /// Update the number of client updates skipped per client @@ -606,15 +645,13 @@ impl TelemetryState { client: &ClientId, count: u64, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), KeyValue::new("dst_chain", dst_chain.to_string()), KeyValue::new("client", client.to_string()), ]; - self.client_updates_skipped.add(&cx, count, labels); + self.client_updates_skipped.add(count, labels); } /// Number of client misbehaviours per client @@ -625,15 +662,13 @@ impl TelemetryState { client: &ClientId, count: u64, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), KeyValue::new("dst_chain", dst_chain.to_string()), KeyValue::new("client", client.to_string()), ]; - self.client_misbehaviours_submitted.add(&cx, count, labels); + self.client_misbehaviours_submitted.add(count, labels); } /// Number of receive packets relayed, per channel @@ -648,8 +683,6 @@ impl TelemetryState { dst_port: &PortId, count: u64, ) { - let cx = Context::current(); - if count > 0 { let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), @@ -660,7 +693,7 @@ impl TelemetryState { KeyValue::new("dst_port", dst_port.to_string()), ]; - self.receive_packets_confirmed.add(&cx, count, labels); + self.receive_packets_confirmed.add(count, labels); } } @@ -676,8 +709,6 @@ impl TelemetryState { dst_port: &PortId, count: u64, ) { - let cx = Context::current(); - if count > 0 { let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), @@ -688,8 +719,7 @@ impl TelemetryState { KeyValue::new("dst_port", dst_port.to_string()), ]; - self.acknowledgment_packets_confirmed - .add(&cx, count, labels); + self.acknowledgment_packets_confirmed.add(count, labels); } } @@ -705,8 +735,6 @@ impl TelemetryState { dst_port: &PortId, count: u64, ) { - let cx = Context::current(); - if count > 0 { let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), @@ -717,73 +745,61 @@ impl TelemetryState { KeyValue::new("dst_port", dst_port.to_string()), ]; - self.timeout_packets_confirmed.add(&cx, count, labels); + self.timeout_packets_confirmed.add(count, labels); } } /// Number of queries emitted by the relayer, per chain and query type pub fn query(&self, chain_id: &ChainId, query_type: &'static str) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("query_type", query_type), ]; - self.queries.add(&cx, 1, labels); + self.queries.add(1, labels); } /// Number of cache hits for queries emitted by the relayer, per chain and query type pub fn queries_cache_hits(&self, chain_id: &ChainId, query_type: &'static str) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("query_type", query_type), ]; - self.queries_cache_hits.add(&cx, 1, labels); + self.queries_cache_hits.add(1, labels); } /// Number of time the relayer had to reconnect to the WebSocket endpoint, per chain pub fn ws_reconnect(&self, chain_id: &ChainId) { - let cx = Context::current(); - let labels = &[KeyValue::new("chain", chain_id.to_string())]; - self.ws_reconnect.add(&cx, 1, labels); + self.ws_reconnect.add(1, labels); } /// How many IBC events did Hermes receive via the WebSocket subscription, per chain pub fn ws_events(&self, chain_id: &ChainId, count: u64) { - let cx = Context::current(); - let labels = &[KeyValue::new("chain", chain_id.to_string())]; - self.ws_events.add(&cx, count, labels); + self.ws_events.add(count, labels); } /// How many messages Hermes submitted to the chain pub fn messages_submitted(&self, chain_id: &ChainId, count: u64) { - let cx = Context::current(); - let labels = &[KeyValue::new("chain", chain_id.to_string())]; - self.messages_submitted.add(&cx, count, labels); + self.messages_submitted.add(count, labels); } /// The balance in each wallet that Hermes is using, per account, denom and chain. /// The amount given is of unit: 10^6 * `denom` pub fn wallet_balance(&self, chain_id: &ChainId, account: &str, amount: f64, denom: &str) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("account", account.to_string()), KeyValue::new("denom", denom.to_string()), ]; - self.wallet_balance.observe(&cx, amount, labels); + self.wallet_balance.observe(amount, labels); } pub fn received_event_batch(&self, tracking_id: impl ToString) { @@ -800,8 +816,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - let tracking_id = tracking_id.to_string(); if let Some(start) = self.in_flight_events.get(&tracking_id) { @@ -816,7 +830,7 @@ impl TelemetryState { ]; for _ in 0..tx_count { - self.tx_latency_submitted.observe(&cx, latency, labels); + self.tx_latency_submitted.record(latency, labels); } } } @@ -830,8 +844,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - let tracking_id = tracking_id.to_string(); if let Some(start) = self.in_flight_events.get(&tracking_id) { @@ -846,7 +858,7 @@ impl TelemetryState { ]; for _ in 0..tx_count { - self.tx_latency_confirmed.observe(&cx, latency, labels); + self.tx_latency_confirmed.record(latency, labels); } } } @@ -860,8 +872,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("counterparty", counterparty_chain_id.to_string()), @@ -869,7 +879,7 @@ impl TelemetryState { KeyValue::new("port", port_id.to_string()), ]; - self.send_packet_events.add(&cx, 1, labels); + self.send_packet_events.add(1, labels); } pub fn acknowledgement_events( @@ -881,8 +891,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("counterparty", counterparty_chain_id.to_string()), @@ -890,7 +898,7 @@ impl TelemetryState { KeyValue::new("port", port_id.to_string()), ]; - self.acknowledgement_events.add(&cx, 1, labels); + self.acknowledgement_events.add(1, labels); } pub fn timeout_events( @@ -900,8 +908,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("counterparty", counterparty_chain_id.to_string()), @@ -909,7 +915,7 @@ impl TelemetryState { KeyValue::new("port", port_id.to_string()), ]; - self.timeout_events.add(&cx, 1, labels); + self.timeout_events.add(1, labels); } pub fn cleared_send_packet_events( @@ -921,8 +927,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - let labels: &[KeyValue; 4] = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("counterparty", counterparty_chain_id.to_string()), @@ -930,7 +934,7 @@ impl TelemetryState { KeyValue::new("port", port_id.to_string()), ]; - self.cleared_send_packet_events.add(&cx, 1, labels); + self.cleared_send_packet_events.add(1, labels); } pub fn cleared_acknowledgment_events( @@ -942,8 +946,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - let labels: &[KeyValue; 4] = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("counterparty", counterparty_chain_id.to_string()), @@ -951,7 +953,7 @@ impl TelemetryState { KeyValue::new("port", port_id.to_string()), ]; - self.cleared_acknowledgment_events.add(&cx, 1, labels); + self.cleared_acknowledgment_events.add(1, labels); } /// Inserts in the backlog a new event for the given sequence number. @@ -964,8 +966,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - // Unique identifier for a chain/channel/port. let path_uid: PathIdentifier = PathIdentifier::new( chain_id.to_string(), @@ -1018,10 +1018,10 @@ impl TelemetryState { }; // Update metrics to reflect the new state of the backlog - self.backlog_oldest_sequence.observe(&cx, oldest_sn, labels); + self.backlog_oldest_sequence.observe(oldest_sn, labels); self.backlog_latest_update_timestamp - .observe(&cx, timestamp, labels); - self.backlog_size.observe(&cx, total, labels); + .observe(timestamp, labels); + self.backlog_size.observe(total, labels); } /// Inserts in the backlog a new event for the given sequence number. @@ -1077,8 +1077,6 @@ impl TelemetryState { port_id: &PortId, counterparty_chain_id: &ChainId, ) { - let cx = Context::current(); - // Unique identifier for a chain/channel/port path. let path_uid: PathIdentifier = PathIdentifier::new( chain_id.to_string(), @@ -1104,17 +1102,16 @@ impl TelemetryState { if path_backlog.remove(&seq_nr).is_some() { // If the entry was removed update the latest update timestamp. self.backlog_latest_update_timestamp - .observe(&cx, timestamp, labels); + .observe(timestamp, labels); // The oldest pending sequence number is the minimum key in the inner (path) backlog. if let Some(min_key) = path_backlog.iter().map(|v| *v.key()).min() { - self.backlog_oldest_sequence.observe(&cx, min_key, labels); - self.backlog_size - .observe(&cx, path_backlog.len() as u64, labels); + self.backlog_oldest_sequence.observe(min_key, labels); + self.backlog_size.observe(path_backlog.len() as u64, labels); } else { // No minimum found, update the metrics to reflect an empty backlog self.backlog_oldest_sequence - .observe(&cx, EMPTY_BACKLOG_SYMBOL, labels); - self.backlog_size.observe(&cx, EMPTY_BACKLOG_SYMBOL, labels); + .observe(EMPTY_BACKLOG_SYMBOL, labels); + self.backlog_size.observe(EMPTY_BACKLOG_SYMBOL, labels); } } } @@ -1127,8 +1124,6 @@ impl TelemetryState { if !self.visible_fee_addresses.contains(&receiver.to_string()) { return; } - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("receiver", receiver.to_string()), @@ -1137,7 +1132,7 @@ impl TelemetryState { let fee_amount = fee_amounts.amount.0.as_u64(); - self.fee_amounts.add(&cx, fee_amount, labels); + self.fee_amounts.add(fee_amount, labels); let ephemeral_fee: moka::sync::Cache = moka::sync::Cache::builder() .time_to_live(FEE_LIFETIME) // Remove entries after 1 hour without insert @@ -1152,12 +1147,10 @@ impl TelemetryState { let sum: u64 = cached_fees.iter().filter_map(|e| e.get(&key)).sum(); - self.period_fees.observe(&cx, sum, labels); + self.period_fees.observe(sum, labels); } pub fn update_period_fees(&self, chain_id: &ChainId, receiver: &String, denom: &String) { - let cx = Context::current(); - let labels = &[ KeyValue::new("chain", chain_id.to_string()), KeyValue::new("receiver", receiver.to_string()), @@ -1170,7 +1163,7 @@ impl TelemetryState { let sum: u64 = cached_fees.iter().filter_map(|e| e.get(&key)).sum(); - self.period_fees.observe(&cx, sum, labels); + self.period_fees.observe(sum, labels); } // Add an address to the list of addresses which will record @@ -1182,7 +1175,6 @@ impl TelemetryState { /// Add an error and its description to the list of errors observed after broadcasting /// a Tx with a specific account. pub fn broadcast_errors(&self, address: &String, error_code: u32, error_description: &str) { - let cx = Context::current(); let broadcast_error = BroadcastError::new(error_code, error_description); let labels = &[ @@ -1191,46 +1183,37 @@ impl TelemetryState { KeyValue::new("error_description", broadcast_error.description), ]; - self.broadcast_errors.add(&cx, 1, labels); + self.broadcast_errors.add(1, labels); } /// Add an error and its description to the list of errors observed after simulating /// a Tx with a specific account. pub fn simulate_errors(&self, address: &String, recoverable: bool, error_description: String) { - let cx = Context::current(); - let labels = &[ KeyValue::new("account", address.to_string()), KeyValue::new("recoverable", recoverable.to_string()), KeyValue::new("error_description", error_description.to_owned()), ]; - self.simulate_errors.add(&cx, 1, labels); + self.simulate_errors.add(1, labels); } pub fn dynamic_gas_queried_fees(&self, chain_id: &ChainId, amount: f64) { - let cx = Context::current(); - let labels = &[KeyValue::new("identifier", chain_id.to_string())]; - self.dynamic_gas_queried_fees.observe(&cx, amount, labels); + self.dynamic_gas_queried_fees.record(amount, labels); } pub fn dynamic_gas_paid_fees(&self, chain_id: &ChainId, amount: f64) { - let cx = Context::current(); - let labels = &[KeyValue::new("identifier", chain_id.to_string())]; - self.dynamic_gas_paid_fees.observe(&cx, amount, labels); + self.dynamic_gas_paid_fees.record(amount, labels); } pub fn dynamic_gas_queried_success_fees(&self, chain_id: &ChainId, amount: f64) { - let cx = Context::current(); - let labels = &[KeyValue::new("identifier", chain_id.to_string())]; - self.dynamic_gas_queried_success_fees - .observe(&cx, amount, labels); + self.dynamic_gas_queried_success_fees.record(amount, labels); } /// Increment number of packets filtered because the memo field is too big @@ -1245,8 +1228,6 @@ impl TelemetryState { dst_port: &PortId, count: u64, ) { - let cx = Context::current(); - if count > 0 { let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), @@ -1257,20 +1238,18 @@ impl TelemetryState { KeyValue::new("dst_port", dst_port.to_string()), ]; - self.filtered_packets.add(&cx, count, labels); + self.filtered_packets.add(count, labels); } } pub fn cross_chain_queries(&self, src_chain: &ChainId, dst_chain: &ChainId, count: usize) { - let cx = Context::current(); - if count > 0 { let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), KeyValue::new("dst_chain", dst_chain.to_string()), ]; - self.cross_chain_queries.add(&cx, count as u64, labels); + self.cross_chain_queries.add(count as u64, labels); } } @@ -1280,8 +1259,6 @@ impl TelemetryState { dst_chain: &ChainId, ccq_responses_codes: Vec, ) { - let cx = Context::current(); - let labels = &[ KeyValue::new("src_chain", src_chain.to_string()), KeyValue::new("dst_chain", dst_chain.to_string()), @@ -1289,62 +1266,14 @@ impl TelemetryState { for code in ccq_responses_codes.iter() { if code.is_ok() { - self.cross_chain_query_responses.add(&cx, 1, labels); + self.cross_chain_query_responses.add(1, labels); } else { - self.cross_chain_query_error_responses.add(&cx, 1, labels); + self.cross_chain_query_error_responses.add(1, labels); } } } } -use std::sync::Arc; - -use opentelemetry::metrics::Unit; -use opentelemetry::sdk::export::metrics::AggregatorSelector; -use opentelemetry::sdk::metrics::aggregators::Aggregator; -use opentelemetry::sdk::metrics::aggregators::{histogram, last_value, sum}; -use opentelemetry::sdk::metrics::sdk_api::Descriptor; - -#[derive(Debug)] -struct CustomAggregatorSelector { - tx_latency_submitted_range: Range, - tx_latency_submitted_buckets: u64, - tx_latency_confirmed_range: Range, - tx_latency_confirmed_buckets: u64, -} - -impl CustomAggregatorSelector { - pub fn new( - tx_latency_submitted_range: Range, - tx_latency_submitted_buckets: u64, - tx_latency_confirmed_range: Range, - tx_latency_confirmed_buckets: u64, - ) -> Self { - Self { - tx_latency_submitted_range, - tx_latency_submitted_buckets, - tx_latency_confirmed_range, - tx_latency_confirmed_buckets, - } - } - - pub fn get_submitted_range(&self) -> Vec { - build_histogram_buckets( - self.tx_latency_submitted_range.start, - self.tx_latency_submitted_range.end, - self.tx_latency_submitted_buckets, - ) - } - - pub fn get_confirmed_range(&self) -> Vec { - build_histogram_buckets( - self.tx_latency_confirmed_range.start, - self.tx_latency_confirmed_range.end, - self.tx_latency_confirmed_buckets, - ) - } -} - fn build_histogram_buckets(start: u64, end: u64, buckets: u64) -> Vec { let step = (end - start) / buckets; (0..=buckets) @@ -1352,33 +1281,6 @@ fn build_histogram_buckets(start: u64, end: u64, buckets: u64) -> Vec { .collect::>() } -impl AggregatorSelector for CustomAggregatorSelector { - fn aggregator_for(&self, descriptor: &Descriptor) -> Option> { - match descriptor.name() { - "wallet_balance" => Some(Arc::new(last_value())), - "backlog_oldest_sequence" => Some(Arc::new(last_value())), - "backlog_latest_update_timestamp" => Some(Arc::new(last_value())), - "backlog_size" => Some(Arc::new(last_value())), - // Prometheus' supports only collector for histogram, sum, and last value aggregators. - // https://docs.rs/opentelemetry-prometheus/0.10.0/src/opentelemetry_prometheus/lib.rs.html#411-418 - // TODO: Once quantile sketches are supported, replace histograms with that. - "tx_latency_submitted" => Some(Arc::new(histogram(&self.get_submitted_range()))), - "tx_latency_confirmed" => Some(Arc::new(histogram(&self.get_confirmed_range()))), - "dynamic_gas_queried_fees" => Some(Arc::new(histogram(&[ - 0.0025, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, - ]))), - "dynamic_gas_paid_fees" => Some(Arc::new(histogram(&[ - 0.0025, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, - ]))), - "dynamic_gas_queried_success_fees" => Some(Arc::new(histogram(&[ - 0.0025, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, - ]))), - "ics29_period_fees" => Some(Arc::new(last_value())), - _ => Some(Arc::new(sum())), - } - } -} - #[cfg(test)] mod tests { use prometheus::proto::Metric; @@ -1398,6 +1300,7 @@ mod tests { end: 5000, }, 5, + "hermes", ); let chain_id = ChainId::from_string("chain-test"); @@ -1413,10 +1316,10 @@ mod tests { state.backlog_remove(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id); state.backlog_remove(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id); - let metrics = state.exporter.registry().gather().clone(); + let metrics = state.registry.gather().clone(); let backlog_size = metrics .iter() - .find(|metric| metric.get_name() == "backlog_size") + .find(|metric| metric.get_name() == "hermes_backlog_size") .unwrap(); assert!( assert_metric_value(backlog_size.get_metric(), 3), @@ -1424,7 +1327,7 @@ mod tests { ); let backlog_oldest_sequence = metrics .iter() - .find(|&metric| metric.get_name() == "backlog_oldest_sequence") + .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence") .unwrap(); assert!( assert_metric_value(backlog_oldest_sequence.get_metric(), 2), @@ -1445,6 +1348,7 @@ mod tests { end: 5000, }, 5, + "hermes", ); let chain_id = ChainId::from_string("chain-test"); @@ -1466,10 +1370,10 @@ mod tests { &counterparty_chain_id, ); - let metrics = state.exporter.registry().gather().clone(); + let metrics = state.registry.gather().clone(); let backlog_size = metrics .iter() - .find(|&metric| metric.get_name() == "backlog_size") + .find(|&metric| metric.get_name() == "hermes_backlog_size") .unwrap(); assert!( assert_metric_value(backlog_size.get_metric(), 1), @@ -1477,7 +1381,7 @@ mod tests { ); let backlog_oldest_sequence = metrics .iter() - .find(|&metric| metric.get_name() == "backlog_oldest_sequence") + .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence") .unwrap(); assert!( assert_metric_value(backlog_oldest_sequence.get_metric(), 5), @@ -1498,6 +1402,7 @@ mod tests { end: 5000, }, 5, + "hermes_", ); let chain_id = ChainId::from_string("chain-test"); @@ -1519,10 +1424,10 @@ mod tests { &counterparty_chain_id, ); - let metrics = state.exporter.registry().gather().clone(); + let metrics = state.registry.gather().clone(); let backlog_size = metrics .iter() - .find(|&metric| metric.get_name() == "backlog_size") + .find(|&metric| metric.get_name() == "hermes_backlog_size") .unwrap(); assert!( assert_metric_value(backlog_size.get_metric(), 0), @@ -1530,7 +1435,7 @@ mod tests { ); let backlog_oldest_sequence = metrics .iter() - .find(|&metric| metric.get_name() == "backlog_oldest_sequence") + .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence") .unwrap(); assert!( assert_metric_value(backlog_oldest_sequence.get_metric(), 0), diff --git a/guide/src/documentation/telemetry/index.md b/guide/src/documentation/telemetry/index.md index b0d8f926d1..72726f1fcd 100644 --- a/guide/src/documentation/telemetry/index.md +++ b/guide/src/documentation/telemetry/index.md @@ -16,6 +16,7 @@ The telemetry service is not active by default, and must be enabled in Hermes' c enabled = true # default = false host = '127.0.0.1' # default value port = 3001 # default value +prefix = "" # default value [telemetry.buckets] # default value latency_submitted = { start = 5000, end = 10000, buckets = 10 } # default value latency_confirmed = { start = 5000, end = 10000, buckets = 10 } # default value