Skip to content

Commit

Permalink
KREST-4591 add topic recreate logic to cluster test harness (confluen…
Browse files Browse the repository at this point in the history
…tinc#978)

* KREST-4591 add topic recreate logic to cluster test harness

Most of the test flakes in this class are due to failures in the setup and the topics not being created.

I can't reproduce this locally unfortunately, but I have put in a pile of retry logic to allow more time, and then have another try at creating the environment.

Also add in a test retry for a flakey test.

* KREST-4591 address review comments

* KREST-4591 Look again at topic recreate retry section

* KREST-4591 address review nits
  • Loading branch information
ehumber authored Jun 7, 2022
1 parent f233508 commit dc77403
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
public abstract class ClusterTestHarness {

private static final Logger log = LoggerFactory.getLogger(ClusterTestHarness.class);
private static final long HALF_SECOND_MILLIS = 500L;

public static final int DEFAULT_NUM_BROKERS = 1;

Expand Down Expand Up @@ -171,6 +172,15 @@ public Properties overrideSchemaRegistryProps(Properties props) {
@BeforeEach
public void setUp() throws Exception {
log.info("Starting setup of {}", getClass().getSimpleName());
setupMethod();
log.info("Completed setup of {}", getClass().getSimpleName());
}

// Calling setup() in this class calls the setup() from the calling sub-class, which includes the
// createTopic calls, which then causes an infinite loop on topic creation.
// Pulling out the functionality to a separate method so we can call it without this behaviour
// getting in the way.
private void setupMethod() throws Exception {
zookeeper = new EmbeddedZookeeper();
zkConnect = String.format("127.0.0.1:%d", zookeeper.port());
// start brokers concurrently
Expand Down Expand Up @@ -248,6 +258,7 @@ private void startRest() throws Exception {
}

private void stopRest() throws Exception {
restProperties.clear();
if (restApp != null) {
restApp.stop();
restApp.getMetrics().close();
Expand Down Expand Up @@ -333,6 +344,18 @@ protected Properties getBrokerProperties(int i) {
public void tearDown() throws Exception {
log.info("Starting teardown of {}", getClass().getSimpleName());
stopRest();
tearDownMethod();
log.info("Completed teardown of {}", getClass().getSimpleName());
}

private void tearDownMethod() throws Exception {

restProperties.clear();

schemaRegProperties.clear();
if (schemaRegApp != null) {
schemaRegApp.stop();
}

if (schemaRegServer != null) {
schemaRegServer.stop();
Expand All @@ -348,7 +371,6 @@ public void tearDown() throws Exception {
}

zookeeper.shutdown();
log.info("Completed teardown of {}", getClass().getSimpleName());
}

protected Invocation.Builder request(String path) {
Expand Down Expand Up @@ -443,24 +465,111 @@ protected final Set<String> getTopicNames() {
try {
return result.names().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(String.format("Failed to create topic: %s", e.getMessage()));
throw new RuntimeException(String.format("Failed to get topic: %s", e.getMessage()));
}
}

protected final void createTopic(String topicName, int numPartitions, short replicationFactor) {
Properties properties = restConfig.getAdminProperties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
createTopic(topicName, numPartitions, replicationFactor, properties);
createTopic(topicName, numPartitions, replicationFactor, restConfig.getAdminProperties());
}

protected final void createTopic(
String topicName, int numPartitions, short replicationFactor, Properties properties) {
AdminClient adminClient = AdminClient.create(properties);
createTopic(
topicName,
Optional.of(numPartitions),
Optional.of(replicationFactor),
Optional.empty(),
properties);
}

protected final void createTopic(
String topicName,
Optional<Integer> numPartitions,
Optional<Short> replicationFactor,
Optional<Map<Integer, List<Integer>>> replicasAssignments,
Properties properties) {

CreateTopicsResult result =
adminClient.createTopics(
Collections.singletonList(new NewTopic(topicName, numPartitions, replicationFactor)));
createTopicCall(
topicName, numPartitions, replicationFactor, replicasAssignments, properties);

try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
pause();
Set<String> topicNames = getTopicNames();
if (topicNames.size() == 0) { // Can restart because no topics exist yet
log.warn("Restarting the environment as topic creation failed the first time");
try {
tearDownMethod();
pause();
setupMethod();
pause();
} catch (Exception tearDownException) {
fail(String.format("Failed to create topic: %s", tearDownException.getMessage()));
}

result =
createTopicCall(
topicName, numPartitions, replicationFactor, replicasAssignments, properties);
getTopicCreateFutures(result);
} else if (!topicNames.stream()
.filter(returnedTopicName -> topicName.equals(returnedTopicName))
.findFirst()
.isPresent()) {
// The topic we are trying to make isn't the first topic to be created, the topic list isn't
// 0 length (that or an exception has been thrown, but the topic was created anyway).
// We can't easily restart the environment as we will lose the existing topics.
// While we could store them and recreate them all, for now, let's just wait a bit and have
// another go with the current topic.
log.warn("Topic creation failed the first time round, trying again.");
result =
createTopicCall(
topicName, numPartitions, replicationFactor, replicasAssignments, properties);
pause(); // It's struggling at this point, give it a little time
getTopicCreateFutures(result);
} else {
log.warn(
String.format(
"Exception thrown but topic %s has been created, carrying on: %s",
topicName, e.getMessage()));
}
if (!getTopicNames().stream()
.filter(returnedTopicName -> topicName.equals(returnedTopicName))
.findFirst()
.isPresent()) {
fail(String.format("Failed to create topic after retry: %s", topicNames));
}
}
}

private CreateTopicsResult createTopicCall(
String topicName,
Optional<Integer> numPartitions,
Optional<Short> replicationFactor,
Optional<Map<Integer, List<Integer>>> replicasAssignments,
Properties properties) {
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
AdminClient adminClient = AdminClient.create(properties);

if (replicasAssignments.isPresent()) {
return adminClient.createTopics(
Collections.singletonList(new NewTopic(topicName, replicasAssignments.get())));
} else {
return adminClient.createTopics(
Collections.singletonList(new NewTopic(topicName, numPartitions, replicationFactor)));
}
}

private void pause() {
try {
Thread.sleep(HALF_SECOND_MILLIS);
} catch (InterruptedException ie3) {
}
}

private void getTopicCreateFutures(CreateTopicsResult result) {
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
Expand Down Expand Up @@ -491,19 +600,12 @@ protected final void setTopicConfig(String topicName, String configName, String

protected final void createTopic(
String topicName, Map<Integer, List<Integer>> replicasAssignments) {
Properties properties = restConfig.getAdminProperties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
AdminClient adminClient = AdminClient.create(properties);

CreateTopicsResult result =
adminClient.createTopics(
Collections.singletonList(new NewTopic(topicName, replicasAssignments)));

try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
fail(String.format("Failed to create topic: %s", e.getMessage()));
}
createTopic(
topicName,
Optional.empty(),
Optional.empty(),
Optional.of(replicasAssignments),
restConfig.getAdminProperties());
}

protected final void alterPartitionReassignment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,16 @@ public void getTopic_existingClusterExistingTopic_returnsTopic() {
.setAuthorizedOperations(emptySet())
.build());

Response response =
request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_1)
.accept(MediaType.APPLICATION_JSON)
.get();
assertEquals(Status.OK.getStatusCode(), response.getStatus());

GetTopicResponse actual = response.readEntity(GetTopicResponse.class);
assertEquals(expected, actual);
testWithRetry(
() -> {
Response response =
request("/v3/clusters/" + clusterId + "/topics/" + TOPIC_1)
.accept(MediaType.APPLICATION_JSON)
.get();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
GetTopicResponse actual = response.readEntity(GetTopicResponse.class);
assertEquals(expected, actual);
});
}

@Test
Expand Down Expand Up @@ -678,15 +680,23 @@ public void createAndDelete_nonExisting_returnsNotFoundCreatedAndNotFound() {
.build()))
.build());

Response existingGetTopicConfigResponse =
request("/v3/clusters/" + clusterId + "/topics/" + topicName + "/configs/cleanup.policy")
.accept(MediaType.APPLICATION_JSON)
.get();
assertEquals(Status.OK.getStatusCode(), existingGetTopicConfigResponse.getStatus());
testWithRetry(
() -> {
Response existingGetTopicConfigResponse =
request(
"/v3/clusters/"
+ clusterId
+ "/topics/"
+ topicName
+ "/configs/cleanup.policy")
.accept(MediaType.APPLICATION_JSON)
.get();
assertEquals(Status.OK.getStatusCode(), existingGetTopicConfigResponse.getStatus());

GetTopicConfigResponse actualGetTopicConfigResponse =
existingGetTopicConfigResponse.readEntity(GetTopicConfigResponse.class);
assertEquals(expectedExistingGetTopicConfigResponse, actualGetTopicConfigResponse);
GetTopicConfigResponse actualGetTopicConfigResponse =
existingGetTopicConfigResponse.readEntity(GetTopicConfigResponse.class);
assertEquals(expectedExistingGetTopicConfigResponse, actualGetTopicConfigResponse);
});

Response deleteTopicResponse =
request("/v3/clusters/" + clusterId + "/topics/" + topicName)
Expand Down

0 comments on commit dc77403

Please sign in to comment.