Skip to content

Commit 676e0f2

Browse files
authored
KAFKA-19139 Plugin#wrapInstance should use LinkedHashMap instead of Map (#19519)
There will be an update to the PluginMetrics#metricName method: the type of the tags parameter will be changed from Map to LinkedHashMap. This change is necessary because the order of metric tags is important 1. If the tag order is inconsistent, identical metrics may be treated as distinct ones by the metrics backend 2. KAFKA-18390 is updating metric naming to use LinkedHashMap. For consistency, we should follow the same approach here. Reviewers: TengYao Chi <[email protected]>, Jhen-Yung Hsu <[email protected]>, lllilllilllilili
1 parent 431cffc commit 676e0f2

File tree

15 files changed

+78
-59
lines changed

15 files changed

+78
-59
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public static class MonitorableCustomQuotaCallback extends CustomQuotaCallback i
173173

174174
@Override
175175
public void withPluginMetrics(PluginMetrics metrics) {
176-
MetricName metricName = metrics.metricName(METRIC_NAME, METRIC_DESCRIPTION, Map.of());
176+
MetricName metricName = metrics.metricName(METRIC_NAME, METRIC_DESCRIPTION, new LinkedHashMap<>());
177177
metrics.addMetric(metricName, (Gauge<Integer>) (config, now) -> 1);
178178
}
179179

clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import org.apache.kafka.common.MetricName;
2020

21-
import java.util.Map;
21+
import java.util.LinkedHashMap;
2222

2323
/**
2424
* This allows plugins to register metrics and sensors.
@@ -35,7 +35,7 @@ public interface PluginMetrics {
3535
* @param tags Additional tags for the metric
3636
* @throws IllegalArgumentException if any of the tag names collide with the default tags for the plugin
3737
*/
38-
MetricName metricName(String name, String description, Map<String, String> tags);
38+
MetricName metricName(String name, String description, LinkedHashMap<String, String> tags);
3939

4040
/**
4141
* Add a metric to monitor an object that implements {@link MetricValueProvider}. This metric won't be associated with any

clients/src/main/java/org/apache/kafka/common/metrics/internals/PluginMetricsImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public PluginMetricsImpl(Metrics metrics, Map<String, String> tags) {
4545
}
4646

4747
@Override
48-
public MetricName metricName(String name, String description, Map<String, String> tags) {
48+
public MetricName metricName(String name, String description, LinkedHashMap<String, String> tags) {
4949
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
5050
for (String tagName : tags.keySet()) {
5151
if (this.tags.containsKey(tagName)) {

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -3746,9 +3746,13 @@ private MetricName expectedMetricName(String clientId, String config, Class<?> c
37463746

37473747
private static final String NAME = "name";
37483748
private static final String DESCRIPTION = "description";
3749-
private static final Map<String, String> TAGS = Collections.singletonMap("k", "v");
3749+
private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();
37503750
private static final double VALUE = 123.0;
37513751

3752+
static {
3753+
TAGS.put("t1", "v1");
3754+
}
3755+
37523756
public static class MonitorableDeserializer extends MockDeserializer implements Monitorable {
37533757

37543758
@Override

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

+21-24
Original file line numberDiff line numberDiff line change
@@ -238,14 +238,7 @@ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
238238

239239
@Test
240240
public void testAcksAndIdempotenceForIdempotentProducers() {
241-
Properties baseProps = new Properties() {{
242-
setProperty(
243-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
244-
setProperty(
245-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
246-
setProperty(
247-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
248-
}};
241+
Properties baseProps = baseProperties();
249242

250243
Properties validProps = new Properties() {{
251244
putAll(baseProps);
@@ -348,11 +341,7 @@ public void testAcksAndIdempotenceForIdempotentProducers() {
348341

349342
@Test
350343
public void testRetriesAndIdempotenceForIdempotentProducers() {
351-
Properties baseProps = new Properties() {{
352-
setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
353-
setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
354-
setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
355-
}};
344+
Properties baseProps = baseProperties();
356345

357346
Properties validProps = new Properties() {{
358347
putAll(baseProps);
@@ -414,13 +403,17 @@ public void testRetriesAndIdempotenceForIdempotentProducers() {
414403
"Must set retries to non-zero when using the transactional producer.");
415404
}
416405

406+
private Properties baseProperties() {
407+
Properties baseProps = new Properties();
408+
baseProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
409+
baseProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
410+
baseProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
411+
return baseProps;
412+
}
413+
417414
@Test
418415
public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
419-
Properties baseProps = new Properties() {{
420-
setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
421-
setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
422-
setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
423-
}};
416+
Properties baseProps = baseProperties();
424417

425418
Properties validProps = new Properties() {{
426419
putAll(baseProps);
@@ -1590,7 +1583,7 @@ public void testMeasureAbortTransactionDuration() {
15901583
}
15911584

15921585
@Test
1593-
public void testCommitTransactionWithRecordTooLargeException() throws Exception {
1586+
public void testCommitTransactionWithRecordTooLargeException() {
15941587
Map<String, Object> configs = new HashMap<>();
15951588
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
15961589
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
@@ -1620,7 +1613,7 @@ public void testCommitTransactionWithRecordTooLargeException() throws Exception
16201613
}
16211614

16221615
@Test
1623-
public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
1616+
public void testCommitTransactionWithMetadataTimeoutForMissingTopic() {
16241617
Map<String, Object> configs = new HashMap<>();
16251618
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
16261619
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
@@ -1657,7 +1650,7 @@ public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exc
16571650
}
16581651

16591652
@Test
1660-
public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() throws Exception {
1653+
public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() {
16611654
Map<String, Object> configs = new HashMap<>();
16621655
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
16631656
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
@@ -1694,7 +1687,7 @@ public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() thr
16941687
}
16951688

16961689
@Test
1697-
public void testCommitTransactionWithSendToInvalidTopic() throws Exception {
1690+
public void testCommitTransactionWithSendToInvalidTopic() {
16981691
Map<String, Object> configs = new HashMap<>();
16991692
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
17001693
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
@@ -2131,7 +2124,7 @@ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
21312124
}
21322125

21332126
@Test
2134-
public void testSendToInvalidTopic() throws Exception {
2127+
public void testSendToInvalidTopic() {
21352128
Map<String, Object> configs = new HashMap<>();
21362129
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
21372130
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
@@ -2937,9 +2930,13 @@ private MetricName expectedMetricName(String clientId, String config, Class<?> c
29372930

29382931
private static final String NAME = "name";
29392932
private static final String DESCRIPTION = "description";
2940-
private static final Map<String, String> TAGS = Collections.singletonMap("k", "v");
2933+
private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();
29412934
private static final double VALUE = 123.0;
29422935

2936+
static {
2937+
TAGS.put("t1", "v1");
2938+
}
2939+
29432940
public static class MonitorableSerializer extends MockSerializer implements Monitorable {
29442941

29452942
@Override

clients/src/test/java/org/apache/kafka/common/internals/PluginTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import java.io.Closeable;
2727
import java.io.IOException;
2828
import java.util.Arrays;
29-
import java.util.Collections;
29+
import java.util.LinkedHashMap;
3030
import java.util.List;
3131

3232
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -129,7 +129,7 @@ public void testUsePluginMetricsAfterClose() throws Exception {
129129
Plugin<SomeMonitorablePlugin> plugin = Plugin.wrapInstance(new SomeMonitorablePlugin(), METRICS, CONFIG);
130130
PluginMetrics pluginMetrics = plugin.get().pluginMetrics;
131131
plugin.close();
132-
assertThrows(IllegalStateException.class, () -> pluginMetrics.metricName("", "", Collections.emptyMap()));
132+
assertThrows(IllegalStateException.class, () -> pluginMetrics.metricName("", "", new LinkedHashMap<>()));
133133
assertThrows(IllegalStateException.class, () -> pluginMetrics.addMetric(null, null));
134134
assertThrows(IllegalStateException.class, () -> pluginMetrics.removeMetric(null));
135135
assertThrows(IllegalStateException.class, () -> pluginMetrics.addSensor(""));

clients/src/test/java/org/apache/kafka/common/metrics/internals/PluginMetricsImplTest.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.junit.jupiter.api.Test;
2828

2929
import java.io.IOException;
30-
import java.util.Collections;
3130
import java.util.LinkedHashMap;
3231
import java.util.Map;
3332

@@ -36,11 +35,15 @@
3635

3736
public class PluginMetricsImplTest {
3837

39-
private final Map<String, String> extraTags = Collections.singletonMap("my-tag", "my-value");
38+
private static final LinkedHashMap<String, String> EXTRA_TAGS = new LinkedHashMap<>();
4039
private Map<String, String> tags;
4140
private Metrics metrics;
4241
private int initialMetrics;
4342

43+
static {
44+
EXTRA_TAGS.put("my-tag", "my-value");
45+
}
46+
4447
@BeforeEach
4548
void setup() {
4649
metrics = new Metrics();
@@ -53,26 +56,28 @@ void setup() {
5356
@Test
5457
void testMetricName() {
5558
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
56-
MetricName metricName = pmi.metricName("name", "description", extraTags);
59+
MetricName metricName = pmi.metricName("name", "description", EXTRA_TAGS);
5760
assertEquals("name", metricName.name());
5861
assertEquals("plugins", metricName.group());
5962
assertEquals("description", metricName.description());
6063
Map<String, String> expectedTags = new LinkedHashMap<>(tags);
61-
expectedTags.putAll(extraTags);
64+
expectedTags.putAll(EXTRA_TAGS);
6265
assertEquals(expectedTags, metricName.tags());
6366
}
6467

6568
@Test
6669
void testDuplicateTagName() {
6770
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
71+
LinkedHashMap<String, String> tags = new LinkedHashMap<>();
72+
tags.put("k1", "value");
6873
assertThrows(IllegalArgumentException.class,
69-
() -> pmi.metricName("name", "description", Collections.singletonMap("k1", "value")));
74+
() -> pmi.metricName("name", "description", tags));
7075
}
7176

7277
@Test
7378
void testAddRemoveMetrics() {
7479
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
75-
MetricName metricName = pmi.metricName("name", "description", extraTags);
80+
MetricName metricName = pmi.metricName("name", "description", EXTRA_TAGS);
7681
pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0);
7782
assertEquals(initialMetrics + 1, metrics.metrics().size());
7883

@@ -88,7 +93,7 @@ void testAddRemoveMetrics() {
8893
void testAddRemoveSensor() {
8994
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
9095
String sensorName = "my-sensor";
91-
MetricName metricName = pmi.metricName("name", "description", extraTags);
96+
MetricName metricName = pmi.metricName("name", "description", EXTRA_TAGS);
9297
Sensor sensor = pmi.addSensor(sensorName);
9398
assertEquals(initialMetrics, metrics.metrics().size());
9499
sensor.add(metricName, new Rate());
@@ -107,10 +112,10 @@ void testAddRemoveSensor() {
107112
void testClose() throws IOException {
108113
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
109114
String sensorName = "my-sensor";
110-
MetricName metricName1 = pmi.metricName("name1", "description", extraTags);
115+
MetricName metricName1 = pmi.metricName("name1", "description", EXTRA_TAGS);
111116
Sensor sensor = pmi.addSensor(sensorName);
112117
sensor.add(metricName1, new Rate());
113-
MetricName metricName2 = pmi.metricName("name2", "description", extraTags);
118+
MetricName metricName2 = pmi.metricName("name2", "description", EXTRA_TAGS);
114119
pmi.addMetric(metricName2, (Measurable) (config, now) -> 1.0);
115120

116121
assertEquals(initialMetrics + 2, metrics.metrics().size());

connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.connect.sink.SinkRecord;
2525

2626
import java.util.Collection;
27+
import java.util.LinkedHashMap;
2728
import java.util.Map;
2829

2930
public class MonitorableSinkConnector extends TestableSinkConnector {
@@ -35,7 +36,7 @@ public class MonitorableSinkConnector extends TestableSinkConnector {
3536
public void start(Map<String, String> props) {
3637
super.start(props);
3738
PluginMetrics pluginMetrics = context.pluginMetrics();
38-
metricsName = pluginMetrics.metricName("start", "description", Map.of());
39+
metricsName = pluginMetrics.metricName("start", "description", new LinkedHashMap<>());
3940
pluginMetrics.addMetric(metricsName, (Gauge<Object>) (config, now) -> VALUE);
4041
}
4142

@@ -53,7 +54,7 @@ public static class MonitorableSinkTask extends TestableSinkTask {
5354
public void start(Map<String, String> props) {
5455
super.start(props);
5556
PluginMetrics pluginMetrics = context.pluginMetrics();
56-
metricsName = pluginMetrics.metricName("put", "description", Map.of());
57+
metricsName = pluginMetrics.metricName("put", "description", new LinkedHashMap<>());
5758
pluginMetrics.addMetric(metricsName, (Measurable) (config, now) -> count);
5859
}
5960

connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.connect.connector.Task;
2424
import org.apache.kafka.connect.source.SourceRecord;
2525

26+
import java.util.LinkedHashMap;
2627
import java.util.List;
2728
import java.util.Map;
2829

@@ -35,7 +36,7 @@ public class MonitorableSourceConnector extends TestableSourceConnector {
3536
public void start(Map<String, String> props) {
3637
super.start(props);
3738
PluginMetrics pluginMetrics = context.pluginMetrics();
38-
metricsName = pluginMetrics.metricName("start", "description", Map.of());
39+
metricsName = pluginMetrics.metricName("start", "description", new LinkedHashMap<>());
3940
pluginMetrics.addMetric(metricsName, (Gauge<Object>) (config, now) -> VALUE);
4041
}
4142

@@ -53,7 +54,7 @@ public static class MonitorableSourceTask extends TestableSourceTask {
5354
public void start(Map<String, String> props) {
5455
super.start(props);
5556
PluginMetrics pluginMetrics = context.pluginMetrics();
56-
metricsName = pluginMetrics.metricName("poll", "description", Map.of());
57+
metricsName = pluginMetrics.metricName("poll", "description", new LinkedHashMap<>());
5758
pluginMetrics.addMetric(metricsName, (Measurable) (config, now) -> count);
5859
}
5960

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.Collections;
7979
import java.util.HashMap;
8080
import java.util.HashSet;
81+
import java.util.LinkedHashMap;
8182
import java.util.List;
8283
import java.util.Map;
8384
import java.util.Optional;
@@ -799,7 +800,7 @@ protected boolean isAllowed(ConfigValue configValue) {
799800

800801
@Override
801802
public void withPluginMetrics(PluginMetrics metrics) {
802-
metricName = metrics.metricName("name", "description", Map.of());
803+
metricName = metrics.metricName("name", "description", new LinkedHashMap<>());
803804
metrics.addMetric(metricName, (Measurable) (config, now) -> count);
804805
}
805806
}

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,13 @@ public class ConnectMetricsTest {
6464
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter",
6565
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
6666
private static final ConnectorTaskId CONNECTOR_TASK_ID = new ConnectorTaskId("connector", 0);
67-
private static final Map<String, String> TAGS = Map.of("t1", "v1");
68-
67+
private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();
6968
private ConnectMetrics metrics;
70-
69+
70+
static {
71+
TAGS.put("t1", "v1");
72+
}
73+
7174
@BeforeEach
7275
public void setUp() {
7376
metrics = new ConnectMetrics("worker1", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime(), "cluster-1");

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.util.Collection;
6565
import java.util.Collections;
6666
import java.util.HashMap;
67+
import java.util.LinkedHashMap;
6768
import java.util.List;
6869
import java.util.Map;
6970

@@ -402,7 +403,7 @@ public void register(ConnectRestExtensionContext restPluginContext) {
402403

403404
@Override
404405
public void withPluginMetrics(PluginMetrics metrics) {
405-
metricName = metrics.metricName("name", "description", Map.of());
406+
metricName = metrics.metricName("name", "description", new LinkedHashMap<>());
406407
metrics.addMetric(metricName, (Gauge<Boolean>) (config, now) -> called);
407408
}
408409
}

metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.ArrayList;
4040
import java.util.HashMap;
4141
import java.util.HashSet;
42+
import java.util.LinkedHashMap;
4243
import java.util.List;
4344
import java.util.Map;
4445
import java.util.Set;
@@ -230,21 +231,21 @@ private class AuthorizerMetrics {
230231
private AuthorizerMetrics(PluginMetrics metrics) {
231232
authorizationAllowedSensor = metrics.addSensor("authorizer-authorization-allowed");
232233
authorizationAllowedSensor.add(
233-
metrics.metricName("authorization-allowed-rate-per-minute", "The number of authorization allowed per minute", Map.of()),
234+
metrics.metricName("authorization-allowed-rate-per-minute", "The number of authorization allowed per minute", new LinkedHashMap<>()),
234235
new Rate(TimeUnit.MINUTES, new WindowedCount()));
235236

236237
authorizationDeniedSensor = metrics.addSensor("authorizer-authorization-denied");
237238
authorizationDeniedSensor.add(
238-
metrics.metricName("authorization-denied-rate-per-minute", "The number of authorization denied per minute", Map.of()),
239+
metrics.metricName("authorization-denied-rate-per-minute", "The number of authorization denied per minute", new LinkedHashMap<>()),
239240
new Rate(TimeUnit.MINUTES, new WindowedCount()));
240241

241242
authorizationRequestSensor = metrics.addSensor("authorizer-authorization-request");
242243
authorizationRequestSensor.add(
243-
metrics.metricName("authorization-request-rate-per-minute", "The number of authorization request per minute", Map.of()),
244+
metrics.metricName("authorization-request-rate-per-minute", "The number of authorization request per minute", new LinkedHashMap<>()),
244245
new Rate(TimeUnit.MINUTES, new WindowedCount()));
245246

246247
metrics.addMetric(
247-
metrics.metricName("acls-total-count", "The number of acls defined", Map.of()),
248+
metrics.metricName("acls-total-count", "The number of acls defined", new LinkedHashMap<>()),
248249
(Gauge<Integer>) (config, now) -> aclCount());
249250
}
250251

0 commit comments

Comments
 (0)