Skip to content

KAFKA-18695: Remove quorum=kraft and kip932 from all integration tests #19633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.time.Duration;
Expand Down Expand Up @@ -137,9 +136,8 @@ public void close() throws Exception {
if (adminClient != null) adminClient.close();
}

@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Expand Down Expand Up @@ -224,9 +222,8 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) {
);
}

@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testElrMemberCanBeElected(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Expand Down Expand Up @@ -300,9 +297,8 @@ public void testElrMemberCanBeElected(String quorum) throws ExecutionException,
}
}

@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Expand Down Expand Up @@ -361,9 +357,8 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String quorum) throw
/*
This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed.
*/
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Expand Down
108 changes: 44 additions & 64 deletions core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
import org.junit.jupiter.api.{BeforeEach, Tag, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import org.junit.jupiter.params.provider.CsvSource

import java.util
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -73,9 +73,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithValidRetentionTime(): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
Expand All @@ -85,9 +84,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithValidRetentionSize(): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
Expand All @@ -97,9 +95,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInheritedLocalRetentionTime(): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -109,9 +106,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInheritedLocalRetentionSize(): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -121,9 +117,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInvalidRetentionTime(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInvalidRetentionTime(): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -133,9 +128,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig = topicConfig))
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInvalidRetentionSize(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInvalidRetentionSize(): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -145,9 +139,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig = topicConfig))
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateCompactedRemoteStorage(quorum: String): Unit = {
@Test
def testCreateCompactedRemoteStorage(): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")
Expand All @@ -158,8 +151,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {

// `remote.log.delete.on.disable` and `remote.log.copy.disable` only works in KRaft mode.
@ParameterizedTest
@CsvSource(Array("kraft,true,true", "kraft,true,false", "kraft,false,true", "kraft,false,false"))
def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(quorum: String, copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = {
@CsvSource(Array("true,true", "true,false", "false,true", "false,false"))
def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString)
topicConfig.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString)
Expand All @@ -169,9 +162,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

// `remote.log.delete.on.disable` only works in KRaft mode.
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(quorum: String): Unit = {
@Test
def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(): Unit = {
val testTopicName2 = testTopicName + "2"
val testTopicName3 = testTopicName + "3"
val errorMsgMs = "When `remote.log.copy.disable` is set to true, the `local.retention.ms` and `retention.ms` " +
Expand Down Expand Up @@ -235,9 +227,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
admin.incrementalAlterConfigs(configs).all().get()
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(quorum: String): Unit = {
@Test
def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(): Unit = {
val testTopicName2 = testTopicName + "2"
val testTopicName3 = testTopicName + "3"
val errorMsgBytes = "When `remote.log.copy.disable` is set to true, the `local.retention.bytes` and `retention.bytes` " +
Expand Down Expand Up @@ -300,9 +291,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
admin.incrementalAlterConfigs(configs).all().get()
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
@Test
def testEnableRemoteLogOnExistingTopicTest(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
Expand All @@ -318,9 +308,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testEnableRemoteLogWhenSystemRemoteStorageIsDisabled(quorum: String): Unit = {
@Test
def testEnableRemoteLogWhenSystemRemoteStorageIsDisabled(): Unit = {
val admin = createAdminClient()

val topicConfigWithRemoteStorage = new Properties()
Expand All @@ -342,9 +331,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
assertTrue(errorMessage.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithValidRetentionTimeTest(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithValidRetentionTimeTest(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -363,9 +351,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithValidRetentionSizeTest(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithValidRetentionSizeTest(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -384,9 +371,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionTime(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithInheritedLocalRetentionTime(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -404,9 +390,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get())
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionSize(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithInheritedLocalRetentionSize(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -425,9 +410,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

// The remote storage config validation on controller level only works in KRaft
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithDisablingRemoteStorage(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithDisablingRemoteStorage(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -446,9 +430,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -473,9 +456,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(newProps)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testTopicDeletion(quorum: String): Unit = {
@Test
def testTopicDeletion(): Unit = {
MyRemoteStorageManager.deleteSegmentEventCounter.set(0)
val numPartitions = 2
val topicConfig = new Properties()
Expand All @@ -492,9 +474,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
"Remote log segments should be deleted only once by the leader")
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum: String): Unit = {
@Test
def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")

Expand All @@ -510,9 +491,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
faultHandler.setIgnore(true)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum: String): Unit = {
@Test
def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import org.apache.kafka.server.policy.AlterConfigPolicy
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -79,9 +77,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
props.put(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[Policy])
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testValidAlterConfigs(quorum: String): Unit = {
@Test
def testValidAlterConfigs(): Unit = {
client = Admin.create(createConfig)
// Create topics
val topic1 = "describe-alter-configs-topic-1"
Expand All @@ -100,16 +97,14 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, topicResource1, topicResource2, maxMessageBytes, retentionMs)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigs(quorum: String): Unit = {
@Test
def testInvalidAlterConfigs(): Unit = {
client = Admin.create(createConfig)
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
@Test
def testInvalidAlterConfigsDueToPolicy(): Unit = {
client = Admin.create(createConfig)

// Create topics
Expand Down
Loading