Skip to content

Commit

Permalink
Merge pull request spring-projects#41335 from travisriegler
Browse files Browse the repository at this point in the history
* spring-projectsgh-41335:
  Polish "Group Kafka back-off properties"
  Group Kafka back-off properties

Closes spring-projectsgh-41335
  • Loading branch information
wilkinsona committed Jul 11, 2024
2 parents d07fe47 + 8707399 commit 0a3e799
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic.Backoff;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.ssl.SslBundles;
Expand Down Expand Up @@ -186,7 +186,7 @@ public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?>
.useSingleTopicForSameIntervals()
.suffixTopicsWithIndexValues()
.doNotAutoCreateRetryTopics();
setBackOffPolicy(builder, retryTopic);
setBackOffPolicy(builder, retryTopic.getBackoff());
return builder.create(kafkaTemplate);
}

Expand Down Expand Up @@ -214,15 +214,15 @@ private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties,
}
}

private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Backoff retryTopicBackoff) {
long delay = (retryTopicBackoff.getDelay() != null) ? retryTopicBackoff.getDelay().toMillis() : 0;
if (delay > 0) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
map.from(delay).to(backOffPolicy::delay);
map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier);
map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random);
map.from(retryTopicBackoff.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
map.from(retryTopicBackoff.getMultiplier()).to(backOffPolicy::multiplier);
map.from(retryTopicBackoff.isRandom()).to(backOffPolicy::random);
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.convert.DurationUnit;
Expand Down Expand Up @@ -1547,28 +1548,6 @@ public static class Topic {
*/
private int attempts = 3;

/**
* Canonical backoff period. Used as an initial value in the exponential case,
* and as a minimum value in the uniform case.
*/
private Duration delay = Duration.ofSeconds(1);

/**
* Multiplier to use for generating the next backoff delay.
*/
private double multiplier = 0.0;

/**
* Maximum wait between retries. If less than the delay then the default of 30
* seconds is applied.
*/
private Duration maxDelay = Duration.ZERO;

/**
* Whether to have the backoff delays.
*/
private boolean randomBackOff = false;

public boolean isEnabled() {
return this.enabled;
}
Expand All @@ -1585,36 +1564,113 @@ public void setAttempts(int attempts) {
this.attempts = attempts;
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.delay", since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public Duration getDelay() {
return this.delay;
return getBackoff().getDelay();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setDelay(Duration delay) {
this.delay = delay;
getBackoff().setDelay(delay);
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.multiplier",
since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public double getMultiplier() {
return this.multiplier;
return getBackoff().getMultiplier();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setMultiplier(double multiplier) {
this.multiplier = multiplier;
getBackoff().setMultiplier(multiplier);
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.maxDelay", since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public Duration getMaxDelay() {
return this.maxDelay;
return getBackoff().getMaxDelay();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setMaxDelay(Duration maxDelay) {
this.maxDelay = maxDelay;
getBackoff().setMaxDelay(maxDelay);
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.random", since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public boolean isRandomBackOff() {
return this.randomBackOff;
return getBackoff().isRandom();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setRandomBackOff(boolean randomBackOff) {
this.randomBackOff = randomBackOff;
getBackoff().setRandom(randomBackOff);
}

private final Backoff backoff = new Backoff();

public Backoff getBackoff() {
return this.backoff;
}

public static class Backoff {

/**
* Canonical backoff period. Used as an initial value in the exponential
* case, and as a minimum value in the uniform case.
*/
private Duration delay = Duration.ofSeconds(1);

/**
* Multiplier to use for generating the next backoff delay.
*/
private double multiplier = 0.0;

/**
* Maximum wait between retries. If less than the delay then the default
* of 30 seconds is applied.
*/
private Duration maxDelay = Duration.ZERO;

/**
* Whether to have the backoff delays.
*/
private boolean random = false;

public Duration getDelay() {
return this.delay;
}

public void setDelay(Duration delay) {
this.delay = delay;
}

public double getMultiplier() {
return this.multiplier;
}

public void setMultiplier(double multiplier) {
this.multiplier = multiplier;
}

public Duration getMaxDelay() {
return this.maxDelay;
}

public void setMaxDelay(Duration maxDelay) {
this.maxDelay = maxDelay;
}

public boolean isRandom() {
return this.random;
}

public void setRandom(boolean random) {
this.random = random;
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,22 @@ void retryTopicConfigurationIsNotEnabledByDefault() {

@Test
void retryTopicConfigurationWithExponentialBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.backoff.delay=100ms",
"spring.kafka.retry.topic.backoff.multiplier=2", "spring.kafka.retry.topic.backoff.max-delay=300ms")
.run((context) -> {
RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class);
assertThat(configuration.getDestinationTopicProperties()).hasSize(5)
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix)
.containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"),
tuple(300L, "-retry-2"), tuple(0L, "-dlt"));
});
}

@Test
@Deprecated(since = "3.4.0", forRemoval = true)
void retryTopicConfigurationWithExponentialBackOffUsingDeprecatedProperties() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
Expand Down Expand Up @@ -471,6 +487,18 @@ void retryTopicConfigurationWithDefaultProperties() {

@Test
void retryTopicConfigurationWithFixedBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=2s")
.run(assertRetryTopicConfiguration(
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
.extracting(DestinationTopic.Properties::delay)
.containsExactly(0L, 2000L, 0L)));
}

@Test
@Deprecated(since = "3.4.0", forRemoval = true)
void retryTopicConfigurationWithFixedBackOffUsingDeprecatedProperties() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s")
Expand All @@ -482,6 +510,18 @@ void retryTopicConfigurationWithFixedBackOff() {

@Test
void retryTopicConfigurationWithNoBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=0")
.run(assertRetryTopicConfiguration(
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
.extracting(DestinationTopic.Properties::delay)
.containsExactly(0L, 0L, 0L)));
}

@Test
@Deprecated(since = "3.4.0", forRemoval = true)
void retryTopicConfigurationWithNoBackOffUsingDeprecatedProperties() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0")
Expand Down

0 comments on commit 0a3e799

Please sign in to comment.