Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions crates/relayer/src/link/packet_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::error::Error;
use crate::event::IbcEventWithHeight;
use crate::path::PathIdentifiers;
use crate::util::collate::CollatedIterExt;
use crate::telemetry::TelemetryState;

/// Returns an iterator on batches of packet events.
pub fn query_packet_events_with<'a, ChainA, QueryFn>(
Expand All @@ -22,6 +23,7 @@ pub fn query_packet_events_with<'a, ChainA, QueryFn>(
path: &'a PathIdentifiers,
chunk_size: usize,
query_fn: QueryFn,
telemetry_state: &'a crate::telemetry::TelemetryState,
) -> impl Iterator<Item = Vec<IbcEventWithHeight>> + 'a
where
ChainA: ChainHandle,
Expand All @@ -45,6 +47,15 @@ where
warn!("no packet data was pulled at height {query_height} for sequences {}, this might be due to the data not being available on the configured endpoint. \
Please verify that the RPC endpoint has the required packet data, for more details see https://hermes.informal.systems/advanced/troubleshooting/cross-comp-config.html#uncleared-pending-packets",
chunk.iter().copied().collated().format(", "));
telemetry_state.persistent_packet_data_query_failures(
&src_chain.id(),
&src_chain.id(),
&path.counterparty_channel_id,
&path.channel_id,
&path.counterparty_port_id,
&path.port_id,
chunk.len() as u64,
);
} else {
info!(
events.total = %events_total,
Expand Down
7 changes: 6 additions & 1 deletion crates/relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,11 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
chunk_size,
clear_limit,
tracking_id,
&crate::telemetry::TelemetryState::default(),
);

let cleared_ack =
self.schedule_packet_ack_msgs(height, chunk_size, clear_limit, tracking_id);
self.schedule_packet_ack_msgs(height, chunk_size, clear_limit, tracking_id, &crate::telemetry::TelemetryState::default());

match cleared_recv.and(cleared_ack) {
Ok(()) => return Ok(()),
Expand Down Expand Up @@ -1155,6 +1156,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
chunk_size: usize,
clear_limit: usize,
tracking_id: TrackingId,
telemetry_state: &crate::telemetry::TelemetryState,
) -> Result<(), LinkError> {
let _span = span!(
Level::ERROR,
Expand Down Expand Up @@ -1204,6 +1206,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
&self.path_id,
chunk_size,
query_send_packet_events,
telemetry_state,
) {
// Update telemetry info
telemetry!({
Expand All @@ -1229,6 +1232,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
chunk_size: usize,
clear_limit: usize,
tracking_id: TrackingId,
telemetry_state: &crate::telemetry::TelemetryState,
) -> Result<(), LinkError> {
let _span = span!(
Level::ERROR,
Expand Down Expand Up @@ -1280,6 +1284,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
&self.path_id,
chunk_size,
query_write_ack_events,
telemetry_state,
) {
telemetry!(self.record_cleared_acknowledgments(events_chunk.iter()));
self.events_to_operational_data(TrackedEvents::new(events_chunk, tracking_id))?;
Expand Down
34 changes: 34 additions & 0 deletions crates/telemetry/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ pub struct TelemetryState {

/// Observed ICS31 CrossChainQuery error Responses
cross_chain_query_error_responses: Counter<u64>,

/// Number of persistent packet data query failures per channel
persistent_packet_data_query_failures: Counter<u64>,
}

impl TelemetryState {
Expand Down Expand Up @@ -447,6 +450,11 @@ impl TelemetryState {
.u64_counter("cross_chain_query_error_responses")
.with_description("Number of ICS-31 error query responses")
.init(),

persistent_packet_data_query_failures: meter
.u64_counter("persistent_packet_data_query_failures")
.with_description("Number of persistent or repeated failures to query packet data for relaying. Helps distinguish transient from long-standing issues.")
.init(),
}
}

Expand Down Expand Up @@ -1295,6 +1303,32 @@ impl TelemetryState {
}
}
}

/// Number of persistent packet data query failures, per channel
#[allow(clippy::too_many_arguments)]
pub fn persistent_packet_data_query_failures(
&self,
src_chain: &ChainId,
dst_chain: &ChainId,
src_channel: &ChannelId,
dst_channel: &ChannelId,
src_port: &PortId,
dst_port: &PortId,
count: u64,
) {
let cx = Context::current();
if count > 0 {
let labels = &[
KeyValue::new("src_chain", src_chain.to_string()),
KeyValue::new("dst_chain", dst_chain.to_string()),
KeyValue::new("src_channel", src_channel.to_string()),
KeyValue::new("dst_channel", dst_channel.to_string()),
KeyValue::new("src_port", src_port.to_string()),
KeyValue::new("dst_port", dst_port.to_string()),
];
self.persistent_packet_data_query_failures.add(&cx, count, labels);
}
}
}

use std::sync::Arc;
Expand Down