Skip to content

Commit 802229d

Browse files
authored
sql_server: add source-specific prometheus metrics (#33962)
Adds SQL Server specific metrics. The list isn't exhaustive. The expectation is we will add additional metrics as needed. ### Motivation implements MaterializeInc/database-issues#9506
1 parent 795e76b commit 802229d

File tree

8 files changed

+288
-17
lines changed

8 files changed

+288
-17
lines changed

src/sql-server-util/examples/cdc.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use futures::StreamExt;
4242
use mz_ore::future::InTask;
4343
use mz_sql_server_util::cdc::CdcEvent;
4444
use mz_sql_server_util::config::TunnelConfig;
45-
use mz_sql_server_util::{Client, Config};
45+
use mz_sql_server_util::{Client, Config, LoggingSqlServerCdcMetrics};
4646
use tracing_subscriber::EnvFilter;
4747

4848
#[tokio::main]
@@ -65,7 +65,8 @@ async fn main() -> Result<(), anyhow::Error> {
6565
tracing::info!("connection 1 successful!");
6666

6767
let capture_instances = ["materialize_t1", "materialize_t2"];
68-
let mut cdc_handle = client_1.cdc(capture_instances);
68+
let metrics = LoggingSqlServerCdcMetrics;
69+
let mut cdc_handle = client_1.cdc(capture_instances, metrics);
6970

7071
cdc_handle.wait_for_ready().await?;
7172

src/sql-server-util/src/cdc.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ use tiberius::numeric::Numeric;
7575

7676
use crate::desc::{SqlServerQualifiedTableName, SqlServerTableRaw};
7777
use crate::inspect::DDLEvent;
78-
use crate::{Client, SqlServerError, TransactionIsolationLevel};
78+
use crate::{Client, SqlServerCdcMetrics, SqlServerError, TransactionIsolationLevel};
7979

8080
/// A stream of changes from a table in SQL Server that has CDC enabled.
8181
///
8282
/// SQL Server does not have an API to push or notify consumers of changes, so we periodically
8383
/// poll the upstream source.
8484
///
8585
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/change-data-capture-tables-transact-sql?view=sql-server-ver16>
86-
pub struct CdcStream<'a> {
86+
pub struct CdcStream<'a, M: SqlServerCdcMetrics> {
8787
/// Client we use for querying SQL Server.
8888
client: &'a mut Client,
8989
/// Upstream capture instances we'll list changes from.
@@ -97,18 +97,22 @@ pub struct CdcStream<'a> {
9797
/// we'll wait this duration for SQL Server to report an [`Lsn`] and thus indicate CDC is
9898
/// ready to go.
9999
max_lsn_wait: Duration,
100+
/// Metrics.
101+
metrics: M,
100102
}
101103

102-
impl<'a> CdcStream<'a> {
104+
impl<'a, M: SqlServerCdcMetrics> CdcStream<'a, M> {
103105
pub(crate) fn new(
104106
client: &'a mut Client,
105107
capture_instances: BTreeMap<Arc<str>, Option<Lsn>>,
108+
metrics: M,
106109
) -> Self {
107110
CdcStream {
108111
client,
109112
capture_instances,
110113
poll_interval: Duration::from_secs(1),
111114
max_lsn_wait: Duration::from_secs(10),
115+
metrics,
112116
}
113117
}
114118

@@ -164,6 +168,13 @@ impl<'a> CdcStream<'a> {
164168
// as it will be just be locking the table(s).
165169
let mut fencing_client = self.client.new_connection().await?;
166170
let mut fence_txn = fencing_client.transaction().await?;
171+
let qualified_table_name = format!(
172+
"{schema_name}.{table_name}",
173+
schema_name = &table.schema_name,
174+
table_name = &table.name
175+
);
176+
self.metrics
177+
.snapshot_table_lock_start(&qualified_table_name);
167178
fence_txn
168179
.lock_table_shared(&table.schema_name, &table.name)
169180
.await?;
@@ -204,7 +215,7 @@ impl<'a> CdcStream<'a> {
204215
// the table no longer needs to be locked. Any writes that happen to the upstream table
205216
// will have an LSN higher than our captured LSN, and will be read from CDC.
206217
fence_txn.rollback().await?;
207-
218+
self.metrics.snapshot_table_lock_end(&qualified_table_name);
208219
let lsn = txn.get_lsn().await?;
209220

210221
tracing::info!(%source_id, ?lsn, "timely-{worker_id} starting snapshot");
@@ -225,7 +236,9 @@ impl<'a> CdcStream<'a> {
225236
}
226237

227238
/// Consume `self` returning a [`Stream`] of [`CdcEvent`]s.
228-
pub fn into_stream(mut self) -> impl Stream<Item = Result<CdcEvent, SqlServerError>> + use<'a> {
239+
pub fn into_stream(
240+
mut self,
241+
) -> impl Stream<Item = Result<CdcEvent, SqlServerError>> + use<'a, M> {
229242
async_stream::try_stream! {
230243
// Initialize all of our start LSNs.
231244
self.initialize_start_lsns().await?;

src/sql-server-util/src/lib.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,16 +341,17 @@ impl Client {
341341
/// `capture_instances`.
342342
///
343343
/// [`CdcStream`]: crate::cdc::CdcStream
344-
pub fn cdc<I>(&mut self, capture_instances: I) -> crate::cdc::CdcStream<'_>
344+
pub fn cdc<I, M>(&mut self, capture_instances: I, metrics: M) -> crate::cdc::CdcStream<'_, M>
345345
where
346346
I: IntoIterator,
347347
I::Item: Into<Arc<str>>,
348+
M: SqlServerCdcMetrics,
348349
{
349350
let instances = capture_instances
350351
.into_iter()
351352
.map(|i| (i.into(), None))
352353
.collect();
353-
crate::cdc::CdcStream::new(self, instances)
354+
crate::cdc::CdcStream::new(self, instances, metrics)
354355
}
355356
}
356357

@@ -945,6 +946,27 @@ pub fn quote_identifier(ident: &str) -> String {
945946
quoted
946947
}
947948

949+
pub trait SqlServerCdcMetrics {
950+
/// Called before the table lock is aquired
951+
fn snapshot_table_lock_start(&self, table_name: &str);
952+
/// Called after the table lock is released
953+
fn snapshot_table_lock_end(&self, table_name: &str);
954+
}
955+
956+
/// A simple implementation of [`SqlServerCdcMetrics`] that uses the tracing framework to log
957+
/// the start and end conditions.
958+
pub struct LoggingSqlServerCdcMetrics;
959+
960+
impl SqlServerCdcMetrics for LoggingSqlServerCdcMetrics {
961+
fn snapshot_table_lock_start(&self, table_name: &str) {
962+
tracing::info!("snapshot_table_lock_start: {table_name}");
963+
}
964+
965+
fn snapshot_table_lock_end(&self, table_name: &str) {
966+
tracing::info!("snapshot_table_lock_end: {table_name}");
967+
}
968+
}
969+
948970
#[cfg(test)]
949971
mod test {
950972
use super::*;

src/storage/src/metrics.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl StorageMetrics {
138138
source::SourceMetrics::new(&self.source_defs.source_defs, id, worker_id)
139139
}
140140

141-
/// Get a `PgMetrics` for the given id.
141+
/// Get a `PgSourceMetrics` for the given id.
142142
pub(crate) fn get_postgres_source_metrics(
143143
&self,
144144
id: GlobalId,
@@ -154,6 +154,19 @@ impl StorageMetrics {
154154
source::mysql::MySqlSourceMetrics::new(&self.source_defs.mysql_defs, id)
155155
}
156156

157+
/// Get a `SqlServerSourceMetrics` for the given id.
158+
pub(crate) fn get_sql_server_source_metrics(
159+
&self,
160+
source_id: GlobalId,
161+
worker_id: usize,
162+
) -> source::sql_server::SqlServerSourceMetrics {
163+
source::sql_server::SqlServerSourceMetrics::new(
164+
&self.source_defs.sql_server_defs,
165+
source_id,
166+
worker_id,
167+
)
168+
}
169+
157170
/// Get an `OffsetCommitMetrics` for the given id.
158171
pub(crate) fn get_offset_commit_metrics(&self, id: GlobalId) -> source::OffsetCommitMetrics {
159172
source::OffsetCommitMetrics::new(&self.source_defs.source_defs, id)

src/storage/src/metrics/source.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use prometheus::core::{AtomicI64, AtomicU64};
2525
pub mod kafka;
2626
pub mod mysql;
2727
pub mod postgres;
28+
pub mod sql_server;
2829

2930
/// Definitions for general metrics about sources that are not specific to the source type.
3031
///
@@ -225,6 +226,7 @@ pub(crate) struct SourceMetricDefs {
225226
pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
226227
pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
227228
pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
229+
pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs,
228230
/// A cluster-wide counter shared across all sources.
229231
pub(crate) bytes_read: IntCounter,
230232
}
@@ -236,6 +238,7 @@ impl SourceMetricDefs {
236238
postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
237239
mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
238240
kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
241+
sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry),
239242
bytes_read: registry.register(metric!(
240243
name: "mz_bytes_read_total",
241244
help: "Count of bytes read from sources",
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
//! Metrics for Postgres.
11+
12+
use std::collections::BTreeMap;
13+
use std::rc::Rc;
14+
use std::sync::Mutex;
15+
16+
use mz_ore::metric;
17+
use mz_ore::metrics::{
18+
DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, IntGaugeVec, MetricsRegistry,
19+
UIntGaugeVec,
20+
};
21+
use mz_repr::GlobalId;
22+
use prometheus::core::{AtomicF64, AtomicI64, AtomicU64};
23+
24+
/// Definitions for Postgres source metrics.
25+
#[derive(Clone, Debug)]
26+
pub(crate) struct SqlServerSourceMetricDefs {
27+
pub(crate) ignored_messages: IntCounterVec,
28+
pub(crate) insert_messages: IntCounterVec,
29+
pub(crate) update_messages: IntCounterVec,
30+
pub(crate) delete_messages: IntCounterVec,
31+
pub(crate) snapshot_table_count: UIntGaugeVec,
32+
pub(crate) snapshot_table_size_latency: GaugeVec,
33+
pub(crate) snapshot_table_lock: IntGaugeVec,
34+
}
35+
36+
impl SqlServerSourceMetricDefs {
37+
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
38+
// Every metric must have a worker specific id associated with it. These are later wrapped
39+
// in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
40+
// would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
41+
// workers may still be running, but the metrics registry will no longer record or report
42+
// metrics for that `source_id`.
43+
Self {
44+
ignored_messages: registry.register(metric!(
45+
name: "mz_sql_server_per_source_ignored_messages",
46+
help: "The number of messages ignored because of an irrelevant type or relation_id",
47+
var_labels: ["source_id", "worker_id"],
48+
)),
49+
insert_messages: registry.register(metric!(
50+
name: "mz_sql_server_per_source_inserts",
51+
help: "The number of inserts for all tables in this source",
52+
var_labels: ["source_id", "worker_id"],
53+
)),
54+
update_messages: registry.register(metric!(
55+
name: "mz_sql_server_per_source_updates",
56+
help: "The number of updates for all tables in this source",
57+
var_labels: ["source_id", "worker_id"],
58+
)),
59+
delete_messages: registry.register(metric!(
60+
name: "mz_sql_server_per_source_deletes",
61+
help: "The number of deletes for all tables in this source",
62+
var_labels: ["source_id", "worker_id"],
63+
)),
64+
snapshot_table_count: registry.register(metric!(
65+
name: "mz_sql_server_snapshot_table_count",
66+
help: "The number of tables that SQL Server still needs to snapshot",
67+
var_labels: ["source_id", "worker_id"],
68+
)),
69+
snapshot_table_size_latency: registry.register(metric!(
70+
name: "mz_sql_server_snapshot_count_latency",
71+
help: "The wall time used to obtain snapshot sizes.",
72+
var_labels: ["source_id", "worker_id", "table_name"],
73+
)),
74+
snapshot_table_lock: registry.register(metric!(
75+
name: "mz_sql_server_snapshot_table_lock",
76+
help: "The upstream tables locked for snapshot.",
77+
var_labels: ["source_id", "worker_id", "table_name"],
78+
)),
79+
}
80+
}
81+
}
82+
#[derive(Clone)]
83+
/// Metrics for Postgres sources.
84+
pub(crate) struct SqlServerSourceMetrics {
85+
// stored as String to avoid having to convert them repeatedly.
86+
source_id: String,
87+
worker_id: String,
88+
defs: SqlServerSourceMetricDefs,
89+
// Currently, this structure is not accessed across threads.
90+
snapshot_table_size_latency:
91+
Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
92+
snapshot_table_lock_count:
93+
Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicI64, Vec<String>>>>>,
94+
95+
pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
96+
pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
97+
pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
98+
pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
99+
pub(crate) snapshot_table_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
100+
}
101+
102+
impl SqlServerSourceMetrics {
103+
/// Create a `SqlServerSourceMetrics` from the `SqlServerSourceMetricDefs`.
104+
pub(crate) fn new(
105+
defs: &SqlServerSourceMetricDefs,
106+
source_id: GlobalId,
107+
worker_id: usize,
108+
) -> Self {
109+
let source_id_labels = &[source_id.to_string(), worker_id.to_string()];
110+
Self {
111+
source_id: source_id.to_string(),
112+
worker_id: worker_id.to_string(),
113+
defs: defs.clone(),
114+
inserts: defs
115+
.insert_messages
116+
.get_delete_on_drop_metric(source_id_labels.to_vec()),
117+
updates: defs
118+
.update_messages
119+
.get_delete_on_drop_metric(source_id_labels.to_vec()),
120+
deletes: defs
121+
.delete_messages
122+
.get_delete_on_drop_metric(source_id_labels.to_vec()),
123+
ignored: defs
124+
.ignored_messages
125+
.get_delete_on_drop_metric(source_id_labels.to_vec()),
126+
snapshot_table_count: defs
127+
.snapshot_table_count
128+
.get_delete_on_drop_metric(source_id_labels.to_vec()),
129+
snapshot_table_size_latency: Default::default(),
130+
snapshot_table_lock_count: Default::default(),
131+
}
132+
}
133+
134+
pub fn set_snapshot_table_size_latency(&self, table_name: &str, latency: f64) {
135+
let mut snapshot_table_size_latency =
136+
self.snapshot_table_size_latency.lock().expect("poisoned");
137+
match snapshot_table_size_latency.entry(table_name.to_string()) {
138+
std::collections::btree_map::Entry::Vacant(vacant_entry) => {
139+
let labels = vec![
140+
self.source_id.clone(),
141+
self.worker_id.clone(),
142+
table_name.to_string(),
143+
];
144+
let metric = self
145+
.defs
146+
.snapshot_table_size_latency
147+
.get_delete_on_drop_metric(labels);
148+
vacant_entry.insert(metric).set(latency);
149+
}
150+
std::collections::btree_map::Entry::Occupied(occupied_entry) => {
151+
occupied_entry.get().set(latency)
152+
}
153+
}
154+
}
155+
156+
pub fn update_snapshot_table_lock_count(&self, table_name: &str, delta: i64) {
157+
let mut snapshot_table_lock_count =
158+
self.snapshot_table_lock_count.lock().expect("poisoned");
159+
match snapshot_table_lock_count.entry(table_name.to_string()) {
160+
std::collections::btree_map::Entry::Vacant(vacant_entry) => {
161+
let labels = vec![
162+
self.source_id.clone(),
163+
self.worker_id.clone(),
164+
table_name.to_string(),
165+
];
166+
let metric = self
167+
.defs
168+
.snapshot_table_lock
169+
.get_delete_on_drop_metric(labels);
170+
vacant_entry.insert(metric).add(delta);
171+
}
172+
std::collections::btree_map::Entry::Occupied(occupied_entry) => {
173+
occupied_entry.get().add(delta);
174+
}
175+
}
176+
}
177+
}

src/storage/src/source/sql_server.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,16 @@ impl SourceRender for SqlServerSourceConnection {
157157
source_outputs.insert(*id, output_info);
158158
}
159159

160+
let metrics = config
161+
.metrics
162+
.get_sql_server_source_metrics(config.id, config.worker_id);
163+
160164
let (repl_updates, uppers, repl_errs, repl_token) = replication::render(
161165
scope.clone(),
162166
config.clone(),
163167
source_outputs.clone(),
164168
self.clone(),
169+
metrics,
165170
);
166171

167172
let (progress_errs, progress_probes, progress_token) = progress::render(

0 commit comments

Comments
 (0)