diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 18e02adb854a..d2944a6a6942 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 the original author or authors. + * Copyright 2012-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic.Backoff; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.ssl.SslBundles; @@ -186,7 +186,7 @@ public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate .useSingleTopicForSameIntervals() .suffixTopicsWithIndexValues() .doNotAutoCreateRetryTopics(); - setBackOffPolicy(builder, retryTopic); + setBackOffPolicy(builder, retryTopic.getBackoff()); return builder.create(kafkaTemplate); } @@ -214,15 +214,15 @@ private void applyKafkaConnectionDetailsForAdmin(Map properties, } } - private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { - long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0; + private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Backoff retryTopicBackoff) { + long delay = (retryTopicBackoff.getDelay() != null) ? retryTopicBackoff.getDelay().toMillis() : 0; if (delay > 0) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder(); map.from(delay).to(backOffPolicy::delay); - map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay); - map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier); - map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random); + map.from(retryTopicBackoff.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay); + map.from(retryTopicBackoff.getMultiplier()).to(backOffPolicy::multiplier); + map.from(retryTopicBackoff.isRandom()).to(backOffPolicy::random); builder.customBackoff((SleepingBackOffPolicy) backOffPolicy.build()); } else { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index b7c5a5ea9d48..2a849fcda83c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.convert.DurationUnit; @@ -1547,28 +1548,6 @@ public static class Topic { */ private int attempts = 3; - /** - * Canonical backoff period. Used as an initial value in the exponential case, - * and as a minimum value in the uniform case. - */ - private Duration delay = Duration.ofSeconds(1); - - /** - * Multiplier to use for generating the next backoff delay. - */ - private double multiplier = 0.0; - - /** - * Maximum wait between retries. If less than the delay then the default of 30 - * seconds is applied. - */ - private Duration maxDelay = Duration.ZERO; - - /** - * Whether to have the backoff delays. - */ - private boolean randomBackOff = false; - public boolean isEnabled() { return this.enabled; } @@ -1585,36 +1564,113 @@ public void setAttempts(int attempts) { this.attempts = attempts; } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.delay", since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public Duration getDelay() { - return this.delay; + return getBackoff().getDelay(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setDelay(Duration delay) { - this.delay = delay; + getBackoff().setDelay(delay); } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.multiplier", + since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public double getMultiplier() { - return this.multiplier; + return getBackoff().getMultiplier(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setMultiplier(double multiplier) { - this.multiplier = multiplier; + getBackoff().setMultiplier(multiplier); } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.maxDelay", since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public Duration getMaxDelay() { - return this.maxDelay; + return getBackoff().getMaxDelay(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setMaxDelay(Duration maxDelay) { - this.maxDelay = maxDelay; + getBackoff().setMaxDelay(maxDelay); } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.random", since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public boolean isRandomBackOff() { - return this.randomBackOff; + return getBackoff().isRandom(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setRandomBackOff(boolean randomBackOff) { - this.randomBackOff = randomBackOff; + getBackoff().setRandom(randomBackOff); + } + + private final Backoff backoff = new Backoff(); + + public Backoff getBackoff() { + return this.backoff; + } + + public static class Backoff { + + /** + * Canonical backoff period. Used as an initial value in the exponential + * case, and as a minimum value in the uniform case. + */ + private Duration delay = Duration.ofSeconds(1); + + /** + * Multiplier to use for generating the next backoff delay. + */ + private double multiplier = 0.0; + + /** + * Maximum wait between retries. If less than the delay then the default + * of 30 seconds is applied. + */ + private Duration maxDelay = Duration.ZERO; + + /** + * Whether to have the backoff delays. + */ + private boolean random = false; + + public Duration getDelay() { + return this.delay; + } + + public void setDelay(Duration delay) { + this.delay = delay; + } + + public double getMultiplier() { + return this.multiplier; + } + + public void setMultiplier(double multiplier) { + this.multiplier = multiplier; + } + + public Duration getMaxDelay() { + return this.maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } + + public boolean isRandom() { + return this.random; + } + + public void setRandom(boolean random) { + this.random = random; + } + } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 0c3d0cced728..8e34ee8da56f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -442,6 +442,22 @@ void retryTopicConfigurationIsNotEnabledByDefault() { @Test void retryTopicConfigurationWithExponentialBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.backoff.delay=100ms", + "spring.kafka.retry.topic.backoff.multiplier=2", "spring.kafka.retry.topic.backoff.max-delay=300ms") + .run((context) -> { + RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class); + assertThat(configuration.getDestinationTopicProperties()).hasSize(5) + .extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) + .containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"), + tuple(300L, "-retry-2"), tuple(0L, "-dlt")); + }); + } + + @Test + @Deprecated(since = "3.4.0", forRemoval = true) + void retryTopicConfigurationWithExponentialBackOffUsingDeprecatedProperties() { this.contextRunner.withPropertyValues("spring.application.name=my-test-app", "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms", @@ -471,6 +487,18 @@ void retryTopicConfigurationWithDefaultProperties() { @Test void retryTopicConfigurationWithFixedBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=2s") + .run(assertRetryTopicConfiguration( + (configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay) + .containsExactly(0L, 2000L, 0L))); + } + + @Test + @Deprecated(since = "3.4.0", forRemoval = true) + void retryTopicConfigurationWithFixedBackOffUsingDeprecatedProperties() { this.contextRunner.withPropertyValues("spring.application.name=my-test-app", "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s") @@ -482,6 +510,18 @@ void retryTopicConfigurationWithFixedBackOff() { @Test void retryTopicConfigurationWithNoBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=0") + .run(assertRetryTopicConfiguration( + (configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay) + .containsExactly(0L, 0L, 0L))); + } + + @Test + @Deprecated(since = "3.4.0", forRemoval = true) + void retryTopicConfigurationWithNoBackOffUsingDeprecatedProperties() { this.contextRunner.withPropertyValues("spring.application.name=my-test-app", "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0")