From 6ddcfab578245268239767295b416b0ce8a0a358 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 10 Jun 2025 17:36:27 +0100 Subject: [PATCH 1/4] KAFKA-19389: Fix memory consumption for completed share fetch requests (#19928) For ShareFetch Requests, the fetch happens through DelayedShareFetch operation. The operations which are already completed has reference to data being sent as response. As the operation is watched over multiple keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey, hence if the operation is already completed by either watched keys but then again the reference to the operation is still present in other watched key. Which means the memory can only be free once purge operation is triggered by DelayedOperationPurgatory which removes the watched key operation from remaining keys, as the operation is already completed. The purge operation is dependent on the config `ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG` hence if the value is not smaller than the number of share fetch requests which can consume complete memory of the broker then broker can go out of memory. This can also be avoided by having lower fetch max bytes for request but this value is client dependent hence can't rely to prevent the broker. This PR triggers the completion on both watched keys hence the DelayedShareFetch operation shall be removed from both keys which frees the broker memory as soon the share fetch response is sent. #### Testing Tested with LocalTieredStorage where broker goes OOM after reading some 8040 messages before the fix, with default configurations as mentioned in the doc [here](https://kafka.apache.org/documentation/#tiered_storage_config_ex). But after the fix the consumption continues without any issue. And the memory is released instantaneously. Reviewers: Jun Rao , Andrew Schofield --- .../kafka/server/share/DelayedShareFetch.java | 22 ++++++++++++++----- .../fetch/DelayedShareFetchPartitionKey.java | 5 +++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index fce50613becf8..2d01fb959cb10 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -34,6 +34,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; +import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey; import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy; import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.share.fetch.ShareFetchPartitionData; @@ -804,13 +805,22 @@ private void releasePartitionLocksAndAddToActionQueue(Set topi } // Releasing the lock to move ahead with the next request in queue. releasePartitionLocks(topicIdPartitions); - // If we have a fetch request completed for a topic-partition, we release the locks for that partition, - // then we should check if there is a pending share fetch request for the topic-partition and complete it. - // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if - // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> { + // If we have a fetch request completed for a share-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the share-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete. replicaManager.completeDelayedShareFetchRequest( - new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + // As DelayedShareFetch operation is watched over multiple keys, same operation might be + // completed and can contain references to data fetched. Hence, if the operation is not + // removed from other watched keys then there can be a memory leak. The removal of the + // operation is dependent on the purge task by DelayedOperationPurgatory. Hence, this can + // also be prevented by setting smaller value for configuration {@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}. + // However, it's best to trigger the check on all the keys that are being watched which + // should free the memory for the completed operation. + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchPartitionKey(topicIdPartition)); + })); } /** diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java index 584613cde17e4..c1e40975c2bce 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.share.fetch; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import java.util.Objects; @@ -27,6 +28,10 @@ public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey { private final Uuid topicId; private final int partition; + public DelayedShareFetchPartitionKey(TopicIdPartition topicIdPartition) { + this(topicIdPartition.topicId(), topicIdPartition.partition()); + } + public DelayedShareFetchPartitionKey(Uuid topicId, int partition) { this.topicId = topicId; this.partition = partition; From 3c50e23f1f395b23a90a0c9ad6999618b8af8332 Mon Sep 17 00:00:00 2001 From: Gaurav Narula Date: Tue, 10 Jun 2025 18:09:52 +0100 Subject: [PATCH 2/4] KAFKA-19221 Propagate IOException on LogSegment#close (#19607) Log segment closure results in right sizing the segment on disk along with the associated index files. This is specially important for TimeIndexes where a failure to right size may eventually cause log roll failures leading to under replication and log cleaner failures. This change uses `Utils.closeAll` which propagates exceptions, resulting in an "unclean" shutdown. That would then cause the broker to attempt to recover the log segment and the index on next startup, thereby avoiding the failures described above. Reviewers: Omnia Ibrahim , Jun Rao , Chia-Ping Tsai --- .../kafka/common/record/FileRecords.java | 4 ++ .../server/LogManagerIntegrationTest.java | 66 +++++++++++++++++++ .../storage/internals/log/AbstractIndex.java | 4 +- .../storage/internals/log/LogSegment.java | 5 +- .../storage/internals/log/LogSegments.java | 4 +- .../internals/log/TransactionIndex.java | 2 +- .../internals/log/LogSegmentsTest.java | 23 ++++++- .../internals/log/OffsetIndexTest.java | 1 - 8 files changed, 99 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 4ec7db604bcbe..2f5e2e50dde75 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -211,6 +211,10 @@ public void flush() throws IOException { * Close this record set */ public void close() throws IOException { + if (!channel.isOpen()) { + return; + } + flush(); trim(); channel.close(); diff --git a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java index 6a5e49825d741..15a57bbe22da0 100644 --- a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java @@ -34,9 +34,11 @@ import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.test.TestUtils; +import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -57,6 +59,70 @@ public LogManagerIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } + @ClusterTest(types = {Type.KRAFT}) + public void testIOExceptionOnLogSegmentCloseResultsInRecovery() throws IOException, InterruptedException, ExecutionException { + try (Admin admin = cluster.admin()) { + admin.createTopics(List.of(new NewTopic("foo", 1, (short) 1))).all().get(); + } + cluster.waitForTopic("foo", 1); + + // Produce some data into the topic + Map producerConfigs = Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName() + ); + + try (Producer producer = new KafkaProducer<>(producerConfigs)) { + producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get(); + producer.flush(); + } + + var broker = cluster.brokers().get(0); + + File timeIndexFile = broker.logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .activeSegment() + .timeIndexFile(); + + // Set read only so that we throw an IOException on shutdown + assertTrue(timeIndexFile.exists()); + assertTrue(timeIndexFile.setReadOnly()); + + broker.shutdown(); + + assertEquals(1, broker.config().logDirs().size()); + String logDir = broker.config().logDirs().get(0); + CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir); + assertFalse(cleanShutdownFileHandler.exists(), "Did not expect the clean shutdown file to exist"); + + // Ensure we have a corrupt index on broker shutdown + long maxIndexSize = broker.config().logIndexSizeMaxBytes(); + long expectedIndexSize = 12 * (maxIndexSize / 12); + assertEquals(expectedIndexSize, timeIndexFile.length()); + + // Allow write permissions before startup + assertTrue(timeIndexFile.setWritable(true)); + + broker.startup(); + // make sure there is no error during load logs + assertTrue(cluster.firstFatalException().isEmpty()); + try (Admin admin = cluster.admin()) { + TestUtils.waitForCondition(() -> { + List partitionInfos = admin.describeTopics(List.of("foo")) + .topicNameValues().get("foo").get().partitions(); + return partitionInfos.get(0).leader().id() == 0; + }, "Partition does not have a leader assigned"); + } + + // Ensure that sanity check does not fail + broker.logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .activeSegment() + .timeIndex() + .sanityCheck(); + } + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index bf1bd802f5e0c..f2e7c9830bd9e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -263,7 +263,9 @@ public boolean deleteIfExists() throws IOException { public void trimToValidSize() throws IOException { lock.lock(); try { - resize(entrySize() * entries); + if (mmap != null) { + resize(entrySize() * entries); + } } finally { lock.unlock(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index cead243408763..9a16eaa4a184f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -751,10 +751,7 @@ public Optional findOffsetByTimestamp(long times public void close() throws IOException { if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true)); - Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); - Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER); - Utils.closeQuietly(log, "log", LOGGER); - Utils.closeQuietly(txnIndex, "txnIndex", LOGGER); + Utils.closeAll(lazyOffsetIndex, lazyTimeIndex, log, txnIndex); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java index a79602d56d12e..4b89d7115a45d 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java @@ -17,6 +17,7 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; import java.io.File; @@ -104,8 +105,7 @@ public void clear() { */ @Override public void close() throws IOException { - for (LogSegment s : values()) - s.close(); + Utils.closeAll(values().toArray(new LogSegment[0])); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 9e28c253a5e9b..076dfa0627cd8 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -109,7 +109,7 @@ public void reset() throws IOException { public void close() throws IOException { FileChannel channel = channelOrNull(); - if (channel != null) + if (channel != null && channel.isOpen()) channel.close(); maybeChannel = Optional.empty(); } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java index fcf02cc7b764d..4136a76995aec 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java @@ -38,7 +38,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class LogSegmentsTest { @@ -47,7 +50,7 @@ public class LogSegmentsTest { /* create a segment with the given base offset */ private static LogSegment createSegment(Long offset) throws IOException { - return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM); + return spy(LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM)); } @BeforeEach @@ -274,4 +277,22 @@ public void testUpdateDir() throws IOException { } } + @Test + public void testCloseClosesAllLogSegmentsOnExceptionWhileClosingOne() throws IOException { + LogSegment seg1 = createSegment(0L); + LogSegment seg2 = createSegment(100L); + LogSegment seg3 = createSegment(200L); + LogSegments segments = new LogSegments(topicPartition); + segments.add(seg1); + segments.add(seg2); + segments.add(seg3); + + doThrow(new IOException("Failure")).when(seg2).close(); + + assertThrows(IOException.class, segments::close, "Expected IOException to be thrown"); + verify(seg1).close(); + verify(seg2).close(); + verify(seg3).close(); + } + } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java index 918e9dd409c09..ad7fa5908529c 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java @@ -225,7 +225,6 @@ public void forceUnmapTest() throws IOException { idx.forceUnmap(); // mmap should be null after unmap causing lookup to throw a NPE assertThrows(NullPointerException.class, () -> idx.lookup(1)); - assertThrows(NullPointerException.class, idx::close); } @Test From b311ac7dd5bce649fd5bd83a948f95c8c468a9aa Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Wed, 11 Jun 2025 12:57:01 +0530 Subject: [PATCH 3/4] Temporarily disable artifact publishing for the 4.1 branch. --- .semaphore/semaphore.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index bf94b3fcf4e92..6b746c4592c60 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -33,7 +33,7 @@ global_job_config: - name: PUBLISH value: "true" - name: ENABLE_PUBLISH_ARTIFACTS - value: "true" + value: "false" - name: ENABLE_DOWNSTREAM_TRIGGER value: "true" - name: DOWNSTREAM_PROJECTS From ddc30477a99c06d1c91f53bbf1230d32fadb98d5 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 11 Jun 2025 15:23:04 +0800 Subject: [PATCH 4/4] KAFKA-19359: force bump commons-beanutils for CVE-2025-48734 (#19939) Bump the commons-beanutils for CVE-2025-48734. Since `commons-validator` hasn't had new release with newer `commons-beanutils` versions, we manually bump it in kafka. Reviewers: Mickael Maison --- LICENSE-binary | 4 ++-- build.gradle | 2 ++ gradle/dependencies.gradle | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index fc87fb913f879..b0640c1bca77c 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -206,11 +206,11 @@ This project bundles some components that are also licensed under the Apache License Version 2.0: - caffeine-3.2.0 -- commons-beanutils-1.9.4 +- commons-beanutils-1.11.0 - commons-collections-3.2.2 - commons-digester-2.1 - commons-lang3-3.12.0 -- commons-logging-1.3.2 +- commons-logging-1.3.5 - commons-validator-1.9.0 - hash4j-0.22.0 - jackson-annotations-2.19.0 diff --git a/build.gradle b/build.gradle index b4e9cbb801133..767395ad32d0f 100644 --- a/build.gradle +++ b/build.gradle @@ -193,6 +193,8 @@ allprojects { // ensure we have a single version in the classpath despite transitive dependencies libs.scalaLibrary, libs.scalaReflect, + // Workaround before `commons-validator` has new release. See KAFKA-19359. + libs.commonsBeanutils, libs.jacksonAnnotations ) } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index cf519c4af8f07..cd6af7ee16880 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -57,6 +57,7 @@ versions += [ caffeine: "3.2.0", bndlib: "7.1.0", checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2", + commonsBeanutils: "1.11.0", commonsValidator: "1.9.0", classgraph: "4.8.179", gradle: "8.14.1", @@ -147,6 +148,7 @@ libs += [ bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib", caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", + commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",