Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup-gradle/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ runs:
distribution: temurin
java-version: ${{ inputs.java-version }}
- name: Setup Gradle
uses: gradle/actions/setup-gradle@748248ddd2a24f49513d8f472f81c3a07d4d50e1 # v4.4.4
uses: gradle/actions/setup-gradle@4d9f0ba0025fe599b4ebab900eb7f3a1d93ef4c2 # v5.0.0
env:
GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true
with:
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ jobs:
- name: Setup Gradle
uses: ./.github/actions/setup-gradle
with:
java-version: 24
java-version: 17
gradle-cache-read-only: ${{ !inputs.is-trunk }}
gradle-cache-write-only: ${{ inputs.is-trunk }}
develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
Expand Down Expand Up @@ -181,7 +181,7 @@ jobs:
fail-fast: false
matrix:
# If we change these, make sure to adjust ci-complete.yml
java: [ 24, 17 ]
java: [ 25, 17 ]
run-flaky: [ true, false ]
run-new: [ true, false ]
exclude:
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
python .github/scripts/junit.py \
--path build/junit-xml >> $GITHUB_STEP_SUMMARY

# This job downloads all the JUnit XML files and thread dumps from the JDK 24 test runs.
# This job downloads all the JUnit XML files and thread dumps from the JDK 25 test runs.
# If any test job fails, we will not run this job. Also, if any thread dump artifacts
# are present, this means there was a timeout in the tests and so we will not proceed
# with catalog creation.
Expand All @@ -288,7 +288,7 @@ jobs:
- name: Download Thread Dumps
uses: actions/download-artifact@v5
with:
pattern: junit-thread-dumps-24-*
pattern: junit-thread-dumps-25-*
path: thread-dumps
merge-multiple: true
- name: Check For Thread Dump
Expand All @@ -302,7 +302,7 @@ jobs:
- name: Download JUnit XMLs
uses: actions/download-artifact@v5
with:
pattern: junit-xml-24-* # Only look at JDK 24 tests for the test catalog
pattern: junit-xml-25-* # Only look at JDK 25 tests for the test catalog
path: junit-xml
merge-multiple: true
- name: Collate Test Catalog
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci-complete.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ jobs:
strategy:
fail-fast: false
matrix:
# Make sure these match build.yml
java: [ 24, 17 ]
# Make sure these match build.yml and also keep in mind that GitHub Actions build will always use this file from the trunk branch.
java: [ 25, 17 ]
run-flaky: [ true, false ]
run-new: [ true, false ]
exclude:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker_build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
run: |
python docker_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -u=$KAFKA_URL
- name: Run CVE scan
uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1
with:
image-ref: 'kafka/test:test'
format: 'table'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
run: |
python docker_official_image_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -v=$KAFKA_VERSION
- name: Run CVE scan
uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1
with:
image-ref: 'kafka/test:test'
format: 'table'
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/docker_promote.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0
uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 # v3.6.0
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
- name: Login to Docker Hub
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0
with:
username: ${{ secrets.DOCKERHUB_USER }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/docker_rc_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ jobs:
python -m pip install --upgrade pip
pip install -r docker/requirements.txt
- name: Set up QEMU
uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0
uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 # v3.6.0
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
- name: Login to Docker Hub
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0
with:
username: ${{ secrets.DOCKERHUB_USER }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker_scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
supported_image_tag: ['latest', '3.9.1', '4.0.0', '4.1.0']
steps:
- name: Run CVE scan
uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1
if: always()
with:
image-ref: apache/kafka:${{ matrix.supported_image_tag }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pr-labels-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
env:
GITHUB_CONTEXT: ${{ toJson(github) }}
- name: Remove label
uses: actions/github-script@v7
uses: actions/github-script@v8
continue-on-error: true
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
Expand Down Expand Up @@ -77,7 +77,7 @@ jobs:
issues: write
pull-requests: write
steps:
- uses: actions/stale@v9
- uses: actions/stale@v10
with:
debug-only: ${{ inputs.dryRun || false }}
operations-per-run: ${{ inputs.operationsPerRun || 500 }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v9
- uses: actions/stale@v10
with:
debug-only: ${{ inputs.dryRun || false }}
operations-per-run: ${{ inputs.operationsPerRun || 500 }}
Expand Down
4 changes: 2 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ License Version 2.0:
- opentelemetry-proto-1.3.2-alpha
- plexus-utils-3.5.1
- rocksdbjni-10.1.3
- scala-library-2.13.16
- scala-library-2.13.17
- scala-logging_2.13-3.9.5
- scala-reflect-2.13.16
- scala-reflect-2.13.17
- snappy-java-1.1.10.7
- snakeyaml-2.4
- swagger-annotations-2.2.25
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.

We build and test Apache Kafka with 17 and 24. The `release` parameter in javac is set to `11` for the clients
We build and test Apache Kafka with 17 and 25. The `release` parameter in javac is set to `11` for the clients
and streams modules, and `17` for the rest, ensuring compatibility with their respective
minimum Java versions. Similarly, the `release` parameter in scalac is set to `11` for the streams modules and `17`
for the rest.
Expand Down
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ should_include_file() {
base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.16
SCALA_VERSION=2.13.17
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
Expand Down
2 changes: 1 addition & 1 deletion bin/windows/kafka-run-class.bat
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.16
set SCALA_VERSION=2.13.17
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
Expand Down
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ ext {
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
)

if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_25)) {
// Spotbugs is not compatible with Java 25+ so Gradle related tasks are disabled
// until version can be upgraded: https://github.com/spotbugs/spotbugs/issues/3564
project.gradle.startParameter.excludedTaskNames.add("spotbugsMain")
project.gradle.startParameter.excludedTaskNames.add("spotbugsTest")
}

maxTestForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : Runtime.runtime.availableProcessors()
maxScalacThreads = project.hasProperty('maxScalacThreads') ? maxScalacThreads.toInteger() :
Math.min(Runtime.runtime.availableProcessors(), 8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class TestSslUtils {
* @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
* @param pair the KeyPair
* @param days how many days from now the Certificate is valid for, or - for negative values - how many days before now
* @param algorithm the signing algorithm, eg "SHA1withRSA"
* @param algorithm the signing algorithm, eg "SHA256withRSA"
* @return the self-signed certificate
* @throws CertificateException thrown if a security error or an IO error occurred.
*/
Expand All @@ -131,7 +131,7 @@ public static X509Certificate generateCertificate(String dn, KeyPair pair,
* CA.
* @param parentKeyPair The key pair of the issuer. Leave null if you want to generate a root
* CA.
* @param algorithm the signing algorithm, eg "SHA1withRSA"
* @param algorithm the signing algorithm, eg "SHA256withRSA"
* @return the signed certificate
* @throws CertificateException
*/
Expand Down Expand Up @@ -399,7 +399,7 @@ public static class CertificateBuilder {
private byte[] subjectAltName;

public CertificateBuilder() {
this(30, "SHA1withRSA");
this(30, "SHA256withRSA");
}

public CertificateBuilder(int days, String algorithm) {
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ class DelayedOperations(topicId: Option[Uuid],
}

object Partition {
private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.cluster"
private val metricsClassName = "Partition"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

def apply(topicIdPartition: TopicIdPartition,
time: Time,
Expand Down Expand Up @@ -852,6 +855,8 @@ class Partition(val topicPartition: TopicPartition,
val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
// The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet
// the under min isr condition during the makeFollower process and emits the wrong metric.
val prevLeaderReplicaIdOpt = leaderReplicaIdOpt
val prevLeaderEpoch = leaderEpoch
Comment on lines 855 to +859
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variables prevLeaderReplicaIdOpt and prevLeaderEpoch capture the current values before they are updated, but these values will be the same as the new values if this is not a new leader epoch. The logging message will be incorrect in cases where the leader hasn't changed.

Suggested change
val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
// The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet
// the under min isr condition during the makeFollower process and emits the wrong metric.
val prevLeaderReplicaIdOpt = leaderReplicaIdOpt
val prevLeaderEpoch = leaderEpoch
val prevLeaderReplicaIdOpt = leaderReplicaIdOpt
val prevLeaderEpoch = leaderEpoch
val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
// The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet
// the under min isr condition during the makeFollower process and emits the wrong metric.

Copilot uses AI. Check for mistakes.

leaderReplicaIdOpt = Option(partitionRegistration.leader)
leaderEpoch = partitionRegistration.leaderEpoch
leaderEpochStartOffsetOpt = None
Expand All @@ -874,7 +879,7 @@ class Partition(val topicPartition: TopicPartition,
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionRegistration.leaderEpoch} from " +
s"offset $leaderEpochEndOffset with partition epoch ${partitionRegistration.partitionEpoch} and " +
s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionRegistration.leader}. " +
s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.")
s"Previous leader $prevLeaderReplicaIdOpt and previous leader epoch was $prevLeaderEpoch.")
} else {
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " +
s"partition registration $partitionRegistration and isNew=$isNew since it is already a follower with leader epoch $leaderEpoch.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ class TransactionMarkerChannelManager(
time: Time
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, config.requestTimeoutMs, time)
with Logging {

private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.coordinator.transaction"
private val metricsClassName = "TransactionMarkerChannelManager"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: "

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ class LogManager(logDirs: Seq[File],
cleanerFactory: (CleanerConfig, util.List[File], ConcurrentMap[TopicPartition, UnifiedLog], LogDirFailureChannel, Time) => LogCleaner =
(cleanerConfig, files, map, logDirFailureChannel, time) => new LogCleaner(cleanerConfig, files, map, logDirFailureChannel, time)
) extends Logging {

private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.log"
private val metricsClassName = "LogManager"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

private val logCreationOrDeletionLock = new Object
private val currentLogs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ class RequestChannel(val queueSize: Int,
val metrics: RequestChannelMetrics) {
import RequestChannel._

private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.network"
private val metricsClassName = "RequestChannel"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ class SocketServer(
val socketFactory: ServerSocketFactory = ServerSocketFactory.INSTANCE,
val connectionDisconnectListeners: Seq[ConnectionDisconnectListener] = Seq.empty
) extends Logging with BrokerReconfigurable {

private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.network"
private val metricsClassName = "SocketServer"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

private val maxQueuedRequests = config.queuedMaxRequests

Expand Down Expand Up @@ -485,9 +487,6 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
memoryPool: MemoryPool,
apiVersionManager: ApiVersionManager)
extends Runnable with Logging {

private val metricsGroup = new KafkaMetricsGroup(this.getClass)

val shouldRun = new AtomicBoolean(true)

private val sendBufferSize = config.socketSendBufferBytes
Expand Down Expand Up @@ -516,7 +515,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
"AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listener).asJava)
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private val blockedPercentMeter = backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
private val started = new AtomicBoolean()
Expand Down Expand Up @@ -834,7 +833,9 @@ private[kafka] class Processor(
threadName: String,
connectionDisconnectListeners: Seq[ConnectionDisconnectListener]
) extends Runnable with Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val metricsPackage = "kafka.network"
private val metricsClassName = "Processor"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

val shouldRun: AtomicBoolean = new AtomicBoolean(true)
private val started: AtomicBoolean = new AtomicBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import scala.jdk.OptionConverters._

abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: String, clientId: String, numFetchers: Int)
extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.server"
private val metricsClassName = this.getClass.getSimpleName
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

// map of (source broker_id, fetcher_id per source broker) => fetcher.
// package private for test
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,10 @@ object FetcherMetrics {
}

class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.server"
private val metricsClassName = "FetcherLagMetrics"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

private[this] val lagVal = new AtomicLong(-1L)
private[this] val tags = Map(
Expand Down Expand Up @@ -927,7 +930,10 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
}

class FetcherStats(metricId: ClientIdAndBroker) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.server"
private val metricsClassName = "FetcherStats"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
"brokerHost" -> metricId.brokerHost,
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ class ControllerServer(

import kafka.server.Server._

private val metricsGroup = new KafkaMetricsGroup(this.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.server"
private val metricsClassName = "ControllerServer"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

val config = sharedServer.controllerConfig
val logContext = new LogContext(s"[ControllerServer id=${config.nodeId}] ")
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ class DelayedFetch(
}

object DelayedFetchMetrics {
private val metricsGroup = new KafkaMetricsGroup(DelayedFetchMetrics.getClass)
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.server"
private val metricsClassName = "DelayedFetchMetrics"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
private val FetcherTypeKey = "fetcherType"
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
Expand Down
Loading