diff --git a/src/coord/src/catalog/builtin.rs b/src/coord/src/catalog/builtin.rs index 1fd4d1b747617..61fe51022da7a 100644 --- a/src/coord/src/catalog/builtin.rs +++ b/src/coord/src/catalog/builtin.rs @@ -554,6 +554,14 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog { index_id: GlobalId::System(3029), }; +pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog { + name: "mz_kafka_consumer_statistics", + schema: MZ_CATALOG_SCHEMA, + variant: LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo), + id: GlobalId::System(3030), + index_id: GlobalId::System(3031), +}; + lazy_static! { pub static ref MZ_VIEW_KEYS: BuiltinTable = BuiltinTable { name: "mz_view_keys", @@ -1271,6 +1279,7 @@ lazy_static! { Builtin::Log(&MZ_PEEK_DURATIONS), Builtin::Log(&MZ_SOURCE_INFO), Builtin::Log(&MZ_MESSAGE_COUNTS), + Builtin::Log(&MZ_KAFKA_CONSUMER_STATISTICS), Builtin::Table(&MZ_VIEW_KEYS), Builtin::Table(&MZ_VIEW_FOREIGN_KEYS), Builtin::Table(&MZ_KAFKA_SINKS), diff --git a/src/dataflow-types/src/logging.rs b/src/dataflow-types/src/logging.rs index 5e26bd6720534..cbeb178dcf5d4 100644 --- a/src/dataflow-types/src/logging.rs +++ b/src/dataflow-types/src/logging.rs @@ -52,6 +52,7 @@ pub enum MaterializedLog { DataflowCurrent, DataflowDependency, FrontierCurrent, + KafkaConsumerInfo, PeekCurrent, PeekDuration, SourceInfo, @@ -162,6 +163,22 @@ impl LogVariant { .with_column("worker", ScalarType::Int64.nullable(false)) .with_column("time", ScalarType::Int64.nullable(false)), + LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty() + .with_column("consumer_name", ScalarType::String.nullable(false)) + .with_column("source_id", ScalarType::String.nullable(false)) + .with_column("dataflow_id", ScalarType::Int64.nullable(false)) + .with_column("partition_id", ScalarType::String.nullable(false)) + .with_column("rx_msgs", ScalarType::Int64.nullable(false)) + .with_column("rx_bytes", ScalarType::Int64.nullable(false)) + .with_column("tx_msgs", ScalarType::Int64.nullable(false)) + .with_column("tx_bytes", ScalarType::Int64.nullable(false)) + .with_column("lo_offset", ScalarType::Int64.nullable(false)) + .with_column("hi_offset", ScalarType::Int64.nullable(false)) + .with_column("ls_offset", ScalarType::Int64.nullable(false)) + .with_column("app_offset", ScalarType::Int64.nullable(false)) + .with_column("consumer_lag", ScalarType::Int64.nullable(false)) + .with_key(vec![0, 1, 2]), + LogVariant::Materialized(MaterializedLog::PeekCurrent) => RelationDesc::empty() .with_column("uuid", ScalarType::String.nullable(false)) .with_column("worker", ScalarType::Int64.nullable(false)) @@ -219,6 +236,10 @@ impl LogVariant { LogVariant::Materialized(MaterializedLog::DataflowCurrent) => vec![], LogVariant::Materialized(MaterializedLog::DataflowDependency) => vec![], LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![], + LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![( + LogVariant::Materialized(MaterializedLog::SourceInfo), + vec![(1, 1), (2, 2), (3, 3)], + )], LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![], LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![], LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![], diff --git a/src/dataflow/src/logging/materialized.rs b/src/dataflow/src/logging/materialized.rs index 9aac6d46d32fc..6a8900c7dc176 100644 --- a/src/dataflow/src/logging/materialized.rs +++ b/src/dataflow/src/logging/materialized.rs @@ -11,7 +11,8 @@ use std::time::Duration; -use differential_dataflow::{difference::DiffPair, operators::count::CountTotal}; +use differential_dataflow::difference::{DiffPair, DiffVector}; +use differential_dataflow::operators::count::CountTotal; use log::error; use timely::communication::Allocate; use timely::dataflow::operators::capture::EventLink; @@ -38,6 +39,34 @@ pub enum MaterializedEvent { /// Globally unique identifier for the source on which the dataflow depends. source: GlobalId, }, + /// Tracks statistics for a particular Kafka consumer / partition pair + /// Reference: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md + KafkaConsumerInfo { + /// Kafka name for the consumer + consumer_name: String, + /// Materialize source identifier + source_id: SourceInstanceId, + /// The Kafka partition ID for these metrics (may be multiple per consumer) + partition_id: String, + /// Number of message sets received from Brokers + rxmsgs: i64, + /// Number of bytes received from Brokers + rxbytes: i64, + /// Number of message sets sent to Brokers + txmsgs: i64, + /// Number of bytes transmitted to Brokers + txbytes: i64, + /// Partition's low watermark offset on the broker + lo_offset: i64, + /// Partition's high watermark offset on the broker + hi_offset: i64, + /// Last stable offset on the broker + ls_offset: i64, + /// How far into the topic our consumer has read + app_offset: i64, + /// How many messages remain until our consumer reaches the (hi|lo) watermark + consumer_lag: i64, + }, /// Peek command, true for install and false for retire. Peek(Peek, bool), /// Tracks the source name, id, partition id, and received/ingested offsets @@ -103,9 +132,10 @@ pub fn construct( let mut input = demux.new_input(&logs, Pipeline); let (mut dataflow_out, dataflow) = demux.new_output(); let (mut dependency_out, dependency) = demux.new_output(); + let (mut frontier_out, frontier) = demux.new_output(); + let (mut kafka_consumer_info_out, kafka_consumer_info) = demux.new_output(); let (mut peek_out, peek) = demux.new_output(); let (mut source_info_out, source_info) = demux.new_output(); - let (mut frontier_out, frontier) = demux.new_output(); let mut demux_buffer = Vec::new(); demux.build(move |_capability| { @@ -114,18 +144,20 @@ pub fn construct( move |_frontiers| { let mut dataflow = dataflow_out.activate(); let mut dependency = dependency_out.activate(); + let mut frontier = frontier_out.activate(); + let mut kafka_consumer_info = kafka_consumer_info_out.activate(); let mut peek = peek_out.activate(); let mut source_info = source_info_out.activate(); - let mut frontier = frontier_out.activate(); input.for_each(|time, data| { data.swap(&mut demux_buffer); let mut dataflow_session = dataflow.session(&time); let mut dependency_session = dependency.session(&time); + let mut frontier_session = frontier.session(&time); + let mut kafka_consumer_info_session = kafka_consumer_info.session(&time); let mut peek_session = peek.session(&time); let mut source_info_session = source_info.session(&time); - let mut frontier_session = frontier.session(&time); for (time, worker, datum) in demux_buffer.drain(..) { let time_ns = time.as_nanos() as Timestamp; @@ -176,6 +208,47 @@ pub fn construct( ), } } + MaterializedEvent::Frontier(name, logical, delta) => { + frontier_session.give(( + row_packer.pack(&[ + Datum::String(&name.to_string()), + Datum::Int64(worker as i64), + Datum::Int64(logical as i64), + ]), + time_ms, + delta as isize, + )); + } + MaterializedEvent::KafkaConsumerInfo { + consumer_name, + source_id, + partition_id, + rxmsgs, + rxbytes, + txmsgs, + txbytes, + lo_offset, + hi_offset, + ls_offset, + app_offset, + consumer_lag, + } => { + kafka_consumer_info_session.give(( + (consumer_name, source_id, partition_id), + time_ms, + DiffVector::new(vec![ + rxmsgs, + rxbytes, + txmsgs, + txbytes, + lo_offset, + hi_offset, + ls_offset, + app_offset, + consumer_lag, + ]), + )); + } MaterializedEvent::Peek(peek, is_install) => { peek_session.give((peek, worker, is_install, time_ns)) } @@ -192,17 +265,6 @@ pub fn construct( DiffPair::new(offset, timestamp), )); } - MaterializedEvent::Frontier(name, logical, delta) => { - frontier_session.give(( - row_packer.pack(&[ - Datum::String(&name.to_string()), - Datum::Int64(worker as i64), - Datum::Int64(logical as i64), - ]), - time_ms, - delta as isize, - )); - } } } }); @@ -245,6 +307,30 @@ pub fn construct( } }); + let frontier_current = frontier.as_collection(); + + use differential_dataflow::operators::Count; + let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({ + let mut row_packer = repr::RowPacker::new(); + move |((consumer_name, source_id, partition_id), diff_vector)| { + row_packer.pack(&[ + Datum::String(&consumer_name), + Datum::String(&source_id.source_id.to_string()), + Datum::Int64(source_id.dataflow_id as i64), + Datum::String(&partition_id), + Datum::Int64(diff_vector[0]), + Datum::Int64(diff_vector[1]), + Datum::Int64(diff_vector[2]), + Datum::Int64(diff_vector[3]), + Datum::Int64(diff_vector[4]), + Datum::Int64(diff_vector[5]), + Datum::Int64(diff_vector[6]), + Datum::Int64(diff_vector[7]), + Datum::Int64(diff_vector[8]), + ]) + } + }); + let peek_current = peek .map(move |(name, worker, is_install, time_ns)| { let time_ms = (time_ns / 1_000_000) as Timestamp; @@ -265,7 +351,6 @@ pub fn construct( } }); - use differential_dataflow::operators::Count; let source_info_current = source_info.as_collection().count().map({ let mut row_packer = repr::RowPacker::new(); move |((name, id, pid), pair)| { @@ -282,8 +367,6 @@ pub fn construct( } }); - let frontier_current = frontier.as_collection(); - // Duration statistics derive from the non-rounded event times. let peek_duration = peek .unary( @@ -361,6 +444,10 @@ pub fn construct( LogVariant::Materialized(MaterializedLog::FrontierCurrent), frontier_current, ), + ( + LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo), + kafka_consumer_info_current, + ), ( LogVariant::Materialized(MaterializedLog::PeekCurrent), peek_current, diff --git a/src/dataflow/src/source/file.rs b/src/dataflow/src/source/file.rs index 3d3191cd722a8..d8635de9d7b9d 100644 --- a/src/dataflow/src/source/file.rs +++ b/src/dataflow/src/source/file.rs @@ -26,6 +26,7 @@ use expr::{PartitionId, SourceInstanceId}; use mz_avro::types::Value; use mz_avro::{AvroRead, Schema, Skip}; +use crate::logging::materialized::Logger; use crate::source::{NextMessage, SourceMessage, SourceReader}; /// Contains all information necessary to ingest data from file sources (either @@ -63,6 +64,7 @@ impl SourceReader for FileSourceReader { consumer_activator: SyncActivator, connector: ExternalSourceConnector, encoding: DataEncoding, + _: Option, ) -> Result<(FileSourceReader, Option), anyhow::Error> { let receiver = match connector { ExternalSourceConnector::AvroOcf(oc) => { @@ -139,6 +141,7 @@ impl SourceReader> for FileSourceReader> { consumer_activator: SyncActivator, connector: ExternalSourceConnector, _: DataEncoding, + _: Option, ) -> Result<(FileSourceReader>, Option), anyhow::Error> { let receiver = match connector { ExternalSourceConnector::File(fc) => { diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 826d73a561f03..81d9f0060f552 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -27,8 +27,25 @@ use kafka_util::KafkaAddrs; use log::{error, info, log_enabled, warn}; use uuid::Uuid; +use crate::logging::materialized::{Logger, MaterializedEvent}; use crate::source::{NextMessage, SourceMessage, SourceReader}; +/// Values recorded from the last rdkafka statistics callback, used to generate a +/// diff of values for logging +#[derive(Default)] +pub struct PreviousStats { + consumer_name: Option, + rxmsgs: i64, + rxbytes: i64, + txmsgs: i64, + txbytes: i64, + lo_offset: i64, + hi_offset: i64, + ls_offset: i64, + app_offset: i64, + consumer_lag: i64, +} + /// Contains all information necessary to ingest data from Kafka pub struct KafkaSourceReader { /// Name of the topic on which this source is backed on @@ -49,6 +66,10 @@ pub struct KafkaSourceReader { last_offsets: HashMap, /// Map from partition -> offset to start reading at start_offsets: HashMap, + /// Timely worker logger for source events + logger: Option, + /// Channel to receive Kafka statistics objects from the stats callback + stats_rx: crossbeam_channel::Receiver, } impl SourceReader> for KafkaSourceReader { @@ -60,10 +81,18 @@ impl SourceReader> for KafkaSourceReader { consumer_activator: SyncActivator, connector: ExternalSourceConnector, _: DataEncoding, + logger: Option, ) -> Result<(KafkaSourceReader, Option), anyhow::Error> { match connector { ExternalSourceConnector::Kafka(kc) => Ok(( - KafkaSourceReader::new(source_name, source_id, worker_id, consumer_activator, kc), + KafkaSourceReader::new( + source_name, + source_id, + worker_id, + consumer_activator, + kc, + logger, + ), None, )), _ => unreachable!(), @@ -110,6 +139,49 @@ impl SourceReader> for KafkaSourceReader { } } + // Read any statistics objects generated via the GlueConsumerContext::stats callback + while let Ok(statistics) = self.stats_rx.try_recv() { + if let Some(logger) = self.logger.as_mut() { + for mut part in self.partition_consumers.iter_mut() { + // If this is the first callback, initialize our consumer name + // so that we can later retract this when the source is dropped + match part.previous_stats.consumer_name { + None => part.previous_stats.consumer_name = Some(statistics.name.clone()), + _ => (), + } + + let partition_stats = + &statistics.topics[self.topic_name.as_str()].partitions[&part.pid]; + + logger.log(MaterializedEvent::KafkaConsumerInfo { + consumer_name: statistics.name.to_string(), + source_id: self.id, + partition_id: partition_stats.partition.to_string(), + rxmsgs: partition_stats.rxmsgs - part.previous_stats.rxmsgs, + rxbytes: partition_stats.rxbytes - part.previous_stats.rxbytes, + txmsgs: partition_stats.txmsgs - part.previous_stats.txmsgs, + txbytes: partition_stats.txbytes - part.previous_stats.txbytes, + lo_offset: partition_stats.lo_offset - part.previous_stats.lo_offset, + hi_offset: partition_stats.hi_offset - part.previous_stats.hi_offset, + ls_offset: partition_stats.ls_offset - part.previous_stats.ls_offset, + app_offset: partition_stats.app_offset - part.previous_stats.app_offset, + consumer_lag: partition_stats.consumer_lag + - part.previous_stats.consumer_lag, + }); + + part.previous_stats.rxmsgs = partition_stats.rxmsgs; + part.previous_stats.rxbytes = partition_stats.rxbytes; + part.previous_stats.txmsgs = partition_stats.txmsgs; + part.previous_stats.txbytes = partition_stats.txbytes; + part.previous_stats.lo_offset = partition_stats.lo_offset; + part.previous_stats.hi_offset = partition_stats.hi_offset; + part.previous_stats.ls_offset = partition_stats.ls_offset; + part.previous_stats.app_offset = partition_stats.app_offset; + part.previous_stats.consumer_lag = partition_stats.consumer_lag; + } + } + } + let mut next_message = NextMessage::Pending; let consumer_count = self.get_partition_consumers_count(); let mut attempts = 0; @@ -206,6 +278,7 @@ impl KafkaSourceReader { worker_id: usize, consumer_activator: SyncActivator, kc: KafkaSourceConnector, + logger: Option, ) -> KafkaSourceReader { let KafkaSourceConnector { addrs, @@ -223,8 +296,12 @@ impl KafkaSourceReader { cluster_id, &config_options, ); + let (stats_tx, stats_rx) = crossbeam_channel::unbounded(); let consumer: BaseConsumer = kafka_config - .create_with_context(GlueConsumerContext(consumer_activator)) + .create_with_context(GlueConsumerContext { + activator: consumer_activator, + stats_tx: stats_tx, + }) .expect("Failed to create Kafka Consumer"); let start_offsets = kc.start_offsets.iter().map(|(k, v)| (*k, v - 1)).collect(); @@ -239,6 +316,8 @@ impl KafkaSourceReader { worker_id, last_offsets: HashMap::new(), start_offsets, + logger, + stats_rx, } } @@ -337,6 +416,32 @@ impl KafkaSourceReader { } } +impl Drop for KafkaSourceReader { + fn drop(&mut self) { + // Retract any metrics logged for this source + if let Some(logger) = self.logger.as_mut() { + for part in self.partition_consumers.iter_mut() { + if let Some(consumer_name) = part.previous_stats.consumer_name.as_ref() { + logger.log(MaterializedEvent::KafkaConsumerInfo { + consumer_name: consumer_name.to_string(), + source_id: self.id, + partition_id: part.pid.to_string(), + rxmsgs: -part.previous_stats.rxmsgs, + rxbytes: -part.previous_stats.rxbytes, + txmsgs: -part.previous_stats.txmsgs, + txbytes: -part.previous_stats.txbytes, + lo_offset: -part.previous_stats.lo_offset, + hi_offset: -part.previous_stats.hi_offset, + ls_offset: -part.previous_stats.ls_offset, + app_offset: -part.previous_stats.app_offset, + consumer_lag: -part.previous_stats.consumer_lag, + }); + } + } + } + } +} + /// Creates a Kafka config. fn create_kafka_config( name: &str, @@ -433,6 +538,8 @@ struct PartitionConsumer { pid: i32, /// The underlying Kafka partition queue partition_queue: PartitionQueue, + /// Memoized Statistics for a partition consumer + previous_stats: PreviousStats, } impl PartitionConsumer { @@ -441,6 +548,7 @@ impl PartitionConsumer { PartitionConsumer { pid, partition_queue, + previous_stats: PreviousStats::default(), } } @@ -465,17 +573,23 @@ impl PartitionConsumer { /// An implementation of [`ConsumerContext`] that unparks the wrapped thread /// when the message queue switches from nonempty to empty. -struct GlueConsumerContext(SyncActivator); +struct GlueConsumerContext { + activator: SyncActivator, + stats_tx: crossbeam_channel::Sender, +} impl ClientContext for GlueConsumerContext { fn stats(&self, statistics: Statistics) { - info!("Client stats: {:#?}", statistics); + self.stats_tx + .send(statistics) + .expect("timely operator hung up while Kafka source active"); + self.activate(); } } impl GlueConsumerContext { fn activate(&self) { - self.0 + self.activator .activate() .expect("timely operator hung up while Kafka source active"); } diff --git a/src/dataflow/src/source/kinesis.rs b/src/dataflow/src/source/kinesis.rs index b5290e14eab7f..0dc176e131a1b 100644 --- a/src/dataflow/src/source/kinesis.rs +++ b/src/dataflow/src/source/kinesis.rs @@ -24,6 +24,7 @@ use timely::scheduling::SyncActivator; use dataflow_types::{DataEncoding, ExternalSourceConnector, KinesisSourceConnector, MzOffset}; use expr::{PartitionId, SourceInstanceId}; +use crate::logging::materialized::Logger; use crate::source::{NextMessage, SourceMessage, SourceReader}; lazy_static! { @@ -101,6 +102,7 @@ impl SourceReader> for KinesisSourceReader { _consumer_activator: SyncActivator, connector: ExternalSourceConnector, _encoding: DataEncoding, + _: Option, ) -> Result<(Self, Option), anyhow::Error> { let kc = match connector { ExternalSourceConnector::Kinesis(kc) => kc, diff --git a/src/dataflow/src/source/mod.rs b/src/dataflow/src/source/mod.rs index 29b5d11800c7b..a77c16c162d2f 100644 --- a/src/dataflow/src/source/mod.rs +++ b/src/dataflow/src/source/mod.rs @@ -275,6 +275,7 @@ pub(crate) trait SourceReader { consumer_activator: SyncActivator, connector: ExternalSourceConnector, encoding: DataEncoding, + logger: Option, ) -> Result<(Self, Option), anyhow::Error> where Self: Sized; @@ -1040,6 +1041,7 @@ where scope.sync_activator_for(&info.address[..]), source_connector.clone(), encoding, + logger, ) { Ok((source_reader, partition)) => { if let Some(pid) = partition { diff --git a/src/dataflow/src/source/s3.rs b/src/dataflow/src/source/s3.rs index 533ff622901dd..4ba90ab6859c5 100644 --- a/src/dataflow/src/source/s3.rs +++ b/src/dataflow/src/source/s3.rs @@ -35,6 +35,7 @@ use aws_util::aws; use dataflow_types::{Compression, DataEncoding, ExternalSourceConnector, MzOffset, S3KeySource}; use expr::{PartitionId, SourceInstanceId}; +use crate::logging::materialized::Logger; use crate::source::{NextMessage, SourceMessage, SourceReader}; use self::metrics::ScanBucketMetrics; @@ -600,6 +601,7 @@ impl SourceReader> for S3SourceReader { consumer_activator: SyncActivator, connector: ExternalSourceConnector, _encoding: DataEncoding, + _: Option, ) -> Result<(S3SourceReader, Option), anyhow::Error> { let s3_conn = match connector { ExternalSourceConnector::S3(s3_conn) => s3_conn, diff --git a/src/materialized/tests/server.rs b/src/materialized/tests/server.rs index 3be4e518bc71f..16e94a7921a20 100644 --- a/src/materialized/tests/server.rs +++ b/src/materialized/tests/server.rs @@ -89,15 +89,15 @@ fn test_persistence() -> Result<(), Box> { "s3000", "s3001", "s3002", "s3003", "s3004", "s3005", "s3006", "s3007", "s3008", "s3009", "s3010", "s3011", "s3012", "s3013", "s3014", "s3015", "s3016", "s3017", "s3018", "s3019", "s3020", "s3021", "s3022", "s3023", "s3024", "s3025", "s3026", - "s3027", "s3028", "s3029", "s4001", "s4002", "s4003", "s4004", "s4005", "s4006", - "s4007", "s4008", "s4009", "s4010", "s4011", "s4012", "s4013", "s4014", "s4015", - "s4016", "s4017", "s4018", "s4019", "s4020", "s4021", "s4022", "s4023", "s4024", - "s4025", "s4026", "s4027", "s4028", "s4029", "s4030", "s4031", "s4032", "s4033", - "s4034", "s4035", "s4036", "s4037", "s4038", "s4039", "s4040", "s4041", "s4042", - "s5000", "s5001", "s5002", "s5003", "s5004", "s5005", "s5006", "s5007", "s5008", - "s5009", "s5010", "s5011", "s5012", "s5013", "s5014", "s5015", "s5016", "s5017", - "s5018", "s5019", "s5020", "s5021", "s5022", "s5023", "s5024", "u1", "u2", "u3", - "u4", "u5", "u6" + "s3027", "s3028", "s3029", "s3030", "s3031", "s4001", "s4002", "s4003", "s4004", + "s4005", "s4006", "s4007", "s4008", "s4009", "s4010", "s4011", "s4012", "s4013", + "s4014", "s4015", "s4016", "s4017", "s4018", "s4019", "s4020", "s4021", "s4022", + "s4023", "s4024", "s4025", "s4026", "s4027", "s4028", "s4029", "s4030", "s4031", + "s4032", "s4033", "s4034", "s4035", "s4036", "s4037", "s4038", "s4039", "s4040", + "s4041", "s4042", "s5000", "s5001", "s5002", "s5003", "s5004", "s5005", "s5006", + "s5007", "s5008", "s5009", "s5010", "s5011", "s5012", "s5013", "s5014", "s5015", + "s5016", "s5017", "s5018", "s5019", "s5020", "s5021", "s5022", "s5023", "s5024", + "u1", "u2", "u3", "u4", "u5", "u6" ] ); } diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index 9582604dbae45..51ebe662851cc 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -301,6 +301,7 @@ mz_arrangement_sizes mz_dataflow_channels mz_dataflow_operator_addresses mz_dataflow_operators +mz_kafka_consumer_statistics mz_materialization_dependencies mz_materializations mz_message_counts @@ -320,6 +321,7 @@ mz_arrangement_sizes system true mz_dataflow_channels system true mz_dataflow_operator_addresses system true mz_dataflow_operators system true +mz_kafka_consumer_statistics system true mz_materialization_dependencies system true mz_materializations system true mz_message_counts system true @@ -388,7 +390,7 @@ SHOW EXTENDED TABLES not yet supported # There is one entry in mz_indexes for each field_number/expression of the index. > SELECT COUNT(id) FROM mz_indexes WHERE id LIKE 's%' -36 +37 > SHOW VIEWS FROM mz_catalog mz_addresses_with_unit_length diff --git a/test/testdrive/kafka-stats.td b/test/testdrive/kafka-stats.td new file mode 100644 index 0000000000000..c46cf43213e73 --- /dev/null +++ b/test/testdrive/kafka-stats.td @@ -0,0 +1,104 @@ +# Copyright Materialize, Inc. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test the creation and removal of entries in mz_kafka_consumer_statistics + +$ set schema={ + "type": "record", + "name": "row", + "fields": [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "long"} + ] + } + +$ kafka-create-topic topic=data + +> CREATE SOURCE data + FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}' + WITH (statistics_interval_ms = 1000) + FORMAT AVRO USING SCHEMA '${schema}' + +> CREATE MATERIALIZED VIEW test1 AS + SELECT b, sum(a) FROM data GROUP BY b + +> SELECT * FROM test1 +b sum +------ + +$ kafka-ingest format=avro topic=data schema=${schema} timestamp=1 +{"a": 1, "b": 1} +{"a": 2, "b": 1} +{"a": 3, "b": 1} +{"a": 1, "b": 2} + +> SELECT * FROM test1 +b sum +------ +1 6 +2 1 + +# There should only be metrics from a single partition consumer +> SELECT count(*) FROM mz_kafka_consumer_statistics +1 + +# We should have read 4 messages +> SELECT count(*) FROM mz_kafka_consumer_statistics where rx_msgs = 4 +1 + +# and they should have non-zero bytes +> SELECT count(*) FROM mz_kafka_consumer_statistics where rx_bytes = 0 +0 + +# We have not transmitted anything +> SELECT count(*) FROM mz_kafka_consumer_statistics where tx_msgs = 0 +1 + +> SELECT count(*) FROM mz_kafka_consumer_statistics where tx_bytes = 0 +1 + +# Lo Offset should not exceed Hi offset +> SELECT count(*) FROM mz_kafka_consumer_statistics where lo_offset > hi_offset +0 + +# Lo Offset should not exceed Ls offset +> SELECT count(*) FROM mz_kafka_consumer_statistics where lo_offset > ls_offset +0 + +# Ls Offset should not exceed Hi Offset +> SELECT count(*) FROM mz_kafka_consumer_statistics where ls_offset > hi_offset +0 + +# We should have read 4 records +> SELECT count(*) FROM mz_kafka_consumer_statistics where app_offset = 4 +1 + +# And we should not be lagging +> SELECT count(*) FROM mz_kafka_consumer_statistics where consumer_lag = 0 +1 + +# If we change the message encoding and/or the broker version, these results may change +> SELECT partition_id, rx_msgs, rx_bytes, tx_msgs, tx_bytes, lo_offset, hi_offset, ls_offset, app_offset, consumer_lag FROM mz_kafka_consumer_statistics; +partition_id rx_msgs rx_bytes tx_msgs tx_bytes lo_offset hi_offset ls_offset app_offset consumer_lag +------------------------------------------------------------------------------------------------------------- +0 4 28 0 0 0 4 4 4 0 + +# Verify that we can join against mz_source_info +> SELECT mz_kafka_consumer_statistics.rx_msgs FROM mz_kafka_consumer_statistics INNER JOIN mz_source_info USING (source_id, dataflow_id, partition_id) WHERE mz_source_info.source_name like 'kafka-%%'; +rx_msgs +------- +4 + +# Drop the sources and verify that metrics have been removed +> DROP VIEW test1 + +> DROP SOURCE data + +> SELECT count(*) FROM mz_kafka_consumer_statistics +0