From 93fb82a6c5524f244928d9843124a0d4120551fa Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 17 Mar 2021 12:41:49 -0700 Subject: [PATCH 01/12] Log basic metrics instead of extended statistics --- src/dataflow/src/source/kafka.rs | 6 +++++- test/bench/avro-upsert/create_views/views/upsert_views.sql | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 6f10a8df9ef41..4f8475344c72b 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -652,7 +652,11 @@ struct GlueConsumerContext(SyncActivator); impl ClientContext for GlueConsumerContext { fn stats(&self, statistics: Statistics) { - info!("Client stats: {:#?}", statistics); + info!( + "Client stats: name={}, tx={}, tx_bytes={}, rx={}, rx_bytes={}", + statistics.name, statistics.tx, statistics.tx_bytes, statistics.rx, statistics.rx_bytes, + ); + // info!("Client stats: {:#?}", statistics); } } diff --git a/test/bench/avro-upsert/create_views/views/upsert_views.sql b/test/bench/avro-upsert/create_views/views/upsert_views.sql index d89577fc25f34..7f816653ebc17 100644 --- a/test/bench/avro-upsert/create_views/views/upsert_views.sql +++ b/test/bench/avro-upsert/create_views/views/upsert_views.sql @@ -10,6 +10,7 @@ CREATE SOURCE source_upsertavrotest FROM KAFKA BROKER 'kafka:9092' TOPIC 'upsertavrotest' +WITH (statistics_interval_ms = 10000) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081' ENVELOPE UPSERT FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'; From ee8089004bfd86a416818d010775bceaa16e9799 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 17 Mar 2021 16:56:17 -0700 Subject: [PATCH 02/12] Add mz_kafka_statistics built-in log / source --- src/coord/src/catalog/builtin.rs | 13 ++++++++++++- src/dataflow-types/src/logging.rs | 19 +++++++++++++++++++ test/testdrive/catalog.td | 4 +++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/coord/src/catalog/builtin.rs b/src/coord/src/catalog/builtin.rs index 61f5c23398656..9109d3df5369f 100644 --- a/src/coord/src/catalog/builtin.rs +++ b/src/coord/src/catalog/builtin.rs @@ -27,7 +27,9 @@ use std::collections::{BTreeMap, BTreeSet}; use lazy_static::lazy_static; use postgres_types::{Kind, Type}; -use dataflow_types::logging::{DifferentialLog, LogVariant, MaterializedLog, TimelyLog}; +use dataflow_types::logging::{ + DifferentialLog, LogVariant, MaterializedLog, RDKafkaLog, TimelyLog, +}; use expr::GlobalId; use repr::{RelationDesc, ScalarType}; @@ -549,6 +551,14 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog { index_id: GlobalId::System(3029), }; +pub const MZ_KAFKA_STATISTICS: BuiltinLog = BuiltinLog { + name: "mz_kafka_statistics", + schema: MZ_CATALOG_SCHEMA, + variant: LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics), + id: GlobalId::System(3030), + index_id: GlobalId::System(3031), +}; + lazy_static! { pub static ref MZ_VIEW_KEYS: BuiltinTable = BuiltinTable { name: "mz_view_keys", @@ -1265,6 +1275,7 @@ lazy_static! { Builtin::Log(&MZ_PEEK_DURATIONS), Builtin::Log(&MZ_SOURCE_INFO), Builtin::Log(&MZ_MESSAGE_COUNTS), + Builtin::Log(&MZ_KAFKA_STATISTICS), Builtin::Table(&MZ_VIEW_KEYS), Builtin::Table(&MZ_VIEW_FOREIGN_KEYS), Builtin::Table(&MZ_KAFKA_SINKS), diff --git a/src/dataflow-types/src/logging.rs b/src/dataflow-types/src/logging.rs index 5e26bd6720534..1320d5eb75b15 100644 --- a/src/dataflow-types/src/logging.rs +++ b/src/dataflow-types/src/logging.rs @@ -28,6 +28,7 @@ pub enum LogVariant { Timely(TimelyLog), Differential(DifferentialLog), Materialized(MaterializedLog), + RDKafkaLog(RDKafkaLog), } #[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] @@ -56,6 +57,10 @@ pub enum MaterializedLog { PeekDuration, SourceInfo, } +#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] +pub enum RDKafkaLog { + ConsumerStatistics, +} impl LogVariant { /// By which columns should the logs be indexed. @@ -174,6 +179,16 @@ impl LogVariant { .with_column("duration_ns", ScalarType::Int64.nullable(false)) .with_column("count", ScalarType::Int64.nullable(false)) .with_key(vec![0, 1]), + + LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics) => RelationDesc::empty() + .with_column("source_name", ScalarType::String.nullable(false)) + .with_column("source_id", ScalarType::String.nullable(false)) + .with_column("consumer_name", ScalarType::String.nullable(false)) + .with_column("rx", ScalarType::Int64.nullable(false)) + .with_column("rx_bytes", ScalarType::Int64.nullable(false)) + .with_column("tx", ScalarType::Int64.nullable(false)) + .with_column("tx_bytes", ScalarType::Int64.nullable(false)) + .with_key(vec![0, 1, 2]), } } @@ -222,6 +237,10 @@ impl LogVariant { LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![], LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![], LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![], + LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics) => vec![( + LogVariant::Materialized(MaterializedLog::SourceInfo), + vec![(0, 0), (1, 1)], + )], } } } diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index 9582604dbae45..f8883be7fa522 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -301,6 +301,7 @@ mz_arrangement_sizes mz_dataflow_channels mz_dataflow_operator_addresses mz_dataflow_operators +mz_kafka_statistics mz_materialization_dependencies mz_materializations mz_message_counts @@ -320,6 +321,7 @@ mz_arrangement_sizes system true mz_dataflow_channels system true mz_dataflow_operator_addresses system true mz_dataflow_operators system true +mz_kafka_statistics system true mz_materialization_dependencies system true mz_materializations system true mz_message_counts system true @@ -388,7 +390,7 @@ SHOW EXTENDED TABLES not yet supported # There is one entry in mz_indexes for each field_number/expression of the index. > SELECT COUNT(id) FROM mz_indexes WHERE id LIKE 's%' -36 +37 > SHOW VIEWS FROM mz_catalog mz_addresses_with_unit_length From 23c397029058ae4bfeba310fd6edd6699323e039 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 17 Mar 2021 17:02:26 -0700 Subject: [PATCH 03/12] Remove change for debugging --- test/bench/avro-upsert/create_views/views/upsert_views.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/test/bench/avro-upsert/create_views/views/upsert_views.sql b/test/bench/avro-upsert/create_views/views/upsert_views.sql index 7f816653ebc17..d89577fc25f34 100644 --- a/test/bench/avro-upsert/create_views/views/upsert_views.sql +++ b/test/bench/avro-upsert/create_views/views/upsert_views.sql @@ -10,7 +10,6 @@ CREATE SOURCE source_upsertavrotest FROM KAFKA BROKER 'kafka:9092' TOPIC 'upsertavrotest' -WITH (statistics_interval_ms = 10000) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081' ENVELOPE UPSERT FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'; From 0388f524ca12540f973fe24cc429b7f0bf6aed92 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 17 Mar 2021 17:05:46 -0700 Subject: [PATCH 04/12] Put consumer in the builtin log name --- src/coord/src/catalog/builtin.rs | 6 +++--- test/testdrive/catalog.td | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/coord/src/catalog/builtin.rs b/src/coord/src/catalog/builtin.rs index 9109d3df5369f..e3addc58897e4 100644 --- a/src/coord/src/catalog/builtin.rs +++ b/src/coord/src/catalog/builtin.rs @@ -551,8 +551,8 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog { index_id: GlobalId::System(3029), }; -pub const MZ_KAFKA_STATISTICS: BuiltinLog = BuiltinLog { - name: "mz_kafka_statistics", +pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog { + name: "mz_kafka_consumer_statistics", schema: MZ_CATALOG_SCHEMA, variant: LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics), id: GlobalId::System(3030), @@ -1275,7 +1275,7 @@ lazy_static! { Builtin::Log(&MZ_PEEK_DURATIONS), Builtin::Log(&MZ_SOURCE_INFO), Builtin::Log(&MZ_MESSAGE_COUNTS), - Builtin::Log(&MZ_KAFKA_STATISTICS), + Builtin::Log(&MZ_KAFKA_CONSUMER_STATISTICS), Builtin::Table(&MZ_VIEW_KEYS), Builtin::Table(&MZ_VIEW_FOREIGN_KEYS), Builtin::Table(&MZ_KAFKA_SINKS), diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index f8883be7fa522..51ebe662851cc 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -301,7 +301,7 @@ mz_arrangement_sizes mz_dataflow_channels mz_dataflow_operator_addresses mz_dataflow_operators -mz_kafka_statistics +mz_kafka_consumer_statistics mz_materialization_dependencies mz_materializations mz_message_counts @@ -321,7 +321,7 @@ mz_arrangement_sizes system true mz_dataflow_channels system true mz_dataflow_operator_addresses system true mz_dataflow_operators system true -mz_kafka_statistics system true +mz_kafka_consumer_statistics system true mz_materialization_dependencies system true mz_materializations system true mz_message_counts system true From 8945be40711831689088fbf0dee8719ed7d57b1c Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Thu, 18 Mar 2021 16:05:39 -0700 Subject: [PATCH 05/12] Tracking kafka consumer statistics now works Instead of trying to create a new Logger class that implements Send + Sync, use crossbeam to send messages from the callback context into the source info context. This will activate the source, triggering the source to read messages from the callback and update the system log tables. When a source is dropped, the statistics are removed. --- src/coord/src/catalog/builtin.rs | 6 +- src/dataflow-types/src/logging.rs | 34 ++++----- src/dataflow/src/logging/materialized.rs | 91 ++++++++++++++++++----- src/dataflow/src/source/kafka.rs | 92 +++++++++++++++++++++--- test/testdrive/kafka-stats.td | 73 +++++++++++++++++++ 5 files changed, 247 insertions(+), 49 deletions(-) create mode 100644 test/testdrive/kafka-stats.td diff --git a/src/coord/src/catalog/builtin.rs b/src/coord/src/catalog/builtin.rs index e3addc58897e4..950716b7023d6 100644 --- a/src/coord/src/catalog/builtin.rs +++ b/src/coord/src/catalog/builtin.rs @@ -27,9 +27,7 @@ use std::collections::{BTreeMap, BTreeSet}; use lazy_static::lazy_static; use postgres_types::{Kind, Type}; -use dataflow_types::logging::{ - DifferentialLog, LogVariant, MaterializedLog, RDKafkaLog, TimelyLog, -}; +use dataflow_types::logging::{DifferentialLog, LogVariant, MaterializedLog, TimelyLog}; use expr::GlobalId; use repr::{RelationDesc, ScalarType}; @@ -554,7 +552,7 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog { pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog { name: "mz_kafka_consumer_statistics", schema: MZ_CATALOG_SCHEMA, - variant: LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics), + variant: LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo), id: GlobalId::System(3030), index_id: GlobalId::System(3031), }; diff --git a/src/dataflow-types/src/logging.rs b/src/dataflow-types/src/logging.rs index 1320d5eb75b15..5b0d32cf38b57 100644 --- a/src/dataflow-types/src/logging.rs +++ b/src/dataflow-types/src/logging.rs @@ -28,7 +28,6 @@ pub enum LogVariant { Timely(TimelyLog), Differential(DifferentialLog), Materialized(MaterializedLog), - RDKafkaLog(RDKafkaLog), } #[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] @@ -53,14 +52,11 @@ pub enum MaterializedLog { DataflowCurrent, DataflowDependency, FrontierCurrent, + KafkaConsumerInfo, PeekCurrent, PeekDuration, SourceInfo, } -#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] -pub enum RDKafkaLog { - ConsumerStatistics, -} impl LogVariant { /// By which columns should the logs be indexed. @@ -167,6 +163,16 @@ impl LogVariant { .with_column("worker", ScalarType::Int64.nullable(false)) .with_column("time", ScalarType::Int64.nullable(false)), + LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty() + .with_column("source_name", ScalarType::String.nullable(false)) + .with_column("source_id", ScalarType::String.nullable(false)) + .with_column("consumer_name", ScalarType::String.nullable(false)) + .with_column("rx", ScalarType::Int64.nullable(false)) + .with_column("rx_bytes", ScalarType::Int64.nullable(false)) + .with_column("tx", ScalarType::Int64.nullable(false)) + .with_column("tx_bytes", ScalarType::Int64.nullable(false)) + .with_key(vec![0, 1, 2]), + LogVariant::Materialized(MaterializedLog::PeekCurrent) => RelationDesc::empty() .with_column("uuid", ScalarType::String.nullable(false)) .with_column("worker", ScalarType::Int64.nullable(false)) @@ -179,16 +185,6 @@ impl LogVariant { .with_column("duration_ns", ScalarType::Int64.nullable(false)) .with_column("count", ScalarType::Int64.nullable(false)) .with_key(vec![0, 1]), - - LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics) => RelationDesc::empty() - .with_column("source_name", ScalarType::String.nullable(false)) - .with_column("source_id", ScalarType::String.nullable(false)) - .with_column("consumer_name", ScalarType::String.nullable(false)) - .with_column("rx", ScalarType::Int64.nullable(false)) - .with_column("rx_bytes", ScalarType::Int64.nullable(false)) - .with_column("tx", ScalarType::Int64.nullable(false)) - .with_column("tx_bytes", ScalarType::Int64.nullable(false)) - .with_key(vec![0, 1, 2]), } } @@ -234,13 +230,13 @@ impl LogVariant { LogVariant::Materialized(MaterializedLog::DataflowCurrent) => vec![], LogVariant::Materialized(MaterializedLog::DataflowDependency) => vec![], LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![], - LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![], - LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![], - LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![], - LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics) => vec![( + LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![( LogVariant::Materialized(MaterializedLog::SourceInfo), vec![(0, 0), (1, 1)], )], + LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![], + LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![], + LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![], } } } diff --git a/src/dataflow/src/logging/materialized.rs b/src/dataflow/src/logging/materialized.rs index 9aac6d46d32fc..6aa93ec32d9f3 100644 --- a/src/dataflow/src/logging/materialized.rs +++ b/src/dataflow/src/logging/materialized.rs @@ -11,7 +11,8 @@ use std::time::Duration; -use differential_dataflow::{difference::DiffPair, operators::count::CountTotal}; +use differential_dataflow::difference::{DiffPair, DiffVector}; +use differential_dataflow::operators::count::CountTotal; use log::error; use timely::communication::Allocate; use timely::dataflow::operators::capture::EventLink; @@ -38,6 +39,23 @@ pub enum MaterializedEvent { /// Globally unique identifier for the source on which the dataflow depends. source: GlobalId, }, + /// Tracks the source name, id, partition id, and received/ingested offsets + KafkaConsumerInfo { + /// Materialize name of the source + source_name: String, + /// Materialize source identifier + source_id: SourceInstanceId, + /// Kafka name for the consumer + consumer_name: String, + /// Number of message sets received from Brokers + rx: i64, + /// Number of bytes received from Brokers + rx_bytes: i64, + /// Number of message sets sent to Brokers + tx: i64, + /// Number of bytes transmitted to Brokers + tx_bytes: i64, + }, /// Peek command, true for install and false for retire. Peek(Peek, bool), /// Tracks the source name, id, partition id, and received/ingested offsets @@ -103,9 +121,10 @@ pub fn construct( let mut input = demux.new_input(&logs, Pipeline); let (mut dataflow_out, dataflow) = demux.new_output(); let (mut dependency_out, dependency) = demux.new_output(); + let (mut frontier_out, frontier) = demux.new_output(); + let (mut kafka_consumer_info_out, kafka_consumer_info) = demux.new_output(); let (mut peek_out, peek) = demux.new_output(); let (mut source_info_out, source_info) = demux.new_output(); - let (mut frontier_out, frontier) = demux.new_output(); let mut demux_buffer = Vec::new(); demux.build(move |_capability| { @@ -114,18 +133,20 @@ pub fn construct( move |_frontiers| { let mut dataflow = dataflow_out.activate(); let mut dependency = dependency_out.activate(); + let mut frontier = frontier_out.activate(); + let mut kafka_consumer_info = kafka_consumer_info_out.activate(); let mut peek = peek_out.activate(); let mut source_info = source_info_out.activate(); - let mut frontier = frontier_out.activate(); input.for_each(|time, data| { data.swap(&mut demux_buffer); let mut dataflow_session = dataflow.session(&time); let mut dependency_session = dependency.session(&time); + let mut frontier_session = frontier.session(&time); + let mut kafka_consumer_info_session = kafka_consumer_info.session(&time); let mut peek_session = peek.session(&time); let mut source_info_session = source_info.session(&time); - let mut frontier_session = frontier.session(&time); for (time, worker, datum) in demux_buffer.drain(..) { let time_ns = time.as_nanos() as Timestamp; @@ -176,6 +197,32 @@ pub fn construct( ), } } + MaterializedEvent::Frontier(name, logical, delta) => { + frontier_session.give(( + row_packer.pack(&[ + Datum::String(&name.to_string()), + Datum::Int64(worker as i64), + Datum::Int64(logical as i64), + ]), + time_ms, + delta as isize, + )); + } + MaterializedEvent::KafkaConsumerInfo { + source_name, + source_id, + consumer_name, + rx, + rx_bytes, + tx, + tx_bytes, + } => { + kafka_consumer_info_session.give(( + (source_name, source_id, consumer_name), + time_ms, + DiffVector::new(vec![rx, rx_bytes, tx, tx_bytes]), + )); + } MaterializedEvent::Peek(peek, is_install) => { peek_session.give((peek, worker, is_install, time_ns)) } @@ -192,17 +239,6 @@ pub fn construct( DiffPair::new(offset, timestamp), )); } - MaterializedEvent::Frontier(name, logical, delta) => { - frontier_session.give(( - row_packer.pack(&[ - Datum::String(&name.to_string()), - Datum::Int64(worker as i64), - Datum::Int64(logical as i64), - ]), - time_ms, - delta as isize, - )); - } } } }); @@ -245,6 +281,24 @@ pub fn construct( } }); + let frontier_current = frontier.as_collection(); + + use differential_dataflow::operators::Count; + let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({ + let mut row_packer = repr::RowPacker::new(); + move |((source_name, source_id, consumer_name), diff_vector)| { + row_packer.pack(&[ + Datum::String(&source_name), + Datum::String(&source_id.to_string()), + Datum::String(&consumer_name), + Datum::Int64(diff_vector[0]), + Datum::Int64(diff_vector[1]), + Datum::Int64(diff_vector[2]), + Datum::Int64(diff_vector[3]), + ]) + } + }); + let peek_current = peek .map(move |(name, worker, is_install, time_ns)| { let time_ms = (time_ns / 1_000_000) as Timestamp; @@ -265,7 +319,6 @@ pub fn construct( } }); - use differential_dataflow::operators::Count; let source_info_current = source_info.as_collection().count().map({ let mut row_packer = repr::RowPacker::new(); move |((name, id, pid), pair)| { @@ -282,8 +335,6 @@ pub fn construct( } }); - let frontier_current = frontier.as_collection(); - // Duration statistics derive from the non-rounded event times. let peek_duration = peek .unary( @@ -361,6 +412,10 @@ pub fn construct( LogVariant::Materialized(MaterializedLog::FrontierCurrent), frontier_current, ), + ( + LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo), + kafka_consumer_info_current, + ), ( LogVariant::Materialized(MaterializedLog::PeekCurrent), peek_current, diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index f94260a7eb681..b7fbb36603eb2 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -34,11 +34,22 @@ use repr::{CachedRecord, CachedRecordIter, Timestamp}; use tokio::sync::mpsc; use uuid::Uuid; +use crate::logging::materialized::{Logger, MaterializedEvent}; +use crate::server::CacheMessage; use crate::source::cache::{RecordFileMetadata, WorkerCacheData}; use crate::source::{ ConsistencyInfo, NextMessage, PartitionMetrics, SourceConstructor, SourceInfo, SourceMessage, }; -use crate::{logging::materialized::Logger, server::CacheMessage}; + +/// Values recorded from the last rdkafka statistics callback, used to generate a +/// diff of values for logging +pub struct PreviousStats { + consumer_name: Option, + rx: i64, + rx_bytes: i64, + tx: i64, + tx_bytes: i64, +} /// Contains all information necessary to ingest data from Kafka pub struct KafkaSourceInfo { @@ -62,6 +73,10 @@ pub struct KafkaSourceInfo { cached_files: Vec, /// Timely worker logger for source events logger: Option, + /// Channel to receive Kafka statistics objects from the stats callback + stats_rx: crossbeam_channel::Receiver, + /// Memoized Statistics for a consumer + previous_stats: PreviousStats, } impl SourceConstructor> for KafkaSourceInfo { @@ -157,6 +172,33 @@ impl SourceInfo> for KafkaSourceInfo { } } + // Read any statistics objects generated via the GlueConsumerContext::stats callback + while let Ok(statistics) = self.stats_rx.try_recv() { + if let Some(logger) = self.logger.as_mut() { + // If this is the first callback, initialize our consumer name + // so that we can later retract this when the source is dropped + match self.previous_stats.consumer_name { + None => self.previous_stats.consumer_name = Some(statistics.name.clone()), + _ => (), + } + + logger.log(MaterializedEvent::KafkaConsumerInfo { + source_name: self.source_name.to_string(), + source_id: self.id, + consumer_name: statistics.name, + rx: statistics.rx - self.previous_stats.rx, + rx_bytes: statistics.rx_bytes - self.previous_stats.rx_bytes, + tx: statistics.tx - self.previous_stats.tx, + tx_bytes: statistics.tx_bytes - self.previous_stats.tx_bytes, + }); + + self.previous_stats.rx = statistics.rx; + self.previous_stats.rx_bytes = statistics.rx_bytes; + self.previous_stats.tx = statistics.tx; + self.previous_stats.tx_bytes = statistics.tx_bytes; + } + } + let mut next_message = NextMessage::Pending; let consumer_count = self.get_partition_consumers_count(); let mut attempts = 0; @@ -329,8 +371,12 @@ impl KafkaSourceInfo { cluster_id, &config_options, ); + let (stats_tx, stats_rx) = crossbeam_channel::unbounded(); let consumer: BaseConsumer = kafka_config - .create_with_context(GlueConsumerContext(consumer_activator)) + .create_with_context(GlueConsumerContext { + activator: consumer_activator, + stats_tx: stats_tx, + }) .expect("Failed to create Kafka Consumer"); let cached_files = kc .cached_files @@ -385,6 +431,14 @@ impl KafkaSourceInfo { worker_count, cached_files, logger, + stats_rx, + previous_stats: PreviousStats { + consumer_name: None, + rx: 0, + rx_bytes: 0, + tx: 0, + tx_bytes: 0, + }, } } @@ -493,6 +547,25 @@ impl KafkaSourceInfo { } } +impl Drop for KafkaSourceInfo { + fn drop(&mut self) { + // Retract any metrics logged for this source + if let Some(logger) = self.logger.as_mut() { + if let Some(consumer_name) = self.previous_stats.consumer_name.as_ref() { + logger.log(MaterializedEvent::KafkaConsumerInfo { + source_name: self.source_name.to_string(), + source_id: self.id, + consumer_name: consumer_name.to_string(), + rx: -self.previous_stats.rx, + rx_bytes: -self.previous_stats.rx_bytes, + tx: -self.previous_stats.tx, + tx_bytes: -self.previous_stats.tx_bytes, + }); + } + } + } +} + /// Creates a Kafka config. fn create_kafka_config( name: &str, @@ -621,21 +694,24 @@ impl PartitionConsumer { /// An implementation of [`ConsumerContext`] that unparks the wrapped thread /// when the message queue switches from nonempty to empty. -struct GlueConsumerContext(SyncActivator); +struct GlueConsumerContext { + activator: SyncActivator, + stats_tx: crossbeam_channel::Sender, +} impl ClientContext for GlueConsumerContext { fn stats(&self, statistics: Statistics) { - info!( - "Client stats: name={}, tx={}, tx_bytes={}, rx={}, rx_bytes={}", - statistics.name, statistics.tx, statistics.tx_bytes, statistics.rx, statistics.rx_bytes, - ); + self.stats_tx + .send(statistics) + .expect("timely operator hung up while Kafka source active"); + self.activate(); // info!("Client stats: {:#?}", statistics); } } impl GlueConsumerContext { fn activate(&self) { - self.0 + self.activator .activate() .expect("timely operator hung up while Kafka source active"); } diff --git a/test/testdrive/kafka-stats.td b/test/testdrive/kafka-stats.td new file mode 100644 index 0000000000000..97a3d79091101 --- /dev/null +++ b/test/testdrive/kafka-stats.td @@ -0,0 +1,73 @@ +# Copyright Materialize, Inc. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test the creation and removal of entries in mz_kafka_consumer_statistics + +$ set schema={ + "type": "record", + "name": "row", + "fields": [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "long"} + ] + } + +$ kafka-create-topic topic=data + +> CREATE SOURCE data + FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}' + WITH (statistics_interval_ms = 1000) + FORMAT AVRO USING SCHEMA '${schema}' + +> CREATE MATERIALIZED VIEW test1 AS + SELECT b, sum(a) FROM data GROUP BY b + +> SELECT * FROM test1 +b sum +------ + +$ kafka-ingest format=avro topic=data schema=${schema} timestamp=1 +{"a": 1, "b": 1} +{"a": 2, "b": 1} +{"a": 3, "b": 1} +{"a": 1, "b": 2} + +> SELECT * FROM test1 +b sum +------ +1 6 +2 1 + +# It would be lovely to try and assert on the actual value for metrics here. Unfortunately, +# librdkafka is chatty and the metrics continue to change even when no data is being written. As +# such, we'll need to settle for a query that simply asserts the presence of a single source in +# the table: +> SELECT count(*) FROM mz_kafka_consumer_statistics +1 + +# and that each of the metrics should be non-zero: +> SELECT count(*) FROM mz_kafka_consumer_statistics where rx = 0 +0 + +> SELECT count(*) FROM mz_kafka_consumer_statistics where rx_bytes = 0 +0 + +> SELECT count(*) FROM mz_kafka_consumer_statistics where tx = 0 +0 + +> SELECT count(*) FROM mz_kafka_consumer_statistics where tx_bytes = 0 +0 + +# Drop the sources and verify that metrics have been removed +> DROP VIEW test1 + +> DROP SOURCE data + +> SELECT count(*) FROM mz_kafka_consumer_statistics +0 From f73c6f67b37ffd84ef8fdf4924d060fc35b5e1cf Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Thu, 18 Mar 2021 16:47:13 -0700 Subject: [PATCH 06/12] Fixup test_persistence --- src/materialized/tests/server.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/materialized/tests/server.rs b/src/materialized/tests/server.rs index ce537cc7d1429..92302fc087251 100644 --- a/src/materialized/tests/server.rs +++ b/src/materialized/tests/server.rs @@ -89,15 +89,15 @@ fn test_persistence() -> Result<(), Box> { "s3000", "s3001", "s3002", "s3003", "s3004", "s3005", "s3006", "s3007", "s3008", "s3009", "s3010", "s3011", "s3012", "s3013", "s3014", "s3015", "s3016", "s3017", "s3018", "s3019", "s3020", "s3021", "s3022", "s3023", "s3024", "s3025", "s3026", - "s3027", "s3028", "s3029", "s4001", "s4002", "s4003", "s4004", "s4005", "s4006", - "s4007", "s4008", "s4009", "s4010", "s4011", "s4012", "s4013", "s4014", "s4015", - "s4016", "s4017", "s4018", "s4019", "s4020", "s4021", "s4022", "s4023", "s4024", - "s4025", "s4026", "s4027", "s4028", "s4029", "s4030", "s4031", "s4032", "s4033", - "s4034", "s4035", "s4036", "s4037", "s4038", "s4039", "s4040", "s4041", "s4042", - "s5000", "s5001", "s5002", "s5003", "s5004", "s5005", "s5006", "s5007", "s5008", - "s5009", "s5010", "s5011", "s5012", "s5013", "s5014", "s5015", "s5016", "s5017", - "s5018", "s5019", "s5020", "s5021", "s5022", "s5023", "s5024", "u1", "u2", "u3", - "u4", "u5", "u6" + "s3027", "s3028", "s3029", "s3030", "s3031", "s4001", "s4002", "s4003", "s4004", + "s4005", "s4006", "s4007", "s4008", "s4009", "s4010", "s4011", "s4012", "s4013", + "s4014", "s4015", "s4016", "s4017", "s4018", "s4019", "s4020", "s4021", "s4022", + "s4023", "s4024", "s4025", "s4026", "s4027", "s4028", "s4029", "s4030", "s4031", + "s4032", "s4033", "s4034", "s4035", "s4036", "s4037", "s4038", "s4039", "s4040", + "s4041", "s4042", "s5000", "s5001", "s5002", "s5003", "s5004", "s5005", "s5006", + "s5007", "s5008", "s5009", "s5010", "s5011", "s5012", "s5013", "s5014", "s5015", + "s5016", "s5017", "s5018", "s5019", "s5020", "s5021", "s5022", "s5023", "s5024", + "u1", "u2", "u3", "u4", "u5", "u6" ] ); } From 9f34b96180cbee7b160365aed0fb3064c319629d Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Mon, 22 Mar 2021 15:41:40 -0700 Subject: [PATCH 07/12] Log per-partition kafka consumer metrics Fixes #5666 --- src/dataflow-types/src/logging.rs | 12 +-- src/dataflow/src/logging/materialized.rs | 65 ++++++++------ src/dataflow/src/source/kafka.rs | 104 +++++++++++++---------- test/testdrive/kafka-stats.td | 27 +++--- 4 files changed, 125 insertions(+), 83 deletions(-) diff --git a/src/dataflow-types/src/logging.rs b/src/dataflow-types/src/logging.rs index 5b0d32cf38b57..2871a5f942442 100644 --- a/src/dataflow-types/src/logging.rs +++ b/src/dataflow-types/src/logging.rs @@ -164,13 +164,15 @@ impl LogVariant { .with_column("time", ScalarType::Int64.nullable(false)), LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty() - .with_column("source_name", ScalarType::String.nullable(false)) - .with_column("source_id", ScalarType::String.nullable(false)) .with_column("consumer_name", ScalarType::String.nullable(false)) - .with_column("rx", ScalarType::Int64.nullable(false)) + .with_column("source_id", ScalarType::String.nullable(false)) + .with_column("partition_id", ScalarType::String.nullable(false)) + .with_column("rx_msgs", ScalarType::Int64.nullable(false)) .with_column("rx_bytes", ScalarType::Int64.nullable(false)) - .with_column("tx", ScalarType::Int64.nullable(false)) + .with_column("tx_msgs", ScalarType::Int64.nullable(false)) .with_column("tx_bytes", ScalarType::Int64.nullable(false)) + .with_column("app_offset", ScalarType::Int64.nullable(false)) + .with_column("consumer_lag", ScalarType::Int64.nullable(false)) .with_key(vec![0, 1, 2]), LogVariant::Materialized(MaterializedLog::PeekCurrent) => RelationDesc::empty() @@ -232,7 +234,7 @@ impl LogVariant { LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![], LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![( LogVariant::Materialized(MaterializedLog::SourceInfo), - vec![(0, 0), (1, 1)], + vec![(1, 1), (2, 3)], )], LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![], LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![], diff --git a/src/dataflow/src/logging/materialized.rs b/src/dataflow/src/logging/materialized.rs index 6aa93ec32d9f3..4dd3f4855b5be 100644 --- a/src/dataflow/src/logging/materialized.rs +++ b/src/dataflow/src/logging/materialized.rs @@ -11,7 +11,7 @@ use std::time::Duration; -use differential_dataflow::difference::{DiffPair, DiffVector}; +use differential_dataflow::difference::DiffPair; use differential_dataflow::operators::count::CountTotal; use log::error; use timely::communication::Allocate; @@ -39,22 +39,27 @@ pub enum MaterializedEvent { /// Globally unique identifier for the source on which the dataflow depends. source: GlobalId, }, - /// Tracks the source name, id, partition id, and received/ingested offsets + /// Tracks statistics for a particular Kafka consumer / partition pair + /// Reference: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md KafkaConsumerInfo { - /// Materialize name of the source - source_name: String, - /// Materialize source identifier - source_id: SourceInstanceId, /// Kafka name for the consumer consumer_name: String, + /// Materialize source identifier + source_id: SourceInstanceId, + /// The Kafka partition ID for these metrics (may be multiple per consumer) + partition_id: String, /// Number of message sets received from Brokers - rx: i64, + rxmsgs: i64, /// Number of bytes received from Brokers - rx_bytes: i64, + rxbytes: i64, /// Number of message sets sent to Brokers - tx: i64, + txmsgs: i64, /// Number of bytes transmitted to Brokers - tx_bytes: i64, + txbytes: i64, + /// How far into the topic our consumer has read + app_offset: i64, + /// How many messages remain until our consumer reaches the (hi|lo) watermark + consumer_lag: i64, }, /// Peek command, true for install and false for retire. Peek(Peek, bool), @@ -209,18 +214,26 @@ pub fn construct( )); } MaterializedEvent::KafkaConsumerInfo { - source_name, - source_id, consumer_name, - rx, - rx_bytes, - tx, - tx_bytes, + source_id, + partition_id, + rxmsgs, + rxbytes, + txmsgs, + txbytes, + app_offset, + consumer_lag, } => { kafka_consumer_info_session.give(( - (source_name, source_id, consumer_name), + (consumer_name, source_id, partition_id), time_ms, - DiffVector::new(vec![rx, rx_bytes, tx, tx_bytes]), + DiffPair::new( + DiffPair::new( + DiffPair::new(rxmsgs, rxbytes), + DiffPair::new(txmsgs, txbytes), + ), + DiffPair::new(app_offset, consumer_lag), + ), )); } MaterializedEvent::Peek(peek, is_install) => { @@ -286,15 +299,17 @@ pub fn construct( use differential_dataflow::operators::Count; let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({ let mut row_packer = repr::RowPacker::new(); - move |((source_name, source_id, consumer_name), diff_vector)| { + move |((consumer_name, source_id, partition_id), pairs)| { row_packer.pack(&[ - Datum::String(&source_name), - Datum::String(&source_id.to_string()), Datum::String(&consumer_name), - Datum::Int64(diff_vector[0]), - Datum::Int64(diff_vector[1]), - Datum::Int64(diff_vector[2]), - Datum::Int64(diff_vector[3]), + Datum::String(&source_id.to_string()), + Datum::String(&partition_id), + Datum::Int64(pairs.element1.element1.element1), + Datum::Int64(pairs.element1.element1.element2), + Datum::Int64(pairs.element1.element2.element1), + Datum::Int64(pairs.element1.element2.element2), + Datum::Int64(pairs.element2.element1), + Datum::Int64(pairs.element2.element2), ]) } }); diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 6bca50be817a3..13c7d04633ef0 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -38,10 +38,12 @@ use crate::source::{ /// diff of values for logging pub struct PreviousStats { consumer_name: Option, - rx: i64, - rx_bytes: i64, - tx: i64, - tx_bytes: i64, + rxmsgs: i64, + rxbytes: i64, + txmsgs: i64, + txbytes: i64, + app_offset: i64, + consumer_lag: i64, } /// Contains all information necessary to ingest data from Kafka @@ -68,8 +70,6 @@ pub struct KafkaSourceInfo { logger: Option, /// Channel to receive Kafka statistics objects from the stats callback stats_rx: crossbeam_channel::Receiver, - /// Memoized Statistics for a consumer - previous_stats: PreviousStats, } impl SourceConstructor> for KafkaSourceInfo { @@ -153,27 +153,37 @@ impl SourceInfo> for KafkaSourceInfo { // Read any statistics objects generated via the GlueConsumerContext::stats callback while let Ok(statistics) = self.stats_rx.try_recv() { if let Some(logger) = self.logger.as_mut() { - // If this is the first callback, initialize our consumer name - // so that we can later retract this when the source is dropped - match self.previous_stats.consumer_name { - None => self.previous_stats.consumer_name = Some(statistics.name.clone()), - _ => (), - } + for mut part in self.partition_consumers.iter_mut() { + // If this is the first callback, initialize our consumer name + // so that we can later retract this when the source is dropped + match part.previous_stats.consumer_name { + None => part.previous_stats.consumer_name = Some(statistics.name.clone()), + _ => (), + } + + let partition_stats = + &statistics.topics[self.topic_name.as_str()].partitions[&part.pid]; + + logger.log(MaterializedEvent::KafkaConsumerInfo { + consumer_name: statistics.name.to_string(), + source_id: self.id, + partition_id: partition_stats.partition.to_string(), + rxmsgs: partition_stats.rxmsgs - part.previous_stats.rxmsgs, + rxbytes: partition_stats.rxbytes - part.previous_stats.rxbytes, + txmsgs: partition_stats.txmsgs - part.previous_stats.txmsgs, + txbytes: partition_stats.txbytes - part.previous_stats.txbytes, + app_offset: partition_stats.app_offset - part.previous_stats.app_offset, + consumer_lag: partition_stats.consumer_lag + - part.previous_stats.consumer_lag, + }); - logger.log(MaterializedEvent::KafkaConsumerInfo { - source_name: self.source_name.to_string(), - source_id: self.id, - consumer_name: statistics.name, - rx: statistics.rx - self.previous_stats.rx, - rx_bytes: statistics.rx_bytes - self.previous_stats.rx_bytes, - tx: statistics.tx - self.previous_stats.tx, - tx_bytes: statistics.tx_bytes - self.previous_stats.tx_bytes, - }); - - self.previous_stats.rx = statistics.rx; - self.previous_stats.rx_bytes = statistics.rx_bytes; - self.previous_stats.tx = statistics.tx; - self.previous_stats.tx_bytes = statistics.tx_bytes; + part.previous_stats.rxmsgs = partition_stats.rxmsgs; + part.previous_stats.rxbytes = partition_stats.rxbytes; + part.previous_stats.txmsgs = partition_stats.txmsgs; + part.previous_stats.txbytes = partition_stats.txbytes; + part.previous_stats.app_offset = partition_stats.app_offset; + part.previous_stats.consumer_lag = partition_stats.consumer_lag; + } } } @@ -327,13 +337,6 @@ impl KafkaSourceInfo { start_offsets, logger, stats_rx, - previous_stats: PreviousStats { - consumer_name: None, - rx: 0, - rx_bytes: 0, - tx: 0, - tx_bytes: 0, - }, } } @@ -436,16 +439,20 @@ impl Drop for KafkaSourceInfo { fn drop(&mut self) { // Retract any metrics logged for this source if let Some(logger) = self.logger.as_mut() { - if let Some(consumer_name) = self.previous_stats.consumer_name.as_ref() { - logger.log(MaterializedEvent::KafkaConsumerInfo { - source_name: self.source_name.to_string(), - source_id: self.id, - consumer_name: consumer_name.to_string(), - rx: -self.previous_stats.rx, - rx_bytes: -self.previous_stats.rx_bytes, - tx: -self.previous_stats.tx, - tx_bytes: -self.previous_stats.tx_bytes, - }); + for part in self.partition_consumers.iter_mut() { + if let Some(consumer_name) = part.previous_stats.consumer_name.as_ref() { + logger.log(MaterializedEvent::KafkaConsumerInfo { + consumer_name: consumer_name.to_string(), + source_id: self.id, + partition_id: part.pid.to_string(), + rxmsgs: -part.previous_stats.rxmsgs, + rxbytes: -part.previous_stats.rxbytes, + txmsgs: -part.previous_stats.txmsgs, + txbytes: -part.previous_stats.txbytes, + app_offset: -part.previous_stats.app_offset, + consumer_lag: -part.previous_stats.consumer_lag, + }); + } } } } @@ -547,6 +554,8 @@ struct PartitionConsumer { pid: i32, /// The underlying Kafka partition queue partition_queue: PartitionQueue, + /// Memoized Statistics for a partition consumer + previous_stats: PreviousStats, } impl PartitionConsumer { @@ -555,6 +564,15 @@ impl PartitionConsumer { PartitionConsumer { pid, partition_queue, + previous_stats: PreviousStats { + consumer_name: None, + rxmsgs: 0, + rxbytes: 0, + txmsgs: 0, + txbytes: 0, + app_offset: 0, + consumer_lag: 0, + }, } } diff --git a/test/testdrive/kafka-stats.td b/test/testdrive/kafka-stats.td index 97a3d79091101..7f66a1c57601c 100644 --- a/test/testdrive/kafka-stats.td +++ b/test/testdrive/kafka-stats.td @@ -44,25 +44,32 @@ b sum 1 6 2 1 -# It would be lovely to try and assert on the actual value for metrics here. Unfortunately, -# librdkafka is chatty and the metrics continue to change even when no data is being written. As -# such, we'll need to settle for a query that simply asserts the presence of a single source in -# the table: +# There should only be metrics from a single partition consumer > SELECT count(*) FROM mz_kafka_consumer_statistics 1 -# and that each of the metrics should be non-zero: -> SELECT count(*) FROM mz_kafka_consumer_statistics where rx = 0 -0 +# We should have read 4 messages +> SELECT count(*) FROM mz_kafka_consumer_statistics where rx_msgs = 4 +1 +# and they should have non-zero bytes > SELECT count(*) FROM mz_kafka_consumer_statistics where rx_bytes = 0 0 -> SELECT count(*) FROM mz_kafka_consumer_statistics where tx = 0 -0 +# We have not transmitted anything +> SELECT count(*) FROM mz_kafka_consumer_statistics where tx_msgs = 0 +1 > SELECT count(*) FROM mz_kafka_consumer_statistics where tx_bytes = 0 -0 +1 + +# We should have read 4 records +> SELECT count(*) FROM mz_kafka_consumer_statistics where app_offset = 4 +1 + +# And we should not be lagging +> SELECT count(*) FROM mz_kafka_consumer_statistics where consumer_lag = 0 +1 # Drop the sources and verify that metrics have been removed > DROP VIEW test1 From 3713dbe3213f7a175f62582938503a7abe3960f3 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Mon, 22 Mar 2021 16:28:42 -0700 Subject: [PATCH 08/12] Derive default values for stats object --- src/dataflow/src/source/kafka.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 1678205b7a92a..186539fb63a53 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -32,6 +32,7 @@ use crate::source::{NextMessage, SourceMessage, SourceReader}; /// Values recorded from the last rdkafka statistics callback, used to generate a /// diff of values for logging +#[derive(Default)] pub struct PreviousStats { consumer_name: Option, rxmsgs: i64, @@ -535,15 +536,7 @@ impl PartitionConsumer { PartitionConsumer { pid, partition_queue, - previous_stats: PreviousStats { - consumer_name: None, - rxmsgs: 0, - rxbytes: 0, - txmsgs: 0, - txbytes: 0, - app_offset: 0, - consumer_lag: 0, - }, + previous_stats: PreviousStats::default(), } } From 77d3c5ed099db5a3492ef1b41f077d1369dc0167 Mon Sep 17 00:00:00 2001 From: Chris Golden <551285+cirego@users.noreply.github.com> Date: Mon, 22 Mar 2021 16:45:04 -0700 Subject: [PATCH 09/12] Remove commented out log line We no longer need the print message (that's what this PR replaces!) --- src/dataflow/src/source/kafka.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 186539fb63a53..522351cf6c703 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -572,7 +572,6 @@ impl ClientContext for GlueConsumerContext { .send(statistics) .expect("timely operator hung up while Kafka source active"); self.activate(); - // info!("Client stats: {:#?}", statistics); } } From 03a09285304f126bc6677884b3b06da13a7e02a7 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Tue, 23 Mar 2021 10:23:13 -0700 Subject: [PATCH 10/12] Add lo/hi/ls offsets --- src/dataflow-types/src/logging.rs | 3 ++ src/dataflow/src/logging/materialized.rs | 46 ++++++++++++++++-------- src/dataflow/src/source/kafka.rs | 12 +++++++ test/testdrive/kafka-stats.td | 12 +++++++ 4 files changed, 58 insertions(+), 15 deletions(-) diff --git a/src/dataflow-types/src/logging.rs b/src/dataflow-types/src/logging.rs index 2871a5f942442..a020e919d44ce 100644 --- a/src/dataflow-types/src/logging.rs +++ b/src/dataflow-types/src/logging.rs @@ -171,6 +171,9 @@ impl LogVariant { .with_column("rx_bytes", ScalarType::Int64.nullable(false)) .with_column("tx_msgs", ScalarType::Int64.nullable(false)) .with_column("tx_bytes", ScalarType::Int64.nullable(false)) + .with_column("lo_offset", ScalarType::Int64.nullable(false)) + .with_column("hi_offset", ScalarType::Int64.nullable(false)) + .with_column("ls_offset", ScalarType::Int64.nullable(false)) .with_column("app_offset", ScalarType::Int64.nullable(false)) .with_column("consumer_lag", ScalarType::Int64.nullable(false)) .with_key(vec![0, 1, 2]), diff --git a/src/dataflow/src/logging/materialized.rs b/src/dataflow/src/logging/materialized.rs index 4dd3f4855b5be..2525ec1047ff0 100644 --- a/src/dataflow/src/logging/materialized.rs +++ b/src/dataflow/src/logging/materialized.rs @@ -11,7 +11,7 @@ use std::time::Duration; -use differential_dataflow::difference::DiffPair; +use differential_dataflow::difference::{DiffPair, DiffVector}; use differential_dataflow::operators::count::CountTotal; use log::error; use timely::communication::Allocate; @@ -56,6 +56,12 @@ pub enum MaterializedEvent { txmsgs: i64, /// Number of bytes transmitted to Brokers txbytes: i64, + /// Partition's low watermark offset on the broker + lo_offset: i64, + /// Partition's high watermark offset on the broker + hi_offset: i64, + /// Last stable offset on the broker + ls_offset: i64, /// How far into the topic our consumer has read app_offset: i64, /// How many messages remain until our consumer reaches the (hi|lo) watermark @@ -221,19 +227,26 @@ pub fn construct( rxbytes, txmsgs, txbytes, + lo_offset, + hi_offset, + ls_offset, app_offset, consumer_lag, } => { kafka_consumer_info_session.give(( (consumer_name, source_id, partition_id), time_ms, - DiffPair::new( - DiffPair::new( - DiffPair::new(rxmsgs, rxbytes), - DiffPair::new(txmsgs, txbytes), - ), - DiffPair::new(app_offset, consumer_lag), - ), + DiffVector::new(vec![ + rxmsgs, + rxbytes, + txmsgs, + txbytes, + lo_offset, + hi_offset, + ls_offset, + app_offset, + consumer_lag, + ]), )); } MaterializedEvent::Peek(peek, is_install) => { @@ -299,17 +312,20 @@ pub fn construct( use differential_dataflow::operators::Count; let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({ let mut row_packer = repr::RowPacker::new(); - move |((consumer_name, source_id, partition_id), pairs)| { + move |((consumer_name, source_id, partition_id), diff_vector)| { row_packer.pack(&[ Datum::String(&consumer_name), Datum::String(&source_id.to_string()), Datum::String(&partition_id), - Datum::Int64(pairs.element1.element1.element1), - Datum::Int64(pairs.element1.element1.element2), - Datum::Int64(pairs.element1.element2.element1), - Datum::Int64(pairs.element1.element2.element2), - Datum::Int64(pairs.element2.element1), - Datum::Int64(pairs.element2.element2), + Datum::Int64(diff_vector[0]), + Datum::Int64(diff_vector[1]), + Datum::Int64(diff_vector[2]), + Datum::Int64(diff_vector[3]), + Datum::Int64(diff_vector[4]), + Datum::Int64(diff_vector[5]), + Datum::Int64(diff_vector[6]), + Datum::Int64(diff_vector[7]), + Datum::Int64(diff_vector[8]), ]) } }); diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 522351cf6c703..81d9f0060f552 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -39,6 +39,9 @@ pub struct PreviousStats { rxbytes: i64, txmsgs: i64, txbytes: i64, + lo_offset: i64, + hi_offset: i64, + ls_offset: i64, app_offset: i64, consumer_lag: i64, } @@ -158,6 +161,9 @@ impl SourceReader> for KafkaSourceReader { rxbytes: partition_stats.rxbytes - part.previous_stats.rxbytes, txmsgs: partition_stats.txmsgs - part.previous_stats.txmsgs, txbytes: partition_stats.txbytes - part.previous_stats.txbytes, + lo_offset: partition_stats.lo_offset - part.previous_stats.lo_offset, + hi_offset: partition_stats.hi_offset - part.previous_stats.hi_offset, + ls_offset: partition_stats.ls_offset - part.previous_stats.ls_offset, app_offset: partition_stats.app_offset - part.previous_stats.app_offset, consumer_lag: partition_stats.consumer_lag - part.previous_stats.consumer_lag, @@ -167,6 +173,9 @@ impl SourceReader> for KafkaSourceReader { part.previous_stats.rxbytes = partition_stats.rxbytes; part.previous_stats.txmsgs = partition_stats.txmsgs; part.previous_stats.txbytes = partition_stats.txbytes; + part.previous_stats.lo_offset = partition_stats.lo_offset; + part.previous_stats.hi_offset = partition_stats.hi_offset; + part.previous_stats.ls_offset = partition_stats.ls_offset; part.previous_stats.app_offset = partition_stats.app_offset; part.previous_stats.consumer_lag = partition_stats.consumer_lag; } @@ -421,6 +430,9 @@ impl Drop for KafkaSourceReader { rxbytes: -part.previous_stats.rxbytes, txmsgs: -part.previous_stats.txmsgs, txbytes: -part.previous_stats.txbytes, + lo_offset: -part.previous_stats.lo_offset, + hi_offset: -part.previous_stats.hi_offset, + ls_offset: -part.previous_stats.ls_offset, app_offset: -part.previous_stats.app_offset, consumer_lag: -part.previous_stats.consumer_lag, }); diff --git a/test/testdrive/kafka-stats.td b/test/testdrive/kafka-stats.td index 7f66a1c57601c..ec4119abbcfbb 100644 --- a/test/testdrive/kafka-stats.td +++ b/test/testdrive/kafka-stats.td @@ -63,6 +63,18 @@ b sum > SELECT count(*) FROM mz_kafka_consumer_statistics where tx_bytes = 0 1 +# Lo Offset should not exceed Hi offset +> SELECT count(*) FROM mz_kafka_consumer_statistics where lo_offset > hi_offset +0 + +# Lo Offset should not exceed Ls offset +> SELECT count(*) FROM mz_kafka_consumer_statistics where lo_offset > ls_offset +0 + +# Ls Offset should not exceed Hi Offset +> SELECT count(*) FROM mz_kafka_consumer_statistics where ls_offset > hi_offset +0 + # We should have read 4 records > SELECT count(*) FROM mz_kafka_consumer_statistics where app_offset = 4 1 From d09093e04943740fe57d759b109c46080bf84d54 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Tue, 23 Mar 2021 12:33:31 -0700 Subject: [PATCH 11/12] Fix source id and verify join against mz_source_info --- src/dataflow/src/logging/materialized.rs | 2 +- test/testdrive/kafka-stats.td | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/dataflow/src/logging/materialized.rs b/src/dataflow/src/logging/materialized.rs index 2525ec1047ff0..0e517f1b45f94 100644 --- a/src/dataflow/src/logging/materialized.rs +++ b/src/dataflow/src/logging/materialized.rs @@ -315,7 +315,7 @@ pub fn construct( move |((consumer_name, source_id, partition_id), diff_vector)| { row_packer.pack(&[ Datum::String(&consumer_name), - Datum::String(&source_id.to_string()), + Datum::String(&source_id.source_id.to_string()), Datum::String(&partition_id), Datum::Int64(diff_vector[0]), Datum::Int64(diff_vector[1]), diff --git a/test/testdrive/kafka-stats.td b/test/testdrive/kafka-stats.td index ec4119abbcfbb..cf403bd6a33b0 100644 --- a/test/testdrive/kafka-stats.td +++ b/test/testdrive/kafka-stats.td @@ -83,6 +83,18 @@ b sum > SELECT count(*) FROM mz_kafka_consumer_statistics where consumer_lag = 0 1 +# If we change the message encoding and/or the broker version, these results may change +> SELECT partition_id, rx_msgs, rx_bytes, tx_msgs, tx_bytes, lo_offset, hi_offset, ls_offset, app_offset, consumer_lag FROM mz_kafka_consumer_statistics; +partition_id rx_msgs rx_bytes tx_msgs tx_bytes lo_offset hi_offset ls_offset app_offset consumer_lag +------------------------------------------------------------------------------------------------------------- +0 4 28 0 0 0 4 4 4 0 + +# Verify that we can join against mz_source_info +> SELECT mz_kafka_consumer_statistics.rx_msgs FROM mz_kafka_consumer_statistics INNER JOIN mz_source_info USING (source_id, partition_id) WHERE mz_source_info.source_name like 'kafka-%%'; +rx_msgs +------- +4 + # Drop the sources and verify that metrics have been removed > DROP VIEW test1 From e871578374290f5e5677e4b0ac9c4a20cd9a3fae Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Tue, 23 Mar 2021 14:23:45 -0700 Subject: [PATCH 12/12] Add dataflow ID to correctly identify source --- src/dataflow-types/src/logging.rs | 3 ++- src/dataflow/src/logging/materialized.rs | 1 + test/testdrive/kafka-stats.td | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/dataflow-types/src/logging.rs b/src/dataflow-types/src/logging.rs index a020e919d44ce..cbeb178dcf5d4 100644 --- a/src/dataflow-types/src/logging.rs +++ b/src/dataflow-types/src/logging.rs @@ -166,6 +166,7 @@ impl LogVariant { LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty() .with_column("consumer_name", ScalarType::String.nullable(false)) .with_column("source_id", ScalarType::String.nullable(false)) + .with_column("dataflow_id", ScalarType::Int64.nullable(false)) .with_column("partition_id", ScalarType::String.nullable(false)) .with_column("rx_msgs", ScalarType::Int64.nullable(false)) .with_column("rx_bytes", ScalarType::Int64.nullable(false)) @@ -237,7 +238,7 @@ impl LogVariant { LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![], LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![( LogVariant::Materialized(MaterializedLog::SourceInfo), - vec![(1, 1), (2, 3)], + vec![(1, 1), (2, 2), (3, 3)], )], LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![], LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![], diff --git a/src/dataflow/src/logging/materialized.rs b/src/dataflow/src/logging/materialized.rs index 0e517f1b45f94..6a8900c7dc176 100644 --- a/src/dataflow/src/logging/materialized.rs +++ b/src/dataflow/src/logging/materialized.rs @@ -316,6 +316,7 @@ pub fn construct( row_packer.pack(&[ Datum::String(&consumer_name), Datum::String(&source_id.source_id.to_string()), + Datum::Int64(source_id.dataflow_id as i64), Datum::String(&partition_id), Datum::Int64(diff_vector[0]), Datum::Int64(diff_vector[1]), diff --git a/test/testdrive/kafka-stats.td b/test/testdrive/kafka-stats.td index cf403bd6a33b0..c46cf43213e73 100644 --- a/test/testdrive/kafka-stats.td +++ b/test/testdrive/kafka-stats.td @@ -90,7 +90,7 @@ partition_id rx_msgs rx_bytes tx_msgs tx_bytes lo_offset hi_offset ls_off 0 4 28 0 0 0 4 4 4 0 # Verify that we can join against mz_source_info -> SELECT mz_kafka_consumer_statistics.rx_msgs FROM mz_kafka_consumer_statistics INNER JOIN mz_source_info USING (source_id, partition_id) WHERE mz_source_info.source_name like 'kafka-%%'; +> SELECT mz_kafka_consumer_statistics.rx_msgs FROM mz_kafka_consumer_statistics INNER JOIN mz_source_info USING (source_id, dataflow_id, partition_id) WHERE mz_source_info.source_name like 'kafka-%%'; rx_msgs ------- 4