Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -649,88 +649,115 @@ public void testMinIsrUpdateWithElr() throws Throwable {

// Unfence all brokers and create a topic foo (min ISR 2)
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(List.of(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor(replicationFactor),
new CreatableTopic().setName("bar").setNumPartitions(1).
setReplicationFactor(replicationFactor)
).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
Set.of("foo", "bar")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(BROKER.id())
.setResourceName("")
.setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
.setValue("2");
RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0)));

// Fence brokers
TestUtils.waitForCondition(() -> {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
// Heartbeat pumper
final java.util.concurrent.ScheduledExecutorService hbExec =
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
final java.util.concurrent.atomic.AtomicBoolean keepOnly =
new java.util.concurrent.atomic.AtomicBoolean(false);
final long periodMs = Math.max(50L, sessionTimeoutMillis / 3);

hbExec.scheduleAtFixedRate(() -> {
try {
if (keepOnly.get()) {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
} else {
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
}
return true;
}, sessionTimeoutMillis * 30,
"Fencing of brokers did not process within expected time"
);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}, 0L, periodMs, java.util.concurrent.TimeUnit.MILLISECONDS);

// Send another heartbeat to the brokers we want to keep alive
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
try {
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(List.of(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor(replicationFactor),
new CreatableTopic().setName("bar").setNumPartitions(1).
setReplicationFactor(replicationFactor)
).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
Set.of("foo", "bar")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(BROKER.id())
.setResourceName("")
.setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
.setValue("2");
RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0)));

// Before fencing wait, switch pumper to only keep brokersToKeepUnfenced alive
keepOnly.set(true);

// Fence brokers
TestUtils.waitForCondition(() -> {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
return true;
}, sessionTimeoutMillis * 30,
"Fencing of brokers did not process within expected time"
);

// At this point only the brokers we want to fence (broker 2, 3) should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
// Send another heartbeat to the brokers we want to keep alive
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);

// Verify the isr and elr for the topic partition
PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// At this point only the brokers we want to fence (broker 2, 3) should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);

// The ELR set is not determined but the size is 1.
assertEquals(1, partition.elr.length, partition.toString());
// Verify the isr and elr for the topic partition
PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());

// First, decrease the min ISR config to 1. This should clear the ELR fields.
ControllerResult<Map<ConfigResource, ApiError>> result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
// The ELR set is not determined but the size is 1.
assertEquals(1, partition.elr.length, partition.toString());

partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// First, decrease the min ISR config to 1. This should clear the ELR fields.
ControllerResult<Map<ConfigResource, ApiError>> result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));

// Second, let's try update config on cluster level with the other topic.
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
assertEquals(1, partition.elr.length, partition.toString());
partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());

result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
// Second, let's try update config on cluster level with the other topic.
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
assertEquals(1, partition.elr.length, partition.toString());

partition = active.replicationControl().getPartition(topicIdBar, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));

partition = active.replicationControl().getPartition(topicIdBar, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
} finally {
hbExec.shutdownNow();
}
}
}

Expand Down