Skip to content

Commit

Permalink
Polish "Group Kafka back-off properties"
Browse files Browse the repository at this point in the history
  • Loading branch information
wilkinsona committed Jul 11, 2024
1 parent 14c9893 commit 8707399
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1548,28 +1548,6 @@ public static class Topic {
*/
private int attempts = 3;

/**
* Canonical backoff period. Used as an initial value in the exponential case,
* and as a minimum value in the uniform case.
*/
private Duration delay = Duration.ofSeconds(1);

/**
* Multiplier to use for generating the next backoff delay.
*/
private double multiplier = 0.0;

/**
* Maximum wait between retries. If less than the delay then the default of 30
* seconds is applied.
*/
private Duration maxDelay = Duration.ZERO;

/**
* Whether to have the backoff delays.
*/
private boolean randomBackOff = false;

public boolean isEnabled() {
return this.enabled;
}
Expand Down Expand Up @@ -1620,8 +1598,7 @@ public void setMaxDelay(Duration maxDelay) {
getBackoff().setMaxDelay(maxDelay);
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.random",
since = "3.4.0")
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.random", since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public boolean isRandomBackOff() {
return getBackoff().isRandom();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,22 @@ void retryTopicConfigurationIsNotEnabledByDefault() {

@Test
void retryTopicConfigurationWithExponentialBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.backoff.delay=100ms",
"spring.kafka.retry.topic.backoff.multiplier=2", "spring.kafka.retry.topic.backoff.max-delay=300ms")
.run((context) -> {
RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class);
assertThat(configuration.getDestinationTopicProperties()).hasSize(5)
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix)
.containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"),
tuple(300L, "-retry-2"), tuple(0L, "-dlt"));
});
}

@Test
@Deprecated(since = "3.4.0", forRemoval = true)
void retryTopicConfigurationWithExponentialBackOffUsingDeprecatedProperties() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
Expand Down Expand Up @@ -471,6 +487,18 @@ void retryTopicConfigurationWithDefaultProperties() {

@Test
void retryTopicConfigurationWithFixedBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=2s")
.run(assertRetryTopicConfiguration(
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
.extracting(DestinationTopic.Properties::delay)
.containsExactly(0L, 2000L, 0L)));
}

@Test
@Deprecated(since = "3.4.0", forRemoval = true)
void retryTopicConfigurationWithFixedBackOffUsingDeprecatedProperties() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s")
Expand All @@ -482,6 +510,18 @@ void retryTopicConfigurationWithFixedBackOff() {

@Test
void retryTopicConfigurationWithNoBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=0")
.run(assertRetryTopicConfiguration(
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
.extracting(DestinationTopic.Properties::delay)
.containsExactly(0L, 0L, 0L)));
}

@Test
@Deprecated(since = "3.4.0", forRemoval = true)
void retryTopicConfigurationWithNoBackOffUsingDeprecatedProperties() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0")
Expand Down

0 comments on commit 8707399

Please sign in to comment.