Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ global_job_config:
- name: PUBLISH
value: "true"
- name: ENABLE_PUBLISH_ARTIFACTS
value: "true"
value: "false"
- name: ENABLE_DOWNSTREAM_TRIGGER
value: "true"
- name: DOWNSTREAM_PROJECTS
Expand Down
4 changes: 2 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ This project bundles some components that are also licensed under the Apache
License Version 2.0:

- caffeine-3.2.0
- commons-beanutils-1.9.4
- commons-beanutils-1.11.0
- commons-collections-3.2.2
- commons-digester-2.1
- commons-lang3-3.12.0
- commons-logging-1.3.2
- commons-logging-1.3.5
- commons-validator-1.9.0
- hash4j-0.22.0
- jackson-annotations-2.19.0
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ allprojects {
// ensure we have a single version in the classpath despite transitive dependencies
libs.scalaLibrary,
libs.scalaReflect,
// Workaround before `commons-validator` has new release. See KAFKA-19359.
libs.commonsBeanutils,
libs.jacksonAnnotations
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ public void flush() throws IOException {
* Close this record set
*/
public void close() throws IOException {
if (!channel.isOpen()) {
return;
}

flush();
trim();
channel.close();
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ versions += [
caffeine: "3.2.0",
bndlib: "7.1.0",
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
commonsBeanutils: "1.11.0",
commonsValidator: "1.9.0",
classgraph: "4.8.179",
gradle: "8.14.1",
Expand Down Expand Up @@ -147,6 +148,7 @@ libs += [
bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
import org.apache.kafka.test.TestUtils;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -57,6 +59,70 @@ public LogManagerIntegrationTest(ClusterInstance cluster) {
this.cluster = cluster;
}

@ClusterTest(types = {Type.KRAFT})
public void testIOExceptionOnLogSegmentCloseResultsInRecovery() throws IOException, InterruptedException, ExecutionException {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic("foo", 1, (short) 1))).all().get();
}
cluster.waitForTopic("foo", 1);

// Produce some data into the topic
Map<String, Object> producerConfigs = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
);

try (Producer<String, String> producer = new KafkaProducer<>(producerConfigs)) {
producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get();
producer.flush();
}

var broker = cluster.brokers().get(0);

File timeIndexFile = broker.logManager()
.getLog(new TopicPartition("foo", 0), false).get()
.activeSegment()
.timeIndexFile();

// Set read only so that we throw an IOException on shutdown
assertTrue(timeIndexFile.exists());
assertTrue(timeIndexFile.setReadOnly());

broker.shutdown();

assertEquals(1, broker.config().logDirs().size());
String logDir = broker.config().logDirs().get(0);
CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir);
assertFalse(cleanShutdownFileHandler.exists(), "Did not expect the clean shutdown file to exist");

// Ensure we have a corrupt index on broker shutdown
long maxIndexSize = broker.config().logIndexSizeMaxBytes();
long expectedIndexSize = 12 * (maxIndexSize / 12);
assertEquals(expectedIndexSize, timeIndexFile.length());

// Allow write permissions before startup
assertTrue(timeIndexFile.setWritable(true));

broker.startup();
// make sure there is no error during load logs
assertTrue(cluster.firstFatalException().isEmpty());
try (Admin admin = cluster.admin()) {
TestUtils.waitForCondition(() -> {
List<TopicPartitionInfo> partitionInfos = admin.describeTopics(List.of("foo"))
.topicNameValues().get("foo").get().partitions();
return partitionInfos.get(0).leader().id() == 0;
}, "Partition does not have a leader assigned");
}

// Ensure that sanity check does not fail
broker.logManager()
.getLog(new TopicPartition("foo", 0), false).get()
.activeSegment()
.timeIndex()
.sanityCheck();
}

@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ public boolean deleteIfExists() throws IOException {
public void trimToValidSize() throws IOException {
lock.lock();
try {
resize(entrySize() * entries);
if (mmap != null) {
resize(entrySize() * entries);
}
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,10 +751,7 @@ public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(long times
public void close() throws IOException {
if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN)
Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true));
Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER);
Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER);
Utils.closeQuietly(log, "log", LOGGER);
Utils.closeQuietly(txnIndex, "txnIndex", LOGGER);
Utils.closeAll(lazyOffsetIndex, lazyTimeIndex, log, txnIndex);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;

import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -104,8 +105,7 @@ public void clear() {
*/
@Override
public void close() throws IOException {
for (LogSegment s : values())
s.close();
Utils.closeAll(values().toArray(new LogSegment[0]));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void reset() throws IOException {

public void close() throws IOException {
FileChannel channel = channelOrNull();
if (channel != null)
if (channel != null && channel.isOpen())
channel.close();
maybeChannel = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class LogSegmentsTest {
Expand All @@ -47,7 +50,7 @@ public class LogSegmentsTest {

/* create a segment with the given base offset */
private static LogSegment createSegment(Long offset) throws IOException {
return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM);
return spy(LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM));
}

@BeforeEach
Expand Down Expand Up @@ -274,4 +277,22 @@ public void testUpdateDir() throws IOException {
}
}

@Test
public void testCloseClosesAllLogSegmentsOnExceptionWhileClosingOne() throws IOException {
LogSegment seg1 = createSegment(0L);
LogSegment seg2 = createSegment(100L);
LogSegment seg3 = createSegment(200L);
LogSegments segments = new LogSegments(topicPartition);
segments.add(seg1);
segments.add(seg2);
segments.add(seg3);

doThrow(new IOException("Failure")).when(seg2).close();

assertThrows(IOException.class, segments::close, "Expected IOException to be thrown");
verify(seg1).close();
verify(seg2).close();
verify(seg3).close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ public void forceUnmapTest() throws IOException {
idx.forceUnmap();
// mmap should be null after unmap causing lookup to throw a NPE
assertThrows(NullPointerException.class, () -> idx.lookup(1));
assertThrows(NullPointerException.class, idx::close);
}

@Test
Expand Down