From ede6c90dfb5fa5faa7472541dde3bcc90cbd68d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dejan=20Stojadinovi=C4=87?= Date: Wed, 8 Oct 2025 10:16:23 +0200 Subject: [PATCH 1/8] KAFKA-19664 Support building with Java 25 (LTS release) (#20561) This patch updates the Apache Kafka project's build, test, and dependency configurations. - Java Version Update: The build and test processes have been upgraded from Java 24 to Java 25. - Scala Version Update: The Scala library version has been updated from 2.13.16 to 2.13.17. - Dependency Upgrades: Several dependencies have been updated to newer versions, including mockito (5.14.2 to 5.20.0), zinc (1.10.8 to 1.11.0), and scala-library/reflect (2.13.16 to 2.13.17). - Code and Configuration Changes: The patch modifies build.gradle to exclude certain Spotbugs tasks for Java 25 compatibility. It also changes the default signing algorithm in TestSslUtils.java from SHA1withRSA to SHA256withRSA, enhancing security. - Documentation: The README.md file has been updated to reflect the new Java 25 requirement. Reviewers: Ismael Juma , Liam Clarke-Hutchinson , Gaurav Narula , Chia-Ping Tsai --- .github/workflows/build.yml | 10 +++++----- .github/workflows/ci-complete.yml | 4 ++-- LICENSE-binary | 4 ++-- README.md | 2 +- bin/kafka-run-class.sh | 2 +- bin/windows/kafka-run-class.bat | 2 +- build.gradle | 7 +++++++ .../test/java/org/apache/kafka/test/TestSslUtils.java | 6 +++--- gradle.properties | 2 +- gradle/dependencies.gradle | 6 +++--- 10 files changed, 26 insertions(+), 19 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 47bb2cbc31d5d..59c2fabaaed25 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -127,7 +127,7 @@ jobs: - name: Setup Gradle uses: ./.github/actions/setup-gradle with: - java-version: 24 + java-version: 17 gradle-cache-read-only: ${{ !inputs.is-trunk }} gradle-cache-write-only: ${{ inputs.is-trunk }} develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }} @@ -181,7 +181,7 @@ jobs: fail-fast: false matrix: # If we change these, make sure to adjust ci-complete.yml - java: [ 24, 17 ] + java: [ 25, 17 ] run-flaky: [ true, false ] run-new: [ true, false ] exclude: @@ -270,7 +270,7 @@ jobs: python .github/scripts/junit.py \ --path build/junit-xml >> $GITHUB_STEP_SUMMARY - # This job downloads all the JUnit XML files and thread dumps from the JDK 24 test runs. + # This job downloads all the JUnit XML files and thread dumps from the JDK 25 test runs. # If any test job fails, we will not run this job. Also, if any thread dump artifacts # are present, this means there was a timeout in the tests and so we will not proceed # with catalog creation. @@ -288,7 +288,7 @@ jobs: - name: Download Thread Dumps uses: actions/download-artifact@v5 with: - pattern: junit-thread-dumps-24-* + pattern: junit-thread-dumps-25-* path: thread-dumps merge-multiple: true - name: Check For Thread Dump @@ -302,7 +302,7 @@ jobs: - name: Download JUnit XMLs uses: actions/download-artifact@v5 with: - pattern: junit-xml-24-* # Only look at JDK 24 tests for the test catalog + pattern: junit-xml-25-* # Only look at JDK 25 tests for the test catalog path: junit-xml merge-multiple: true - name: Collate Test Catalog diff --git a/.github/workflows/ci-complete.yml b/.github/workflows/ci-complete.yml index 4b7dec3cdf578..1dbd0871e9d1d 100644 --- a/.github/workflows/ci-complete.yml +++ b/.github/workflows/ci-complete.yml @@ -43,8 +43,8 @@ jobs: strategy: fail-fast: false matrix: - # Make sure these match build.yml - java: [ 24, 17 ] + # Make sure these match build.yml and also keep in mind that GitHub Actions build will always use this file from the trunk branch. + java: [ 25, 17 ] run-flaky: [ true, false ] run-new: [ true, false ] exclude: diff --git a/LICENSE-binary b/LICENSE-binary index c6078a48bca75..91c8865931ea2 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -248,9 +248,9 @@ License Version 2.0: - opentelemetry-proto-1.3.2-alpha - plexus-utils-3.5.1 - rocksdbjni-10.1.3 -- scala-library-2.13.16 +- scala-library-2.13.17 - scala-logging_2.13-3.9.5 -- scala-reflect-2.13.16 +- scala-reflect-2.13.17 - snappy-java-1.1.10.7 - snakeyaml-2.4 - swagger-annotations-2.2.25 diff --git a/README.md b/README.md index c6a9dc5432175..28da38abe1dab 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed. -We build and test Apache Kafka with 17 and 24. The `release` parameter in javac is set to `11` for the clients +We build and test Apache Kafka with 17 and 25. The `release` parameter in javac is set to `11` for the clients and streams modules, and `17` for the rest, ensuring compatibility with their respective minimum Java versions. Similarly, the `release` parameter in scalac is set to `11` for the streams modules and `17` for the rest. diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 0a5ecfae04e20..012f9c27f0aca 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -49,7 +49,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.16 + SCALA_VERSION=2.13.17 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index a73ae2b26f2d7..394269a4294e2 100755 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.16 + set SCALA_VERSION=2.13.17 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( diff --git a/build.gradle b/build.gradle index 5b08bbf106ec7..dc3bf215ec88f 100644 --- a/build.gradle +++ b/build.gradle @@ -71,6 +71,13 @@ ext { "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" ) + if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_25)) { + // Spotbugs is not compatible with Java 25+ so Gradle related tasks are disabled + // until version can be upgraded: https://github.com/spotbugs/spotbugs/issues/3564 + project.gradle.startParameter.excludedTaskNames.add("spotbugsMain") + project.gradle.startParameter.excludedTaskNames.add("spotbugsTest") + } + maxTestForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : Runtime.runtime.availableProcessors() maxScalacThreads = project.hasProperty('maxScalacThreads') ? maxScalacThreads.toInteger() : Math.min(Runtime.runtime.availableProcessors(), 8) diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 889ebcbc607b8..7b4680a8b0446 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -110,7 +110,7 @@ public class TestSslUtils { * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB" * @param pair the KeyPair * @param days how many days from now the Certificate is valid for, or - for negative values - how many days before now - * @param algorithm the signing algorithm, eg "SHA1withRSA" + * @param algorithm the signing algorithm, eg "SHA256withRSA" * @return the self-signed certificate * @throws CertificateException thrown if a security error or an IO error occurred. */ @@ -131,7 +131,7 @@ public static X509Certificate generateCertificate(String dn, KeyPair pair, * CA. * @param parentKeyPair The key pair of the issuer. Leave null if you want to generate a root * CA. - * @param algorithm the signing algorithm, eg "SHA1withRSA" + * @param algorithm the signing algorithm, eg "SHA256withRSA" * @return the signed certificate * @throws CertificateException */ @@ -399,7 +399,7 @@ public static class CertificateBuilder { private byte[] subjectAltName; public CertificateBuilder() { - this(30, "SHA1withRSA"); + this(30, "SHA256withRSA"); } public CertificateBuilder(int days, String algorithm) { diff --git a/gradle.properties b/gradle.properties index 8855ff8362314..3a669bb57a3cf 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,7 +24,7 @@ group=org.apache.kafka # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml version=4.2.0-SNAPSHOT -scalaVersion=2.13.16 +scalaVersion=2.13.17 # Adding swaggerVersion in gradle.properties to have a single version in place for swagger swaggerVersion=2.2.25 task=build diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 9b1a42e53494a..b85dc87d78fac 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -23,7 +23,7 @@ ext { } // Add Scala version -def defaultScala213Version = '2.13.16' +def defaultScala213Version = '2.13.17' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.13') { versions["scala"] = defaultScala213Version @@ -110,7 +110,7 @@ versions += [ lz4: "1.8.0", mavenArtifact: "3.9.6", metrics: "2.2.0", - mockito: "5.14.2", + mockito: "5.20.0", opentelemetryProto: "1.3.2-alpha", protobuf: "3.25.5", // a dependency of opentelemetryProto pcollections: "4.0.2", @@ -125,7 +125,7 @@ versions += [ snappy: "1.1.10.7", spotbugs: "4.9.4", mockOAuth2Server: "2.2.1", - zinc: "1.10.8", + zinc: "1.11.0", // When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json // Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid zstd: "1.5.6-10", From efc37732e57516371ca00a921349ae33eb164ab0 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Wed, 8 Oct 2025 19:13:51 +0800 Subject: [PATCH 2/8] MINOR: Update all github actions to latest version (#20649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Updates all GitHub Actions to their latest versions. ---- **Upgraded Actions:** * **Gradle setup**: * `gradle/actions/setup-gradle` **v4.4.4 → v5.0.0** * **Trivy security scanner**: * `aquasecurity/trivy-action` **v0.24.0 → v0.33.1** * **Docker build tools:** * `docker/setup-qemu-action` **v3.2.0 → v3.6.0** * `docker/setup-buildx-action` **v3.6.1 → v3.11.1** * `docker/login-action` **v3.3.0 → v3.6.0** * **GitHub utilities:** * `actions/github-script` **v7 → v8** * `actions/stale` **v9 → v10** Reviewers: Chia-Ping Tsai --- .github/actions/setup-gradle/action.yml | 2 +- .github/workflows/docker_build_and_test.yml | 2 +- .github/workflows/docker_official_image_build_and_test.yml | 2 +- .github/workflows/docker_promote.yml | 6 +++--- .github/workflows/docker_rc_release.yml | 6 +++--- .github/workflows/docker_scan.yml | 2 +- .github/workflows/pr-labels-cron.yml | 4 ++-- .github/workflows/stale.yml | 2 +- 8 files changed, 13 insertions(+), 13 deletions(-) diff --git a/.github/actions/setup-gradle/action.yml b/.github/actions/setup-gradle/action.yml index 47a10469cb46a..57d363471dd29 100644 --- a/.github/actions/setup-gradle/action.yml +++ b/.github/actions/setup-gradle/action.yml @@ -42,7 +42,7 @@ runs: distribution: temurin java-version: ${{ inputs.java-version }} - name: Setup Gradle - uses: gradle/actions/setup-gradle@748248ddd2a24f49513d8f472f81c3a07d4d50e1 # v4.4.4 + uses: gradle/actions/setup-gradle@4d9f0ba0025fe599b4ebab900eb7f3a1d93ef4c2 # v5.0.0 env: GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true with: diff --git a/.github/workflows/docker_build_and_test.yml b/.github/workflows/docker_build_and_test.yml index 6a1b2f7de25f1..8358f10433a42 100644 --- a/.github/workflows/docker_build_and_test.yml +++ b/.github/workflows/docker_build_and_test.yml @@ -54,7 +54,7 @@ jobs: run: | python docker_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -u=$KAFKA_URL - name: Run CVE scan - uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0 + uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1 with: image-ref: 'kafka/test:test' format: 'table' diff --git a/.github/workflows/docker_official_image_build_and_test.yml b/.github/workflows/docker_official_image_build_and_test.yml index 1580ea1f744ba..1c67ef584720c 100644 --- a/.github/workflows/docker_official_image_build_and_test.yml +++ b/.github/workflows/docker_official_image_build_and_test.yml @@ -53,7 +53,7 @@ jobs: run: | python docker_official_image_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -v=$KAFKA_VERSION - name: Run CVE scan - uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0 + uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1 with: image-ref: 'kafka/test:test' format: 'table' diff --git a/.github/workflows/docker_promote.yml b/.github/workflows/docker_promote.yml index 3b9a6f1d4fb53..e6f8779de6976 100644 --- a/.github/workflows/docker_promote.yml +++ b/.github/workflows/docker_promote.yml @@ -31,11 +31,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Set up QEMU - uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0 + uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 # v3.6.0 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1 + uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 - name: Login to Docker Hub - uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0 + uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0 with: username: ${{ secrets.DOCKERHUB_USER }} password: ${{ secrets.DOCKERHUB_TOKEN }} diff --git a/.github/workflows/docker_rc_release.yml b/.github/workflows/docker_rc_release.yml index da851f4a43028..8ec489fb12b0b 100644 --- a/.github/workflows/docker_rc_release.yml +++ b/.github/workflows/docker_rc_release.yml @@ -47,11 +47,11 @@ jobs: python -m pip install --upgrade pip pip install -r docker/requirements.txt - name: Set up QEMU - uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0 + uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 # v3.6.0 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1 + uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 - name: Login to Docker Hub - uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0 + uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0 with: username: ${{ secrets.DOCKERHUB_USER }} password: ${{ secrets.DOCKERHUB_TOKEN }} diff --git a/.github/workflows/docker_scan.yml b/.github/workflows/docker_scan.yml index 55df9f65e4c08..ea8cc95a30346 100644 --- a/.github/workflows/docker_scan.yml +++ b/.github/workflows/docker_scan.yml @@ -29,7 +29,7 @@ jobs: supported_image_tag: ['latest', '3.9.1', '4.0.0', '4.1.0'] steps: - name: Run CVE scan - uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0 + uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1 if: always() with: image-ref: apache/kafka:${{ matrix.supported_image_tag }} diff --git a/.github/workflows/pr-labels-cron.yml b/.github/workflows/pr-labels-cron.yml index 5faaca72ed36b..420d80498baa6 100644 --- a/.github/workflows/pr-labels-cron.yml +++ b/.github/workflows/pr-labels-cron.yml @@ -35,7 +35,7 @@ jobs: env: GITHUB_CONTEXT: ${{ toJson(github) }} - name: Remove label - uses: actions/github-script@v7 + uses: actions/github-script@v8 continue-on-error: true with: github-token: ${{ secrets.GITHUB_TOKEN }} @@ -77,7 +77,7 @@ jobs: issues: write pull-requests: write steps: - - uses: actions/stale@v9 + - uses: actions/stale@v10 with: debug-only: ${{ inputs.dryRun || false }} operations-per-run: ${{ inputs.operationsPerRun || 500 }} diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 9382d4173e94c..74de2a967b59f 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -38,7 +38,7 @@ jobs: stale: runs-on: ubuntu-latest steps: - - uses: actions/stale@v9 + - uses: actions/stale@v10 with: debug-only: ${{ inputs.dryRun || false }} operations-per-run: ${{ inputs.operationsPerRun || 500 }} From 25de3209516f148686323f8b3cdb4e12f417a6c5 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Wed, 8 Oct 2025 15:16:32 +0200 Subject: [PATCH 3/8] KAFKA-19518 Remove the usage of KafkaMetricsGroup(Class klass) (#20399) the constructor is error-prone when migrating code, since metrics could get unintentionally changed. We should remove the constructor and use constant strings instead to avoid issues like KAFKA-17876, KAFKA-19150, and others. Reviewers: Ken Huang , Jhen-Yung Hsu , KuoChe , Chia-Ping Tsai --- .../main/scala/kafka/cluster/Partition.scala | 5 +++- .../TransactionMarkerChannelManager.scala | 6 +++-- .../src/main/scala/kafka/log/LogManager.scala | 6 +++-- .../scala/kafka/network/RequestChannel.scala | 5 +++- .../scala/kafka/network/SocketServer.scala | 15 ++++++----- .../kafka/server/AbstractFetcherManager.scala | 5 +++- .../kafka/server/AbstractFetcherThread.scala | 10 +++++-- .../scala/kafka/server/ControllerServer.scala | 5 +++- .../scala/kafka/server/DelayedFetch.scala | 5 +++- .../scala/kafka/server/DelayedProduce.scala | 5 +++- .../kafka/server/DelayedRemoteFetch.scala | 5 +++- .../scala/kafka/server/FetchSession.scala | 5 +++- .../kafka/server/KafkaRequestHandler.scala | 5 +++- .../scala/kafka/server/ReplicaManager.scala | 5 +++- .../unit/kafka/metrics/MetricsTest.scala | 6 ++++- .../kafka/network/ConnectionQuotasTest.scala | 5 +++- .../server/AbstractFetcherManagerTest.scala | 27 ++++++++++++++++--- .../kafka/server/ReplicaManagerTest.scala | 2 +- .../server/metrics/KafkaMetricsGroup.java | 4 --- .../log/RemoteStorageThreadPool.java | 1 + 20 files changed, 99 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 4c729f596063b..f47160a9119d8 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -116,7 +116,10 @@ class DelayedOperations(topicId: Option[Uuid], } object Partition { - private val metricsGroup = new KafkaMetricsGroup(classOf[Partition]) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.cluster" + private val metricsClassName = "Partition" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) def apply(topicIdPartition: TopicIdPartition, time: Time, diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 6c395feb5827f..227eb9881a297 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -166,8 +166,10 @@ class TransactionMarkerChannelManager( time: Time ) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, config.requestTimeoutMs, time) with Logging { - - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.coordinator.transaction" + private val metricsClassName = "TransactionMarkerChannelManager" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: " diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d3f64793685d2..bfee35061f82f 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -81,8 +81,10 @@ class LogManager(logDirs: Seq[File], cleanerFactory: (CleanerConfig, util.List[File], ConcurrentMap[TopicPartition, UnifiedLog], LogDirFailureChannel, Time) => LogCleaner = (cleanerConfig, files, map, logDirFailureChannel, time) => new LogCleaner(cleanerConfig, files, map, logDirFailureChannel, time) ) extends Logging { - - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.log" + private val metricsClassName = "LogManager" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private val logCreationOrDeletionLock = new Object private val currentLogs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]() diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 473ab172a093e..a0fbc0452060a 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -346,7 +346,10 @@ class RequestChannel(val queueSize: Int, val metrics: RequestChannelMetrics) { import RequestChannel._ - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.network" + private val metricsClassName = "RequestChannel" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 884c00002c5b5..c14c5c665afde 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -78,8 +78,10 @@ class SocketServer( val socketFactory: ServerSocketFactory = ServerSocketFactory.INSTANCE, val connectionDisconnectListeners: Seq[ConnectionDisconnectListener] = Seq.empty ) extends Logging with BrokerReconfigurable { - - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.network" + private val metricsClassName = "SocketServer" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private val maxQueuedRequests = config.queuedMaxRequests @@ -485,9 +487,6 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, memoryPool: MemoryPool, apiVersionManager: ApiVersionManager) extends Runnable with Logging { - - private val metricsGroup = new KafkaMetricsGroup(this.getClass) - val shouldRun = new AtomicBoolean(true) private val sendBufferSize = config.socketSendBufferBytes @@ -516,7 +515,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName( "AcceptorBlockedPercent", Map(ListenerMetricTag -> endPoint.listener).asJava) - private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS) + private val blockedPercentMeter = backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS) private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() private val started = new AtomicBoolean() @@ -834,7 +833,9 @@ private[kafka] class Processor( threadName: String, connectionDisconnectListeners: Seq[ConnectionDisconnectListener] ) extends Runnable with Logging { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + private val metricsPackage = "kafka.server" + private val metricsClassName = "Processor" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) val shouldRun: AtomicBoolean = new AtomicBoolean(true) private val started: AtomicBoolean = new AtomicBoolean() diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 42580250b5b2f..958c92ac1854b 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -30,7 +30,10 @@ import scala.jdk.OptionConverters._ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: String, clientId: String, numFetchers: Int) extends Logging { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = this.getClass.getSimpleName + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) // map of (source broker_id, fetcher_id per source broker) => fetcher. // package private for test diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 8dd621d19509f..1e8841df0ca98 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -888,7 +888,10 @@ object FetcherMetrics { } class FetcherLagMetrics(metricId: ClientIdTopicPartition) { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "FetcherLagMetrics" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private[this] val lagVal = new AtomicLong(-1L) private[this] val tags = Map( @@ -927,7 +930,10 @@ class FetcherLagStats(metricId: ClientIdAndBroker) { } class FetcherStats(metricId: ClientIdAndBroker) { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "FetcherStats" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId, "brokerHost" -> metricId.brokerHost, diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index e41705ed3bae9..460e2969706a1 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -71,7 +71,10 @@ class ControllerServer( import kafka.server.Server._ - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "ControllerServer" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) val config = sharedServer.controllerConfig val logContext = new LogContext(s"[ControllerServer id=${config.nodeId}] ") diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 74a3e2b1a2997..91480bb420edd 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -187,7 +187,10 @@ class DelayedFetch( } object DelayedFetchMetrics { - private val metricsGroup = new KafkaMetricsGroup(DelayedFetchMetrics.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "DelayedFetchMetrics" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private val FetcherTypeKey = "fetcherType" val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava) val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 1d21ec78e4c63..523158fe5594b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -135,7 +135,10 @@ class DelayedProduce(delayMs: Long, } object DelayedProduceMetrics { - private val metricsGroup = new KafkaMetricsGroup(DelayedProduceMetrics.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "DelayedProduceMetrics" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private val aggregateExpirationMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS) diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index cb14a14b3e902..03e6f8d230fd5 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -138,6 +138,9 @@ class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition, Future[Voi } object DelayedRemoteFetchMetrics { - private val metricsGroup = new KafkaMetricsGroup(DelayedRemoteFetchMetrics.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "DelayedRemoteFetchMetrics" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) val expiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS) } diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index 51db1fcb092fe..4bbb4c47e3fa0 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -803,7 +803,10 @@ class FetchSessionCacheShard(private val maxEntries: Int, } } object FetchSessionCache { - private[server] val metricsGroup = new KafkaMetricsGroup(classOf[FetchSessionCache]) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "FetchSessionCache" + private[server] val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private[server] val counter = new AtomicInteger(0) } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 815fe4966eb81..d4998cbb73488 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -201,7 +201,10 @@ class KafkaRequestHandlerPool( requestHandlerAvgIdleMetricName: String, nodeName: String = "broker" ) extends Logging { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "KafkaRequestHandlerPool" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) /* a meter to track the average free capacity of the request handlers */ diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ff70d7ae34aec..e01d264f53962 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -231,7 +231,10 @@ class ReplicaManager(val config: KafkaConfig, val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP, val defaultActionQueue: ActionQueue = new DelayedActionQueue ) extends Logging { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + private val metricsPackage = "kafka.server" + private val metricsClassName = "ReplicaManager" + private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) private val addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(config) private val shareFetchPurgatoryName = "ShareFetch" private val delayedShareFetchTimer = new SystemTimer(shareFetchPurgatoryName) diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 834b8efe48db9..5d13fc97bfb16 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -153,7 +153,11 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val path = "C:\\windows-path\\kafka-logs" val tags = util.Map.of("dir", path) val expectedMBeanName = Set(tags.keySet().iterator().next(), ObjectName.quote(path)).mkString("=") - val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags) + + // Changing the package or class name may cause incompatibility with existing code and metrics configuration + val metricsPackage = "kafka.metrics" + val metricsClassName = "MetricsTest" + val metric = new KafkaMetricsGroup(metricsPackage, metricsClassName).metricName("test-metric", tags) assert(metric.getMBeanName.endsWith(expectedMBeanName)) } diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala index d9a64b4186fae..2d81c2a773bdf 100644 --- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala @@ -88,8 +88,11 @@ class ConnectionQuotasTest { // Clean-up any metrics left around by previous tests TestUtils.clearYammerMetrics() + val metricsPackage = "kafka.network" + val metricsClassName = "ConnectionQuotasTest" + listeners.keys.foreach { name => - blockedPercentMeters.put(name, new KafkaMetricsGroup(this.getClass).newMeter( + blockedPercentMeters.put(name, new KafkaMetricsGroup(metricsPackage, metricsClassName).newMeter( s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, util.Map.of(ListenerMetricTag, name))) } // use system time, because ConnectionQuota causes the current thread to wait with timeout, which waits based on diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala index d1d6e4a7810ad..0c5eb83e5e97f 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala @@ -1,7 +1,7 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. + * this work for additional information registryarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -21,10 +21,11 @@ import kafka.utils.TestUtils import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} import org.apache.kafka.common.message.FetchResponseData.PartitionData import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.requests.FetchRequest -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.common.{TopicPartition, Uuid} -import org.apache.kafka.server.common.OffsetAndEpoch +import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.ReplicaFetch @@ -355,4 +356,24 @@ class AbstractFetcherManagerTest { override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Optional[OffsetAndEpoch] = Optional.of(new OffsetAndEpoch(1, 0)) } + @Test + def testMetricsClassName(): Unit = { + val registry = KafkaYammerMetrics.defaultRegistry() + val config = mock(classOf[KafkaConfig]) + val replicaManager = mock(classOf[ReplicaManager]) + val quotaManager = mock(classOf[ReplicationQuotaManager]) + val brokerTopicStats = new BrokerTopicStats() + val directoryEventHandler = DirectoryEventHandler.NOOP + val metrics = new Metrics() + val time = new MockTime() + val metadataVersionSupplier = () => MetadataVersion.LATEST_PRODUCTION + val brokerEpochSupplier = () => 1L + + val _ = new ReplicaAlterLogDirsManager(config, replicaManager, quotaManager, brokerTopicStats, directoryEventHandler) + val _ = new ReplicaFetcherManager(config, replicaManager, metrics, time, quotaManager, metadataVersionSupplier, brokerEpochSupplier) + val existReplicaAlterLogDirsManager = registry.allMetrics.entrySet().stream().filter(metric => metric.getKey.getType == "ReplicaAlterLogDirsManager").findFirst() + val existReplicaFetcherManager = registry.allMetrics.entrySet().stream().filter(metric => metric.getKey.getType == "ReplicaFetcherManager").findFirst() + assertTrue(existReplicaAlterLogDirsManager.isPresent) + assertTrue(existReplicaFetcherManager.isPresent) + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 2483a1f85c04f..eb5491812c109 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -629,7 +629,7 @@ class ReplicaManagerTest { def replicaManagerMetricValue(): Int = { KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.filter { case (metricName, _) => - metricName.getName == "ProducerIdCount" && metricName.getType == replicaManager.getClass.getSimpleName + metricName.getName == "ProducerIdCount" && metricName.getType == "ReplicaManager" }.head._2.asInstanceOf[Gauge[Int]].value } diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java index 5a83c0d94ad61..3a7045d7d5ebd 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java @@ -35,10 +35,6 @@ public class KafkaMetricsGroup { private final String pkg; private final String simpleName; - public KafkaMetricsGroup(Class klass) { - this(klass.getPackage() == null ? "" : klass.getPackage().getName(), klass.getSimpleName().replaceAll("\\$$", "")); - } - /** * This constructor allows caller to build metrics name with custom package and class name. This is useful to keep metrics * compatibility in migrating scala code, since the difference of either package or class name will impact the mbean name and diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java index 692afbddaf276..6b73a55a5ef0c 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java @@ -35,6 +35,7 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class); @Deprecated(since = "4.2") // This metrics group is used to register deprecated metrics. It will be removed in Kafka 5.0 + // Changing the package or class name may cause incompatibility with existing code and metrics configuration private final KafkaMetricsGroup deprecatedLogMetricsGroup = new KafkaMetricsGroup("org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool"); private final KafkaMetricsGroup logRemoteMetricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool"); From d1b160b6956e19f07205e0019ff0d8ded34b898f Mon Sep 17 00:00:00 2001 From: Nikita Shupletsov Date: Wed, 8 Oct 2025 14:42:50 -0700 Subject: [PATCH 4/8] KAFKA-19510: clear pendingTasksToInit on tasks clear. (#20646) Clear pendingTasksToInit on tasks clear. It matters in situations when we shutting down a thread in PARTITIONS_ASSIGNED state. In this case we may have locked some unassigned task directories (see TaskManager#tryToLockAllNonEmptyTaskDirectories). Then we may have gotten assigned to one or multiple of those tasks. In this scenario, we will not release the locks for the unassigned task directories (see TaskManager#releaseLockedUnassignedTaskDirectories), because TaskManager#allTasks includes pendingTasksToInit, but it hasn't been cleared. Reviewers: Matthias J. Sax , Lucas Brutschy --- .../streams/processor/internals/Tasks.java | 11 +++++++++ .../processor/internals/TasksTest.java | 23 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index 92dd07ba974cf..76d63490683e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -258,6 +258,9 @@ private void removePartitionsForActiveTask(final TaskId taskId) { @Override public synchronized void clear() { + pendingTasksToInit.clear(); + pendingActiveTasksToCreate.clear(); + pendingStandbyTasksToCreate.clear(); activeTasksPerId.clear(); standbyTasksPerId.clear(); activeTasksPerPartition.clear(); @@ -346,4 +349,12 @@ public synchronized Map allTasksPerId() { public boolean contains(final TaskId taskId) { return getTask(taskId) != null; } + + Map> pendingActiveTasksToCreate() { + return pendingActiveTasksToCreate; + } + + Map> pendingStandbyTasksToCreate() { + return pendingStandbyTasksToCreate; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java index e2bd30c20a562..ec4d672f9c214 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java @@ -207,4 +207,27 @@ public void shouldClearFailedTask() { tasks.addTask(activeTask1); assertTrue(tasks.allNonFailedTasks().contains(activeTask1)); } + + @Test + public void shouldClearAllPendingTasks() { + final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)) + .inState(State.CREATED).build(); + tasks.addPendingTasksToInit(Collections.singleton(task)); + final TaskId taskId1 = new TaskId(0, 0, "A"); + tasks.addPendingActiveTasksToCreate(mkMap( + mkEntry(taskId1, Set.of(TOPIC_PARTITION_A_0)) + )); + final TaskId taskId2 = new TaskId(0, 1, "A"); + tasks.addPendingStandbyTasksToCreate(mkMap( + mkEntry(taskId2, Set.of(TOPIC_PARTITION_A_0)) + )); + + assertTrue(tasks.pendingTasksToInit().contains(task)); + assertTrue(tasks.pendingActiveTasksToCreate().containsKey(taskId1)); + assertTrue(tasks.pendingStandbyTasksToCreate().containsKey(taskId2)); + tasks.clear(); + assertTrue(tasks.pendingTasksToInit().isEmpty()); + assertTrue(tasks.pendingActiveTasksToCreate().isEmpty()); + assertTrue(tasks.pendingStandbyTasksToCreate().isEmpty()); + } } From be3e40c45ac9a70833ce0d5735ad141a5bd24627 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 9 Oct 2025 08:13:20 +0200 Subject: [PATCH 5/8] HOTFIX: Fix log level in StateChangeLogger#trace (#20662) from: https://github.com/apache/kafka/pull/20637#discussion_r2414668452 Previously, the method used LOGGER.info() instead of LOGGER.trace(). This patch corrects the logging level used in the trace method of StateChangeLogger. Reviewers: Manikumar Reddy , TengYao Chi , Ken Huang , Chia-Ping Tsai Co-authored-by: Ubuntu --- .../main/java/org/apache/kafka/logger/StateChangeLogger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java index a8f7ed9cc9d38..a852fc30eb889 100644 --- a/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java +++ b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java @@ -33,7 +33,7 @@ public StateChangeLogger(int brokerId) { } public void trace(String message) { - LOGGER.info("{}{}", logIdent, message); + LOGGER.trace("{}{}", logIdent, message); } public void info(String message) { From 9d319283c12f79e353bd45c42c637c9517ac546e Mon Sep 17 00:00:00 2001 From: Xuan-Zhang Gong Date: Thu, 9 Oct 2025 20:17:27 +0800 Subject: [PATCH 6/8] MINOR: Fix the incorrect package name in the metrics and a typo (#20666) Fix the incorrect package name in metrics and revert the comment see: https://github.com/apache/kafka/pull/20399#discussion_r2415673042 https://github.com/apache/kafka/pull/20399#discussion_r2415633676 Reviewers: Ken Huang , Manikumar Reddy , Chia-Ping Tsai --- core/src/main/scala/kafka/network/SocketServer.scala | 2 +- .../scala/unit/kafka/server/AbstractFetcherManagerTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c14c5c665afde..306b633f6fa37 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -833,7 +833,7 @@ private[kafka] class Processor( threadName: String, connectionDisconnectListeners: Seq[ConnectionDisconnectListener] ) extends Runnable with Logging { - private val metricsPackage = "kafka.server" + private val metricsPackage = "kafka.network" private val metricsClassName = "Processor" private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala index 0c5eb83e5e97f..5577dc9bd38fd 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala @@ -1,7 +1,7 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with - * this work for additional information registryarding copyright ownership. + * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at From cedf1c052e903967982891e05d95f4a6cd4c4faf Mon Sep 17 00:00:00 2001 From: Gaurav Narula Date: Thu, 9 Oct 2025 15:28:07 +0100 Subject: [PATCH 7/8] KAFKA-19776: Fix log values for previous leader id and epoch (#20671) Stores the existing values for both the fields in a local variable for logging. Reviewers: Omnia Ibrahim --- core/src/main/scala/kafka/cluster/Partition.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f47160a9119d8..53228873e5d90 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -855,6 +855,8 @@ class Partition(val topicPartition: TopicPartition, val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch // The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet // the under min isr condition during the makeFollower process and emits the wrong metric. + val prevLeaderReplicaIdOpt = leaderReplicaIdOpt + val prevLeaderEpoch = leaderEpoch leaderReplicaIdOpt = Option(partitionRegistration.leader) leaderEpoch = partitionRegistration.leaderEpoch leaderEpochStartOffsetOpt = None @@ -877,7 +879,7 @@ class Partition(val topicPartition: TopicPartition, stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionRegistration.leaderEpoch} from " + s"offset $leaderEpochEndOffset with partition epoch ${partitionRegistration.partitionEpoch} and " + s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionRegistration.leader}. " + - s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.") + s"Previous leader $prevLeaderReplicaIdOpt and previous leader epoch was $prevLeaderEpoch.") } else { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " + s"partition registration $partitionRegistration and isNew=$isNew since it is already a follower with leader epoch $leaderEpoch.") From 6ef10ec75d2ffceabaf643eb696986140c5b1bfe Mon Sep 17 00:00:00 2001 From: Nikita Shupletsov Date: Thu, 9 Oct 2025 10:56:03 -0700 Subject: [PATCH 8/8] MINOR: changed the condition to only check the test topic to reduce the flakiness of the test. (#20664) MINOR: changed the condition to only check the test topic to reduce the flakiness of the test. Reviewers: Matthias J. Sax , Lianet Magrans --- .../test/java/org/apache/kafka/tools/TopicCommandTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index c6f1073edb56c..57a8269f089bb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -1192,14 +1192,15 @@ public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(Cl // describe the topic and test if it's under-replicated String simpleDescribeOutput = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName)); String[] simpleDescribeOutputRows = simpleDescribeOutput.split(System.lineSeparator()); - assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName)), + String testTopicNameLogLine = String.format("Topic: %s", testTopicName); + assertTrue(simpleDescribeOutputRows[0].startsWith(testTopicNameLogLine), "Unexpected describe output: " + simpleDescribeOutputRows[0]); assertEquals(2, simpleDescribeOutputRows.length, "Unexpected describe output length: " + simpleDescribeOutputRows.length); String underReplicatedOutput = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-replicated-partitions")); - assertEquals("", underReplicatedOutput, - String.format("--under-replicated-partitions shouldn't return anything: '%s'", underReplicatedOutput)); + assertFalse(underReplicatedOutput.contains(testTopicNameLogLine), + String.format("--under-replicated-partitions shouldn't contain '%s': '%s'", testTopicNameLogLine, underReplicatedOutput)); int maxRetries = 20; long pause = 100L;