-
Notifications
You must be signed in to change notification settings - Fork 645
feat: add ENV to control Kafka sdk log level #21865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Introduced a new function `read_kafka_log_level` to read the log level from the environment variable `RISINGWAVE_KAFKA_LOG_LEVEL`. - Updated `KafkaConnection`, `KafkaSinkWriter`, and `KafkaSplitReader` to utilize the dynamic log level configuration. - Ensured that an invalid log level falls back to the default level of INFO with a warning log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces dynamic control of the Kafka SDK log level via the new environment variable RISINGWAVE_KAFKA_LOG_LEVEL.
- Introduces the function read_kafka_log_level to read and parse the log level from the environment.
- Updates multiple Kafka-related modules (consumer, sink writer, and enumerator) to use the dynamic log level configuration.
- Adjusts module exports to incorporate the new log level reader for better configurability.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
src/connector/src/source/kafka/source/reader.rs | Updated consumer initialization to use dynamic log level. |
src/connector/src/source/kafka/enumerator.rs | Configured enumerator with dynamic log level. |
src/connector/src/sink/kafka.rs | Updated sink writer to set dynamic log level. |
src/connector/src/connector_common/mod.rs | Added export for the read_kafka_log_level function. |
src/connector/src/connector_common/connection.rs | New function read_kafka_log_level reading environment variable and falling back to INFO. |
Comments suppressed due to low confidence (1)
src/connector/src/connector_common/connection.rs:116
- The PR description specifies that an invalid log level should fall back to INFO with a warning log; consider using 'tracing::warn!' here instead of 'tracing::info!' to accurately reflect the intended log severity.
tracing::info!("Invalid RISINGWAVE_KAFKA_LOG_LEVEL: {}, using INFO instead", log_level);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks!
src/connector/src/sink/kafka.rs
Outdated
@@ -477,7 +477,9 @@ impl KafkaSinkWriter { | |||
.await?; | |||
let producer_ctx = RwProducerContext::new(ctx_common); | |||
// Generate the producer | |||
c.create_with_context(producer_ctx).await? | |||
c.set_log_level(read_kafka_log_level()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can set sth like RUST_LOG="librdkafka=trace"
, why need a separate env var? 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Modified `read_kafka_log_level` to return an `Option<RDKafkaLogLevel>` instead of a direct value. - Updated all instances in `KafkaConnection`, `KafkaSinkWriter`, `KafkaSplitReader`, and `KafkaSplitEnumerator` to handle the new return type, ensuring log level is set only if valid. - Improved error handling by returning `None` for invalid log levels instead of defaulting to INFO.
Co-authored-by: tab <[email protected]>
read_kafka_log_level
to read the log level from the environment variableRISINGWAVE_KAFKA_LOG_LEVEL
.KafkaConnection
,KafkaSinkWriter
, andKafkaSplitReader
to utilize the dynamic log level configuration.I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
proposed by @arkbriar
introduce
RISINGWAVE_KAFKA_LOG_LEVEL
to control the log level of kafka sdkaccept
What's changed and what's your intention?
Checklist
Documentation
Release note