From 36cb999ab75459a293f1cf1d47aefc6795ce6758 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 21 Apr 2025 13:27:24 +0800 Subject: [PATCH 01/14] KAFKA-17747: Add compute topic and group hash Signed-off-by: PoAn Yang --- build.gradle | 1 + .../import-control-group-coordinator.xml | 1 + gradle/dependencies.gradle | 2 + .../apache/kafka/coordinator/group/Group.java | 56 ++++++ .../kafka/coordinator/group/GroupTest.java | 189 ++++++++++++++++++ 5 files changed, 249 insertions(+) create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java diff --git a/build.gradle b/build.gradle index 2e35057165c53..61b68edfe67de 100644 --- a/build.gradle +++ b/build.gradle @@ -1420,6 +1420,7 @@ project(':group-coordinator') { implementation libs.hdrHistogram implementation libs.re2j implementation libs.slf4jApi + implementation libs.guava testImplementation project(':clients').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 8b6a8d99f5eaa..341ac8984ab93 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -77,6 +77,7 @@ + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index f6f7bd68e8363..a4e94e44080a5 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -61,6 +61,7 @@ versions += [ classgraph: "4.8.173", gradle: "8.10.2", grgit: "4.1.1", + guava: "33.4.0-jre", httpclient: "4.5.14", jackson: "2.16.2", jacoco: "0.8.10", @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson", diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 54d7e98d4b7be..34b17ff2533f4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -19,11 +19,21 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.metadata.BrokerRegistration; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + + /** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ + static long computeGroupHash(Map topicHashes) { + return Hashing.combineOrdered( + topicHashes.entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> HashCode.fromLong(e.getValue())) + .toList() + ).asLong(); + } + + /** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ + static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(topicImage.id().hashCode()) // topic Id + .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name + .putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { + topicHasher.putInt(entry.getKey()); // partition id + String racks = Arrays.stream(entry.getValue().replicas) + .mapToObj(clusterImage::broker) + .filter(Objects::nonNull) + .map(BrokerRegistration::rack) + .filter(Optional::isPresent) + .map(Optional::get) + .sorted() + .collect(Collectors.joining(";")); + topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" + }); + return topicHasher.hash().asLong(); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java new file mode 100644 index 0000000000000..679bcfdf3e1e4 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.MetadataImage; + +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class GroupTest { + private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); + private static final String FOO_TOPIC_NAME = "foo"; + private static final String BAR_TOPIC_NAME = "bar"; + private static final int FOO_NUM_PARTITIONS = 2; + private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build(); + + @Test + void testComputeTopicHash() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + .putInt(0) // partition 0 + .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 + assertEquals(topicHasher.hash().asLong(), result); + } + + @Test + void testComputeTopicHashWithDifferentMagicByte() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 1) // different magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + .putInt(0) // partition 0 + .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 + assertNotEquals(topicHasher.hash().asLong(), result); + } + + @Test + void testComputeTopicHashWithDifferentPartitionOrder() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + // different partition order + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1 + .putInt(0) // partition 0 + .putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0 + assertNotEquals(topicHasher.hash().asLong(), result); + } + + @Test + void testComputeTopicHashWithDifferentRackOrder() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + .putInt(0) // partition 0 + .putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0 + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 + assertNotEquals(topicHasher.hash().asLong(), result); + } + + @ParameterizedTest + @MethodSource("differentFieldGenerator") + void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + assertNotEquals( + Group.computeTopicHash( + differentImage.topics().getTopic(topicId), + differentImage.cluster() + ), + result + ); + } + + private static Stream differentFieldGenerator() { + Uuid differentTopicId = Uuid.randomUuid(); + return Stream.of( + Arguments.of(new MetadataImageBuilder() // different topic id + .addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build(), + differentTopicId + ), + Arguments.of(new MetadataImageBuilder() // different topic name + .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS) + .addRacks() + .build(), + FOO_TOPIC_ID + ), + Arguments.of(new MetadataImageBuilder() // different partitions + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1) + .addRacks() + .build(), + FOO_TOPIC_ID + ), + Arguments.of(new MetadataImageBuilder() // different racks + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .build(), + FOO_TOPIC_ID + ) + ); + } + + @Test + void testComputeGroupHash() { + long result = Group.computeGroupHash(Map.of( + BAR_TOPIC_NAME, 123L, + FOO_TOPIC_NAME, 456L + )); + + long expected = Hashing.combineOrdered(List.of( + HashCode.fromLong(123L), + HashCode.fromLong(456L) + )).asLong(); + assertEquals(expected, result); + } + + @Test + void testComputeGroupHashWithDifferentOrder() { + long result = Group.computeGroupHash(Map.of( + BAR_TOPIC_NAME, 123L, + FOO_TOPIC_NAME, 456L + )); + + long unexpected = Hashing.combineOrdered(List.of( + HashCode.fromLong(456L), + HashCode.fromLong(123L) + )).asLong(); + assertNotEquals(unexpected, result); + } +} From f4c97491fe93fe5e4a23260f08b7d333f3e8d776 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 21 Apr 2025 17:08:51 +0800 Subject: [PATCH 02/14] KAFKA-17747: add guava license Signed-off-by: PoAn Yang --- LICENSE-binary | 8 +++++++- committer-tools/verify_license.py | 2 +- licenses/checker-qual-MIT | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 licenses/checker-qual-MIT diff --git a/LICENSE-binary b/LICENSE-binary index b5ccf97fe003c..3c53426dc6e8f 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -212,7 +212,10 @@ License Version 2.0: - commons-lang3-3.12.0 - commons-logging-1.3.2 - commons-validator-1.9.0 -- error_prone_annotations-2.21.1 +- error_prone_annotations-2.36.0 +- failureaccess-1.0.2 +- guava-33.4.0-jre +- j2objc-annotations-3.0.0 - jackson-annotations-2.16.2 - jackson-core-2.16.2 - jackson-databind-2.16.2 @@ -237,6 +240,7 @@ License Version 2.0: - jetty-session-12.0.15 - jetty-util-12.0.15 - jose4j-0.9.4 +- listenablefuture-9999.0-empty-to-avoid-conflict-with-guava - log4j-api-2.24.3 - log4j-core-2.24.3 - log4j-slf4j-impl-2.24.3 @@ -298,6 +302,7 @@ see: licenses/CDDL+GPL-1.1 MIT License - argparse4j-0.7.0, see: licenses/argparse-MIT +- checker-qual-3.43.0, see: licenses/checker-qual-MIT - classgraph-4.8.173, see: licenses/classgraph-MIT - jopt-simple-5.0.4, see: licenses/jopt-simple-MIT - slf4j-api-1.7.36, see: licenses/slf4j-MIT @@ -313,6 +318,7 @@ BSD 2-Clause BSD 3-Clause - jline-3.25.1, see: licenses/jline-BSD-3-clause +- jsr305-3.0.2, see: licenses/jsr305-BSD-3-clause - protobuf-java-3.25.5, see: licenses/protobuf-java-BSD-3-clause - jakarta.activation-2.0.1, see: licenses/jakarta-BSD-3-clause diff --git a/committer-tools/verify_license.py b/committer-tools/verify_license.py index c8489008cae67..61bdc07b473dd 100644 --- a/committer-tools/verify_license.py +++ b/committer-tools/verify_license.py @@ -31,7 +31,7 @@ # DependencyName-x.y, DependencyName-x.y.z, or DependencyName-x.y.z.w # Optionally, a trailing suffix (e.g., "-alpha") is captured. LICENSE_DEP_PATTERN = re.compile( - r'^\s*-\s*([A-Za-z0-9_.+-]+-[0-9]+\.[0-9]+(?:\.[0-9]+){0,2}(?:[-.][A-Za-z0-9]+)?)', + r'^\s*-\s*([A-Za-z0-9_.+-]+-[0-9]+\.[0-9]+(?:\.[0-9]+){0,2}(?:[-.][A-Za-z0-9]+)*)', re.MULTILINE ) diff --git a/licenses/checker-qual-MIT b/licenses/checker-qual-MIT new file mode 100644 index 0000000000000..9837c6b69fdab --- /dev/null +++ b/licenses/checker-qual-MIT @@ -0,0 +1,22 @@ +Checker Framework qualifiers +Copyright 2004-present by the Checker Framework developers + +MIT License: + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. From 77b99da961092b35db9ebd7deb92b9cfa9e41888 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 28 Apr 2025 15:05:48 +0800 Subject: [PATCH 03/14] Revert "KAFKA-17747: add guava license" This reverts commit f4c97491fe93fe5e4a23260f08b7d333f3e8d776. --- LICENSE-binary | 8 +------- committer-tools/verify_license.py | 2 +- licenses/checker-qual-MIT | 22 ---------------------- 3 files changed, 2 insertions(+), 30 deletions(-) delete mode 100644 licenses/checker-qual-MIT diff --git a/LICENSE-binary b/LICENSE-binary index 26fc0f1b0f1e9..6175d3ed7d479 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -212,10 +212,7 @@ License Version 2.0: - commons-lang3-3.12.0 - commons-logging-1.3.2 - commons-validator-1.9.0 -- error_prone_annotations-2.36.0 -- failureaccess-1.0.2 -- guava-33.4.0-jre -- j2objc-annotations-3.0.0 +- error_prone_annotations-2.21.1 - jackson-annotations-2.16.2 - jackson-core-2.16.2 - jackson-databind-2.16.2 @@ -240,7 +237,6 @@ License Version 2.0: - jetty-session-12.0.15 - jetty-util-12.0.15 - jose4j-0.9.4 -- listenablefuture-9999.0-empty-to-avoid-conflict-with-guava - log4j-api-2.24.3 - log4j-core-2.24.3 - log4j-slf4j-impl-2.24.3 @@ -302,7 +298,6 @@ see: licenses/CDDL+GPL-1.1 MIT License - argparse4j-0.7.0, see: licenses/argparse-MIT -- checker-qual-3.43.0, see: licenses/checker-qual-MIT - classgraph-4.8.173, see: licenses/classgraph-MIT - jopt-simple-5.0.4, see: licenses/jopt-simple-MIT - slf4j-api-1.7.36, see: licenses/slf4j-MIT @@ -318,7 +313,6 @@ BSD 2-Clause BSD 3-Clause - jline-3.25.1, see: licenses/jline-BSD-3-clause -- jsr305-3.0.2, see: licenses/jsr305-BSD-3-clause - protobuf-java-3.25.5, see: licenses/protobuf-java-BSD-3-clause - jakarta.activation-2.0.1, see: licenses/jakarta-BSD-3-clause diff --git a/committer-tools/verify_license.py b/committer-tools/verify_license.py index 61bdc07b473dd..c8489008cae67 100644 --- a/committer-tools/verify_license.py +++ b/committer-tools/verify_license.py @@ -31,7 +31,7 @@ # DependencyName-x.y, DependencyName-x.y.z, or DependencyName-x.y.z.w # Optionally, a trailing suffix (e.g., "-alpha") is captured. LICENSE_DEP_PATTERN = re.compile( - r'^\s*-\s*([A-Za-z0-9_.+-]+-[0-9]+\.[0-9]+(?:\.[0-9]+){0,2}(?:[-.][A-Za-z0-9]+)*)', + r'^\s*-\s*([A-Za-z0-9_.+-]+-[0-9]+\.[0-9]+(?:\.[0-9]+){0,2}(?:[-.][A-Za-z0-9]+)?)', re.MULTILINE ) diff --git a/licenses/checker-qual-MIT b/licenses/checker-qual-MIT deleted file mode 100644 index 9837c6b69fdab..0000000000000 --- a/licenses/checker-qual-MIT +++ /dev/null @@ -1,22 +0,0 @@ -Checker Framework qualifiers -Copyright 2004-present by the Checker Framework developers - -MIT License: - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. From aff55ce51ed0e5bbe3ddf52595ec8d02f35fbdbc Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 28 Apr 2025 19:03:25 +0800 Subject: [PATCH 04/14] KAFKA-17747: Move Murmur3 to org.apache.kafka.common.internals Signed-off-by: PoAn Yang --- .../main/java/org/apache/kafka/common}/internals/Murmur3.java | 2 +- .../java/org/apache/kafka/common}/internals/Murmur3Test.java | 2 +- gradle/spotbugs-exclude.xml | 4 ++-- streams/integration-tests/src/test/.bsp/sbt.json | 1 + .../foreignkeyjoin/ResponseJoinProcessorSupplier.java | 2 +- .../foreignkeyjoin/SubscriptionSendProcessorSupplier.java | 2 +- .../foreignkeyjoin/ResponseJoinProcessorSupplierTest.java | 2 +- .../foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java | 2 +- .../foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java | 3 +-- .../foreignkeyjoin/SubscriptionWrapperSerdeTest.java | 2 +- 10 files changed, 11 insertions(+), 11 deletions(-) rename {streams/src/main/java/org/apache/kafka/streams/state => clients/src/main/java/org/apache/kafka/common}/internals/Murmur3.java (99%) rename {streams/src/test/java/org/apache/kafka/streams/state => clients/src/test/java/org/apache/kafka/common}/internals/Murmur3Test.java (98%) create mode 100644 streams/integration-tests/src/test/.bsp/sbt.json diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java b/clients/src/main/java/org/apache/kafka/common/internals/Murmur3.java similarity index 99% rename from streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java rename to clients/src/main/java/org/apache/kafka/common/internals/Murmur3.java index 5581a03648312..74ced520e739f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Murmur3.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.common.internals; /** * This class was taken from Hive org.apache.hive.common.util; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java b/clients/src/test/java/org/apache/kafka/common/internals/Murmur3Test.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java rename to clients/src/test/java/org/apache/kafka/common/internals/Murmur3Test.java index b583148ab47fb..b20f3155f3cff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java +++ b/clients/src/test/java/org/apache/kafka/common/internals/Murmur3Test.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.common.internals; import org.junit.jupiter.api.Test; diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 080681e8db376..6c28b3225518d 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -321,7 +321,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + @@ -331,7 +331,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + diff --git a/streams/integration-tests/src/test/.bsp/sbt.json b/streams/integration-tests/src/test/.bsp/sbt.json new file mode 100644 index 0000000000000..0f357de335da0 --- /dev/null +++ b/streams/integration-tests/src/test/.bsp/sbt.json @@ -0,0 +1 @@ +{"name":"sbt","version":"1.10.7","bspVersion":"2.1.0-M1","languages":["scala"],"argv":["/Users/poanyang/Library/Java/JavaVirtualMachines/corretto-21.0.5/Contents/Home/bin/java","-Xms100m","-Xmx100m","-classpath","/Users/poanyang/Library/Application Support/JetBrains/IdeaIC2025.1/plugins/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/Users/poanyang/Library/Application%20Support/JetBrains/IdeaIC2025.1/plugins/Scala/launcher/sbt-launch.jar"]} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java index a81700b6c0165..2c9f90c294835 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -30,7 +31,6 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.ValueAndTimestamp; -import org.apache.kafka.streams.state.internals.Murmur3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java index b161ce092c4d9..bfa9529f383d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; @@ -28,7 +29,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; -import org.apache.kafka.streams.state.internals.Murmur3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java index d5052247d2ece..915a98b81aaf2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.internals.KTableValueGetter; @@ -28,7 +29,6 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.ValueAndTimestamp; -import org.apache.kafka.streams.state.internals.Murmur3; import org.apache.kafka.test.MockInternalProcessorContext; import org.junit.jupiter.api.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java index 276600fd106e2..ff9be3b338db3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java @@ -17,11 +17,11 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.state.internals.Murmur3; import org.junit.jupiter.api.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java index d49df2f5cfdb7..f1dd6d05b040e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java @@ -16,14 +16,13 @@ */ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; - +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.internals.Murmur3; import org.apache.kafka.test.MockInternalProcessorContext; import org.junit.jupiter.api.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java index db2842401b55d..4833c402cce2e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java @@ -17,9 +17,9 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.internals.Murmur3; import org.junit.jupiter.api.Test; From 7f7649aa1b3700085c79abd0492ca47befc03cc0 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 28 Apr 2025 19:04:43 +0800 Subject: [PATCH 05/14] KAFKA-17747: Replace guava with Murmur3 Signed-off-by: PoAn Yang --- build.gradle | 1 - gradle/dependencies.gradle | 2 - .../apache/kafka/coordinator/group/Group.java | 106 ++++++++---- .../kafka/coordinator/group/GroupTest.java | 154 +++++++++--------- 4 files changed, 151 insertions(+), 112 deletions(-) diff --git a/build.gradle b/build.gradle index 7a2ec95f8eb5d..57e73bc13ccee 100644 --- a/build.gradle +++ b/build.gradle @@ -1418,7 +1418,6 @@ project(':group-coordinator') { implementation libs.hdrHistogram implementation libs.re2j implementation libs.slf4jApi - implementation libs.guava testImplementation project(':clients').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 7704c2ee9c637..fba1023fe485f 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -61,7 +61,6 @@ versions += [ classgraph: "4.8.173", gradle: "8.10.2", grgit: "4.1.1", - guava: "33.4.0-jre", httpclient: "4.5.14", jackson: "2.16.2", jacoco: "0.8.10", @@ -148,7 +147,6 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", - guava: "com.google.guava:guava:$versions.guava", jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson", diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 34b17ff2533f4..5e9a83bffacc2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -17,18 +17,16 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.metadata.BrokerRegistration; -import com.google.common.hash.HashCode; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; - -import java.nio.charset.StandardCharsets; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Locale; @@ -37,7 +35,9 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.function.LongFunction; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Interface common for all groups. @@ -220,6 +220,11 @@ default boolean shouldExpire() { return true; } + /** + * The magic byte used to identify the version of topic hash function. + */ + byte TOPIC_HASH_MAGIC_BYTE = 0x00; + /** * Computes the hash of the topics in a group. * @@ -227,13 +232,41 @@ default boolean shouldExpire() { * @return The hash of the group. */ static long computeGroupHash(Map topicHashes) { - return Hashing.combineOrdered( - topicHashes.entrySet() - .stream() - .sorted(Map.Entry.comparingByKey()) - .map(e -> HashCode.fromLong(e.getValue())) - .toList() - ).asLong(); + // Convert long to byte array. This is taken from guava LongHashCode#asBytes. + // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 + LongFunction longToBytes = (long value) -> new byte[] { + (byte) value, + (byte) (value >> 8), + (byte) (value >> 16), + (byte) (value >> 24), + (byte) (value >> 32), + (byte) (value >> 40), + (byte) (value >> 48), + (byte) (value >> 56) + }; + + // Combine the sorted topic hashes. + byte[] resultBytes = new byte[8]; + topicHashes.entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) // sort by topic name + .map(Map.Entry::getValue) + .map(longToBytes::apply) + .forEach(nextBytes -> { + // Combine ordered hashes. This is taken from guava Hashing#combineOrdered. + // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 + for (int i = 0; i < nextBytes.length; i++) { + resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); + } + }); + + // Convert the byte array to long. This is taken from guava BytesHashCode#asLong. + // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295 + long retVal = (resultBytes[0] & 0xFF); + for (int i = 1; i < resultBytes.length; i++) { + retVal |= (resultBytes[i] & 0xFFL) << (i * 8); + } + return retVal; } /** @@ -243,26 +276,31 @@ static long computeGroupHash(Map topicHashes) { * @param clusterImage The cluster image. * @return The hash of the topic. */ - static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { - HashFunction hf = Hashing.murmur3_128(); - Hasher topicHasher = hf.newHasher() - .putByte((byte) 0) // magic byte - .putLong(topicImage.id().hashCode()) // topic Id - .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name - .putInt(topicImage.partitions().size()); // number of partitions - - topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { - topicHasher.putInt(entry.getKey()); // partition id - String racks = Arrays.stream(entry.getValue().replicas) - .mapToObj(clusterImage::broker) - .filter(Objects::nonNull) - .map(BrokerRegistration::rack) - .filter(Optional::isPresent) - .map(Optional::get) - .sorted() - .collect(Collectors.joining(";")); - topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" - }); - return topicHasher.hash().asLong(); + static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte + dos.writeLong(topicImage.id().hashCode()); // topic ID + dos.writeUTF(topicImage.name()); // topic name + dos.writeInt(topicImage.partitions().size()); // number of partitions + for (int i = 0; i < topicImage.partitions().size(); i++) { + dos.writeInt(i); // partition id + List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) + .mapToObj(clusterImage::broker) + .filter(Objects::nonNull) + .map(BrokerRegistration::rack) + .filter(Optional::isPresent) + .map(Optional::get) + .sorted() + .toList(); + + String racks = IntStream.range(0, sortedRacksList.size()) + .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // Format: "index:value" + .collect(Collectors.joining(",")); // Separator between "index:value" pairs + dos.writeUTF(racks); // sorted racks + } + dos.flush(); + return Murmur3.hash64(baos.toByteArray()); + } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java index 679bcfdf3e1e4..2a60952f7d632 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java @@ -17,20 +17,18 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.image.MetadataImage; -import com.google.common.hash.HashCode; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; - import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.nio.charset.StandardCharsets; -import java.util.List; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.LinkedHashMap; import java.util.Map; import java.util.stream.Stream; @@ -48,77 +46,85 @@ public class GroupTest { .build(); @Test - void testComputeTopicHash() { + void testComputeTopicHash() throws IOException { long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - HashFunction hf = Hashing.murmur3_128(); - Hasher topicHasher = hf.newHasher() - .putByte((byte) 0) // magic byte - .putLong(FOO_TOPIC_ID.hashCode()) // topic Id - .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name - .putInt(FOO_NUM_PARTITIONS) // number of partitions - .putInt(0) // partition 0 - .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 - .putInt(1) // partition 1 - .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 - assertEquals(topicHasher.hash().asLong(), result); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeByte(0); // magic byte + dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID + dos.writeUTF(FOO_TOPIC_NAME); // topic name + dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions + dos.writeInt(0); // partition 0 + dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 + dos.writeInt(1); // partition 1 + dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.flush(); + assertEquals(Murmur3.hash64(baos.toByteArray()), result); + } } @Test - void testComputeTopicHashWithDifferentMagicByte() { + void testComputeTopicHashWithDifferentMagicByte() throws IOException { long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - HashFunction hf = Hashing.murmur3_128(); - Hasher topicHasher = hf.newHasher() - .putByte((byte) 1) // different magic byte - .putLong(FOO_TOPIC_ID.hashCode()) // topic Id - .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name - .putInt(FOO_NUM_PARTITIONS) // number of partitions - .putInt(0) // partition 0 - .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 - .putInt(1) // partition 1 - .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 - assertNotEquals(topicHasher.hash().asLong(), result); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeByte(1); // different magic byte + dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID + dos.writeUTF(FOO_TOPIC_NAME); // topic name + dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions + dos.writeInt(0); // partition 0 + dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 + dos.writeInt(1); // partition 1 + dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.flush(); + assertNotEquals(Murmur3.hash64(baos.toByteArray()), result); + } } @Test - void testComputeTopicHashWithDifferentPartitionOrder() { + void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - HashFunction hf = Hashing.murmur3_128(); - Hasher topicHasher = hf.newHasher() - .putByte((byte) 0) // magic byte - .putLong(FOO_TOPIC_ID.hashCode()) // topic Id - .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name - .putInt(FOO_NUM_PARTITIONS) // number of partitions + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeByte(0); // magic byte + dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID + dos.writeUTF(FOO_TOPIC_NAME); // topic name + dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions // different partition order - .putInt(1) // partition 1 - .putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1 - .putInt(0) // partition 0 - .putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0 - assertNotEquals(topicHasher.hash().asLong(), result); + dos.writeInt(1); // partition 1 + dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.writeInt(0); // partition 0 + dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 + dos.flush(); + assertNotEquals(Murmur3.hash64(baos.toByteArray()), result); + } } @Test - void testComputeTopicHashWithDifferentRackOrder() { + void testComputeTopicHashWithDifferentRackOrder() throws IOException { long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - HashFunction hf = Hashing.murmur3_128(); - Hasher topicHasher = hf.newHasher() - .putByte((byte) 0) // magic byte - .putLong(FOO_TOPIC_ID.hashCode()) // topic Id - .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name - .putInt(FOO_NUM_PARTITIONS) // number of partitions - .putInt(0) // partition 0 - .putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0 - .putInt(1) // partition 1 - .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 - assertNotEquals(topicHasher.hash().asLong(), result); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeByte(0); // magic byte + dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID + dos.writeUTF(FOO_TOPIC_NAME); // topic name + dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions + dos.writeInt(0); // partition 0 + dos.writeUTF("0:rack1,1:rack0"); // different rack order of partition 0 + dos.writeInt(1); // partition 1 + dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.flush(); + assertNotEquals(Murmur3.hash64(baos.toByteArray()), result); + } } @ParameterizedTest @MethodSource("differentFieldGenerator") - void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) { + void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) throws IOException { long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); assertNotEquals( @@ -160,30 +166,28 @@ private static Stream differentFieldGenerator() { } @Test - void testComputeGroupHash() { - long result = Group.computeGroupHash(Map.of( - BAR_TOPIC_NAME, 123L, - FOO_TOPIC_NAME, 456L - )); - - long expected = Hashing.combineOrdered(List.of( - HashCode.fromLong(123L), - HashCode.fromLong(456L) - )).asLong(); - assertEquals(expected, result); + void testComputeGroupHashWithDifferentOrder() { + Map ascendTopicHashes = new LinkedHashMap<>(); + ascendTopicHashes.put(BAR_TOPIC_NAME, 123L); + ascendTopicHashes.put(FOO_TOPIC_NAME, 456L); + + Map descendTopicHashes = new LinkedHashMap<>(); + descendTopicHashes.put(FOO_TOPIC_NAME, 456L); + descendTopicHashes.put(BAR_TOPIC_NAME, 123L); + assertEquals(Group.computeGroupHash(ascendTopicHashes), Group.computeGroupHash(descendTopicHashes)); } @Test - void testComputeGroupHashWithDifferentOrder() { - long result = Group.computeGroupHash(Map.of( + void testComputeGroupHashWithSameKeyButDifferentValue() { + Map map1 = Map.of( BAR_TOPIC_NAME, 123L, FOO_TOPIC_NAME, 456L - )); + ); - long unexpected = Hashing.combineOrdered(List.of( - HashCode.fromLong(456L), - HashCode.fromLong(123L) - )).asLong(); - assertNotEquals(unexpected, result); + Map map2 = Map.of( + BAR_TOPIC_NAME, 456L, + FOO_TOPIC_NAME, 123L + ); + assertNotEquals(Group.computeGroupHash(map1), Group.computeGroupHash(map2)); } } From 8fc5b33258ccbd1621d01d1d22ee40a92b10bbc6 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 28 Apr 2025 19:05:48 +0800 Subject: [PATCH 06/14] remove unused package in import-control-group-coordinator.xml Signed-off-by: PoAn Yang --- checkstyle/import-control-group-coordinator.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 341ac8984ab93..8b6a8d99f5eaa 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -77,7 +77,6 @@ - From d2427f99a592fb9136b9802c962f4e0cc9de11b7 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 28 Apr 2025 19:18:44 +0800 Subject: [PATCH 07/14] remove sbt.json Signed-off-by: PoAn Yang --- streams/integration-tests/src/test/.bsp/sbt.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 streams/integration-tests/src/test/.bsp/sbt.json diff --git a/streams/integration-tests/src/test/.bsp/sbt.json b/streams/integration-tests/src/test/.bsp/sbt.json deleted file mode 100644 index 0f357de335da0..0000000000000 --- a/streams/integration-tests/src/test/.bsp/sbt.json +++ /dev/null @@ -1 +0,0 @@ -{"name":"sbt","version":"1.10.7","bspVersion":"2.1.0-M1","languages":["scala"],"argv":["/Users/poanyang/Library/Java/JavaVirtualMachines/corretto-21.0.5/Contents/Home/bin/java","-Xms100m","-Xmx100m","-classpath","/Users/poanyang/Library/Application Support/JetBrains/IdeaIC2025.1/plugins/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/Users/poanyang/Library/Application%20Support/JetBrains/IdeaIC2025.1/plugins/Scala/launcher/sbt-launch.jar"]} \ No newline at end of file From b6ccef3d408444c6e23a32febcce020e9cf050f3 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 29 Apr 2025 19:15:37 +0800 Subject: [PATCH 08/14] Revert "KAFKA-17747: Move Murmur3 to org.apache.kafka.common.internals" This reverts commit aff55ce51ed0e5bbe3ddf52595ec8d02f35fbdbc. --- gradle/spotbugs-exclude.xml | 4 ++-- .../foreignkeyjoin/ResponseJoinProcessorSupplier.java | 2 +- .../foreignkeyjoin/SubscriptionSendProcessorSupplier.java | 2 +- .../org/apache/kafka/streams/state}/internals/Murmur3.java | 2 +- .../foreignkeyjoin/ResponseJoinProcessorSupplierTest.java | 2 +- .../foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java | 2 +- .../foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java | 3 ++- .../foreignkeyjoin/SubscriptionWrapperSerdeTest.java | 2 +- .../apache/kafka/streams/state}/internals/Murmur3Test.java | 2 +- 9 files changed, 11 insertions(+), 10 deletions(-) rename {clients/src/main/java/org/apache/kafka/common => streams/src/main/java/org/apache/kafka/streams/state}/internals/Murmur3.java (99%) rename {clients/src/test/java/org/apache/kafka/common => streams/src/test/java/org/apache/kafka/streams/state}/internals/Murmur3Test.java (98%) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 6c28b3225518d..080681e8db376 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -321,7 +321,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + @@ -331,7 +331,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java index 2c9f90c294835..a81700b6c0165 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -31,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.Murmur3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java index bfa9529f383d9..b161ce092c4d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; -import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; @@ -29,6 +28,7 @@ import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.state.internals.Murmur3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Murmur3.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/common/internals/Murmur3.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java index 74ced520e739f..5581a03648312 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Murmur3.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.internals; +package org.apache.kafka.streams.state.internals; /** * This class was taken from Hive org.apache.hive.common.util; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java index 915a98b81aaf2..d5052247d2ece 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.internals.KTableValueGetter; @@ -29,6 +28,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.Murmur3; import org.apache.kafka.test.MockInternalProcessorContext; import org.junit.jupiter.api.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java index ff9be3b338db3..276600fd106e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java @@ -17,11 +17,11 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.state.internals.Murmur3; import org.junit.jupiter.api.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java index f1dd6d05b040e..d49df2f5cfdb7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java @@ -16,13 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; -import org.apache.kafka.common.internals.Murmur3; + import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.internals.Murmur3; import org.apache.kafka.test.MockInternalProcessorContext; import org.junit.jupiter.api.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java index 4833c402cce2e..db2842401b55d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java @@ -17,9 +17,9 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.internals.Murmur3; import org.junit.jupiter.api.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/internals/Murmur3Test.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java similarity index 98% rename from clients/src/test/java/org/apache/kafka/common/internals/Murmur3Test.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java index b20f3155f3cff..b583148ab47fb 100644 --- a/clients/src/test/java/org/apache/kafka/common/internals/Murmur3Test.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.internals; +package org.apache.kafka.streams.state.internals; import org.junit.jupiter.api.Test; From a1be1f6d6c722f998100b726f8f4733994c53788 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 29 Apr 2025 19:31:03 +0800 Subject: [PATCH 09/14] KAFKA-17747: Replace Murmur3 with XXHash64 Signed-off-by: PoAn Yang --- build.gradle | 1 + checkstyle/import-control-group-coordinator.xml | 1 + .../apache/kafka/coordinator/group/Group.java | 8 ++++++-- .../kafka/coordinator/group/GroupTest.java | 17 ++++++++++++----- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 57e73bc13ccee..bf586c167e154 100644 --- a/build.gradle +++ b/build.gradle @@ -1414,6 +1414,7 @@ project(':group-coordinator') { implementation project(':coordinator-common') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes + implementation libs.lz4 implementation libs.metrics implementation libs.hdrHistogram implementation libs.re2j diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 8b6a8d99f5eaa..dfa5a2d1595ae 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -51,6 +51,7 @@ + diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 5e9a83bffacc2..b98519cb3f1d3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -17,13 +17,15 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.metadata.BrokerRegistration; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -224,6 +226,7 @@ default boolean shouldExpire() { * The magic byte used to identify the version of topic hash function. */ byte TOPIC_HASH_MAGIC_BYTE = 0x00; + XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); /** * Computes the hash of the topics in a group. @@ -300,7 +303,8 @@ static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) t dos.writeUTF(racks); // sorted racks } dos.flush(); - return Murmur3.hash64(baos.toByteArray()); + byte[] topicBytes = baos.toByteArray(); + return LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java index 2a60952f7d632..d2f2fb5c6e3b8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.image.MetadataImage; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -44,6 +46,7 @@ public class GroupTest { .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) .addRacks() .build(); + private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); @Test void testComputeTopicHash() throws IOException { @@ -60,7 +63,8 @@ void testComputeTopicHash() throws IOException { dos.writeInt(1); // partition 1 dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 dos.flush(); - assertEquals(Murmur3.hash64(baos.toByteArray()), result); + byte[] topicBytes = baos.toByteArray(); + assertEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); } } @@ -79,7 +83,8 @@ void testComputeTopicHashWithDifferentMagicByte() throws IOException { dos.writeInt(1); // partition 1 dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 dos.flush(); - assertNotEquals(Murmur3.hash64(baos.toByteArray()), result); + byte[] topicBytes = baos.toByteArray(); + assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); } } @@ -99,7 +104,8 @@ void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { dos.writeInt(0); // partition 0 dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 dos.flush(); - assertNotEquals(Murmur3.hash64(baos.toByteArray()), result); + byte[] topicBytes = baos.toByteArray(); + assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); } } @@ -118,7 +124,8 @@ void testComputeTopicHashWithDifferentRackOrder() throws IOException { dos.writeInt(1); // partition 1 dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 dos.flush(); - assertNotEquals(Murmur3.hash64(baos.toByteArray()), result); + byte[] topicBytes = baos.toByteArray(); + assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); } } From 9b4294fab9457e685229d615425cecb73e4af12b Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 29 Apr 2025 21:38:38 +0800 Subject: [PATCH 10/14] KAFKA-17747: Move static function to Utils Signed-off-by: PoAn Yang --- .../apache/kafka/coordinator/group/Group.java | 98 ------------------- .../apache/kafka/coordinator/group/Utils.java | 92 +++++++++++++++++ .../group/{GroupTest.java => UtilsTest.java} | 40 ++++---- 3 files changed, 112 insertions(+), 118 deletions(-) rename group-coordinator/src/test/java/org/apache/kafka/coordinator/group/{GroupTest.java => UtilsTest.java} (87%) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index b98519cb3f1d3..54d7e98d4b7be 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -19,27 +19,15 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; -import org.apache.kafka.image.ClusterImage; -import org.apache.kafka.image.TopicImage; -import org.apache.kafka.metadata.BrokerRegistration; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; -import java.util.function.LongFunction; import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * Interface common for all groups. @@ -221,90 +209,4 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } - - /** - * The magic byte used to identify the version of topic hash function. - */ - byte TOPIC_HASH_MAGIC_BYTE = 0x00; - XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); - - /** - * Computes the hash of the topics in a group. - * - * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. - * @return The hash of the group. - */ - static long computeGroupHash(Map topicHashes) { - // Convert long to byte array. This is taken from guava LongHashCode#asBytes. - // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 - LongFunction longToBytes = (long value) -> new byte[] { - (byte) value, - (byte) (value >> 8), - (byte) (value >> 16), - (byte) (value >> 24), - (byte) (value >> 32), - (byte) (value >> 40), - (byte) (value >> 48), - (byte) (value >> 56) - }; - - // Combine the sorted topic hashes. - byte[] resultBytes = new byte[8]; - topicHashes.entrySet() - .stream() - .sorted(Map.Entry.comparingByKey()) // sort by topic name - .map(Map.Entry::getValue) - .map(longToBytes::apply) - .forEach(nextBytes -> { - // Combine ordered hashes. This is taken from guava Hashing#combineOrdered. - // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 - for (int i = 0; i < nextBytes.length; i++) { - resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); - } - }); - - // Convert the byte array to long. This is taken from guava BytesHashCode#asLong. - // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295 - long retVal = (resultBytes[0] & 0xFF); - for (int i = 1; i < resultBytes.length; i++) { - retVal |= (resultBytes[i] & 0xFFL) << (i * 8); - } - return retVal; - } - - /** - * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. - * - * @param topicImage The topic image. - * @param clusterImage The cluster image. - * @return The hash of the topic. - */ - static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos)) { - dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte - dos.writeLong(topicImage.id().hashCode()); // topic ID - dos.writeUTF(topicImage.name()); // topic name - dos.writeInt(topicImage.partitions().size()); // number of partitions - for (int i = 0; i < topicImage.partitions().size(); i++) { - dos.writeInt(i); // partition id - List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) - .mapToObj(clusterImage::broker) - .filter(Objects::nonNull) - .map(BrokerRegistration::rack) - .filter(Optional::isPresent) - .map(Optional::get) - .sorted() - .toList(); - - String racks = IntStream.range(0, sortedRacksList.size()) - .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // Format: "index:value" - .collect(Collectors.joining(",")); // Separator between "index:value" pairs - dos.writeUTF(racks); // sorted racks - } - dos.flush(); - byte[] topicBytes = baos.toByteArray(); - return LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0); - } - } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index 1736aab9d8869..03920df6ad5ec 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -25,14 +25,23 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; +import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.server.common.ApiMessageAndVersion; import com.google.re2j.Pattern; import com.google.re2j.PatternSyntaxException; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -40,11 +49,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; +import java.util.function.LongFunction; import java.util.stream.Collectors; +import java.util.stream.IntStream; public class Utils { private Utils() {} @@ -324,4 +336,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + + /** + * The magic byte used to identify the version of topic hash function. + */ + static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + + /** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ + static long computeGroupHash(Map topicHashes) { + // Convert long to byte array. This is taken from guava LongHashCode#asBytes. + // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 + LongFunction longToBytes = (long value) -> new byte[] { + (byte) value, + (byte) (value >> 8), + (byte) (value >> 16), + (byte) (value >> 24), + (byte) (value >> 32), + (byte) (value >> 40), + (byte) (value >> 48), + (byte) (value >> 56) + }; + + // Combine the sorted topic hashes. + byte[] resultBytes = new byte[8]; + topicHashes.entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) // sort by topic name + .map(Map.Entry::getValue) + .map(longToBytes::apply) + .forEach(nextBytes -> { + // Combine ordered hashes. This is taken from guava Hashing#combineOrdered. + // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 + for (int i = 0; i < nextBytes.length; i++) { + resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); + } + }); + + return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); + } + + /** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ + static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte + dos.writeLong(topicImage.id().hashCode()); // topic ID + dos.writeUTF(topicImage.name()); // topic name + dos.writeInt(topicImage.partitions().size()); // number of partitions + for (int i = 0; i < topicImage.partitions().size(); i++) { + dos.writeInt(i); // partition id + List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) + .mapToObj(clusterImage::broker) + .filter(Objects::nonNull) + .map(BrokerRegistration::rack) + .filter(Optional::isPresent) + .map(Optional::get) + .sorted() + .toList(); + + String racks = IntStream.range(0, sortedRacksList.size()) + .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // Format: "index:value" + .collect(Collectors.joining(",")); // Separator between "index:value" pairs + dos.writeUTF(racks); // sorted racks + } + dos.flush(); + byte[] topicBytes = baos.toByteArray(); + return LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0); + } + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java similarity index 87% rename from group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java rename to group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java index d2f2fb5c6e3b8..080df41da2f13 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java @@ -37,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; -public class GroupTest { +public class UtilsTest { private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); private static final String FOO_TOPIC_NAME = "foo"; private static final String BAR_TOPIC_NAME = "bar"; @@ -50,7 +50,7 @@ public class GroupTest { @Test void testComputeTopicHash() throws IOException { - long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { @@ -70,7 +70,7 @@ void testComputeTopicHash() throws IOException { @Test void testComputeTopicHashWithDifferentMagicByte() throws IOException { - long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { @@ -90,7 +90,7 @@ void testComputeTopicHashWithDifferentMagicByte() throws IOException { @Test void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { - long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { @@ -111,7 +111,7 @@ void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { @Test void testComputeTopicHashWithDifferentRackOrder() throws IOException { - long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { @@ -132,10 +132,10 @@ void testComputeTopicHashWithDifferentRackOrder() throws IOException { @ParameterizedTest @MethodSource("differentFieldGenerator") void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) throws IOException { - long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); assertNotEquals( - Group.computeTopicHash( + Utils.computeTopicHash( differentImage.topics().getTopic(topicId), differentImage.cluster() ), @@ -147,26 +147,26 @@ private static Stream differentFieldGenerator() { Uuid differentTopicId = Uuid.randomUuid(); return Stream.of( Arguments.of(new MetadataImageBuilder() // different topic id - .addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) - .addRacks() - .build(), + .addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build(), differentTopicId ), Arguments.of(new MetadataImageBuilder() // different topic name - .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS) - .addRacks() - .build(), + .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS) + .addRacks() + .build(), FOO_TOPIC_ID ), Arguments.of(new MetadataImageBuilder() // different partitions - .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1) - .addRacks() - .build(), + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1) + .addRacks() + .build(), FOO_TOPIC_ID ), Arguments.of(new MetadataImageBuilder() // different racks - .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) - .build(), + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .build(), FOO_TOPIC_ID ) ); @@ -181,7 +181,7 @@ void testComputeGroupHashWithDifferentOrder() { Map descendTopicHashes = new LinkedHashMap<>(); descendTopicHashes.put(FOO_TOPIC_NAME, 456L); descendTopicHashes.put(BAR_TOPIC_NAME, 123L); - assertEquals(Group.computeGroupHash(ascendTopicHashes), Group.computeGroupHash(descendTopicHashes)); + assertEquals(Utils.computeGroupHash(ascendTopicHashes), Utils.computeGroupHash(descendTopicHashes)); } @Test @@ -195,6 +195,6 @@ void testComputeGroupHashWithSameKeyButDifferentValue() { BAR_TOPIC_NAME, 456L, FOO_TOPIC_NAME, 123L ); - assertNotEquals(Group.computeGroupHash(map1), Group.computeGroupHash(map2)); + assertNotEquals(Utils.computeGroupHash(map1), Utils.computeGroupHash(map2)); } } From 9fc8e8eff1cfca5e9c193bf3e142c9384e45d3ab Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 30 Apr 2025 22:20:33 +0800 Subject: [PATCH 11/14] KAFKA-17747: Replace ByteArrayOutputStream with ByteBufferOutputStream Signed-off-by: PoAn Yang --- .../apache/kafka/coordinator/group/Utils.java | 11 +++--- .../kafka/coordinator/group/UtilsTest.java | 35 ++++++++++--------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index 03920df6ad5ec..3a38da2d95f91 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.message.ConsumerProtocolAssignment; import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; import org.apache.kafka.image.ClusterImage; @@ -37,9 +38,9 @@ import net.jpountz.xxhash.XXHash64; import net.jpountz.xxhash.XXHashFactory; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -389,8 +390,8 @@ static long computeGroupHash(Map topicHashes) { * @return The hash of the topic. */ static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos)) { + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte dos.writeLong(topicImage.id().hashCode()); // topic ID dos.writeUTF(topicImage.name()); // topic name @@ -412,8 +413,8 @@ static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) t dos.writeUTF(racks); // sorted racks } dos.flush(); - byte[] topicBytes = baos.toByteArray(); - return LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0); + ByteBuffer topicBytes = bbos.buffer().flip(); + return LZ4_HASH_INSTANCE.hash(topicBytes, 0); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java index 080df41da2f13..0a81a390ae3cc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.image.MetadataImage; import net.jpountz.xxhash.XXHash64; @@ -27,9 +28,9 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.LinkedHashMap; import java.util.Map; import java.util.stream.Stream; @@ -52,8 +53,8 @@ public class UtilsTest { void testComputeTopicHash() throws IOException { long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos)) { + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { dos.writeByte(0); // magic byte dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID dos.writeUTF(FOO_TOPIC_NAME); // topic name @@ -63,8 +64,8 @@ void testComputeTopicHash() throws IOException { dos.writeInt(1); // partition 1 dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 dos.flush(); - byte[] topicBytes = baos.toByteArray(); - assertEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); + ByteBuffer topicBytes = bbos.buffer().flip(); + assertEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); } } @@ -72,8 +73,8 @@ void testComputeTopicHash() throws IOException { void testComputeTopicHashWithDifferentMagicByte() throws IOException { long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos)) { + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { dos.writeByte(1); // different magic byte dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID dos.writeUTF(FOO_TOPIC_NAME); // topic name @@ -83,8 +84,8 @@ void testComputeTopicHashWithDifferentMagicByte() throws IOException { dos.writeInt(1); // partition 1 dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 dos.flush(); - byte[] topicBytes = baos.toByteArray(); - assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); + ByteBuffer topicBytes = bbos.buffer().flip(); + assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); } } @@ -92,8 +93,8 @@ void testComputeTopicHashWithDifferentMagicByte() throws IOException { void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos)) { + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { dos.writeByte(0); // magic byte dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID dos.writeUTF(FOO_TOPIC_NAME); // topic name @@ -104,8 +105,8 @@ void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { dos.writeInt(0); // partition 0 dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 dos.flush(); - byte[] topicBytes = baos.toByteArray(); - assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); + ByteBuffer topicBytes = bbos.buffer().flip(); + assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); } } @@ -113,8 +114,8 @@ void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { void testComputeTopicHashWithDifferentRackOrder() throws IOException { long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos)) { + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { dos.writeByte(0); // magic byte dos.writeLong(FOO_TOPIC_ID.hashCode()); // topic ID dos.writeUTF(FOO_TOPIC_NAME); // topic name @@ -124,8 +125,8 @@ void testComputeTopicHashWithDifferentRackOrder() throws IOException { dos.writeInt(1); // partition 1 dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 dos.flush(); - byte[] topicBytes = baos.toByteArray(); - assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0, topicBytes.length, 0), result); + ByteBuffer topicBytes = bbos.buffer().flip(); + assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); } } From 167cd4a4af97bdb8042e63b827cb1fd8f154ad29 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 30 Apr 2025 22:57:16 +0800 Subject: [PATCH 12/14] KAFKA-17747: Add comment Signed-off-by: PoAn Yang --- .../main/java/org/apache/kafka/coordinator/group/Utils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index 3a38da2d95f91..7c567f0c48381 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -408,6 +408,9 @@ static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) t .toList(); String racks = IntStream.range(0, sortedRacksList.size()) + // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. + // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". + // Add index before the rack string to avoid the edge case. .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // Format: "index:value" .collect(Collectors.joining(",")); // Separator between "index:value" pairs dos.writeUTF(racks); // sorted racks From 08cdd372f6fa38b9e584cda9c1113e22de38ecc7 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 1 May 2025 15:18:27 +0800 Subject: [PATCH 13/14] KAFKA-17747: Change index to length in rack string Signed-off-by: PoAn Yang --- .../apache/kafka/coordinator/group/Utils.java | 32 +++++++++++++++---- .../kafka/coordinator/group/UtilsTest.java | 16 +++++----- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index 7c567f0c48381..54067c88ba67f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -57,7 +57,6 @@ import java.util.Set; import java.util.function.LongFunction; import java.util.stream.Collectors; -import java.util.stream.IntStream; public class Utils { private Utils() {} @@ -346,6 +345,13 @@ static void throwIfRegularExpressionIsInvalid( /** * Computes the hash of the topics in a group. + *

+ * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + *

+ * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Convert each long hash value into a byte array. + * 3. Combine the sorted byte arrays to produce a final hash for the group. * * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. * @return The hash of the group. @@ -384,6 +390,19 @@ static long computeGroupHash(Map topicHashes) { /** * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + *

+ * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + *

+ * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the UTF-8 encoded topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + * - Rack identifiers are formatted as "length1:value1,length2:value2" to prevent issues with simple separators. * * @param topicImage The topic image. * @param clusterImage The cluster image. @@ -407,12 +426,11 @@ static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) t .sorted() .toList(); - String racks = IntStream.range(0, sortedRacksList.size()) - // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. - // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". - // Add index before the rack string to avoid the edge case. - .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // Format: "index:value" - .collect(Collectors.joining(",")); // Separator between "index:value" pairs + // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. + // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". + // Add length before the rack string to avoid the edge case. + String racks = sortedRacksList.stream().map(s -> s.length() + ":" + s) // Format: "length:value" + .collect(Collectors.joining(",")); // Separator between "length:value" pairs dos.writeUTF(racks); // sorted racks } dos.flush(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java index 0a81a390ae3cc..852979fc233c2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java @@ -60,9 +60,9 @@ void testComputeTopicHash() throws IOException { dos.writeUTF(FOO_TOPIC_NAME); // topic name dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions dos.writeInt(0); // partition 0 - dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 + dos.writeUTF("5:rack0,5:rack1"); // rack of partition 0 dos.writeInt(1); // partition 1 - dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); @@ -80,9 +80,9 @@ void testComputeTopicHashWithDifferentMagicByte() throws IOException { dos.writeUTF(FOO_TOPIC_NAME); // topic name dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions dos.writeInt(0); // partition 0 - dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 + dos.writeUTF("5:rack0,5:rack1"); // rack of partition 0 dos.writeInt(1); // partition 1 - dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); @@ -101,9 +101,9 @@ void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions // different partition order dos.writeInt(1); // partition 1 - dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 dos.writeInt(0); // partition 0 - dos.writeUTF("0:rack0,1:rack1"); // rack of partition 0 + dos.writeUTF("5:rack0,5:rack1"); // rack of partition 0 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); @@ -121,9 +121,9 @@ void testComputeTopicHashWithDifferentRackOrder() throws IOException { dos.writeUTF(FOO_TOPIC_NAME); // topic name dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions dos.writeInt(0); // partition 0 - dos.writeUTF("0:rack1,1:rack0"); // different rack order of partition 0 + dos.writeUTF("5:rack1,5:rack0"); // different rack order of partition 0 dos.writeInt(1); // partition 1 - dos.writeUTF("0:rack1,1:rack2"); // rack of partition 1 + dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); From cde92d86b0720de8bdf1c6fdd8c5dd005e952b1c Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Sun, 4 May 2025 22:51:24 +0800 Subject: [PATCH 14/14] KAFKA-17747: remove : and , in rack string and change streams to for-loop Signed-off-by: PoAn Yang --- .../apache/kafka/coordinator/group/Utils.java | 58 ++++++++++--------- .../kafka/coordinator/group/UtilsTest.java | 41 ++++++++++--- 2 files changed, 64 insertions(+), 35 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index 54067c88ba67f..c928db21a3775 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -42,7 +42,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -50,7 +49,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -372,18 +370,21 @@ static long computeGroupHash(Map topicHashes) { // Combine the sorted topic hashes. byte[] resultBytes = new byte[8]; - topicHashes.entrySet() - .stream() - .sorted(Map.Entry.comparingByKey()) // sort by topic name - .map(Map.Entry::getValue) - .map(longToBytes::apply) - .forEach(nextBytes -> { - // Combine ordered hashes. This is taken from guava Hashing#combineOrdered. - // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 - for (int i = 0; i < nextBytes.length; i++) { - resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); - } - }); + + // Sort entries by topic name + List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); + sortedEntries.sort(Map.Entry.comparingByKey()); + + for (Map.Entry entry : sortedEntries) { + Long value = entry.getValue(); + byte[] nextBytes = longToBytes.apply(value); + + // Combine ordered hashes. This is taken from guava Hashing#combineOrdered. + // https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 + for (int i = 0; i < nextBytes.length; i++) { + resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); + } + } return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); } @@ -402,7 +403,7 @@ static long computeGroupHash(Map topicHashes) { * 3. Write the UTF-8 encoded topic name. * 4. Write the number of partitions associated with the topic. * 5. For each partition, write the partition ID and a sorted list of rack identifiers. - * - Rack identifiers are formatted as "length1:value1,length2:value2" to prevent issues with simple separators. + * - Rack identifiers are formatted as "" to prevent issues with simple separators. * * @param topicImage The topic image. * @param clusterImage The cluster image. @@ -417,21 +418,24 @@ static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) t dos.writeInt(topicImage.partitions().size()); // number of partitions for (int i = 0; i < topicImage.partitions().size(); i++) { dos.writeInt(i); // partition id - List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) - .mapToObj(clusterImage::broker) - .filter(Objects::nonNull) - .map(BrokerRegistration::rack) - .filter(Optional::isPresent) - .map(Optional::get) - .sorted() - .toList(); - // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". // Add length before the rack string to avoid the edge case. - String racks = sortedRacksList.stream().map(s -> s.length() + ":" + s) // Format: "length:value" - .collect(Collectors.joining(",")); // Separator between "length:value" pairs - dos.writeUTF(racks); // sorted racks + List racks = new ArrayList<>(); + for (int replicaId : topicImage.partitions().get(i).replicas) { + BrokerRegistration broker = clusterImage.broker(replicaId); + if (broker != null) { + Optional rackOptional = broker.rack(); + rackOptional.ifPresent(racks::add); + } + } + + Collections.sort(racks); + for (String rack : racks) { + // Format: "" + dos.writeInt(rack.length()); + dos.writeUTF(rack); + } } dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java index 852979fc233c2..21e6837198c27 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java @@ -60,9 +60,15 @@ void testComputeTopicHash() throws IOException { dos.writeUTF(FOO_TOPIC_NAME); // topic name dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions dos.writeInt(0); // partition 0 - dos.writeUTF("5:rack0,5:rack1"); // rack of partition 0 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack0"); // The first rack in partition 0 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack1"); // The second rack in partition 0 dos.writeInt(1); // partition 1 - dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack1"); // The first rack in partition 1 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack2"); // The second rack in partition 1 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); @@ -80,9 +86,15 @@ void testComputeTopicHashWithDifferentMagicByte() throws IOException { dos.writeUTF(FOO_TOPIC_NAME); // topic name dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions dos.writeInt(0); // partition 0 - dos.writeUTF("5:rack0,5:rack1"); // rack of partition 0 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack0"); // The first rack in partition 0 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack1"); // The second rack in partition 0 dos.writeInt(1); // partition 1 - dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack1"); // The first rack in partition 1 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack2"); // The second rack in partition 1 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); @@ -101,9 +113,15 @@ void testComputeTopicHashWithDifferentPartitionOrder() throws IOException { dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions // different partition order dos.writeInt(1); // partition 1 - dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack1"); // The first rack in partition 1 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack2"); // The second rack in partition 1 dos.writeInt(0); // partition 0 - dos.writeUTF("5:rack0,5:rack1"); // rack of partition 0 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack0"); // The first rack in partition 0 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack1"); // The second rack in partition 0 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result); @@ -121,9 +139,16 @@ void testComputeTopicHashWithDifferentRackOrder() throws IOException { dos.writeUTF(FOO_TOPIC_NAME); // topic name dos.writeInt(FOO_NUM_PARTITIONS); // number of partitions dos.writeInt(0); // partition 0 - dos.writeUTF("5:rack1,5:rack0"); // different rack order of partition 0 + // different rack order of partition 0 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack1"); // The second rack in partition 0 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack0"); // The first rack in partition 0 dos.writeInt(1); // partition 1 - dos.writeUTF("5:rack1,5:rack2"); // rack of partition 1 + dos.writeInt(5); // length of rack0 + dos.writeUTF("rack1"); // The first rack in partition 1 + dos.writeInt(5); // length of rack1 + dos.writeUTF("rack2"); // The second rack in partition 1 dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); assertNotEquals(LZ4_HASH_INSTANCE.hash(topicBytes, 0), result);