Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add counter for retry create stream reader #20258

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,16 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_count(
"Source Reader Retry Count",
"The number of times the source reader has retried to connect to the source. Basically the same as `Source Upstream Status` but from Compute Node.",
[
panels.target(
f"sum({metric('source_init_stream_reader_retry_count')}) by (source_name)",
"source_name={{source_name}}",
)
],
),
panels.timeseries_ops(
"Source Split Change Events frequency(events/s)",
"Source Split Change Events frequency by source_id and actor_id",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/connector/src/source/monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Default for EnumeratorMetrics {
#[derive(Debug, Clone)]
pub struct SourceMetrics {
pub partition_input_count: LabelGuardedIntCounterVec<5>,
pub init_stream_reader_retry_count: LabelGuardedIntCounterVec<2>,

// **Note**: for normal messages, the metric is the message's payload size.
// For messages from load generator, the metric is the size of stream chunk.
Expand Down Expand Up @@ -89,6 +90,13 @@ impl SourceMetrics {
registry
)
.unwrap();
let init_stream_reader_retry_count = register_guarded_int_counter_vec_with_registry!(
"source_init_stream_reader_retry_count",
"Total number of retries to initialize stream reader",
&["actor_id", "source_name",],
registry
)
.unwrap();
Comment on lines +93 to +99
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add this to grafana dashboard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it overlaps with source_status_is_up (from source enumerator), so I don't add it to grafana.
But here we focus more on the retry on the compute node, planning add it to cloud alert.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I don't know we have source_status_is_up!

Then what about adding it to the same panel (Source Upstream Status), so that we can both know whether the source is up on meta and each CN. 🤔

let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
"source_partition_input_bytes",
"Total bytes that have been input from specific partition",
Expand Down Expand Up @@ -121,6 +129,7 @@ impl SourceMetrics {
let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
SourceMetrics {
partition_input_count,
init_stream_reader_retry_count,
partition_input_bytes,
latest_message_id,
rdkafka_native_metric,
Expand Down
18 changes: 16 additions & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,20 @@ impl<S: StateStore> SourceExecutor<S> {
let source_reader = source_desc.source.clone();
let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc);
let source_ctx = Arc::new(source_ctx);
let source_ctx_clone = source_ctx.clone();
let mut build_source_stream_fut = Box::pin(async move {
let backoff = get_infinite_backoff_strategy();
tokio_retry::Retry::spawn(backoff, || async {
let source_ctx_inner = source_ctx_clone.clone();
source_ctx_inner.metrics.init_stream_reader_retry_count.with_guarded_label_values(&[
&source_ctx_inner.actor_id.to_string(),
&source_ctx_inner.source_name,
]).inc();
Comment on lines +550 to +554
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simply move this into source_reader.build_stream? And perhaps rename to stream_reader_build_count.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I feel the figure is a little hard to understand. Why did it go up and down 🤔

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am struggling with it too, suspect somewhere calls a rate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the reason is RW is reporting the metric from multiple nodes and the count value from each node varies, leading to the fluctuation.
But it is irrelevant to the alert part, we always alert when the value is non-zero.

match source_reader
.build_stream(
recover_state.clone(),
column_ids.clone(),
source_ctx.clone(),
source_ctx_inner,
false, // not need to seek to latest since source state is initialized
)
.await {
Expand Down Expand Up @@ -619,7 +625,15 @@ impl<S: StateStore> SourceExecutor<S> {
);
}
} else {
assert!(reader_and_splits.is_some());
debug_assert!(reader_and_splits.is_some());
source_ctx
.metrics
.init_stream_reader_retry_count
.with_guarded_label_values(&[
&source_ctx.actor_id.to_string(),
&source_ctx.source_name,
])
.reset();
tracing::info!("source stream created successfully");
break;
}
Expand Down
Loading