From 253c9162d80e322f6b07d994ecebc714645931f6 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 1 Oct 2025 18:02:47 -0700 Subject: [PATCH 1/3] KAFKA-19542: Consumer.close() does not remove all added sensors from Metrics Introduces AbstractConsumerMetricsManager as a shared base class for consumer metrics managers, consolidating metric and sensor registration and cleanup logic. Updates all relevant metrics manager classes to extend this base, reducing code duplication and improving maintainability. --- .../AbstractConsumerMetricsManager.java | 65 +++++++++++++ .../metrics/AsyncConsumerMetrics.java | 92 ++++++------------- .../ConsumerRebalanceMetricsManager.java | 30 +++--- .../metrics/HeartbeatMetricsManager.java | 18 ++-- .../metrics/KafkaConsumerMetrics.java | 45 +++------ .../metrics/KafkaShareConsumerMetrics.java | 33 ++----- .../metrics/OffsetCommitMetricsManager.java | 20 ++-- .../RebalanceCallbackMetricsManager.java | 28 +++--- .../metrics/RebalanceMetricsManager.java | 12 +-- .../metrics/ShareRebalanceMetricsManager.java | 8 +- 10 files changed, 166 insertions(+), 185 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java new file mode 100644 index 0000000000000..7fbdd9afbbb30 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +public abstract class AbstractConsumerMetricsManager implements AutoCloseable { + + private final Metrics metrics; + private final String metricGroupName; + private final Set metricNames; + private final List sensors; + + protected AbstractConsumerMetricsManager(Metrics metrics, String metricGroupName) { + this.metrics = Objects.requireNonNull(metrics); + this.metricGroupName = Objects.requireNonNull(metricGroupName); + this.metricNames = new HashSet<>(); + this.sensors = new ArrayList<>(); + } + + protected MetricName metricName(String name, String description) { + MetricName metricName = metrics.metricName(name, metricGroupName, description); + metricNames.add(metricName); + return metricName; + } + + protected void addMetric(MetricName metricName, Measurable measurable) { + metrics.addMetric(metricName, measurable); + } + + protected Sensor sensor(String name) { + Sensor sensor = metrics.sensor(name); + sensors.add(sensor); + return sensor; + } + + @Override + public final void close() { + metricNames.forEach(metrics::removeMetric); + sensors.forEach(s -> metrics.removeSensor(s.name())); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java index 2f90440a66244..186ee430eb3dc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java @@ -22,10 +22,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Value; -import java.util.Arrays; - -public class AsyncConsumerMetrics implements AutoCloseable { - private final Metrics metrics; +public class AsyncConsumerMetrics extends AbstractConsumerMetricsManager { public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; @@ -49,149 +46,134 @@ public class AsyncConsumerMetrics implements AutoCloseable { private final Sensor unsentRequestsQueueTimeSensor; public AsyncConsumerMetrics(Metrics metrics, String groupName) { - this.metrics = metrics; - this.timeBetweenNetworkThreadPollSensor = metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); + super(metrics, groupName); + + this.timeBetweenNetworkThreadPollSensor = sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); this.timeBetweenNetworkThreadPollSensor.add( - metrics.metricName( + metricName( "time-between-network-thread-poll-avg", - groupName, "The average time taken, in milliseconds, between each poll in the network thread." ), new Avg() ); this.timeBetweenNetworkThreadPollSensor.add( - metrics.metricName( + metricName( "time-between-network-thread-poll-max", - groupName, "The maximum time taken, in milliseconds, between each poll in the network thread." ), new Max() ); - this.applicationEventQueueSizeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.applicationEventQueueSizeSensor = sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); this.applicationEventQueueSizeSensor.add( - metrics.metricName( + metricName( APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME, - groupName, "The current number of events in the queue to send from the application thread to the background thread." ), new Value() ); - this.applicationEventQueueTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME); + this.applicationEventQueueTimeSensor = sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME); this.applicationEventQueueTimeSensor.add( - metrics.metricName( + metricName( "application-event-queue-time-avg", - groupName, "The average time, in milliseconds, that application events are taking to be dequeued." ), new Avg() ); this.applicationEventQueueTimeSensor.add( - metrics.metricName( + metricName( "application-event-queue-time-max", - groupName, "The maximum time, in milliseconds, that an application event took to be dequeued." ), new Max() ); - this.applicationEventQueueProcessingTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.applicationEventQueueProcessingTimeSensor = sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); this.applicationEventQueueProcessingTimeSensor.add( - metrics.metricName( + metricName( "application-event-queue-processing-time-avg", - groupName, "The average time, in milliseconds, that the background thread takes to process all available application events." ), new Avg() ); this.applicationEventQueueProcessingTimeSensor.add( - metrics.metricName("application-event-queue-processing-time-max", - groupName, + metricName("application-event-queue-processing-time-max", "The maximum time, in milliseconds, that the background thread took to process all available application events." ), new Max() ); - this.applicationEventExpiredSizeSensor = metrics.sensor(APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME); + this.applicationEventExpiredSizeSensor = sensor(APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME); this.applicationEventExpiredSizeSensor.add( - metrics.metricName( + metricName( APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME, - groupName, "The current number of expired application events." ), new Value() ); - this.unsentRequestsQueueSizeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME); + this.unsentRequestsQueueSizeSensor = sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME); this.unsentRequestsQueueSizeSensor.add( - metrics.metricName( + metricName( UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME, - groupName, "The current number of unsent requests in the background thread." ), new Value() ); - this.unsentRequestsQueueTimeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME); + this.unsentRequestsQueueTimeSensor = sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME); this.unsentRequestsQueueTimeSensor.add( - metrics.metricName( + metricName( "unsent-requests-queue-time-avg", - groupName, "The average time, in milliseconds, that requests are taking to be sent in the background thread." ), new Avg() ); this.unsentRequestsQueueTimeSensor.add( - metrics.metricName( + metricName( "unsent-requests-queue-time-max", - groupName, "The maximum time, in milliseconds, that a request remained unsent in the background thread." ), new Max() ); - this.backgroundEventQueueSizeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.backgroundEventQueueSizeSensor = sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME); this.backgroundEventQueueSizeSensor.add( - metrics.metricName( + metricName( BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, - groupName, "The current number of events in the queue to send from the background thread to the application thread." ), new Value() ); - this.backgroundEventQueueTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME); + this.backgroundEventQueueTimeSensor = sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME); this.backgroundEventQueueTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-time-avg", - groupName, "The average time, in milliseconds, that background events are taking to be dequeued." ), new Avg() ); this.backgroundEventQueueTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-time-max", - groupName, "The maximum time, in milliseconds, that background events are taking to be dequeued." ), new Max() ); - this.backgroundEventQueueProcessingTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.backgroundEventQueueProcessingTimeSensor = sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); this.backgroundEventQueueProcessingTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-processing-time-avg", - groupName, "The average time, in milliseconds, that the consumer took to process all available background events." ), new Avg() ); this.backgroundEventQueueProcessingTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-processing-time-max", - groupName, "The maximum time, in milliseconds, that the consumer took to process all available background events." ), new Max() @@ -237,20 +219,4 @@ public void recordBackgroundEventQueueTime(long time) { public void recordBackgroundEventQueueProcessingTime(long processingTime) { this.backgroundEventQueueProcessingTimeSensor.record(processingTime); } - - @Override - public void close() { - Arrays.asList( - timeBetweenNetworkThreadPollSensor.name(), - applicationEventQueueSizeSensor.name(), - applicationEventQueueTimeSensor.name(), - applicationEventQueueProcessingTimeSensor.name(), - applicationEventExpiredSizeSensor.name(), - backgroundEventQueueSizeSensor.name(), - backgroundEventQueueTimeSensor.name(), - backgroundEventQueueProcessingTimeSensor.name(), - unsentRequestsQueueSizeSensor.name(), - unsentRequestsQueueTimeSensor.name() - ).forEach(metrics::removeSensor); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java index e271dee526172..037bd35919e98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java @@ -51,38 +51,36 @@ public final class ConsumerRebalanceMetricsManager extends RebalanceMetricsManag public final MetricName assignedPartitionsCount; private long lastRebalanceEndMs = -1L; private long lastRebalanceStartMs = -1L; - private final Metrics metrics; public ConsumerRebalanceMetricsManager(Metrics metrics, SubscriptionState subscriptions) { - super(CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); - this.metrics = metrics; + super(metrics, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); - rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg", + rebalanceLatencyAvg = metricName("rebalance-latency-avg", "The average time in ms taken for a group to complete a rebalance"); - rebalanceLatencyMax = createMetric(metrics, "rebalance-latency-max", + rebalanceLatencyMax = metricName("rebalance-latency-max", "The max time in ms taken for a group to complete a rebalance"); - rebalanceLatencyTotal = createMetric(metrics, "rebalance-latency-total", + rebalanceLatencyTotal = metricName("rebalance-latency-total", "The total number of milliseconds spent in rebalances"); - rebalanceTotal = createMetric(metrics, "rebalance-total", + rebalanceTotal = metricName("rebalance-total", "The total number of rebalance events"); - rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour", + rebalanceRatePerHour = metricName("rebalance-rate-per-hour", "The number of rebalance events per hour"); - failedRebalanceTotal = createMetric(metrics, "failed-rebalance-total", + failedRebalanceTotal = metricName("failed-rebalance-total", "The total number of failed rebalance events"); - failedRebalanceRate = createMetric(metrics, "failed-rebalance-rate-per-hour", + failedRebalanceRate = metricName("failed-rebalance-rate-per-hour", "The number of failed rebalance events per hour"); - assignedPartitionsCount = createMetric(metrics, "assigned-partitions", + assignedPartitionsCount = metricName("assigned-partitions", "The number of partitions currently assigned to this consumer"); registerAssignedPartitionCount(subscriptions); - successfulRebalanceSensor = metrics.sensor("rebalance-latency"); + successfulRebalanceSensor = sensor("rebalance-latency"); successfulRebalanceSensor.add(rebalanceLatencyAvg, new Avg()); successfulRebalanceSensor.add(rebalanceLatencyMax, new Max()); successfulRebalanceSensor.add(rebalanceLatencyTotal, new CumulativeSum()); successfulRebalanceSensor.add(rebalanceTotal, new CumulativeCount()); successfulRebalanceSensor.add(rebalanceRatePerHour, new Rate(TimeUnit.HOURS, new WindowedCount())); - failedRebalanceSensor = metrics.sensor("failed-rebalance"); + failedRebalanceSensor = sensor("failed-rebalance"); failedRebalanceSensor.add(failedRebalanceTotal, new CumulativeSum()); failedRebalanceSensor.add(failedRebalanceRate, new Rate(TimeUnit.HOURS, new WindowedCount())); @@ -92,10 +90,10 @@ public ConsumerRebalanceMetricsManager(Metrics metrics, SubscriptionState subscr else return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS); }; - lastRebalanceSecondsAgo = createMetric(metrics, + lastRebalanceSecondsAgo = metricName( "last-rebalance-seconds-ago", "The number of seconds since the last rebalance event"); - metrics.addMetric(lastRebalanceSecondsAgo, lastRebalance); + addMetric(lastRebalanceSecondsAgo, lastRebalance); } public void recordRebalanceStarted(long nowMs) { @@ -125,6 +123,6 @@ public boolean rebalanceStarted() { */ private void registerAssignedPartitionCount(SubscriptionState subscriptions) { Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions(); - metrics.addMetric(assignedPartitionsCount, numParts); + addMetric(assignedPartitionsCount, numParts); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java index 926e267d98907..2565a8de3b2d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java @@ -29,7 +29,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; -public class HeartbeatMetricsManager { +public class HeartbeatMetricsManager extends AbstractConsumerMetricsManager { // MetricName visible for testing final MetricName heartbeatResponseTimeMax; final MetricName heartbeatRate; @@ -43,16 +43,15 @@ public HeartbeatMetricsManager(Metrics metrics) { } public HeartbeatMetricsManager(Metrics metrics, String metricGroupPrefix) { - final String metricGroupName = metricGroupPrefix + COORDINATOR_METRICS_SUFFIX; - heartbeatSensor = metrics.sensor("heartbeat-latency"); - heartbeatResponseTimeMax = metrics.metricName("heartbeat-response-time-max", - metricGroupName, + super(metrics, metricGroupPrefix + COORDINATOR_METRICS_SUFFIX); + heartbeatSensor = sensor("heartbeat-latency"); + heartbeatResponseTimeMax = metricName("heartbeat-response-time-max", "The max time taken to receive a response to a heartbeat request"); heartbeatSensor.add(heartbeatResponseTimeMax, new Max()); // windowed meters - heartbeatRate = metrics.metricName("heartbeat-rate", metricGroupName, "The number of heartbeats per second"); - heartbeatTotal = metrics.metricName("heartbeat-total", metricGroupName, "The total number of heartbeats"); + heartbeatRate = metricName("heartbeat-rate", "The number of heartbeats per second"); + heartbeatTotal = metricName("heartbeat-total", "The total number of heartbeats"); heartbeatSensor.add(new Meter(new WindowedCount(), heartbeatRate, heartbeatTotal)); @@ -65,10 +64,9 @@ public HeartbeatMetricsManager(Metrics metrics, String metricGroupPrefix) { else return TimeUnit.SECONDS.convert(now - lastHeartbeatSend, TimeUnit.MILLISECONDS); }; - lastHeartbeatSecondsAgo = metrics.metricName("last-heartbeat-seconds-ago", - metricGroupName, + lastHeartbeatSecondsAgo = metricName("last-heartbeat-seconds-ago", "The number of seconds since the last coordinator heartbeat was sent"); - metrics.addMetric(lastHeartbeatSecondsAgo, lastHeartbeat); + addMetric(lastHeartbeatSecondsAgo, lastHeartbeat); } public void recordHeartbeatSentMs(long timeMs) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java index 1b2bb4518f979..e9dd57baa78ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java @@ -28,9 +28,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; -public class KafkaConsumerMetrics implements AutoCloseable { - private final Metrics metrics; - private final MetricName lastPollMetricName; +public class KafkaConsumerMetrics extends AbstractConsumerMetricsManager { private final Sensor timeBetweenPollSensor; private final Sensor pollIdleSensor; private final Sensor committedSensor; @@ -40,8 +38,7 @@ public class KafkaConsumerMetrics implements AutoCloseable { private long timeSinceLastPollMs; public KafkaConsumerMetrics(Metrics metrics) { - this.metrics = metrics; - final String metricGroupName = CONSUMER_METRIC_GROUP; + super(metrics, CONSUMER_METRIC_GROUP); Measurable lastPoll = (mConfig, now) -> { if (lastPollMs == 0L) // if no poll is ever triggered, just return -1. @@ -49,41 +46,36 @@ public KafkaConsumerMetrics(Metrics metrics) { else return TimeUnit.SECONDS.convert(now - lastPollMs, TimeUnit.MILLISECONDS); }; - this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago", - metricGroupName, "The number of seconds since the last poll() invocation."); - metrics.addMetric(lastPollMetricName, lastPoll); + MetricName lastPollMetricName = metricName("last-poll-seconds-ago", + "The number of seconds since the last poll() invocation."); + addMetric(lastPollMetricName, lastPoll); - this.timeBetweenPollSensor = metrics.sensor("time-between-poll"); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg", - metricGroupName, + this.timeBetweenPollSensor = sensor("time-between-poll"); + this.timeBetweenPollSensor.add(metricName("time-between-poll-avg", "The average delay between invocations of poll() in milliseconds."), new Avg()); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-max", - metricGroupName, + this.timeBetweenPollSensor.add(metricName("time-between-poll-max", "The max delay between invocations of poll() in milliseconds."), new Max()); - this.pollIdleSensor = metrics.sensor("poll-idle-ratio-avg"); - this.pollIdleSensor.add(metrics.metricName("poll-idle-ratio-avg", - metricGroupName, + this.pollIdleSensor = sensor("poll-idle-ratio-avg"); + this.pollIdleSensor.add(metricName("poll-idle-ratio-avg", "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."), new Avg()); - this.commitSyncSensor = metrics.sensor("commit-sync-time-ns-total"); + this.commitSyncSensor = sensor("commit-sync-time-ns-total"); this.commitSyncSensor.add( - metrics.metricName( + metricName( "commit-sync-time-ns-total", - metricGroupName, "The total time the consumer has spent in commitSync in nanoseconds" ), new CumulativeSum() ); - this.committedSensor = metrics.sensor("committed-time-ns-total"); + this.committedSensor = sensor("committed-time-ns-total"); this.committedSensor.add( - metrics.metricName( + metricName( "committed-time-ns-total", - metricGroupName, "The total time the consumer has spent in committed in nanoseconds" ), new CumulativeSum() @@ -110,13 +102,4 @@ public void recordCommitSync(long duration) { public void recordCommitted(long duration) { this.committedSensor.record(duration); } - - @Override - public void close() { - metrics.removeMetric(lastPollMetricName); - metrics.removeSensor(timeBetweenPollSensor.name()); - metrics.removeSensor(pollIdleSensor.name()); - metrics.removeSensor(commitSyncSensor.name()); - metrics.removeSensor(committedSensor.name()); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java index e154b97da5a80..653da0871fdd9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java @@ -27,9 +27,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP; -public class KafkaShareConsumerMetrics implements AutoCloseable { - private final Metrics metrics; - private final MetricName lastPollMetricName; +public class KafkaShareConsumerMetrics extends AbstractConsumerMetricsManager { private final Sensor timeBetweenPollSensor; private final Sensor pollIdleSensor; private long lastPollMs; @@ -37,8 +35,7 @@ public class KafkaShareConsumerMetrics implements AutoCloseable { private long timeSinceLastPollMs; public KafkaShareConsumerMetrics(Metrics metrics) { - this.metrics = metrics; - final String metricGroupName = CONSUMER_SHARE_METRIC_GROUP; + super(metrics, CONSUMER_SHARE_METRIC_GROUP); Measurable lastPoll = (mConfig, now) -> { if (lastPollMs == 0L) // if no poll is ever triggered, just return -1. @@ -46,23 +43,20 @@ public KafkaShareConsumerMetrics(Metrics metrics) { else return TimeUnit.SECONDS.convert(now - lastPollMs, TimeUnit.MILLISECONDS); }; - this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago", - metricGroupName, "The number of seconds since the last poll() invocation."); - metrics.addMetric(lastPollMetricName, lastPoll); + MetricName lastPollMetricName = metricName("last-poll-seconds-ago", + "The number of seconds since the last poll() invocation."); + addMetric(lastPollMetricName, lastPoll); - this.timeBetweenPollSensor = metrics.sensor("time-between-poll"); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg", - metricGroupName, + this.timeBetweenPollSensor = sensor("time-between-poll"); + this.timeBetweenPollSensor.add(metricName("time-between-poll-avg", "The average delay between invocations of poll() in milliseconds."), new Avg()); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-max", - metricGroupName, + this.timeBetweenPollSensor.add(metricName("time-between-poll-max", "The max delay between invocations of poll() in milliseconds."), new Max()); - this.pollIdleSensor = metrics.sensor("poll-idle-ratio-avg"); - this.pollIdleSensor.add(metrics.metricName("poll-idle-ratio-avg", - metricGroupName, + this.pollIdleSensor = sensor("poll-idle-ratio-avg"); + this.pollIdleSensor.add(metricName("poll-idle-ratio-avg", "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."), new Avg()); } @@ -79,11 +73,4 @@ public void recordPollEnd(long pollEndMs) { double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); this.pollIdleSensor.record(pollIdleRatio); } - - @Override - public void close() { - metrics.removeMetric(lastPollMetricName); - metrics.removeSensor(timeBetweenPollSensor.name()); - metrics.removeSensor(pollIdleSensor.name()); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java index d700299ef1801..94df5af86513c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java @@ -27,29 +27,25 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; -public class OffsetCommitMetricsManager { +public class OffsetCommitMetricsManager extends AbstractConsumerMetricsManager { final MetricName commitLatencyAvg; final MetricName commitLatencyMax; final MetricName commitRate; final MetricName commitTotal; private final Sensor commitSensor; - public OffsetCommitMetricsManager(Metrics metrics) { - final String metricGroupName = CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX; - commitSensor = metrics.sensor("commit-latency"); - commitLatencyAvg = metrics.metricName("commit-latency-avg", - metricGroupName, + public OffsetCommitMetricsManager(Metrics metrics_) { + super(metrics_, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); + commitSensor = sensor("commit-latency"); + commitLatencyAvg = metricName("commit-latency-avg", "The average time taken for a commit request"); commitSensor.add(commitLatencyAvg, new Avg()); - commitLatencyMax = metrics.metricName("commit-latency-max", - metricGroupName, + commitLatencyMax = metricName("commit-latency-max", "The max time taken for a commit request"); commitSensor.add(commitLatencyMax, new Max()); - commitRate = metrics.metricName("commit-rate", - metricGroupName, + commitRate = metricName("commit-rate", "The number of commit calls per second"); - commitTotal = metrics.metricName("commit-total", - metricGroupName, + commitTotal = metricName("commit-total", "The total number of commit calls"); commitSensor.add(new Meter(new WindowedCount(), commitRate, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java index f70b891864f1c..292ca9a971c5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java @@ -25,7 +25,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; -public class RebalanceCallbackMetricsManager { +public class RebalanceCallbackMetricsManager extends AbstractConsumerMetricsManager { final MetricName partitionRevokeLatencyAvg; final MetricName partitionAssignLatencyAvg; final MetricName partitionLostLatencyAvg; @@ -41,34 +41,28 @@ public RebalanceCallbackMetricsManager(Metrics metrics) { } public RebalanceCallbackMetricsManager(Metrics metrics, String grpMetricsPrefix) { - final String metricGroupName = grpMetricsPrefix + COORDINATOR_METRICS_SUFFIX; - partitionRevokeCallbackSensor = metrics.sensor("partition-revoked-latency"); - partitionRevokeLatencyAvg = metrics.metricName("partition-revoked-latency-avg", - metricGroupName, + super(metrics, grpMetricsPrefix + COORDINATOR_METRICS_SUFFIX); + partitionRevokeCallbackSensor = sensor("partition-revoked-latency"); + partitionRevokeLatencyAvg = metricName("partition-revoked-latency-avg", "The average time taken for a partition-revoked rebalance listener callback"); partitionRevokeCallbackSensor.add(partitionRevokeLatencyAvg, new Avg()); - partitionRevokeLatencyMax = metrics.metricName("partition-revoked-latency-max", - metricGroupName, + partitionRevokeLatencyMax = metricName("partition-revoked-latency-max", "The max time taken for a partition-revoked rebalance listener callback"); partitionRevokeCallbackSensor.add(partitionRevokeLatencyMax, new Max()); - partitionAssignCallbackSensor = metrics.sensor("partition-assigned-latency"); - partitionAssignLatencyAvg = metrics.metricName("partition-assigned-latency-avg", - metricGroupName, + partitionAssignCallbackSensor = sensor("partition-assigned-latency"); + partitionAssignLatencyAvg = metricName("partition-assigned-latency-avg", "The average time taken for a partition-assigned rebalance listener callback"); partitionAssignCallbackSensor.add(partitionAssignLatencyAvg, new Avg()); - partitionAssignLatencyMax = metrics.metricName("partition-assigned-latency-max", - metricGroupName, + partitionAssignLatencyMax = metricName("partition-assigned-latency-max", "The max time taken for a partition-assigned rebalance listener callback"); partitionAssignCallbackSensor.add(partitionAssignLatencyMax, new Max()); - partitionLostCallbackSensor = metrics.sensor("partition-lost-latency"); - partitionLostLatencyAvg = metrics.metricName("partition-lost-latency-avg", - metricGroupName, + partitionLostCallbackSensor = sensor("partition-lost-latency"); + partitionLostLatencyAvg = metricName("partition-lost-latency-avg", "The average time taken for a partition-lost rebalance listener callback"); partitionLostCallbackSensor.add(partitionLostLatencyAvg, new Avg()); - partitionLostLatencyMax = metrics.metricName("partition-lost-latency-max", - metricGroupName, + partitionLostLatencyMax = metricName("partition-lost-latency-max", "The max time taken for a partition-lost rebalance listener callback"); partitionLostCallbackSensor.add(partitionLostLatencyMax, new Max()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java index 16ad1b39817f5..51ea2e4ff620b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java @@ -16,18 +16,12 @@ */ package org.apache.kafka.clients.consumer.internals.metrics; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; -public abstract class RebalanceMetricsManager { - protected final String metricGroupName; +public abstract class RebalanceMetricsManager extends AbstractConsumerMetricsManager { - RebalanceMetricsManager(String metricGroupName) { - this.metricGroupName = metricGroupName; - } - - protected MetricName createMetric(Metrics metrics, String name, String description) { - return metrics.metricName(name, metricGroupName, description); + RebalanceMetricsManager(Metrics metrics, String metricGroupName) { + super(metrics, metricGroupName); } public abstract void recordRebalanceStarted(long nowMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java index 0760eaa6d5cfd..3169ffb0209fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java @@ -36,14 +36,14 @@ public final class ShareRebalanceMetricsManager extends RebalanceMetricsManager private long lastRebalanceStartMs = -1L; public ShareRebalanceMetricsManager(Metrics metrics) { - super(CONSUMER_SHARE_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); + super(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); - rebalanceTotal = createMetric(metrics, "rebalance-total", + rebalanceTotal = metricName("rebalance-total", "The total number of rebalance events"); - rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour", + rebalanceRatePerHour = metricName("rebalance-rate-per-hour", "The number of rebalance events per hour"); - rebalanceSensor = metrics.sensor("rebalance-latency"); + rebalanceSensor = sensor("rebalance-latency"); rebalanceSensor.add(rebalanceTotal, new CumulativeCount()); rebalanceSensor.add(rebalanceRatePerHour, new Rate(TimeUnit.HOURS, new WindowedCount())); } From 191de7e7daa1452e307b1ad02716f7736916b6fe Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 1 Oct 2025 18:49:11 -0700 Subject: [PATCH 2/3] Refactor metrics manager and add base test class Changed AbstractConsumerMetricsManager to use Set for sensors and added debug output in close(). Introduced AbstractConsumerMetricsManagerTest as a base test class for metrics cleanup verification. Updated AsyncConsumerMetricsTest and ConsumerRebalanceMetricsManagerTest to extend the new base test and implement required methods. --- .../AbstractConsumerMetricsManager.java | 22 ++++++++-- .../AbstractConsumerMetricsManagerTest.java | 41 +++++++++++++++++++ .../metrics/AsyncConsumerMetricsTest.java | 7 +++- .../ConsumerRebalanceMetricsManagerTest.java | 8 +++- 4 files changed, 72 insertions(+), 6 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java index 7fbdd9afbbb30..b5026d7ab29de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java @@ -21,24 +21,25 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Objects; import java.util.Set; +/** + * Utility class that serves as a common abstraction point + */ public abstract class AbstractConsumerMetricsManager implements AutoCloseable { private final Metrics metrics; private final String metricGroupName; private final Set metricNames; - private final List sensors; + private final Set sensors; protected AbstractConsumerMetricsManager(Metrics metrics, String metricGroupName) { this.metrics = Objects.requireNonNull(metrics); this.metricGroupName = Objects.requireNonNull(metricGroupName); this.metricNames = new HashSet<>(); - this.sensors = new ArrayList<>(); + this.sensors = new HashSet<>(); } protected MetricName metricName(String name, String description) { @@ -59,7 +60,20 @@ protected Sensor sensor(String name) { @Override public final void close() { + System.out.println("Metrics before:"); + metrics.metrics().keySet().forEach(System.out::println); + metricNames.forEach(metrics::removeMetric); + + System.out.println("Metrics after:"); + metrics.metrics().keySet().forEach(System.out::println); + + System.out.println("Sensors before:"); + sensors.stream().filter(s -> metrics.getSensor(s.name()) != null).forEach(System.out::println); + sensors.forEach(s -> metrics.removeSensor(s.name())); + + System.out.println("Sensors after:"); + sensors.stream().filter(s -> metrics.getSensor(s.name()) != null).forEach(System.out::println); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java new file mode 100644 index 0000000000000..96845c45e615c --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public abstract class AbstractConsumerMetricsManagerTest { + + protected abstract AbstractConsumerMetricsManager metricsManager(Metrics metrics, String groupDescription); + + @Test + public void testCleanup() { + try (Metrics metrics = new Metrics()) { + int metricCount = metrics.metrics().size(); + + try (AbstractConsumerMetricsManager metricsManager = metricsManager(metrics, "test")) { + assertTrue(metrics.metrics().size() > metricCount); + } + + assertEquals(metricCount, metrics.metrics().size()); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java index 876bc3ffa12da..bec5ec2785d4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java @@ -32,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class AsyncConsumerMetricsTest { +public class AsyncConsumerMetricsTest extends AbstractConsumerMetricsManagerTest { private static final long METRIC_VALUE = 123L; private final Metrics metrics = new Metrics(); @@ -53,6 +53,11 @@ public void tearDown() { metrics.close(); } + @Override + protected AbstractConsumerMetricsManager metricsManager(Metrics metrics, String groupDescription) { + return new AsyncConsumerMetrics(metrics, groupDescription); + } + @ParameterizedTest @MethodSource("groupNameProvider") public void shouldMetricNames(String groupName) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java index a7d9122776750..fcd950f59a21c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java @@ -34,11 +34,17 @@ import static org.mockito.Mockito.mock; -class ConsumerRebalanceMetricsManagerTest { +class ConsumerRebalanceMetricsManagerTest extends AbstractConsumerMetricsManagerTest { private final Time time = new MockTime(); private final Metrics metrics = new Metrics(time); + @Override + protected AbstractConsumerMetricsManager metricsManager(Metrics metrics, String groupDescription) { + SubscriptionState subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST); + return new ConsumerRebalanceMetricsManager(metrics, subscriptionState); + } + @Test public void testAssignedPartitionCountMetric() { SubscriptionState subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST); From 90c2836fef75971959cb3e90d2fc3ea1acd28085 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Thu, 2 Oct 2025 10:32:50 -0700 Subject: [PATCH 3/3] Refactor metrics management and cleanup in consumer internals Moved sensor and metric management logic into AbstractConsumerMetricsManager, replacing SensorBuilder with an inner class. Updated FetchMetricsManager, ShareFetchMetricsManager, and related classes to extend AbstractConsumerMetricsManager and use its utilities. Ensured proper cleanup of metrics and sensors on close, added tests to verify metrics removal, and improved constructor usage with @SuppressWarnings. This change centralizes and simplifies metrics lifecycle management for Kafka consumers. --- .../internals/AbstractCoordinator.java | 84 +++++------ .../internals/AsyncKafkaConsumer.java | 18 ++- .../internals/ClassicKafkaConsumer.java | 8 +- .../internals/ConsumerCoordinator.java | 31 ++-- .../internals/FetchMetricsManager.java | 57 +++---- .../internals/FetchMetricsRegistry.java | 7 +- .../consumer/internals/SensorBuilder.java | 115 -------------- .../internals/ShareFetchMetricsManager.java | 37 ++--- .../internals/ShareFetchMetricsRegistry.java | 8 +- .../AbstractConsumerMetricsManager.java | 141 ++++++++++++++++-- .../metrics/AsyncConsumerMetrics.java | 1 + .../ConsumerRebalanceMetricsManager.java | 1 + .../metrics/HeartbeatMetricsManager.java | 1 + .../metrics/KafkaConsumerMetrics.java | 1 + .../metrics/KafkaShareConsumerMetrics.java | 1 + .../metrics/OffsetCommitMetricsManager.java | 5 +- .../RebalanceCallbackMetricsManager.java | 1 + .../metrics/ShareRebalanceMetricsManager.java | 1 + .../clients/consumer/KafkaConsumerTest.java | 13 ++ .../internals/AsyncKafkaConsumerTest.java | 3 + .../AbstractConsumerMetricsManagerTest.java | 1 + 21 files changed, 287 insertions(+), 248 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index a07e12a518abb..04b4f26caddc1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.CloseOptions; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; @@ -1145,6 +1146,8 @@ protected void close(Timer timer, CloseOptions.GroupMembershipOperation membersh log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", client.pendingRequestCount(coordinator)); } + + Utils.closeQuietly(sensors, "coordinator metrics"); } } @@ -1330,14 +1333,6 @@ boolean generationUnchanged() { } } - protected final Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) { - return new Meter(new WindowedCount(), - metrics.metricName(baseName + "-rate", groupName, - String.format("The number of %s per second", descriptiveName)), - metrics.metricName(baseName + "-total", groupName, - String.format("The total number of %s", descriptiveName))); - } - /** * Visible for testing. */ @@ -1345,82 +1340,69 @@ protected BaseHeartbeatThread heartbeatThread() { return heartbeatThread; } - private class GroupCoordinatorMetrics { - public final String metricGrpName; - + private class GroupCoordinatorMetrics extends AbstractConsumerMetricsManager { public final Sensor heartbeatSensor; public final Sensor joinSensor; public final Sensor syncSensor; public final Sensor successfulRebalanceSensor; public final Sensor failedRebalanceSensor; + @SuppressWarnings({"this-escape"}) public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { - this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; + super(metrics, metricGrpPrefix + "-coordinator-metrics"); - this.heartbeatSensor = metrics.sensor("heartbeat-latency"); - this.heartbeatSensor.add(metrics.metricName("heartbeat-response-time-max", - this.metricGrpName, + this.heartbeatSensor = sensor("heartbeat-latency"); + this.heartbeatSensor.add(metricName("heartbeat-response-time-max", "The max time taken to receive a response to a heartbeat request"), new Max()); - this.heartbeatSensor.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats")); + this.heartbeatSensor.add(createMeter("heartbeat", "heartbeats")); - this.joinSensor = metrics.sensor("join-latency"); - this.joinSensor.add(metrics.metricName("join-time-avg", - this.metricGrpName, + this.joinSensor = sensor("join-latency"); + this.joinSensor.add(metricName("join-time-avg", "The average time taken for a group rejoin"), new Avg()); - this.joinSensor.add(metrics.metricName("join-time-max", - this.metricGrpName, + this.joinSensor.add(metricName("join-time-max", "The max time taken for a group rejoin"), new Max()); - this.joinSensor.add(createMeter(metrics, metricGrpName, "join", "group joins")); + this.joinSensor.add(createMeter("join", "group joins")); - this.syncSensor = metrics.sensor("sync-latency"); - this.syncSensor.add(metrics.metricName("sync-time-avg", - this.metricGrpName, + this.syncSensor = sensor("sync-latency"); + this.syncSensor.add(metricName("sync-time-avg", "The average time taken for a group sync"), new Avg()); - this.syncSensor.add(metrics.metricName("sync-time-max", - this.metricGrpName, + this.syncSensor.add(metricName("sync-time-max", "The max time taken for a group sync"), new Max()); - this.syncSensor.add(createMeter(metrics, metricGrpName, "sync", "group syncs")); + this.syncSensor.add(createMeter("sync", "group syncs")); - this.successfulRebalanceSensor = metrics.sensor("rebalance-latency"); - this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-avg", - this.metricGrpName, + this.successfulRebalanceSensor = sensor("rebalance-latency"); + this.successfulRebalanceSensor.add(metricName("rebalance-latency-avg", "The average time taken for a group to complete a successful rebalance, which may be composed of " + "several failed re-trials until it succeeded"), new Avg()); - this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-max", - this.metricGrpName, + this.successfulRebalanceSensor.add(metricName("rebalance-latency-max", "The max time taken for a group to complete a successful rebalance, which may be composed of " + "several failed re-trials until it succeeded"), new Max()); - this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-total", - this.metricGrpName, + this.successfulRebalanceSensor.add(metricName("rebalance-latency-total", "The total number of milliseconds this consumer has spent in successful rebalances since creation"), new CumulativeSum()); this.successfulRebalanceSensor.add( - metrics.metricName("rebalance-total", - this.metricGrpName, + metricName("rebalance-total", "The total number of successful rebalance events, each event is composed of " + "several failed re-trials until it succeeded"), new CumulativeCount() ); this.successfulRebalanceSensor.add( - metrics.metricName( + metricName( "rebalance-rate-per-hour", - this.metricGrpName, "The number of successful rebalance events per hour, each event is composed of " + "several failed re-trials until it succeeded"), new Rate(TimeUnit.HOURS, new WindowedCount()) ); - this.failedRebalanceSensor = metrics.sensor("failed-rebalance"); + this.failedRebalanceSensor = sensor("failed-rebalance"); this.failedRebalanceSensor.add( - metrics.metricName("failed-rebalance-total", - this.metricGrpName, + metricName("failed-rebalance-total", "The total number of failed rebalance events"), new CumulativeCount() ); this.failedRebalanceSensor.add( - metrics.metricName( + metricName( "failed-rebalance-rate-per-hour", - this.metricGrpName, "The number of failed rebalance events per hour"), new Rate(TimeUnit.HOURS, new WindowedCount()) ); @@ -1432,8 +1414,7 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { else return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS); }; - metrics.addMetric(metrics.metricName("last-rebalance-seconds-ago", - this.metricGrpName, + addMetric(metricName("last-rebalance-seconds-ago", "The number of seconds since the last successful rebalance event"), lastRebalance); @@ -1444,11 +1425,18 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { else return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); }; - metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", - this.metricGrpName, + addMetric(metricName("last-heartbeat-seconds-ago", "The number of seconds since the last coordinator heartbeat was sent"), lastHeartbeat); } + + protected final Meter createMeter(String baseName, String descriptiveName) { + return new Meter(new WindowedCount(), + metricName(baseName + "-rate", + String.format("The number of %s per second", descriptiveName)), + metricName(baseName + "-total", + String.format("The total number of %s", descriptiveName))); + } } private class HeartbeatThread extends BaseHeartbeatThread { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 938ae909027d0..4c12967fd5580 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -287,6 +287,8 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() { private final ApplicationEventHandler applicationEventHandler; private final Time time; private final AtomicReference> groupMetadata = new AtomicReference<>(Optional.empty()); + private final FetchMetricsManager fetchMetricsManager; + private final RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager; private final AsyncConsumerMetrics asyncConsumerMetrics; private final KafkaConsumerMetrics kafkaConsumerMetrics; private Logger log; @@ -416,7 +418,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, final List addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); - FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics); + this.fetchMetricsManager = createFetchMetricsManager(metrics); FetchConfig fetchConfig = new FetchConfig(config); this.isolationLevel = fetchConfig.isolationLevel; @@ -475,11 +477,12 @@ public AsyncKafkaConsumer(final ConsumerConfig config, requestManagersSupplier, asyncConsumerMetrics ); + this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics) + rebalanceCallbackMetricsManager ); this.streamsRebalanceListenerInvoker = streamsRebalanceData.map(s -> new StreamsRebalanceListenerInvoker(logContext, s)); @@ -518,6 +521,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, String clientId, Deserializers deserializers, FetchBuffer fetchBuffer, + FetchMetricsManager fetchMetricsManager, + RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager, FetchCollector fetchCollector, ConsumerInterceptors interceptors, Time time, @@ -537,6 +542,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.subscriptions = subscriptions; this.clientId = clientId; this.fetchBuffer = fetchBuffer; + this.fetchMetricsManager = fetchMetricsManager; + this.rebalanceCallbackMetricsManager = rebalanceCallbackMetricsManager; this.fetchCollector = fetchCollector; this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = Objects.requireNonNull(interceptors); @@ -591,7 +598,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.clientTelemetryReporter = Optional.empty(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(); - FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); + this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); this.fetchCollector = new FetchCollector<>(logContext, metadata, subscriptions, @@ -616,11 +623,12 @@ public AsyncKafkaConsumer(final ConsumerConfig config, time, asyncConsumerMetrics ); + this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics) + rebalanceCallbackMetricsManager ); ApiVersions apiVersions = new ApiVersions(); Supplier networkClientDelegateSupplier = () -> new NetworkClientDelegate( @@ -1463,6 +1471,8 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(asyncConsumerMetrics, "async consumer metrics", firstException); + closeQuietly(fetchMetricsManager, "consumer fetch metrics", firstException); + closeQuietly(rebalanceCallbackMetricsManager, "consumer rebalance callback metrics"); closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "async consumer telemetry reporter", firstException)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 787d710535e0c..f52ec096a85ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -125,6 +125,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { private final Optional groupId; private final ConsumerCoordinator coordinator; private final Deserializers deserializers; + private final FetchMetricsManager fetchMetricsManager; private final Fetcher fetcher; private final OffsetFetcher offsetFetcher; private final TopicMetadataFetcher topicMetadataFetcher; @@ -191,7 +192,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { List addresses = ClientUtils.parseAndValidateAddresses(config); this.metadata.bootstrap(addresses); - FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics); + this.fetchMetricsManager = createFetchMetricsManager(metrics); FetchConfig fetchConfig = new FetchConfig(config); this.isolationLevel = fetchConfig.isolationLevel; @@ -362,7 +363,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { boolean checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG); ConsumerMetrics metricsRegistry = new ConsumerMetrics(); - FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); + this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); ApiVersions apiVersions = new ApiVersions(); FetchConfig fetchConfig = new FetchConfig( minBytes, @@ -381,7 +382,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { subscriptions, fetchConfig, deserializers, - metricsManager, + fetchMetricsManager, time, apiVersions ); @@ -1179,6 +1180,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); + closeQuietly(fetchMetricsManager, "kafka fetch metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(client, "consumer network client", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 4956d64228dbb..88dd984be1e7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -55,6 +56,8 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; @@ -108,6 +111,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final List assignors; private final ConsumerMetadata metadata; private final ConsumerCoordinatorMetrics coordinatorMetrics; + private final RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager; private final SubscriptionState subscriptions; private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; @@ -271,11 +275,12 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, protocol = null; } + this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix) + rebalanceCallbackMetricsManager ); this.metadata.requestUpdate(true); } @@ -1024,6 +1029,8 @@ public void close(final Timer timer, CloseOptions.GroupMembershipOperation membe } } finally { super.close(timer, membershipOperation); + Utils.closeQuietly(coordinatorMetrics, "consumer coordinator metrics"); + Utils.closeQuietly(rebalanceCallbackMetricsManager, "consumer rebalance callback metrics"); } } @@ -1621,24 +1628,26 @@ public String toString() { } } - private class ConsumerCoordinatorMetrics { + private class ConsumerCoordinatorMetrics extends AbstractConsumerMetricsManager { private final Sensor commitSensor; + @SuppressWarnings({"this-escape"}) private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { - String metricGrpName = metricGrpPrefix + COORDINATOR_METRICS_SUFFIX; + super(metrics, metricGrpPrefix + COORDINATOR_METRICS_SUFFIX); - this.commitSensor = metrics.sensor("commit-latency"); - this.commitSensor.add(metrics.metricName("commit-latency-avg", - metricGrpName, + this.commitSensor = sensor("commit-latency"); + this.commitSensor.add(metricName("commit-latency-avg", "The average time taken for a commit request"), new Avg()); - this.commitSensor.add(metrics.metricName("commit-latency-max", - metricGrpName, + this.commitSensor.add(metricName("commit-latency-max", "The max time taken for a commit request"), new Max()); - this.commitSensor.add(createMeter(metrics, metricGrpName, "commit", "commit calls")); + this.commitSensor.add(new Meter(new WindowedCount(), + metricName("commit-rate", + String.format("The number of %s per second", "commit calls")), + metricName("commit-total", + String.format("The total number of %s", "commit calls")))); Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions(); - metrics.addMetric(metrics.metricName("assigned-partitions", - metricGrpName, + addMetric(metricName("assigned-partitions", "The number of partitions currently assigned to this consumer"), numParts); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java index 98644180e8b0b..b6853e8e5d3ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Gauge; @@ -35,9 +36,8 @@ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it * records matches up with the topic-partitions in use. */ -public class FetchMetricsManager { +public class FetchMetricsManager extends AbstractConsumerMetricsManager { - private final Metrics metrics; private final FetchMetricsRegistry metricsRegistry; private final Sensor throttleTime; private final Sensor bytesFetched; @@ -49,32 +49,33 @@ public class FetchMetricsManager { private int assignmentId = 0; private Set assignedPartitions = Collections.emptySet(); + @SuppressWarnings({"this-escape"}) public FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry) { - this.metrics = metrics; + super(metrics, metricsRegistry.groupName()); this.metricsRegistry = metricsRegistry; - this.throttleTime = new SensorBuilder(metrics, "fetch-throttle-time") + this.throttleTime = sensorBuilder("fetch-throttle-time") .withAvg(metricsRegistry.fetchThrottleTimeAvg) .withMax(metricsRegistry.fetchThrottleTimeMax) .build(); - this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched") + this.bytesFetched = sensorBuilder("bytes-fetched") .withAvg(metricsRegistry.fetchSizeAvg) .withMax(metricsRegistry.fetchSizeMax) .withMeter(metricsRegistry.bytesConsumedRate, metricsRegistry.bytesConsumedTotal) .build(); - this.recordsFetched = new SensorBuilder(metrics, "records-fetched") + this.recordsFetched = sensorBuilder("records-fetched") .withAvg(metricsRegistry.recordsPerRequestAvg) .withMeter(metricsRegistry.recordsConsumedRate, metricsRegistry.recordsConsumedTotal) .build(); - this.fetchLatency = new SensorBuilder(metrics, "fetch-latency") + this.fetchLatency = sensorBuilder("fetch-latency") .withAvg(metricsRegistry.fetchLatencyAvg) .withMax(metricsRegistry.fetchLatencyMax) .withMeter(new WindowedCount(), metricsRegistry.fetchRequestRate, metricsRegistry.fetchRequestTotal) .build(); - this.recordsLag = new SensorBuilder(metrics, "records-lag") + this.recordsLag = sensorBuilder("records-lag") .withMax(metricsRegistry.recordsLagMax) .build(); - this.recordsLead = new SensorBuilder(metrics, "records-lead") + this.recordsLead = sensorBuilder("records-lead") .withMin(metricsRegistry.recordsLeadMin) .build(); } @@ -87,7 +88,7 @@ void recordLatency(String node, long requestLatencyMs) { fetchLatency.record(requestLatencyMs); if (!node.isEmpty()) { String nodeTimeName = "node-" + node + ".latency"; - Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); + Sensor nodeRequestTime = getSensor(nodeTimeName); if (nodeRequestTime != null) nodeRequestTime.record(requestLatencyMs); } @@ -105,7 +106,7 @@ void recordBytesFetched(String topic, int bytes) { String name = topicBytesFetchedMetricName(topic); maybeRecordDeprecatedBytesFetched(name, topic, bytes); - Sensor bytesFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic)) + Sensor bytesFetched = sensorBuilder(name, () -> Map.of("topic", topic)) .withAvg(metricsRegistry.topicFetchSizeAvg) .withMax(metricsRegistry.topicFetchSizeMax) .withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal) @@ -117,7 +118,7 @@ void recordRecordsFetched(String topic, int records) { String name = topicRecordsFetchedMetricName(topic); maybeRecordDeprecatedRecordsFetched(name, topic, records); - Sensor recordsFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic)) + Sensor recordsFetched = sensorBuilder(name, () -> Map.of("topic", topic)) .withAvg(metricsRegistry.topicRecordsPerRequestAvg) .withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal) .build(); @@ -130,7 +131,7 @@ void recordPartitionLag(TopicPartition tp, long lag) { String name = partitionRecordsLagMetricName(tp); maybeRecordDeprecatedPartitionLag(name, tp, lag); - Sensor recordsLag = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) + Sensor recordsLag = sensorBuilder(name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) .withValue(metricsRegistry.partitionRecordsLag) .withMax(metricsRegistry.partitionRecordsLagMax) .withAvg(metricsRegistry.partitionRecordsLagAvg) @@ -145,7 +146,7 @@ void recordPartitionLead(TopicPartition tp, long lead) { String name = partitionRecordsLeadMetricName(tp); maybeRecordDeprecatedPartitionLead(name, tp, lead); - Sensor recordsLead = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) + Sensor recordsLead = sensorBuilder(name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) .withValue(metricsRegistry.partitionRecordsLead) .withMin(metricsRegistry.partitionRecordsLeadMin) .withAvg(metricsRegistry.partitionRecordsLeadAvg) @@ -169,13 +170,13 @@ void maybeUpdateAssignment(SubscriptionState subscription) { for (TopicPartition tp : this.assignedPartitions) { if (!newAssignedPartitions.contains(tp)) { - metrics.removeSensor(partitionRecordsLagMetricName(tp)); - metrics.removeSensor(partitionRecordsLeadMetricName(tp)); - metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp)); + removeSensor(partitionRecordsLagMetricName(tp)); + removeSensor(partitionRecordsLeadMetricName(tp)); + removeMetric(partitionPreferredReadReplicaMetricName(tp)); // Remove deprecated metrics. - metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp))); - metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp))); - metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp)); + removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp))); + removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp))); + removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp)); } } @@ -184,7 +185,7 @@ void maybeUpdateAssignment(SubscriptionState subscription) { maybeRecordDeprecatedPreferredReadReplica(tp, subscription); MetricName metricName = partitionPreferredReadReplicaMetricName(tp); - metrics.addMetricIfAbsent( + addMetricIfAbsent( metricName, null, (Gauge) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) @@ -200,7 +201,7 @@ void maybeUpdateAssignment(SubscriptionState subscription) { @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedBytesFetched(String name, String topic, int bytes) { if (shouldReportDeprecatedMetric(topic)) { - Sensor deprecatedBytesFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic)) + Sensor deprecatedBytesFetched = sensorBuilder(deprecatedMetricName(name), () -> topicTags(topic)) .withAvg(metricsRegistry.topicFetchSizeAvg) .withMax(metricsRegistry.topicFetchSizeMax) .withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal) @@ -212,7 +213,7 @@ private void maybeRecordDeprecatedBytesFetched(String name, String topic, int by @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int records) { if (shouldReportDeprecatedMetric(topic)) { - Sensor deprecatedRecordsFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic)) + Sensor deprecatedRecordsFetched = sensorBuilder(deprecatedMetricName(name), () -> topicTags(topic)) .withAvg(metricsRegistry.topicRecordsPerRequestAvg) .withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal) .build(); @@ -223,7 +224,7 @@ private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, long lag) { if (shouldReportDeprecatedMetric(tp.topic())) { - Sensor deprecatedRecordsLag = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp)) + Sensor deprecatedRecordsLag = sensorBuilder(deprecatedMetricName(name), () -> topicPartitionTags(tp)) .withValue(metricsRegistry.partitionRecordsLag) .withMax(metricsRegistry.partitionRecordsLagMax) .withAvg(metricsRegistry.partitionRecordsLagAvg) @@ -236,7 +237,7 @@ private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, l @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, double lead) { if (shouldReportDeprecatedMetric(tp.topic())) { - Sensor deprecatedRecordsLead = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp)) + Sensor deprecatedRecordsLead = sensorBuilder(deprecatedMetricName(name), () -> topicPartitionTags(tp)) .withValue(metricsRegistry.partitionRecordsLead) .withMin(metricsRegistry.partitionRecordsLeadMin) .withAvg(metricsRegistry.partitionRecordsLeadAvg) @@ -250,7 +251,7 @@ private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, SubscriptionState subscription) { if (shouldReportDeprecatedMetric(tp.topic())) { MetricName metricName = deprecatedPartitionPreferredReadReplicaMetricName(tp); - metrics.addMetricIfAbsent( + addMetricIfAbsent( metricName, null, (Gauge) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) @@ -284,13 +285,13 @@ private static boolean shouldReportDeprecatedMetric(String topic) { private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) { Map metricTags = mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))); - return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); + return metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); } @Deprecated private MetricName deprecatedPartitionPreferredReadReplicaMetricName(TopicPartition tp) { Map metricTags = topicPartitionTags(tp); - return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); + return metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); } @Deprecated diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java index 589cb6736b367..c0a66339c582e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java @@ -29,6 +29,7 @@ public class FetchMetricsRegistry { private static final String DEPRECATED_TOPIC_METRICS_MESSAGE = "Note: For topic names with periods (.), an additional " + "metric with underscores is emitted. However, the periods replaced metric is deprecated. Please use the metric with actual topic name instead."; + private final String groupName; public MetricNameTemplate fetchSizeAvg; public MetricNameTemplate fetchSizeMax; public MetricNameTemplate bytesConsumedRate; @@ -70,7 +71,7 @@ public FetchMetricsRegistry(String metricGrpPrefix) { public FetchMetricsRegistry(Set tags, String metricGrpPrefix) { /* Client level */ - String groupName = metricGrpPrefix + "-fetch-manager-metrics"; + this.groupName = metricGrpPrefix + "-fetch-manager-metrics"; this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, "The average number of bytes fetched per request", tags); @@ -148,6 +149,10 @@ public FetchMetricsRegistry(Set tags, String metricGrpPrefix) { "The current read replica for the partition, or -1 if reading from leader. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags); } + public String groupName() { + return groupName; + } + public List getAllTemplates() { return Arrays.asList( fetchSizeAvg, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java deleted file mode 100644 index a2346a3b376bb..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricNameTemplate; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Meter; -import org.apache.kafka.common.metrics.stats.Min; -import org.apache.kafka.common.metrics.stats.SampledStat; -import org.apache.kafka.common.metrics.stats.Value; - -import java.util.Collections; -import java.util.Map; -import java.util.function.Supplier; - -/** - * {@code SensorBuilder} takes a bit of the boilerplate out of creating {@link Sensor sensors} for recording - * {@link Metric metrics}. - */ -public class SensorBuilder { - - private final Metrics metrics; - - private final Sensor sensor; - - private final boolean preexisting; - - private final Map tags; - - public SensorBuilder(Metrics metrics, String name) { - this(metrics, name, Collections::emptyMap); - } - - public SensorBuilder(Metrics metrics, String name, Supplier> tagsSupplier) { - this.metrics = metrics; - Sensor s = metrics.getSensor(name); - - if (s != null) { - sensor = s; - tags = Collections.emptyMap(); - preexisting = true; - } else { - sensor = metrics.sensor(name); - tags = tagsSupplier.get(); - preexisting = false; - } - } - - SensorBuilder withAvg(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Avg()); - - return this; - } - - SensorBuilder withMin(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Min()); - - return this; - } - - SensorBuilder withMax(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Max()); - - return this; - } - - SensorBuilder withValue(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Value()); - - return this; - } - - SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate totalName) { - if (!preexisting) { - sensor.add(new Meter(metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags))); - } - - return this; - } - - SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate rateName, MetricNameTemplate totalName) { - if (!preexisting) { - sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags))); - } - - return this; - } - - Sensor build() { - return sensor; - } - -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java index d3e60a3dfaaee..4a59884d2a2c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java @@ -16,15 +16,12 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.WindowedCount; -import java.io.IOException; -import java.util.Arrays; - -public class ShareFetchMetricsManager implements AutoCloseable { - private final Metrics metrics; +public class ShareFetchMetricsManager extends AbstractConsumerMetricsManager { private final Sensor throttleTime; private final Sensor bytesFetched; private final Sensor recordsFetched; @@ -32,35 +29,35 @@ public class ShareFetchMetricsManager implements AutoCloseable { private final Sensor sentAcknowledgements; private final Sensor failedAcknowledgements; + @SuppressWarnings({"this-escape"}) public ShareFetchMetricsManager(Metrics metrics, ShareFetchMetricsRegistry metricsRegistry) { - this.metrics = metrics; - - this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched") + super(metrics, metricsRegistry.groupName()); + this.bytesFetched = sensorBuilder("bytes-fetched") .withAvg(metricsRegistry.fetchSizeAvg) .withMax(metricsRegistry.fetchSizeMax) .withMeter(metricsRegistry.bytesFetchedRate, metricsRegistry.bytesFetchedTotal) .build(); - this.recordsFetched = new SensorBuilder(metrics, "records-fetched") + this.recordsFetched = sensorBuilder("records-fetched") .withAvg(metricsRegistry.recordsPerRequestAvg) .withMax(metricsRegistry.recordsPerRequestMax) .withMeter(metricsRegistry.recordsFetchedRate, metricsRegistry.recordsFetchedTotal) .build(); - this.sentAcknowledgements = new SensorBuilder(metrics, "sent-acknowledgements") + this.sentAcknowledgements = sensorBuilder("sent-acknowledgements") .withMeter(metricsRegistry.acknowledgementSendRate, metricsRegistry.acknowledgementSendTotal) .build(); - this.failedAcknowledgements = new SensorBuilder(metrics, "failed-acknowledgements") + this.failedAcknowledgements = sensorBuilder("failed-acknowledgements") .withMeter(metricsRegistry.acknowledgementErrorRate, metricsRegistry.acknowledgementErrorTotal) .build(); - this.fetchLatency = new SensorBuilder(metrics, "fetch-latency") + this.fetchLatency = sensorBuilder("fetch-latency") .withAvg(metricsRegistry.fetchLatencyAvg) .withMax(metricsRegistry.fetchLatencyMax) .withMeter(new WindowedCount(), metricsRegistry.fetchRequestRate, metricsRegistry.fetchRequestTotal) .build(); - this.throttleTime = new SensorBuilder(metrics, "fetch-throttle-time") + this.throttleTime = sensorBuilder("fetch-throttle-time") .withAvg(metricsRegistry.fetchThrottleTimeAvg) .withMax(metricsRegistry.fetchThrottleTimeMax) .build(); @@ -74,7 +71,7 @@ void recordLatency(String node, long requestLatencyMs) { fetchLatency.record(requestLatencyMs); if (!node.isEmpty()) { String nodeTimeName = "node-" + node + ".latency"; - Sensor nodeRequestTime = metrics.getSensor(nodeTimeName); + Sensor nodeRequestTime = getSensor(nodeTimeName); if (nodeRequestTime != null) nodeRequestTime.record(requestLatencyMs); } @@ -95,16 +92,4 @@ void recordAcknowledgementSent(int acknowledgements) { void recordFailedAcknowledgements(int acknowledgements) { failedAcknowledgements.record(acknowledgements); } - - @Override - public void close() throws IOException { - Arrays.asList( - throttleTime.name(), - bytesFetched.name(), - recordsFetched.name(), - fetchLatency.name(), - sentAcknowledgements.name(), - failedAcknowledgements.name() - ).forEach(metrics::removeSensor); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java index 2ae7952b6671d..c810d99accf57 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java @@ -23,6 +23,8 @@ public class ShareFetchMetricsRegistry { + private final String groupName; + public MetricNameTemplate fetchSizeAvg; public MetricNameTemplate fetchSizeMax; public MetricNameTemplate bytesFetchedRate; @@ -53,7 +55,7 @@ public ShareFetchMetricsRegistry(String metricGrpPrefix) { public ShareFetchMetricsRegistry(Set tags, String metricGrpPrefix) { /* Client level */ - String groupName = metricGrpPrefix + "-fetch-manager-metrics"; + this.groupName = metricGrpPrefix + "-fetch-manager-metrics"; this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, "The average number of bytes fetched per request", tags); @@ -98,4 +100,8 @@ public ShareFetchMetricsRegistry(Set tags, String metricGrpPrefix) { this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName, "The maximum throttle time in ms", tags); } + + public String groupName() { + return groupName; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java index b5026d7ab29de..2de6b882a2c0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java @@ -16,14 +16,27 @@ */ package org.apache.kafka.clients.consumer.internals.metrics; +import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.MetricValueProvider; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.SampledStat; +import org.apache.kafka.common.metrics.stats.Value; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; /** * Utility class that serves as a common abstraction point @@ -48,8 +61,25 @@ protected MetricName metricName(String name, String description) { return metricName; } + protected MetricName metricInstance(MetricNameTemplate template, Map tags) { + MetricName metricName = metrics.metricInstance(template, tags); + metricNames.add(metricName); + return metricName; + } + + protected void addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider metricValueProvider) { + metrics.addMetricIfAbsent(metricName, config, metricValueProvider); + metricNames.add(metricName); + } + protected void addMetric(MetricName metricName, Measurable measurable) { metrics.addMetric(metricName, measurable); + metricNames.add(metricName); + } + + protected void removeMetric(MetricName metricName) { + metrics.removeMetric(metricName); + metricNames.remove(metricName); } protected Sensor sensor(String name) { @@ -58,22 +88,115 @@ protected Sensor sensor(String name) { return sensor; } + protected Sensor getSensor(String name) { + Sensor sensor = metrics.getSensor(name); + + if (sensor != null) + sensors.add(sensor); + + return sensor; + } + + protected void removeSensor(String name) { + Sensor s = getSensor(name); + metrics.removeSensor(name); + sensors.remove(s); + } + + protected SensorBuilder sensorBuilder(String name) { + return new SensorBuilder(name); + } + + protected SensorBuilder sensorBuilder(String name, Supplier> tagsSupplier) { + return new SensorBuilder(name, tagsSupplier); + } + @Override public final void close() { - System.out.println("Metrics before:"); - metrics.metrics().keySet().forEach(System.out::println); + sensors.forEach(s -> { + metrics.removeSensor(s.name()); + }); metricNames.forEach(metrics::removeMetric); + } + + /** + * {@code SensorBuilder} takes a bit of the boilerplate out of creating {@link Sensor sensors} for recording + * {@link Metric metrics}. + */ + public class SensorBuilder { + + private final Sensor sensor; + + private final boolean preexisting; + + private final Map tags; + + public SensorBuilder(String name) { + this(name, Collections::emptyMap); + } + + public SensorBuilder(String name, Supplier> tagsSupplier) { + Sensor s = getSensor(name); + + if (s != null) { + sensor = s; + tags = Collections.emptyMap(); + preexisting = true; + } else { + sensor = sensor(name); + sensors.add(sensor); + tags = tagsSupplier.get(); + preexisting = false; + } + } + + public SensorBuilder withAvg(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Avg()); + + return this; + } + + public SensorBuilder withMin(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Min()); + + return this; + } + + public SensorBuilder withMax(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Max()); + + return this; + } + + public SensorBuilder withValue(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Value()); + + return this; + } + + public SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate totalName) { + if (!preexisting) { + sensor.add(new Meter(metricInstance(rateName, tags), metricInstance(totalName, tags))); + } - System.out.println("Metrics after:"); - metrics.metrics().keySet().forEach(System.out::println); + return this; + } - System.out.println("Sensors before:"); - sensors.stream().filter(s -> metrics.getSensor(s.name()) != null).forEach(System.out::println); + public SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate rateName, MetricNameTemplate totalName) { + if (!preexisting) { + sensor.add(new Meter(sampledStat, metricInstance(rateName, tags), metricInstance(totalName, tags))); + } - sensors.forEach(s -> metrics.removeSensor(s.name())); + return this; + } - System.out.println("Sensors after:"); - sensors.stream().filter(s -> metrics.getSensor(s.name()) != null).forEach(System.out::println); + public Sensor build() { + return sensor; + } } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java index 186ee430eb3dc..271c29c0163df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java @@ -45,6 +45,7 @@ public class AsyncConsumerMetrics extends AbstractConsumerMetricsManager { private final Sensor unsentRequestsQueueSizeSensor; private final Sensor unsentRequestsQueueTimeSensor; + @SuppressWarnings({"this-escape"}) public AsyncConsumerMetrics(Metrics metrics, String groupName) { super(metrics, groupName); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java index 037bd35919e98..d8b38725b82ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java @@ -52,6 +52,7 @@ public final class ConsumerRebalanceMetricsManager extends RebalanceMetricsManag private long lastRebalanceEndMs = -1L; private long lastRebalanceStartMs = -1L; + @SuppressWarnings({"this-escape"}) public ConsumerRebalanceMetricsManager(Metrics metrics, SubscriptionState subscriptions) { super(metrics, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java index 2565a8de3b2d5..0dd34bf55eacf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java @@ -42,6 +42,7 @@ public HeartbeatMetricsManager(Metrics metrics) { this(metrics, CONSUMER_METRIC_GROUP_PREFIX); } + @SuppressWarnings({"this-escape"}) public HeartbeatMetricsManager(Metrics metrics, String metricGroupPrefix) { super(metrics, metricGroupPrefix + COORDINATOR_METRICS_SUFFIX); heartbeatSensor = sensor("heartbeat-latency"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java index e9dd57baa78ad..3c454d99a127a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java @@ -37,6 +37,7 @@ public class KafkaConsumerMetrics extends AbstractConsumerMetricsManager { private long pollStartMs; private long timeSinceLastPollMs; + @SuppressWarnings({"this-escape"}) public KafkaConsumerMetrics(Metrics metrics) { super(metrics, CONSUMER_METRIC_GROUP); Measurable lastPoll = (mConfig, now) -> { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java index 653da0871fdd9..df99a012cc12f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java @@ -34,6 +34,7 @@ public class KafkaShareConsumerMetrics extends AbstractConsumerMetricsManager { private long pollStartMs; private long timeSinceLastPollMs; + @SuppressWarnings({"this-escape"}) public KafkaShareConsumerMetrics(Metrics metrics) { super(metrics, CONSUMER_SHARE_METRIC_GROUP); Measurable lastPoll = (mConfig, now) -> { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java index 94df5af86513c..f9c2d7456f834 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java @@ -34,8 +34,9 @@ public class OffsetCommitMetricsManager extends AbstractConsumerMetricsManager { final MetricName commitTotal; private final Sensor commitSensor; - public OffsetCommitMetricsManager(Metrics metrics_) { - super(metrics_, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); + @SuppressWarnings({"this-escape"}) + public OffsetCommitMetricsManager(Metrics metrics) { + super(metrics, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); commitSensor = sensor("commit-latency"); commitLatencyAvg = metricName("commit-latency-avg", "The average time taken for a commit request"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java index 292ca9a971c5a..fafc7c926eccb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java @@ -40,6 +40,7 @@ public RebalanceCallbackMetricsManager(Metrics metrics) { this(metrics, CONSUMER_METRIC_GROUP_PREFIX); } + @SuppressWarnings({"this-escape"}) public RebalanceCallbackMetricsManager(Metrics metrics, String grpMetricsPrefix) { super(metrics, grpMetricsPrefix + COORDINATOR_METRICS_SUFFIX); partitionRevokeCallbackSensor = sensor("partition-revoked-latency"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java index 3169ffb0209fc..bac33a2b6bdd2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java @@ -35,6 +35,7 @@ public final class ShareRebalanceMetricsManager extends RebalanceMetricsManager private long lastRebalanceEndMs = -1L; private long lastRebalanceStartMs = -1L; + @SuppressWarnings({"this-escape"}) public ShareRebalanceMetricsManager(Metrics metrics) { super(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 78ff15cee5f8e..9334fe610250b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -389,6 +389,19 @@ public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol consumer.close(CloseOptions.timeout(Duration.ZERO)); } + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testMetricsRemovedOnClose(GroupProtocol groupProtocol) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + + assertTrue(consumer.metrics().size() > 1, "The consumer should have created many metrics"); + consumer.close(CloseOptions.timeout(Duration.ZERO)); + assertTrue(consumer.metrics().size() <= 1, "The consumer should have removed all of its metrics"); + } + @ParameterizedTest @EnumSource(GroupProtocol.class) public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8e44b3fcc25d5..f9d39313c7bf3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -59,6 +59,7 @@ import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.Node; @@ -260,6 +261,8 @@ private AsyncKafkaConsumer newConsumer( "client-id", new Deserializers<>(new StringDeserializer(), new StringDeserializer(), metrics), fetchBuffer, + mock(FetchMetricsManager.class), + mock(RebalanceCallbackMetricsManager.class), fetchCollector, interceptors, time, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java index 96845c45e615c..1917ddd6df9b0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals.metrics; import org.apache.kafka.common.metrics.Metrics; + import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals;