From 6328eed37a16acbc5c9177755cd682edbb4313b2 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sat, 12 Oct 2024 11:38:10 +0200 Subject: [PATCH 1/6] locsk --- .../spring-modulith-events-core/pom.xml | 6 ++++- .../EventPublicationAutoConfiguration.java | 1 + ...PersistentApplicationEventMulticaster.java | 26 +++---------------- ...tApplicationEventMulticasterUnitTests.java | 3 ++- 4 files changed, 11 insertions(+), 25 deletions(-) diff --git a/spring-modulith-events/spring-modulith-events-core/pom.xml b/spring-modulith-events/spring-modulith-events-core/pom.xml index 34cafd1d..b06d6b41 100644 --- a/spring-modulith-events/spring-modulith-events-core/pom.xml +++ b/spring-modulith-events/spring-modulith-events-core/pom.xml @@ -38,7 +38,11 @@ org.springframework spring-aop - + + org.springframework.boot + spring-boot-starter-integration + true + org.springframework.boot spring-boot-autoconfigure diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationAutoConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationAutoConfiguration.java index 308e6f0b..f90315ef 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationAutoConfiguration.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationAutoConfiguration.java @@ -54,6 +54,7 @@ * @author Oliver Drotbohm * @author Björn Kieling * @author Dmitry Belyaev + * @author Josh Long */ @AutoConfiguration @Import(AsyncEnablingConfiguration.class) diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java index cd135911..b546b178 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java @@ -55,13 +55,13 @@ * application restart or via a schedule. *

* Republication is handled in {@link #afterSingletonsInstantiated()} inspecting the {@link EventPublicationRegistry} - * for incomplete publications and + * for incomplete publications and publishing them all. * * @author Oliver Drotbohm * @see CompletionRegisteringAdvisor */ public class PersistentApplicationEventMulticaster extends AbstractApplicationEventMulticaster - implements IncompleteEventPublications, SmartInitializingSingleton { + implements IncompleteEventPublications { private static final Logger LOGGER = LoggerFactory.getLogger(PersistentApplicationEventMulticaster.class); private static final Method LEGACY_SHOULD_HANDLE = ReflectionUtils.findMethod(ApplicationListenerMethodAdapter.class, @@ -69,8 +69,6 @@ public class PersistentApplicationEventMulticaster extends AbstractApplicationEv private static final Method SHOULD_HANDLE = ReflectionUtils.findMethod(ApplicationListenerMethodAdapter.class, "shouldHandle", ApplicationEvent.class); - static final String REPUBLISH_ON_RESTART = "spring.modulith.events.republish-outstanding-events-on-restart"; - static final String REPUBLISH_ON_RESTART_LEGACY = "spring.modulith.republish-outstanding-events-on-restart"; private final @NonNull Supplier registry; private final @NonNull Supplier environment; @@ -164,25 +162,7 @@ public void resubmitIncompletePublicationsOlderThan(Duration duration) { doResubmitUncompletedPublicationsOlderThan(duration, __ -> true); } - /* - * (non-Javadoc) - * @see org.springframework.beans.factory.SmartInitializingSingleton#afterSingletonsInstantiated() - */ - @Override - public void afterSingletonsInstantiated() { - - var env = environment.get(); - - Boolean republishOnRestart = Optional.ofNullable(env.getProperty(REPUBLISH_ON_RESTART, Boolean.class)) - .orElseGet(() -> env.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class)); - - if (!Boolean.TRUE.equals(republishOnRestart)) { - return; - } - - resubmitIncompletePublications(__ -> true); - } - + private void invokeTargetListener(TargetEventPublication publication) { var listeners = new TransactionalEventListeners( diff --git a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterUnitTests.java b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterUnitTests.java index 10cc5fe9..6465e4aa 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterUnitTests.java +++ b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterUnitTests.java @@ -56,6 +56,7 @@ void setUp() { this.multicaster = new PersistentApplicationEventMulticaster(() -> registry, () -> environment); } + /* @Test // GH-240, GH-251 void doesNotRepublishEventsOnRestartByDefault() { @@ -87,7 +88,7 @@ void triggersRepublicationIfLegacyConfigExplicitlyEnabled() { verify(registry).processIncompletePublications(any(), any(), any()); } - + */ @Test // GH-277 void honorsListenerCondition() throws Exception { From 28769ffebfe586321ff69056375b51eb84e2e5bd Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sat, 12 Oct 2024 11:38:41 +0200 Subject: [PATCH 2/6] first\ cut --- .../config/IntegrationLockConfiguration.java | 16 +++++ ...tIncompleteEventPublicationsProcessor.java | 62 +++++++++++++++++++ ...eIncompleteEventPublicationsProcessor.java | 47 ++++++++++++++ 3 files changed, 125 insertions(+) create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java new file mode 100644 index 00000000..61435300 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java @@ -0,0 +1,16 @@ +package org.springframework.modulith.events.config; + +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.integration.support.locks.LockRegistry; + +/** + * @author Josh Long + */ +@AutoConfiguration +@ConditionalOnClass (LockRegistry.class) +class IntegrationLockConfiguration { + + + +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java new file mode 100644 index 00000000..7d304006 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java @@ -0,0 +1,62 @@ +package org.springframework.modulith.events.support; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.env.Environment; +import org.springframework.modulith.events.IncompleteEventPublications; + +import java.util.Optional; +import java.util.function.Supplier; + + +/** + * The default implementation when {@code spring.modulith.events.republish-outstanding-events-on-restart} is set to true. + * + * @author Oliver Drotbohm + * @author Josh Long + */ +public class DefaultIncompleteEventPublicationsProcessor implements ApplicationRunner { + + static final String REPUBLISH_ON_RESTART = "spring.modulith.events.republish-outstanding-events-on-restart"; + + static final String REPUBLISH_ON_RESTART_LEGACY = "spring.modulith.republish-outstanding-events-on-restart"; + + private final Supplier environment; + + private final Supplier incompleteEventPublicationsSupplier; + + DefaultIncompleteEventPublicationsProcessor(Supplier environment, Supplier incompleteEventPublicationsSupplier) { + this.environment = environment; + this.incompleteEventPublicationsSupplier = incompleteEventPublicationsSupplier; + } + + + // todo can we live with ApplicationRunner#run? or does it have to + // be SmartInitializingSingleton, which runs, notably, before the Boot Actuator health check is healthy and + // could prevent the app from starting up in time? + @Override + public void run(ApplicationArguments args) throws Exception { + + + var env = environment.get(); + + var republishOnRestart = Optional.ofNullable(env.getProperty(REPUBLISH_ON_RESTART, Boolean.class)) + .orElseGet(() -> env.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class)); + + if (!Boolean.TRUE.equals(republishOnRestart)) { + return; + } + try { + this.process(); + }// + catch (Throwable e) { + throw new RuntimeException(e); + } + } + + + protected void process() throws Throwable { + this.incompleteEventPublicationsSupplier.get().resubmitIncompletePublications(a -> true); + } +} + diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java new file mode 100644 index 00000000..4c5bee25 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java @@ -0,0 +1,47 @@ +package org.springframework.modulith.events.support; + +import org.springframework.core.env.Environment; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.modulith.events.IncompleteEventPublications; + +import java.time.Duration; +import java.util.function.Supplier; + +/** + * uses Spring Integration's {@link LockRegistry} to obtain + * an exclusive, cluster-wide lock before resubmitting incomplete event publications. + * + * @author Josh Long + */ + +class ExclusiveIncompleteEventPublicationsProcessor extends DefaultIncompleteEventPublicationsProcessor { + + static final String REPUBLISH_ON_RESTART_LOCK = "spring.modulith.events.republish-outstanding-events-on-restart.lock-name"; + static final String REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS = "spring.modulith.events.republish-outstanding-events-on-restart.lock-timeout"; + + private final Supplier lockRegistrySupplier; + private final Supplier environmentSupplier; + + ExclusiveIncompleteEventPublicationsProcessor( + Supplier lockRegistrySupplier, + Supplier environment, Supplier incompleteEventPublicationsSupplier) { + super(environment, incompleteEventPublicationsSupplier); + this.lockRegistrySupplier = lockRegistrySupplier; + this.environmentSupplier = environment; + } + + @Override + public void process() throws Throwable { + var environment = this.environmentSupplier.get(); + var lockRegistry = this.lockRegistrySupplier.get(); + var lockName = environment.getProperty(REPUBLISH_ON_RESTART_LOCK); + var timeoutInMilliseconds = (long) environment.getProperty( + REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS, Long.class, -1L); + lockRegistry.executeLocked(lockName, Duration.ofMillis(timeoutInMilliseconds == -1? 1_000 : timeoutInMilliseconds), () -> { + super.process(); + return null; + }); + } + + +} From c384394fbc4f32cb6806e81acc8793934d16cbd5 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sat, 12 Oct 2024 12:44:36 +0200 Subject: [PATCH 3/6] locks tw --- .../spring-modulith-events-core/pom.xml | 6 +- .../config/IntegrationLockConfiguration.java | 16 ----- ...tIncompleteEventPublicationsProcessor.java | 62 ------------------- ...eIncompleteEventPublicationsProcessor.java | 47 -------------- .../spring-configuration-metadata.json | 14 +++++ ...ot.autoconfigure.AutoConfiguration.imports | 2 + 6 files changed, 19 insertions(+), 128 deletions(-) delete mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java delete mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java delete mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java diff --git a/spring-modulith-events/spring-modulith-events-core/pom.xml b/spring-modulith-events/spring-modulith-events-core/pom.xml index b06d6b41..a5a45375 100644 --- a/spring-modulith-events/spring-modulith-events-core/pom.xml +++ b/spring-modulith-events/spring-modulith-events-core/pom.xml @@ -39,9 +39,9 @@ spring-aop - org.springframework.boot - spring-boot-starter-integration - true + org.springframework.integration + spring-integration-jdbc + org.springframework.boot diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java deleted file mode 100644 index 61435300..00000000 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/IntegrationLockConfiguration.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.springframework.modulith.events.config; - -import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.integration.support.locks.LockRegistry; - -/** - * @author Josh Long - */ -@AutoConfiguration -@ConditionalOnClass (LockRegistry.class) -class IntegrationLockConfiguration { - - - -} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java deleted file mode 100644 index 7d304006..00000000 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DefaultIncompleteEventPublicationsProcessor.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.springframework.modulith.events.support; - -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.core.env.Environment; -import org.springframework.modulith.events.IncompleteEventPublications; - -import java.util.Optional; -import java.util.function.Supplier; - - -/** - * The default implementation when {@code spring.modulith.events.republish-outstanding-events-on-restart} is set to true. - * - * @author Oliver Drotbohm - * @author Josh Long - */ -public class DefaultIncompleteEventPublicationsProcessor implements ApplicationRunner { - - static final String REPUBLISH_ON_RESTART = "spring.modulith.events.republish-outstanding-events-on-restart"; - - static final String REPUBLISH_ON_RESTART_LEGACY = "spring.modulith.republish-outstanding-events-on-restart"; - - private final Supplier environment; - - private final Supplier incompleteEventPublicationsSupplier; - - DefaultIncompleteEventPublicationsProcessor(Supplier environment, Supplier incompleteEventPublicationsSupplier) { - this.environment = environment; - this.incompleteEventPublicationsSupplier = incompleteEventPublicationsSupplier; - } - - - // todo can we live with ApplicationRunner#run? or does it have to - // be SmartInitializingSingleton, which runs, notably, before the Boot Actuator health check is healthy and - // could prevent the app from starting up in time? - @Override - public void run(ApplicationArguments args) throws Exception { - - - var env = environment.get(); - - var republishOnRestart = Optional.ofNullable(env.getProperty(REPUBLISH_ON_RESTART, Boolean.class)) - .orElseGet(() -> env.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class)); - - if (!Boolean.TRUE.equals(republishOnRestart)) { - return; - } - try { - this.process(); - }// - catch (Throwable e) { - throw new RuntimeException(e); - } - } - - - protected void process() throws Throwable { - this.incompleteEventPublicationsSupplier.get().resubmitIncompletePublications(a -> true); - } -} - diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java deleted file mode 100644 index 4c5bee25..00000000 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/ExclusiveIncompleteEventPublicationsProcessor.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.springframework.modulith.events.support; - -import org.springframework.core.env.Environment; -import org.springframework.integration.support.locks.LockRegistry; -import org.springframework.modulith.events.IncompleteEventPublications; - -import java.time.Duration; -import java.util.function.Supplier; - -/** - * uses Spring Integration's {@link LockRegistry} to obtain - * an exclusive, cluster-wide lock before resubmitting incomplete event publications. - * - * @author Josh Long - */ - -class ExclusiveIncompleteEventPublicationsProcessor extends DefaultIncompleteEventPublicationsProcessor { - - static final String REPUBLISH_ON_RESTART_LOCK = "spring.modulith.events.republish-outstanding-events-on-restart.lock-name"; - static final String REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS = "spring.modulith.events.republish-outstanding-events-on-restart.lock-timeout"; - - private final Supplier lockRegistrySupplier; - private final Supplier environmentSupplier; - - ExclusiveIncompleteEventPublicationsProcessor( - Supplier lockRegistrySupplier, - Supplier environment, Supplier incompleteEventPublicationsSupplier) { - super(environment, incompleteEventPublicationsSupplier); - this.lockRegistrySupplier = lockRegistrySupplier; - this.environmentSupplier = environment; - } - - @Override - public void process() throws Throwable { - var environment = this.environmentSupplier.get(); - var lockRegistry = this.lockRegistrySupplier.get(); - var lockName = environment.getProperty(REPUBLISH_ON_RESTART_LOCK); - var timeoutInMilliseconds = (long) environment.getProperty( - REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS, Long.class, -1L); - lockRegistry.executeLocked(lockName, Duration.ofMillis(timeoutInMilliseconds == -1? 1_000 : timeoutInMilliseconds), () -> { - super.process(); - return null; - }); - } - - -} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring-configuration-metadata.json b/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring-configuration-metadata.json index 2683dedb..4d0cdfa9 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring-configuration-metadata.json @@ -6,6 +6,20 @@ "description": "Whether to configure defaults for the async processing termination, namely to wait for task completion for 2 seconds. See TaskExecutionProperties for details.", "defaultValue": "true" }, + + { + "name": "spring.modulith.events.republish-outstanding-events-on-restart.lock-timeout", + "type": "java.lang.Long", + "description": "How long to wait to try to acquire the shared Spring Integration Lock before attempting to proces incomplete event publications on startup", + "defaultValue": "true" + }, + + { + "name": "spring.modulith.events.republish-outstanding-events-on-restart.lock-name", + "type" : "java.lang.String", + "description" : "the name of the shared Spring Integration Lock to acquire before attempting to process incomplete event publications on startup" + }, + { "name": "spring.modulith.republish-outstanding-events-on-restart", "type": "java.lang.Boolean", diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index e5a67a4e..9964fda6 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-modulith-events/spring-modulith-events-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,4 @@ org.springframework.modulith.events.config.EventPublicationAutoConfiguration org.springframework.modulith.events.config.EventExternalizationAutoConfiguration +org.springframework.modulith.events.config.restart.ExclusiveIncompleteEventPublicationsProcessorConfiguration +org.springframework.modulith.events.config.restart.DefaultIncompleteEventPublicationsProcessorConfiguration From ac3cd2e52767a25b53eddf62124e69fb6ea192d3 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sat, 12 Oct 2024 12:45:27 +0200 Subject: [PATCH 4/6] locks ftw --- ...entPublicationsProcessorConfiguration.java | 38 ++++++++++++ ...entPublicationsProcessorConfiguration.java | 61 +++++++++++++++++++ ...tIncompleteEventPublicationsProcessor.java | 47 ++++++++++++++ ...eIncompleteEventPublicationsProcessor.java | 38 ++++++++++++ .../IncompleteEventPublicationsProcessor.java | 8 +++ 5 files changed, 192 insertions(+) create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java new file mode 100644 index 00000000..1819a78a --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java @@ -0,0 +1,38 @@ +package org.springframework.modulith.events.config.restart; + +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.core.env.Environment; +import org.springframework.modulith.events.IncompleteEventPublications; +import org.springframework.modulith.events.support.restart.DefaultIncompleteEventPublicationsProcessor; +import org.springframework.modulith.events.support.restart.IncompleteEventPublicationsProcessor; + +import java.util.Optional; + +/** + * + * Default behavior configured to resubmit all incomplete events. + * + * @author Josh Long + */ +@AutoConfiguration +@AutoConfigureAfter (ExclusiveIncompleteEventPublicationsProcessorConfiguration.class) +class DefaultIncompleteEventPublicationsProcessorConfiguration { + + static final String REPUBLISH_ON_RESTART = "spring.modulith.events.republish-outstanding-events-on-restart"; + + static final String REPUBLISH_ON_RESTART_LEGACY = "spring.modulith.republish-outstanding-events-on-restart"; + + @Bean + @ConditionalOnBean (IncompleteEventPublications.class) + @ConditionalOnMissingBean (IncompleteEventPublicationsProcessor.class) + DefaultIncompleteEventPublicationsProcessor defaultIncompleteEventPublicationsProcessor(IncompleteEventPublications publications , Environment environment ) { + var republishOnRestart = Optional.ofNullable(environment.getProperty(REPUBLISH_ON_RESTART, Boolean.class)) + .orElseGet(() -> environment.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class)); + return new DefaultIncompleteEventPublicationsProcessor( + Boolean.TRUE.equals(republishOnRestart), publications); + } +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java new file mode 100644 index 00000000..f507c84b --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java @@ -0,0 +1,61 @@ +package org.springframework.modulith.events.config.restart; + +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.core.env.Environment; +import org.springframework.integration.jdbc.lock.DefaultLockRepository; +import org.springframework.integration.jdbc.lock.JdbcLockRegistry; +import org.springframework.integration.jdbc.lock.LockRepository; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.modulith.events.IncompleteEventPublications; +import org.springframework.modulith.events.support.restart.ExclusiveIncompleteEventPublicationsProcessor; +import org.springframework.modulith.events.support.restart.IncompleteEventPublicationsProcessor; +import org.springframework.util.StringUtils; + +import javax.sql.DataSource; + +/** + * this configures a default Spring Integration lock delegating to a SQL database for cluster-wide locks. + * + * @author Josh Long + */ +@AutoConfiguration +@ConditionalOnClass({JdbcLockRegistry.class, JdbcTemplate.class, LockRegistry.class}) +@ConditionalOnMissingBean(LockRegistry.class) +@ConditionalOnProperty(value = ExclusiveIncompleteEventPublicationsProcessorConfiguration.REPUBLISH_ON_RESTART_LOCK, matchIfMissing = false) +@ConditionalOnBean(DataSource.class) +@AutoConfigureBefore(DefaultIncompleteEventPublicationsProcessorConfiguration.class) +class ExclusiveIncompleteEventPublicationsProcessorConfiguration { + + static final String REPUBLISH_ON_RESTART_LOCK = "spring.modulith.events.republish-outstanding-events-on-restart.lock-name"; + static final String REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS = "spring.modulith.events.republish-outstanding-events-on-restart.lock-timeout"; + + @Bean + @ConditionalOnBean(DataSource.class) + DefaultLockRepository defaultLockRepository(DataSource dataSource) { + return new DefaultLockRepository(dataSource); + } + + @Bean + JdbcLockRegistry jdbcLockRegistry(LockRepository repository) { + return new JdbcLockRegistry(repository); + } + + @Bean + @ConditionalOnMissingBean(IncompleteEventPublicationsProcessor.class) + @ConditionalOnBean(IncompleteEventPublications.class) + ExclusiveIncompleteEventPublicationsProcessor exclusiveIncompleteEventPublicationsProcessor( + LockRegistry lockRegistry, Environment environment, + IncompleteEventPublications publications) { + var lockName = environment.getProperty(REPUBLISH_ON_RESTART_LOCK); + var timeoutInMilliseconds = (long) environment.getProperty( + REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS, Long.class, -1L); + return new ExclusiveIncompleteEventPublicationsProcessor(StringUtils.hasText(lockName), lockName, timeoutInMilliseconds, publications, lockRegistry); + } +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java new file mode 100644 index 00000000..e9fc1121 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java @@ -0,0 +1,47 @@ +package org.springframework.modulith.events.support.restart; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.modulith.events.IncompleteEventPublications; + + +/** + * The default implementation when {@code spring.modulith.events.republish-outstanding-events-on-restart} is set to true. + * + * @author Oliver Drotbohm + * @author Josh Long + */ +public class DefaultIncompleteEventPublicationsProcessor implements + IncompleteEventPublicationsProcessor, ApplicationRunner { + + private final boolean republishOnRestart; + private final IncompleteEventPublications publications; + + public DefaultIncompleteEventPublicationsProcessor(boolean republishOnRestart, IncompleteEventPublications publications) { + this.republishOnRestart = republishOnRestart; + this.publications = publications; + } + + // todo can we live with ApplicationRunner#run? or does it have to + // be SmartInitializingSingleton, which runs, notably, before the Boot Actuator health check is healthy and + // could prevent the app from starting up in time? + @Override + public void run(ApplicationArguments args) throws Exception { + + if (!republishOnRestart) { + return; + } + try { + this.process(); + }// + catch (Throwable e) { + throw new RuntimeException(e); + } + } + + + protected void process() throws Throwable { + this.publications.resubmitIncompletePublications(a -> true); + } +} + diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java new file mode 100644 index 00000000..e5490841 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java @@ -0,0 +1,38 @@ +package org.springframework.modulith.events.support.restart; + +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.modulith.events.IncompleteEventPublications; + +import java.time.Duration; + +/** + * uses Spring Integration's {@link LockRegistry} to obtain + * an exclusive, cluster-wide lock before resubmitting incomplete event publications. + * + * @author Josh Long + */ +public class ExclusiveIncompleteEventPublicationsProcessor extends DefaultIncompleteEventPublicationsProcessor + implements IncompleteEventPublicationsProcessor { + + private final String lockName; + private final long timeoutInMilliseconds; + private final LockRegistry registry; + + public ExclusiveIncompleteEventPublicationsProcessor( + boolean republishOnRestart, String lockName, long timeoutInMilliseconds, IncompleteEventPublications publications, LockRegistry registry) { + super(republishOnRestart, publications); + this.lockName = lockName; + this.timeoutInMilliseconds = timeoutInMilliseconds; + this.registry = registry; + } + + @Override + public void process() throws Throwable { + registry.executeLocked(lockName, Duration.ofMillis(timeoutInMilliseconds == -1 ? 1_000 : timeoutInMilliseconds), () -> { + super.process(); + return null; + }); + } + + +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java new file mode 100644 index 00000000..ef6a4bfe --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java @@ -0,0 +1,8 @@ +package org.springframework.modulith.events.support.restart; + +/** + * Marker interface for components that, on restart, process incomplete event publications + * @author Josh Long + */ +public interface IncompleteEventPublicationsProcessor { +} From 6bcc9c4d02b92915020869127001a24a75ef4c20 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sat, 12 Oct 2024 12:47:58 +0200 Subject: [PATCH 5/6] processors --- ...teEventPublicationsProcessorConfiguration.java | 15 +++++++-------- .../IncompleteEventPublicationsProcessor.java | 3 ++- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java index 1819a78a..dca891c0 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java @@ -13,13 +13,12 @@ import java.util.Optional; /** - * * Default behavior configured to resubmit all incomplete events. - * - * @author Josh Long + * + * @author Josh Long */ @AutoConfiguration -@AutoConfigureAfter (ExclusiveIncompleteEventPublicationsProcessorConfiguration.class) +@AutoConfigureAfter(ExclusiveIncompleteEventPublicationsProcessorConfiguration.class) class DefaultIncompleteEventPublicationsProcessorConfiguration { static final String REPUBLISH_ON_RESTART = "spring.modulith.events.republish-outstanding-events-on-restart"; @@ -27,11 +26,11 @@ class DefaultIncompleteEventPublicationsProcessorConfiguration { static final String REPUBLISH_ON_RESTART_LEGACY = "spring.modulith.republish-outstanding-events-on-restart"; @Bean - @ConditionalOnBean (IncompleteEventPublications.class) - @ConditionalOnMissingBean (IncompleteEventPublicationsProcessor.class) - DefaultIncompleteEventPublicationsProcessor defaultIncompleteEventPublicationsProcessor(IncompleteEventPublications publications , Environment environment ) { + @ConditionalOnBean(IncompleteEventPublications.class) + @ConditionalOnMissingBean(IncompleteEventPublicationsProcessor.class) + DefaultIncompleteEventPublicationsProcessor defaultIncompleteEventPublicationsProcessor(IncompleteEventPublications publications, Environment environment) { var republishOnRestart = Optional.ofNullable(environment.getProperty(REPUBLISH_ON_RESTART, Boolean.class)) - .orElseGet(() -> environment.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class)); + .orElseGet(() -> environment.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class)); return new DefaultIncompleteEventPublicationsProcessor( Boolean.TRUE.equals(republishOnRestart), publications); } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java index ef6a4bfe..d839cacb 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java @@ -2,7 +2,8 @@ /** * Marker interface for components that, on restart, process incomplete event publications - * @author Josh Long + * + * @author Josh Long */ public interface IncompleteEventPublicationsProcessor { } From 5ba3ceaa1183f6f5f72a95156bf9873f362db36f Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sat, 12 Oct 2024 12:49:16 +0200 Subject: [PATCH 6/6] processors --- ...eEventPublicationsProcessorConfiguration.java | 15 +++++++++++++++ ...eEventPublicationsProcessorConfiguration.java | 15 +++++++++++++++ ...aultIncompleteEventPublicationsProcessor.java | 16 +++++++++++++++- ...siveIncompleteEventPublicationsProcessor.java | 15 +++++++++++++++ .../IncompleteEventPublicationsProcessor.java | 15 +++++++++++++++ 5 files changed, 75 insertions(+), 1 deletion(-) diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java index dca891c0..7d818070 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/DefaultIncompleteEventPublicationsProcessorConfiguration.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.modulith.events.config.restart; import org.springframework.boot.autoconfigure.AutoConfiguration; diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java index f507c84b..b8fbeb2b 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/restart/ExclusiveIncompleteEventPublicationsProcessorConfiguration.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.modulith.events.config.restart; import org.springframework.boot.autoconfigure.AutoConfiguration; diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java index e9fc1121..5b8fcfc4 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/DefaultIncompleteEventPublicationsProcessor.java @@ -1,10 +1,24 @@ +/* + * Copyright 2017-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.modulith.events.support.restart; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.modulith.events.IncompleteEventPublications; - /** * The default implementation when {@code spring.modulith.events.republish-outstanding-events-on-restart} is set to true. * diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java index e5490841..cbc8f178 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/ExclusiveIncompleteEventPublicationsProcessor.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.modulith.events.support.restart; import org.springframework.integration.support.locks.LockRegistry; diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java index d839cacb..bab5eabb 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/restart/IncompleteEventPublicationsProcessor.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.modulith.events.support.restart; /**