From dc3ae53661ab8eee179dbedb679ac7bf11ba4d3a Mon Sep 17 00:00:00 2001 From: Marco Amann Date: Thu, 25 Aug 2022 15:22:50 +0200 Subject: [PATCH 01/44] Added description of command line arguments valid for all commands --- .../command-line-interface.md | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/axon-server/administration/admin-configuration/command-line-interface.md b/axon-server/administration/admin-configuration/command-line-interface.md index ecebe6cd..da0e7a7e 100644 --- a/axon-server/administration/admin-configuration/command-line-interface.md +++ b/axon-server/administration/admin-configuration/command-line-interface.md @@ -10,7 +10,7 @@ A quick summary of the various commands is depicted below. Each command has a sp Area (Server Edition) - Command-Line Options + Command name Description @@ -254,6 +254,43 @@ axonserver-cli.jar -S - The option -S with the url to the Axon Server is optional, if it is omitted it defaults to [http://localhost:8024](http://localhost:8024/).‌ While for Axon Server SE, the URL for the Axon Server SE will be the single running node, for Axon Server EE, the URL should be pointing to any node serving the _\_admin_ context within an Axon Server EE cluster. +The `` valid for all commands, are: `-S`, `-s`, `-i`, `-o`. +Their effect is described in the table below. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Option - ShortOption - LongDescription
`-S``server`Server to send command to (default http://localhost:8024)
`-s``https`Use HTTPS (SSL,TLS) to connect to the server, rather than HTTP.
`-i``insecure-ssl`Do not check the certificate when connecting using HTTPS.
`-o``output`Output format (txt,json)
+ +For options specific to individual commands, see the descriptions of the commands below. + ### Access control When running Axon Server with access control enabled, executing commands remotely requires an access token. This needs to be provided with the -t option. When you run a command on the Axon Server node itself from the directory where Axon Server was started, you don't have to provide a token.‌ From 6cad9addb70e3728db90f8fb51de953a6cc6bcb4 Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Fri, 9 Sep 2022 14:19:51 +0200 Subject: [PATCH 02/44] Update docs with (expected) changes. --- extensions/kafka.md | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/extensions/kafka.md b/extensions/kafka.md index cbbf9f87..b5f06e7e 100644 --- a/extensions/kafka.md +++ b/extensions/kafka.md @@ -60,7 +60,7 @@ public class KafkaEventPublicationConfiguration { } ``` -The second infrastructure component to introduce is the `KafkaPublisher`, which has a hard requirement on the `ProducerFactory`. Additionally, this would be the place to define the Kafka topic upon which Axon event messages will be published. Note that the `KafkaPublisher` needs to be `shutDown` properly, to ensure all `Producer` instances are properly closed. +The second infrastructure component to introduce is the `KafkaPublisher`, which has a hard requirement on the `ProducerFactory`. Additionally, this would be the place to define the Kafka topics upon which Axon event messages will be published. You can set a function from event to `Optional`. You can use this to only publish certain events, or put different events to different topics. Its not uncommon for Kafka topics to only contain one type of message. Note that the `KafkaPublisher` needs to be `shutDown` properly, to ensure all `Producer` instances are properly closed. ```java public class KafkaEventPublicationConfiguration { @@ -71,7 +71,7 @@ public class KafkaEventPublicationConfiguration { KafkaMessageConverter kafkaMessageConverter, int publisherAckTimeout) { return KafkaPublisher.builder() - .topic(topic) // Defaults to "Axon.Events" + .topicResolver(m -> Optional.of(topic)) // Defaults to "Axon.Events" for all events .producerFactory(producerFactory) // Hard requirement .messageConverter(kafkaMessageConverter) // Defaults to a "DefaultKafkaMessageConverter" .publisherAckTimeout(publisherAckTimeout) // Defaults to "1000" milliseconds; only used for "WAIT_FOR_ACK" mode @@ -250,7 +250,7 @@ public class KafkaEventConsumptionConfiguration { } ``` -Note that as with any tracking event processor, the progress on the event stream is stored in a `TrackingToken`. Using the `StreamableKafkaMessageSource` means a `KafkaTrackingToken` containing topic-partition to offset pairs is stored in the `TokenStore`. +Note that as with any tracking event processor, the progress on the event stream is stored in a `TrackingToken`. Using the `StreamableKafkaMessageSource` means a `KafkaTrackingToken` containing topic-partition to offset pairs is stored in the `TokenStore`. If no other `TokenStore` is provided, and autoconfiguration is used a `KafkaTokenStore` will be used, instead of an `InMemoryTokenStore`. The `KafkaTokenStore` by default uses the `__axon_token_store_updates` topic. This should be a compacted topic, which should be created and configured automatically. ## Customizing event message format @@ -278,9 +278,9 @@ public class KafkaMessageConversationConfiguration { BiFunction headerValueMapper, EventUpcasterChain upcasterChain) { return DefaultKafkaMessageConverter.builder() - .serializer(serializer) // Hard requirement - .sequencingPolicy(sequencingPolicy) // Defaults to a "SequentialPerAggregatePolicy" - .upcasterChain(upcasterChain) // Defaults to empty upcaster chain + .serializer(serializer) // Hard requirement + .sequencingPolicy(sequencingPolicy) // Defaults to a "SequentialPerAggregatePolicy" + .upcasterChain(upcasterChain) // Defaults to empty upcaster chain .headerValueMapper(headerValueMapper) // Defaults to "HeaderUtils#byteMapper()" .build(); } @@ -288,7 +288,7 @@ public class KafkaMessageConversationConfiguration { } ``` -Make sure to use an identical `KafkaMessageConverter` on both the producing and consuming end, as otherwise exception upon deserialization should be expected. +Make sure to use an identical `KafkaMessageConverter` on both the producing and consuming end, as otherwise exception upon deserialization should be expected. A `CloudEventKafkaMessageConverter` is also available using the [Cloud Events](https://cloudevents.io/) spec. ## Configuration in Spring Boot @@ -298,7 +298,11 @@ This extension can be added as a Spring Boot starter dependency to your project * A `DefaultKafkaMessageConverter` using the configured `eventSerializer` \(which defaults to `XStreamSerializer`\). - Uses a `String` for the keys and a `byte[]` for the record's values + Uses a `String` for the keys and a `byte[]` for the record's values. + + When the property `axon.kafka.message-converter-mode` is set to `cloud_event` a `CloudEventKafkaMessageConverter` will be used instead. This will use `String` for the keys and `CloudEvent`. + + For each the matching Kafka (de)serializers will also be set as default. **Producer Components:** From 408fc5665b6756e248ed1623e418a9b505c78633 Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Mon, 12 Sep 2022 09:51:45 +0200 Subject: [PATCH 03/44] Update extensions/kafka.md Co-authored-by: Steven van Beelen --- extensions/kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/kafka.md b/extensions/kafka.md index b5f06e7e..8c86344e 100644 --- a/extensions/kafka.md +++ b/extensions/kafka.md @@ -250,7 +250,7 @@ public class KafkaEventConsumptionConfiguration { } ``` -Note that as with any tracking event processor, the progress on the event stream is stored in a `TrackingToken`. Using the `StreamableKafkaMessageSource` means a `KafkaTrackingToken` containing topic-partition to offset pairs is stored in the `TokenStore`. If no other `TokenStore` is provided, and autoconfiguration is used a `KafkaTokenStore` will be used, instead of an `InMemoryTokenStore`. The `KafkaTokenStore` by default uses the `__axon_token_store_updates` topic. This should be a compacted topic, which should be created and configured automatically. +Note that as with any tracking event processor, the progress on the event stream is stored in a `TrackingToken`. Using the `StreamableKafkaMessageSource` means a `KafkaTrackingToken` containing topic-partition to offset pairs is stored in the `TokenStore`. If no other `TokenStore` is provided, and auto-configuration is used, a `KafkaTokenStore` will be set instead of an `InMemoryTokenStore`. The `KafkaTokenStore` by default uses the `__axon_token_store_updates` topic. This should be a compacted topic, which should be created and configured automatically. ## Customizing event message format From 103f25c33061ca84616d26d705f5486dacc23286 Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Mon, 12 Sep 2022 09:56:51 +0200 Subject: [PATCH 04/44] Update docs with (expected) changes. --- extensions/kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/kafka.md b/extensions/kafka.md index 8c86344e..278760ef 100644 --- a/extensions/kafka.md +++ b/extensions/kafka.md @@ -296,7 +296,7 @@ This extension can be added as a Spring Boot starter dependency to your project **Generic Components:** -* A `DefaultKafkaMessageConverter` using the configured `eventSerializer` \(which defaults to `XStreamSerializer`\). +* A `DefaultKafkaMessageConverter` using the configured `eventSerializer` \(which defaults to `XStreamSerializer`\), which is used by default to convert between Axon Event messages and Kafka records. Uses a `String` for the keys and a `byte[]` for the record's values. From ad76572003f7f5d2fa68e90464358dae9f2c188e Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 29 Aug 2022 13:45:59 +0200 Subject: [PATCH 05/44] Deadletter documentation added --- .../events/event-processors/README.md | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 2f7143b2..2d35dee1 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -255,6 +255,79 @@ public interface ErrorHandler { Based on the provided `ErrorContext` object, you can decide to ignore the error, schedule retries, perform dead-letter-queue delivery, or rethrow the exception. +### Dead-letter Queue + +When event processor transactions end up in an exception, following events are not handled by that event processor even thought they could be successfully handled. The event processor is stuck until the issue is fixed. +To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. +When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database table. + +A`JpaSequencedDeadLetterQueue` configuration example: +```java +@Configuration +public class DeadLetterQueueExampleConfig { + + public static final String PROCESSING_GROUP = "deadLetterProcessor"; + + @Bean + public ConfigurerModule configure() { + return configurer -> + configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( + PROCESSING_GROUP, + configuration -> JpaSequencedDeadLetterQueue.builder() + .processingGroup( + PROCESSING_GROUP) + .transactionManager(configuration.getComponent( + TransactionManager.class)) + .entityManagerProvider( + configuration.getComponent( + EntityManagerProvider.class)) + .serializer( + configuration.serializer()) + .build())); + } +} +``` +After fixing the issue the events can be handled again by using the `process` function, in this case all events of type ErrorEvent: + +```java +public class DeadletterProcessor{ + + public void repairErrorEvents() { + eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) + .ifPresent(p -> p.process(deadLetter -> deadLetter.message().getPayload() instanceof ErrorEvent)); + } +} +``` + +You can implement a custom dead-letter policy to exclude some events from the dead-letter queue, these events will be skipped: + +```java +@Configuration +public class CustomDeadLetterPolicy{ + @Autowired + public void configure(EventProcessingConfigurer configurer) { + configurer.registerDeadLetterPolicy(PROCESSING_GROUP, configuration -> + (letter, cause) -> { + if (cause instanceof NullPointerException) { + // It's pointless.. + return Decisions.doNotEnqueue(); + } + final int retries = (int) letter.diagnostics().getOrDefault("retries", -1); + if (letter.message().getPayload() instanceof ErrorEvent) { + // Important, always retry + return Decisions.enqueue(cause); + } + if(retries < 10) { + // Let's continue and increase retries! + return Decisions.enqueue(cause, deadLetter -> deadLetter.diagnostics().and("retries", retries + 1)); + } + // Exhausted retries + return Decisions.doNotEnqueue(); + }); + } +} + +``` ## General processor configuration Alongside [handler assignment](#assigning-handlers-to-processors) and [error handling](#error-handling), Event Processors allow configuration for other components too. @@ -342,3 +415,4 @@ Secondly, you can adjust the desired `RollbackConfiguration` per Event Processor It is the `RollbackConfiguration` that decide when a [Unit of Work](../../messaging-concepts/unit-of-work.md) should rollback the transaction. The default `RollbackConfiguration` is to rollback on any type of `Throwable`; the [Unit of Work](../../messaging-concepts/unit-of-work.md) page describes the other options you can choose. To adjust the default behaviour, the `registerRollbackConfiguration(String, Function)` function should be invoked on the `EventProcessingConfigurer`. + From e143023ebed8faff205946d992d2a5e9c46d8b73 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Tue, 30 Aug 2022 09:26:24 +0200 Subject: [PATCH 06/44] Added remark about idem potency --- axon-framework/events/event-processors/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 2d35dee1..91fd310e 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -328,6 +328,9 @@ public class CustomDeadLetterPolicy{ } ``` + +One important note, when implementing event handlers it is important to make them idempotent and with the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. + ## General processor configuration Alongside [handler assignment](#assigning-handlers-to-processors) and [error handling](#error-handling), Event Processors allow configuration for other components too. From ac0e8b9b7d2b911690c32d55bd39f33ad31f6154 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Tue, 30 Aug 2022 09:28:02 +0200 Subject: [PATCH 07/44] Rephrased sentence --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 91fd310e..bfdc34a0 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -329,7 +329,7 @@ public class CustomDeadLetterPolicy{ ``` -One important note, when implementing event handlers it is important to make them idempotent and with the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. +One important note, when implementing event handlers, make them idempotent and with the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. ## General processor configuration From 93aa49e6e3f0bbf3518bb5051a91a4ae126c6679 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Tue, 30 Aug 2022 09:29:28 +0200 Subject: [PATCH 08/44] Rephrased sentence --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index bfdc34a0..5c0c6c30 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -329,7 +329,7 @@ public class CustomDeadLetterPolicy{ ``` -One important note, when implementing event handlers, make them idempotent and with the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. +One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. ## General processor configuration From e99595b56888c3db8e9ff5fe29390233d6e80976 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Wed, 31 Aug 2022 17:08:32 +0200 Subject: [PATCH 09/44] Review comments --- .../events/event-processors/README.md | 438 ++++++++++++------ 1 file changed, 308 insertions(+), 130 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 5c0c6c30..56a48f4a 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -3,7 +3,9 @@ [Event handlers](../event-handlers.md) define the business logic to be performed when an event is received. _Event Processors_ are the components that take care of the technical aspects of that processing. They start a [unit of work](../../messaging-concepts/unit-of-work.md) and possibly a transaction. -However, they also ensure that [correlation data](../../monitoring-and-metrics.md#correlation-data-a-idcorrelation-dataa) can be correctly attached to all messages created during event processing, among other non-functional requirements. +However, they also ensure +that [correlation data](../../monitoring-and-metrics.md#correlation-data-a-idcorrelation-dataa) can be correctly +attached to all messages created during event processing, among other non-functional requirements. The image below depicts a representation of the organization of Event Processors and Event Handlers: @@ -12,16 +14,20 @@ The image below depicts a representation of the organization of Event Processors Axon has a layered approach towards organizing event handlers. First, an event handler is positioned in a _Processing Group_. Each event handler, or "Event Handling Component," will only ever belong to a single Processing Group. -The Processing Group provides a level of configurable non-functional requirements, like [error handling](#processing-group-listener-invocation-error-handler) and the [sequencing policy](streaming.md#sequential-processing). +The Processing Group provides a level of configurable non-functional requirements, +like [error handling](#processing-group-listener-invocation-error-handler) and +the [sequencing policy](streaming.md#sequential-processing). The Event Processors, in turn, is in charge of the Processing Group. An Event Processor will control 1 to N Processing Groups, although there will be a one-to-one mapping in most cases. Similar to the Event Handling Component, a Processing Group will belong to a single Event Processor. -This last layer allows the definition of the type of Event Processor used and concepts like the threading model and a more fine-grained degree of [error handling](#event-processor-error-handler). +This last layer allows the definition of the type of Event Processor used and concepts like the threading model and a +more fine-grained degree of [error handling](#event-processor-error-handler). Event Processors come in roughly two forms: [Subscribing](subscribing.md) and [Streaming](streaming.md). -Subscribing Event Processors subscribe to a source of events and are invoked by the thread managed by the publishing mechanism. +Subscribing Event Processors subscribe to a source of events and are invoked by the thread managed by the publishing +mechanism. Streaming Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself. For more specifics on either type, consult their respective sections [here](subscribing.md) and [here](streaming.md). @@ -36,10 +42,13 @@ Two processors with the same name are considered as two instances of the same pr All event handlers are attached to a processor whose name by default is the package name of the event handler's class. Furthermore, the default processor implementation used by Axon is the [Tracking Event Processor](streaming.md). -The (default) event processor used can be adjusted, as is shown in the [subscribing](subscribing.md#configuring) and [streaming](streaming.md#configuring) sections. +The (default) event processor used can be adjusted, as is shown in the [subscribing](subscribing.md#configuring) +and [streaming](streaming.md#configuring) sections. -Event handlers, or Event Handling Components, come in roughly two flavors: "regular" \(singleton, stateless\) event handlers and [sagas](../../sagas/README.md). -[This](../event-handlers.md#registering-event-handlers) section describes the process to register an event handler, whereas [this](../../sagas/implementation.md#configuring-a-saga) page describes the saga registration process. +Event handlers, or Event Handling Components, come in roughly two flavors: "regular" \(singleton, stateless\) event +handlers and [sagas](../../sagas/README.md). +[This](../event-handlers.md#registering-event-handlers) section describes the process to register an event handler, +whereas [this](../../sagas/implementation.md#configuring-a-saga) page describes the saga registration process. Now let us consider that the following event handlers have been registered: @@ -52,8 +61,10 @@ Without any intervention, this will trigger the creation of two processors, name 1. `org.axonframework.example.eventhandling` with two handlers called `MyHandler` and `MyOtherHandler` 2. `org.axonframework.example.eventhandling.module` with the single handler `ModuleHandler` -Using the package name serves as a suitable default, but using dedicated names for an Event Processor and/or the Processing Group is recommended. -The most straightforward approach to reaching a transparent naming scheme of your event handlers is by using the `ProcessingGroup` annotation. +Using the package name serves as a suitable default, but using dedicated names for an Event Processor and/or the +Processing Group is recommended. +The most straightforward approach to reaching a transparent naming scheme of your event handlers is by using +the `ProcessingGroup` annotation. This annotation resembles the Processing Group level discussed in the [introduction](README.md#event-processors). The `ProcessingGroup` annotation requires the insertion of a name and can only be set on the class. @@ -66,7 +77,7 @@ class MyHandler { } @ProcessingGroup("my-handlers") -class MyOtherHandler{ +class MyOtherHandler { // ... } @@ -81,44 +92,61 @@ Using the `ProcessingGroup` annotation as depicted, we again construct two proce 1. `my-handlers` with two handlers called `MyHandler` and `MyOtherHandler` 2. `module-handlers` with the single handler `ModuleHandler` -If more control is required to group Event Handling Components, we recommend consulting the [assignment rules](#event-handler-assignment-rules) section. +If more control is required to group Event Handling Components, we recommend consulting +the [assignment rules](#event-handler-assignment-rules) section. ### Event Handler Assignment Rules -The Configuration API allows you to configure other strategies for assigning event handling classes to processors or assigning specific handler instances to particular processors. -We can separate these assignment rules into roughly two groups: Event Handler to Processing Group and Processing Group to Event Processor. +The Configuration API allows you to configure other strategies for assigning event handling classes to processors or +assigning specific handler instances to particular processors. +We can separate these assignment rules into roughly two groups: Event Handler to Processing Group and Processing Group +to Event Processor. Below is an exhaustive list of all the assignment rules the `EventProcessingConfigurer` exposes: **Event Handler to Processing Group** * `byDefaultAssignTo(String)` - defines the default Processing Group name to assign an event handler to. - It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not present. -* `byDefaultAssignHandlerInstancesTo(Function)` - defines a lambda invoked to assign an event handling instance to a desired Processing Group by returning that group's name. - It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not present. -* `byDefaultAssignHandlerTypesTo(Function, String>)` - defines a lambda invoked to assign an event handler type to a desired Processing Group by returning that group's name. - It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not present. -* `assignHandlerInstancesMatching(String, Predicate)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance. + It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not + present. +* `byDefaultAssignHandlerInstancesTo(Function)` - defines a lambda invoked to assign an event handling + instance to a desired Processing Group by returning that group's name. + It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not + present. +* `byDefaultAssignHandlerTypesTo(Function, String>)` - defines a lambda invoked to assign an event handler type + to a desired Processing Group by returning that group's name. + It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not + present. +* `assignHandlerInstancesMatching(String, Predicate)` - assigns event handlers to the given Processing Group + name based on a predicate ingesting an event handling instance. The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is _undefined_. -* `assignHandlerTypesMatching(String, Predicate>)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type. +* `assignHandlerTypesMatching(String, Predicate>)` - assigns event handlers to the given Processing Group name + based on a predicate ingesting an event handler type. The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is _undefined_. -* `assignHandlerInstancesMatching(String, int, Predicate)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance. +* `assignHandlerInstancesMatching(String, int, Predicate)` - assigns event handlers to the given Processing + Group name based on a predicate ingesting an event handling instance. Uses the given priority to decide on rule-ordering. The higher the priority value, the more important the rule is. If an instance matches several criteria, the outcome is _undefined_. -* `assignHandlerTypesMatching(String, int, Predicate>)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type. +* `assignHandlerTypesMatching(String, int, Predicate>)` - assigns event handlers to the given Processing Group + name based on a predicate ingesting an event handler type. Uses the given priority to decide on rule-ordering. The higher the priority, the more important the rule is. If an instance matches several criteria, the outcome is _undefined_. **Processing Group to Event Processor** -* `assignProcessingGroup(String, String)` - defines a given Processing Group name that belongs to the given Event Processor's name. -* `assignProcessingGroup(Function)` - defines a lambda invoked to assign a Processing Group name to the desired Event Processor by returning that processor's name. +* `assignProcessingGroup(String, String)` - defines a given Processing Group name that belongs to the given Event + Processor's name. +* `assignProcessingGroup(Function)` - defines a lambda invoked to assign a Processing Group name to the + desired Event Processor by returning that processor's name. ### Ordering Event Handlers within a processor -To order event handlers within an Event Processor, the order in which event handlers are registered (as described in the [Registering Event Handlers](../event-handlers.md#registering-event-handlers) section) is guiding. -Thus, the ordering in which an Event Processor will call event handlers for event handling is the same as their insertion ordering in the Configuration API. +To order event handlers within an Event Processor, the order in which event handlers are registered (as described in +the [Registering Event Handlers](../event-handlers.md#registering-event-handlers) section) is guiding. +Thus, the ordering in which an Event Processor will call event handlers for event handling is the same as their +insertion ordering in the Configuration API. -If we use Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering by adding the `@Order` annotation. +If we use Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering +by adding the `@Order` annotation. This annotation is placed on the event handler class name, containing an `integer` value to specify the ordering. Note that it is **not possible** to order event handlers belonging to different Event Processors. @@ -126,12 +154,15 @@ Each Event Processor acts as an isolated component without any intervention from > **Ordering Considerations** > -> Although we can place an order among event handlers within an Event Processor, separation of event handlers is recommended. -> -> Placing an overall ordering on event handlers means those components are inclined to interact with one another, introducing a form of coupling. +> Although we can place an order among event handlers within an Event Processor, separation of event handlers is +> recommended. +> +> Placing an overall ordering on event handlers means those components are inclined to interact with one another, +> introducing a form of coupling. > Due to this, the event handling process will become complex to manage (e.g., for new team members). -> Furthermore, embracing an ordering approach might lead to place _all_ event handlers in a global ordering, decreasing processing speeds in general. -> +> Furthermore, embracing an ordering approach might lead to place _all_ event handlers in a global ordering, decreasing +> processing speeds in general. +> > In all, you are free to use an ordering, but we recommend using it sparingly. ## Error Handling @@ -139,62 +170,79 @@ Each Event Processor acts as an isolated component without any intervention from Errors are inevitable in any application. Depending on where they happen, you may want to respond differently. -By default, exceptions raised by event handlers are caught in the [Processing Group layer](#processing-group-listener-invocation-error-handler), logged, and processing continues with the following events. -When an exception is thrown when a processor is trying to commit a transaction, update a [token](streaming.md#token-store), or in any other part of the process, the exception will be propagated. +By default, exceptions raised by event handlers are caught in +the [Processing Group layer](#processing-group-listener-invocation-error-handler), logged, and processing continues with +the following events. +When an exception is thrown when a processor is trying to commit a transaction, update +a [token](streaming.md#token-store), or in any other part of the process, the exception will be propagated. -In the case of a [Streaming Event Processor](streaming.md#error-mode), this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval \(starting at 1 second, up to max 60 seconds\). -A [Subscribing Event Processor](subscribing.md#error-mode) will report a publication error to the component that provided the event. +In the case of a [Streaming Event Processor](streaming.md#error-mode), this means the processor will go into error mode, +releasing any tokens and retrying at an incremental interval \(starting at 1 second, up to max 60 seconds\). +A [Subscribing Event Processor](subscribing.md#error-mode) will report a publication error to the component that +provided the event. -To change this behavior, both the Processing Group and Event Processor level allow customization on how to deal with exceptions: +To change this behavior, both the Processing Group and Event Processor level allow customization on how to deal with +exceptions: ### Processing Group - Listener Invocation Error Handler -The component dealing with exceptions thrown from an event handling method is called the `ListenerInvocationErrorHandler`. -By default, these exceptions are logged (with the `LoggingErrorHandler` implementation), and processing continues with the next handler or message. +The component dealing with exceptions thrown from an event handling method is called +the `ListenerInvocationErrorHandler`. +By default, these exceptions are logged (with the `LoggingErrorHandler` implementation), and processing continues with +the next handler or message. The default `ListenerInvocationErrorHandler` used by each processing group can be customized. Furthermore, we can configure the error handling behavior per processing group: {% tabs %} {% tab title="Axon Configuration API" %} + ```java public class AxonConfig { + // ... public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { // To configure a default ... processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */) // ... or for a specific processing group: - .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */); + .registerListenerInvocationErrorHandler("my-processing-group", + conf -> /* create listener error handler */); } } ``` + {% endtab %} {% tab title="Spring Boot AutoConfiguration" %} + ```java @Configuration public class AxonConfig { + // ... @Autowired public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { // To configure a default ... processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */) // ... or for a specific processing group: - .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */); + .registerListenerInvocationErrorHandler("my-processing-group", + conf -> /* create listener error handler */); } } ``` + {% endtab %} {% endtabs %} -It is easy to implement custom error handling behavior. -The error handling method to implement provides the exception, the event that was handled, and a reference to the handler that was handling the message: +It is easy to implement custom error handling behavior. +The error handling method to implement provides the exception, the event that was handled, and a reference to the +handler that was handling the message: ```java public interface ListenerInvocationErrorHandler { - void onError(Exception exception, - EventMessage event, + void onError(Exception exception, + EventMessage event, EventMessageHandler eventHandler) throws Exception; } ``` @@ -208,14 +256,17 @@ Exceptions occurring outside an event handler's scope, or have bubbled up from t The default error handler is the `PropagatingErrorHandler`, which will rethrow any exceptions it catches. How the Event Processor deals with a rethrown exception differ per implementation. -The behaviour for the Subscribing- and the Streaming Event Processor can respectively be found [here](subscribing.md#error-mode) and [here](streaming.md#error-mode). +The behaviour for the Subscribing- and the Streaming Event Processor can respectively be +found [here](subscribing.md#error-mode) and [here](streaming.md#error-mode). We can configure a default `ErrorHandler` for all Event Processors or an `ErrorHandler` for specific processors: {% tabs %} {% tab title="Axon Configuration API" %} + ```java public class AxonConfig { + // ... public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { // To configure a default ... @@ -225,12 +276,15 @@ public class AxonConfig { } } ``` + {% endtab %} {% tab title="Spring Boot AutoConfiguration" %} + ```java @Configuration public class AxonConfig { + // ... @Autowired public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { @@ -241,6 +295,7 @@ public class AxonConfig { } } ``` + {% endtab %} {% endtabs %} @@ -253,88 +308,185 @@ public interface ErrorHandler { } ``` -Based on the provided `ErrorContext` object, you can decide to ignore the error, schedule retries, perform dead-letter-queue delivery, or rethrow the exception. +Based on the provided `ErrorContext` object, you can decide to ignore the error, schedule retries, perform +dead-letter-queue delivery, or rethrow the exception. ### Dead-letter Queue -When event processor transactions end up in an exception, following events are not handled by that event processor even thought they could be successfully handled. The event processor is stuck until the issue is fixed. -To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. -When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database table. +When event processor transactions end up in an exception, following events are not handled by that event processor even +though they could be successfully handled. The event processor is stuck until the issue is fixed. +To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. A +dead-letter queue can not be shared between event processors so there should one queue per processor. +The `InMemorySequencedDeadLetterQueue` can be used for testing purposes but the dead=letters are gone after a restart. +To persist dead-letters the `JpaSequencedDeadLetterQueue` should be used. +When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database +table. A`JpaSequencedDeadLetterQueue` configuration example: + ```java @Configuration public class DeadLetterQueueExampleConfig { - public static final String PROCESSING_GROUP = "deadLetterProcessor"; - - @Bean - public ConfigurerModule configure() { - return configurer -> - configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( - PROCESSING_GROUP, - configuration -> JpaSequencedDeadLetterQueue.builder() - .processingGroup( - PROCESSING_GROUP) - .transactionManager(configuration.getComponent( - TransactionManager.class)) - .entityManagerProvider( - configuration.getComponent( - EntityManagerProvider.class)) - .serializer( - configuration.serializer()) - .build())); - } + public static final String PROCESSING_GROUP = "deadLetterProcessor"; + + @Autowired + public ConfigurerModule configure() { + return configurer -> + configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( + PROCESSING_GROUP, + configuration -> JpaSequencedDeadLetterQueue.builder() + .processingGroup( + PROCESSING_GROUP) + .transactionManager(configuration.getComponent( + TransactionManager.class)) + .entityManagerProvider( + configuration.getComponent( + EntityManagerProvider.class)) + .serializer( + configuration.serializer()) + .maxSequences(256) + .maxSequenceSize(256) + .build())); + } } ``` -After fixing the issue the events can be handled again by using the `process` function, in this case all events of type ErrorEvent: + +You can set the maximum amount of sequences that are saved (defaults to 128) and the maximum amount of dead-letters in a +sequence (also defaults to 128). If these thresholds are exceeded an exception will be thrown and the event processor +will stop processing. +The sequence id is determined by the sequencing policy. By default, the sequence identifier is the aggregate id. + +After fixing the issue the events can be handled again by using the `process` function, in this case it will process the +first matching event of type ErrorEvent: + +{% tabs %} +{% tab title="Process first dead-letter in the queue of type ErrorEvent " %} ```java -public class DeadletterProcessor{ - - public void repairErrorEvents() { - eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) - .ifPresent(p -> p.process(deadLetter -> deadLetter.message().getPayload() instanceof ErrorEvent)); - } +public class DeadletterProcessor { + + private final EventProcessingConfiguration eventProcessingConfiguration; + + public DeadletterProcessor(EventProcessingConfiguration eventProcessingConfiguration) { + this.eventProcessingConfiguration = eventProcessingConfiguration; + } + + public void repairErrorEvent() { + eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) + .ifPresent(p -> p.process(deadLetter -> deadLetter.message() + .getPayload() instanceof ErrorEvent)); + } } ``` -You can implement a custom dead-letter policy to exclude some events from the dead-letter queue, these events will be skipped: +{% endtab %} + +{% tab title="Process any dead-letter in the queue" %} + +```java +public class DeadletterProcessor { + + private final EventProcessingConfiguration eventProcessingConfiguration; + + public DeadletterProcessor(EventProcessingConfiguration eventProcessingConfiguration) { + this.eventProcessingConfiguration = eventProcessingConfiguration; + } + + public void repairAnyEvent() { + eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) + .ifPresent(SequencedDeadLetterProcessor::processAny); + } +} +``` + +{% endtab %} + +{% tab title="Process all dead-letters in the queue" %} + +```java +public class DeadletterProcessor { + + private final EventProcessingConfiguration eventProcessingConfiguration; + + public DeadletterProcessor(EventProcessingConfiguration eventProcessingConfiguration) { + this.eventProcessingConfiguration = eventProcessingConfiguration; + } + + public void repairAllEvents() { + Optional>>>> deadLetters = eventProcessingConfiguration.deadLetterQueue( + PROCESSING_GROUP).map(SequencedDeadLetterQueue::deadLetters); + + deadLetters.ifPresent(sequences -> { + // Iterate over all sequences + for (Iterable>> sequence : sequences) { + // Iterate over all dead-letters belonging to the same sequence + for (DeadLetter> deadLetterInSequence : sequence) { + eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP).ifPresent( + eventMessageSequencedDeadLetterProcessor -> { + eventMessageSequencedDeadLetterProcessor.process( + deadLetter -> deadLetter.message().getIdentifier() + .equals(deadLetterInSequence.message() + .getIdentifier())); + }); + } + } + }); + } +} +``` + +{% endtab %} +{% endtabs %} + +Also add process any() + +You can implement a custom dead-letter policy to exclude some events from the dead-letter queue, these events will be +skipped. This policy is not only called for initial failures but also when dead-lettered events are processed +unsuccessfully again. ```java @Configuration -public class CustomDeadLetterPolicy{ - @Autowired - public void configure(EventProcessingConfigurer configurer) { - configurer.registerDeadLetterPolicy(PROCESSING_GROUP, configuration -> - (letter, cause) -> { - if (cause instanceof NullPointerException) { - // It's pointless.. - return Decisions.doNotEnqueue(); - } - final int retries = (int) letter.diagnostics().getOrDefault("retries", -1); - if (letter.message().getPayload() instanceof ErrorEvent) { - // Important, always retry - return Decisions.enqueue(cause); - } - if(retries < 10) { - // Let's continue and increase retries! - return Decisions.enqueue(cause, deadLetter -> deadLetter.diagnostics().and("retries", retries + 1)); - } - // Exhausted retries - return Decisions.doNotEnqueue(); - }); - } +public class CustomDeadLetterPolicy { + + @Autowired + public void configure(EventProcessingConfigurer configurer) { + configurer.registerDeadLetterPolicy(PROCESSING_GROUP, configuration -> + (letter, cause) -> { + if (cause instanceof NullPointerException) { + // It's pointless.. + return Decisions.doNotEnqueue(); + } + final int retries = (int) letter.diagnostics().getOrDefault("retries", -1); + if (letter.message().getPayload() instanceof ErrorEvent) { + // Important, always retry + return Decisions.enqueue(cause); + } + if (retries < 10) { + // Let's continue and increase retries! + return Decisions.enqueue(cause, + deadLetter -> deadLetter.diagnostics().and("retries", retries + 1)); + } + // Exhausted retries + return Decisions.doNotEnqueue(); + }); + } } ``` -One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. +One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this is a hard +requirement. +Event processors using the dead-letter queue will not roll back the transaction on an error. That means that if you do +multiple actions in the same handler and a subsequent action fails, any earlier action is not rolled back by the +transaction. That is why idempotency is important. ## General processor configuration -Alongside [handler assignment](#assigning-handlers-to-processors) and [error handling](#error-handling), Event Processors allow configuration for other components too. -For [Subscribing](subscribing.md#configuring) and [Streaming](streaming.md#configuring) Event Processor specific options, their respective sections should be checked. +Alongside [handler assignment](#assigning-handlers-to-processors) and [error handling](#error-handling), Event +Processors allow configuration for other components too. +For [Subscribing](subscribing.md#configuring) and [Streaming](streaming.md#configuring) Event Processor specific +options, their respective sections should be checked. The remainder of this page will cover the generic configuration options for each Event Processor. ### Event Processor Builders @@ -348,74 +500,100 @@ To that end, we can configure a custom `EventProcessorBuilder`: interface EventProcessorBuilder { // Note: the `EventHandlerInvoker` is the component which holds the event handling functions. - EventProcessor build(String name, - Configuration configuration, + EventProcessor build(String name, + Configuration configuration, EventHandlerInvoker eventHandlerInvoker); } ``` -The `EventProcessorBuilder` functional interface provides the event processor's name, the `Configuration` and the `EventHandlerInvoker`, and requires returning an `EventProcessor` instance. -Note that any Axon component that an Event Processor requires (e.g., an `EventStore`) is retrievable from the `Configuration`. +The `EventProcessorBuilder` functional interface provides the event processor's name, the `Configuration` and +the `EventHandlerInvoker`, and requires returning an `EventProcessor` instance. +Note that any Axon component that an Event Processor requires (e.g., an `EventStore`) is retrievable from +the `Configuration`. The `EventProcessingConfigurer` provides two methods to configure an `EventProcessorBuilder`: -1. `registerEventProcessorFactory(EventProcessorBuilder)` - allows you to define a default factory method that creates event processors for which no explicit factories are defined -2. `registerEventProcessor(String, EventProcessorBuilder)` - defines the factory method to use to create a processor with given `name` +1. `registerEventProcessorFactory(EventProcessorBuilder)` - allows you to define a default factory method that creates + event processors for which no explicit factories are defined +2. `registerEventProcessor(String, EventProcessorBuilder)` - defines the factory method to use to create a processor + with given `name` ### Event Handler Interceptors -Since the Event Processor is the invoker of event handling methods, it is a spot to configure [Message Handler Interceptors](../../messaging-concepts/message-intercepting.md) too. -Since Event Processors are dedicated to event handling, the `MessageHandlerInterceptor` is required to deal with an `EventMessage`. -Differently put, an [EventHandlerInterceptor](../../messaging-concepts/message-intercepting.md#event-handler-interceptors) can be registered to Event Processors. +Since the Event Processor is the invoker of event handling methods, it is a spot to +configure [Message Handler Interceptors](../../messaging-concepts/message-intercepting.md) too. +Since Event Processors are dedicated to event handling, the `MessageHandlerInterceptor` is required to deal with +an `EventMessage`. +Differently put, +an [EventHandlerInterceptor](../../messaging-concepts/message-intercepting.md#event-handler-interceptors) can be +registered to Event Processors. The `EventProcessingConfigurer` provides two methods to configure `MessageHandlerInterceptor` instances: -- `registerDefaultHandlerInterceptor(BiFunction>>)` - registers a default `MessageHandlerInterceptor` that will be configured on every Event Processor instance -- `registerHandlerInterceptor(String, Function>>)` - registers a `MessageHandlerInterceptor` that will be configured for the Event Processor matching the given `String` +- `registerDefaultHandlerInterceptor(BiFunction>>)` + - registers a default `MessageHandlerInterceptor` that will be configured on every Event Processor instance +- `registerHandlerInterceptor(String, Function>>)` - + registers a `MessageHandlerInterceptor` that will be configured for the Event Processor matching the given `String` ### Message Monitors Any Event Processor instance provides the means to contain a Message Monitor. -Message Monitors (discussed in more detail [here](../../monitoring-and-metrics.md#metrics-a-idmetricsa)) allow for monitoring the flow of messages throughout an Axon application. -For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards the event handling functions. +Message Monitors (discussed in more detail [here](../../monitoring-and-metrics.md#metrics-a-idmetricsa)) allow for +monitoring the flow of messages throughout an Axon application. +For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards +the event handling functions. The `EventProcessingConfigurer` provides two approaches towards configuring a `MessageMonitor`: -- `registerMessageMonitor(String, Function>>)` - registers the given `MessageMonitor` to the Event Processor matching the given `String` -- `registerMessageMonitorFactory(String, MessageMonitorFactory)` - registers the given `MessageMonitorFactory` to construct a `MessageMonitor` for the Event Processor matching the given `String` +- `registerMessageMonitor(String, Function>>)` - registers the + given `MessageMonitor` to the Event Processor matching the given `String` +- `registerMessageMonitorFactory(String, MessageMonitorFactory)` - registers the given `MessageMonitorFactory` to + construct a `MessageMonitor` for the Event Processor matching the given `String` -The `MessageMonitorFactory` provides a more fine-grained approach, used throughout the Configuration API, to construct a `MessageMonitor`: +The `MessageMonitorFactory` provides a more fine-grained approach, used throughout the Configuration API, to construct +a `MessageMonitor`: ```java @FunctionalInterface public interface MessageMonitorFactory { - - MessageMonitor> create(Configuration configuration, - Class componentType, + + MessageMonitor> create(Configuration configuration, + Class componentType, String componentName); } ``` We can use the `Configuration` to retrieve the required dependencies to construct the `MessageMonitor`. The type and name reflect which infrastructure component the factory constructs a monitor for. -Whenever you use the `MessageMonitorFactory` to construct a `MessageMonitor` for an Event Processor, the factory expects the `componentType` to be an `EventProcessor` implementation. +Whenever you use the `MessageMonitorFactory` to construct a `MessageMonitor` for an Event Processor, the factory expects +the `componentType` to be an `EventProcessor` implementation. The `componentName`, on the other hand, would resemble the name of the Event Processor. ### Transaction Management -As components that deal with event handling, the Event Processor is a logical place to provide transaction configuration options. +As components that deal with event handling, the Event Processor is a logical place to provide transaction configuration +options. Note that in the majority of the scenarios, the defaults will suffice. This section simply serves to show these options to allow adjustment if the application requires it. The first of these is the `TransactionManager`. -Axon uses the `TransactionManager` to attach a transaction to every [Unit of Work](../../messaging-concepts/unit-of-work.md). -Within a Spring environment, the `TransactionManager` defaults to a `SpringTransactionManager`, which uses Spring's `PlatformTransactionManager` under the hood. -In non Spring environments, it would be wise to build a `TransactionManager` implement if transaction management is required, of course. +Axon uses the `TransactionManager` to attach a transaction to +every [Unit of Work](../../messaging-concepts/unit-of-work.md). +Within a Spring environment, the `TransactionManager` defaults to a `SpringTransactionManager`, which uses +Spring's `PlatformTransactionManager` under the hood. +In non Spring environments, it would be wise to build a `TransactionManager` implement if transaction management is +required, of course. Such an implementation only requires the definition of the `TransactionManager#startTransaction()` method. -To adjust the transaction manager for an Event Processor, the `registerTransactionManager(String, Function)` on the `EventProcessingConfigurer` should be used. +To adjust the transaction manager for an Event Processor, +the `registerTransactionManager(String, Function)` on the `EventProcessingConfigurer` +should be used. Secondly, you can adjust the desired `RollbackConfiguration` per Event Processor. -It is the `RollbackConfiguration` that decide when a [Unit of Work](../../messaging-concepts/unit-of-work.md) should rollback the transaction. -The default `RollbackConfiguration` is to rollback on any type of `Throwable`; the [Unit of Work](../../messaging-concepts/unit-of-work.md) page describes the other options you can choose. -To adjust the default behaviour, the `registerRollbackConfiguration(String, Function)` function should be invoked on the `EventProcessingConfigurer`. +It is the `RollbackConfiguration` that decide when a [Unit of Work](../../messaging-concepts/unit-of-work.md) should +rollback the transaction. +The default `RollbackConfiguration` is to rollback on any type of `Throwable`; +the [Unit of Work](../../messaging-concepts/unit-of-work.md) page describes the other options you can choose. +To adjust the default behaviour, +the `registerRollbackConfiguration(String, Function)` function should be invoked +on the `EventProcessingConfigurer`. From ae06307385d972e5e71afb8d32350fa5689a3c4e Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Thu, 1 Sep 2022 09:34:51 +0200 Subject: [PATCH 10/44] Added non Spring configuration example --- .../events/event-processors/README.md | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 56a48f4a..e480f1cb 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -317,13 +317,42 @@ When event processor transactions end up in an exception, following events are n though they could be successfully handled. The event processor is stuck until the issue is fixed. To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. A dead-letter queue can not be shared between event processors so there should one queue per processor. -The `InMemorySequencedDeadLetterQueue` can be used for testing purposes but the dead=letters are gone after a restart. +The `InMemorySequencedDeadLetterQueue` can be used for testing purposes but the dead-letters are gone after a restart. To persist dead-letters the `JpaSequencedDeadLetterQueue` should be used. When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database table. A`JpaSequencedDeadLetterQueue` configuration example: +{% tabs %} +{% tab title="Axon Configuration API" %} +```java +public class DeadLetterQueueExampleConfig { + public static final String PROCESSING_GROUP = "deadLetterProcessor"; + + public ConfigurerModule configure() { + return configurer -> + configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( + PROCESSING_GROUP, + configuration -> JpaSequencedDeadLetterQueue.builder() + .processingGroup( + PROCESSING_GROUP) + .transactionManager(configuration.getComponent( + TransactionManager.class)) + .entityManagerProvider( + configuration.getComponent( + EntityManagerProvider.class)) + .serializer( + configuration.serializer()) + .maxSequences(256) + .maxSequenceSize(256) + .build())); + } +} +``` +{% endtab %} + +{% tab title="Spring Boot AutoConfiguration" %} ```java @Configuration public class DeadLetterQueueExampleConfig { @@ -351,7 +380,9 @@ public class DeadLetterQueueExampleConfig { } } ``` +{% endtab %} +{% endtabs %} You can set the maximum amount of sequences that are saved (defaults to 128) and the maximum amount of dead-letters in a sequence (also defaults to 128). If these thresholds are exceeded an exception will be thrown and the event processor will stop processing. @@ -439,8 +470,6 @@ public class DeadletterProcessor { {% endtab %} {% endtabs %} -Also add process any() - You can implement a custom dead-letter policy to exclude some events from the dead-letter queue, these events will be skipped. This policy is not only called for initial failures but also when dead-lettered events are processed unsuccessfully again. @@ -475,8 +504,7 @@ public class CustomDeadLetterPolicy { ``` -One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this is a hard -requirement. +One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this is a hard requirement. Event processors using the dead-letter queue will not roll back the transaction on an error. That means that if you do multiple actions in the same handler and a subsequent action fails, any earlier action is not rolled back by the transaction. That is why idempotency is important. From 1a0a57a3d278501a7b905dfd88baba783f60dd55 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 10:40:15 +0200 Subject: [PATCH 11/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Steven van Beelen --- axon-framework/events/event-processors/README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index e480f1cb..67295933 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -315,8 +315,11 @@ dead-letter-queue delivery, or rethrow the exception. When event processor transactions end up in an exception, following events are not handled by that event processor even though they could be successfully handled. The event processor is stuck until the issue is fixed. -To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. A -dead-letter queue can not be shared between event processors so there should one queue per processor. +To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. + +Note that a dead-letter queue *can not* be shared between event processors. +Hence, every processor you want to enable this for should receive a unique dead-letter queue instance. + The `InMemorySequencedDeadLetterQueue` can be used for testing purposes but the dead-letters are gone after a restart. To persist dead-letters the `JpaSequencedDeadLetterQueue` should be used. When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database From b5a3c98f78a67dc58ea2dde04e6dd5d97da8e529 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 10:43:45 +0200 Subject: [PATCH 12/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Steven van Beelen --- axon-framework/events/event-processors/README.md | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 67295933..45cdb607 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -338,15 +338,10 @@ public class DeadLetterQueueExampleConfig { configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( PROCESSING_GROUP, configuration -> JpaSequencedDeadLetterQueue.builder() - .processingGroup( - PROCESSING_GROUP) - .transactionManager(configuration.getComponent( - TransactionManager.class)) - .entityManagerProvider( - configuration.getComponent( - EntityManagerProvider.class)) - .serializer( - configuration.serializer()) + .processingGroup(PROCESSING_GROUP) + .transactionManager(configuration.getComponent(TransactionManager.class)) + .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) + .serializer(configuration.serializer()) .maxSequences(256) .maxSequenceSize(256) .build())); From aebe42e447111a5c7199fcc3c3ae57351fba3db9 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 10:43:56 +0200 Subject: [PATCH 13/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Steven van Beelen --- axon-framework/events/event-processors/README.md | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 45cdb607..90b2576d 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -363,15 +363,10 @@ public class DeadLetterQueueExampleConfig { configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( PROCESSING_GROUP, configuration -> JpaSequencedDeadLetterQueue.builder() - .processingGroup( - PROCESSING_GROUP) - .transactionManager(configuration.getComponent( - TransactionManager.class)) - .entityManagerProvider( - configuration.getComponent( - EntityManagerProvider.class)) - .serializer( - configuration.serializer()) + .processingGroup(PROCESSING_GROUP) + .transactionManager(configuration.getComponent(TransactionManager.class)) + .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) + .serializer(configuration.serializer()) .maxSequences(256) .maxSequenceSize(256) .build())); From 4ac3e4d37d87acd2dc46edcea5d1c285e774bd12 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 10:44:23 +0200 Subject: [PATCH 14/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Steven van Beelen --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 90b2576d..8ec8093b 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -324,7 +324,7 @@ The `InMemorySequencedDeadLetterQueue` can be used for testing purposes but the To persist dead-letters the `JpaSequencedDeadLetterQueue` should be used. When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database table. - +#### Configuring a sequenced Dead-Letter Queue A`JpaSequencedDeadLetterQueue` configuration example: {% tabs %} {% tab title="Axon Configuration API" %} From a86feb6d2499cdc9703531be0c91a45359627876 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 10:44:49 +0200 Subject: [PATCH 15/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Steven van Beelen --- axon-framework/events/event-processors/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 8ec8093b..04f4ce37 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -376,8 +376,8 @@ public class DeadLetterQueueExampleConfig { {% endtab %} {% endtabs %} -You can set the maximum amount of sequences that are saved (defaults to 128) and the maximum amount of dead-letters in a -sequence (also defaults to 128). If these thresholds are exceeded an exception will be thrown and the event processor +You can set the maximum amount of sequences that are saved (defaults to 1024) and the maximum amount of dead-letters in a +sequence (also defaults to 1024). If these thresholds are exceeded a `DeadLetterQueueOverflowException` will be thrown and the event processor will stop processing. The sequence id is determined by the sequencing policy. By default, the sequence identifier is the aggregate id. From 73f17b9eae81e9bb8250b3ac214352a1dd386626 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 10:45:00 +0200 Subject: [PATCH 16/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Steven van Beelen --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 04f4ce37..f10fcd40 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -380,7 +380,7 @@ You can set the maximum amount of sequences that are saved (defaults to 1024) an sequence (also defaults to 1024). If these thresholds are exceeded a `DeadLetterQueueOverflowException` will be thrown and the event processor will stop processing. The sequence id is determined by the sequencing policy. By default, the sequence identifier is the aggregate id. - +#### Processing Dead-Letter Sequences After fixing the issue the events can be handled again by using the `process` function, in this case it will process the first matching event of type ErrorEvent: From 18c097746636bbe4689353bd9ad8c566b51dcb77 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 10:45:08 +0200 Subject: [PATCH 17/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Steven van Beelen --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index f10fcd40..09ba70d8 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -462,7 +462,7 @@ public class DeadletterProcessor { {% endtab %} {% endtabs %} - +#### Dead-Letter Enqueue Policy You can implement a custom dead-letter policy to exclude some events from the dead-letter queue, these events will be skipped. This policy is not only called for initial failures but also when dead-lettered events are processed unsuccessfully again. From a30300fbfbf438a5c401c22eb98f96b091904446 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 12:11:01 +0200 Subject: [PATCH 18/44] Added a section for deadletter attributes --- .../events/event-processors/README.md | 50 +++++++++++++++---- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 09ba70d8..8e0fb448 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -315,7 +315,7 @@ dead-letter-queue delivery, or rethrow the exception. When event processor transactions end up in an exception, following events are not handled by that event processor even though they could be successfully handled. The event processor is stuck until the issue is fixed. -To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. +To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. Note that a dead-letter queue *can not* be shared between event processors. Hence, every processor you want to enable this for should receive a unique dead-letter queue instance. @@ -324,23 +324,28 @@ The `InMemorySequencedDeadLetterQueue` can be used for testing purposes but the To persist dead-letters the `JpaSequencedDeadLetterQueue` should be used. When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database table. + #### Configuring a sequenced Dead-Letter Queue + A`JpaSequencedDeadLetterQueue` configuration example: {% tabs %} {% tab title="Axon Configuration API" %} + ```java public class DeadLetterQueueExampleConfig { public static final String PROCESSING_GROUP = "deadLetterProcessor"; - + public ConfigurerModule configure() { return configurer -> configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( PROCESSING_GROUP, configuration -> JpaSequencedDeadLetterQueue.builder() .processingGroup(PROCESSING_GROUP) - .transactionManager(configuration.getComponent(TransactionManager.class)) - .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) + .transactionManager(configuration.getComponent( + TransactionManager.class)) + .entityManagerProvider(configuration.getComponent( + EntityManagerProvider.class)) .serializer(configuration.serializer()) .maxSequences(256) .maxSequenceSize(256) @@ -348,9 +353,11 @@ public class DeadLetterQueueExampleConfig { } } ``` + {% endtab %} {% tab title="Spring Boot AutoConfiguration" %} + ```java @Configuration public class DeadLetterQueueExampleConfig { @@ -364,8 +371,10 @@ public class DeadLetterQueueExampleConfig { PROCESSING_GROUP, configuration -> JpaSequencedDeadLetterQueue.builder() .processingGroup(PROCESSING_GROUP) - .transactionManager(configuration.getComponent(TransactionManager.class)) - .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) + .transactionManager(configuration.getComponent( + TransactionManager.class)) + .entityManagerProvider(configuration.getComponent( + EntityManagerProvider.class)) .serializer(configuration.serializer()) .maxSequences(256) .maxSequenceSize(256) @@ -373,14 +382,19 @@ public class DeadLetterQueueExampleConfig { } } ``` + {% endtab %} {% endtabs %} -You can set the maximum amount of sequences that are saved (defaults to 1024) and the maximum amount of dead-letters in a -sequence (also defaults to 1024). If these thresholds are exceeded a `DeadLetterQueueOverflowException` will be thrown and the event processor +You can set the maximum amount of sequences that are saved (defaults to 1024) and the maximum amount of dead-letters in +a +sequence (also defaults to 1024). If these thresholds are exceeded a `DeadLetterQueueOverflowException` will be thrown +and the event processor will stop processing. The sequence id is determined by the sequencing policy. By default, the sequence identifier is the aggregate id. + #### Processing Dead-Letter Sequences + After fixing the issue the events can be handled again by using the `process` function, in this case it will process the first matching event of type ErrorEvent: @@ -462,7 +476,24 @@ public class DeadletterProcessor { {% endtab %} {% endtabs %} + +#### Deadletter attributes + +A dead-letter contains the following attributes: + +| attribute | type | description | +|-----------------|------------|--------------------------------------------------------------------------------------------------------------------------------------| +| message | M | The complete failed message | +| cause | String | The cause for the message to be dead-lettered | +| enqueuedAt | Instant | The moment in time when the message was entered in a dead-letter queue | +| lastTouched | Instant | The moment in time when this letter was last touched. Will equal the enqueuedAt value if this letter is enqueued for the first time. | +| diagnostics | MetaData | The diagnostic MetaData concerning this letter | +| markTouched | Deadletter | Construct a copy of this DeadLetter, replacing the lastTouched with the current time | +| withCause | Deadletter | Construct a copy of this DeadLetter, replacing the cause with the given requeueCause | +| withDiagnostics | Deadletter | Construct a copy of this DeadLetter, replacing the diagnostics with the given diagnostics | + #### Dead-Letter Enqueue Policy + You can implement a custom dead-letter policy to exclude some events from the dead-letter queue, these events will be skipped. This policy is not only called for initial failures but also when dead-lettered events are processed unsuccessfully again. @@ -497,7 +528,8 @@ public class CustomDeadLetterPolicy { ``` -One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this is a hard requirement. +One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this is a hard +requirement. Event processors using the dead-letter queue will not roll back the transaction on an error. That means that if you do multiple actions in the same handler and a subsequent action fails, any earlier action is not rolled back by the transaction. That is why idempotency is important. From c1d4125ca782c898687a9ac038cb9be2de584fb6 Mon Sep 17 00:00:00 2001 From: YvonneCeelie Date: Mon, 5 Sep 2022 13:03:43 +0200 Subject: [PATCH 19/44] Fixed tupos --- axon-framework/events/event-processors/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 8e0fb448..b72c559d 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -488,9 +488,9 @@ A dead-letter contains the following attributes: | enqueuedAt | Instant | The moment in time when the message was entered in a dead-letter queue | | lastTouched | Instant | The moment in time when this letter was last touched. Will equal the enqueuedAt value if this letter is enqueued for the first time. | | diagnostics | MetaData | The diagnostic MetaData concerning this letter | -| markTouched | Deadletter | Construct a copy of this DeadLetter, replacing the lastTouched with the current time | -| withCause | Deadletter | Construct a copy of this DeadLetter, replacing the cause with the given requeueCause | -| withDiagnostics | Deadletter | Construct a copy of this DeadLetter, replacing the diagnostics with the given diagnostics | +| markTouched | DeadLetter | Construct a copy of this DeadLetter, replacing the lastTouched with the current time | +| withCause | DeadLetter | Construct a copy of this DeadLetter, replacing the cause with the given requeueCause | +| withDiagnostics | DeadLetter | Construct a copy of this DeadLetter, replacing the diagnostics with the given diagnostics | #### Dead-Letter Enqueue Policy From 1c348589e41f5e8d26d52c38d669dda7b254ae2a Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Thu, 8 Sep 2022 16:06:45 +0200 Subject: [PATCH 20/44] Revert "Review comments" which introduced indentation adjustments This reverts commit 4fe8f42b --- .../events/event-processors/README.md | 311 ++++++------------ 1 file changed, 107 insertions(+), 204 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index b72c559d..4343f0c3 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -3,9 +3,7 @@ [Event handlers](../event-handlers.md) define the business logic to be performed when an event is received. _Event Processors_ are the components that take care of the technical aspects of that processing. They start a [unit of work](../../messaging-concepts/unit-of-work.md) and possibly a transaction. -However, they also ensure -that [correlation data](../../monitoring-and-metrics.md#correlation-data-a-idcorrelation-dataa) can be correctly -attached to all messages created during event processing, among other non-functional requirements. +However, they also ensure that [correlation data](../../monitoring-and-metrics.md#correlation-data-a-idcorrelation-dataa) can be correctly attached to all messages created during event processing, among other non-functional requirements. The image below depicts a representation of the organization of Event Processors and Event Handlers: @@ -14,20 +12,16 @@ The image below depicts a representation of the organization of Event Processors Axon has a layered approach towards organizing event handlers. First, an event handler is positioned in a _Processing Group_. Each event handler, or "Event Handling Component," will only ever belong to a single Processing Group. -The Processing Group provides a level of configurable non-functional requirements, -like [error handling](#processing-group-listener-invocation-error-handler) and -the [sequencing policy](streaming.md#sequential-processing). +The Processing Group provides a level of configurable non-functional requirements, like [error handling](#processing-group-listener-invocation-error-handler) and the [sequencing policy](streaming.md#sequential-processing). The Event Processors, in turn, is in charge of the Processing Group. An Event Processor will control 1 to N Processing Groups, although there will be a one-to-one mapping in most cases. Similar to the Event Handling Component, a Processing Group will belong to a single Event Processor. -This last layer allows the definition of the type of Event Processor used and concepts like the threading model and a -more fine-grained degree of [error handling](#event-processor-error-handler). +This last layer allows the definition of the type of Event Processor used and concepts like the threading model and a more fine-grained degree of [error handling](#event-processor-error-handler). Event Processors come in roughly two forms: [Subscribing](subscribing.md) and [Streaming](streaming.md). -Subscribing Event Processors subscribe to a source of events and are invoked by the thread managed by the publishing -mechanism. +Subscribing Event Processors subscribe to a source of events and are invoked by the thread managed by the publishing mechanism. Streaming Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself. For more specifics on either type, consult their respective sections [here](subscribing.md) and [here](streaming.md). @@ -42,13 +36,10 @@ Two processors with the same name are considered as two instances of the same pr All event handlers are attached to a processor whose name by default is the package name of the event handler's class. Furthermore, the default processor implementation used by Axon is the [Tracking Event Processor](streaming.md). -The (default) event processor used can be adjusted, as is shown in the [subscribing](subscribing.md#configuring) -and [streaming](streaming.md#configuring) sections. +The (default) event processor used can be adjusted, as is shown in the [subscribing](subscribing.md#configuring) and [streaming](streaming.md#configuring) sections. -Event handlers, or Event Handling Components, come in roughly two flavors: "regular" \(singleton, stateless\) event -handlers and [sagas](../../sagas/README.md). -[This](../event-handlers.md#registering-event-handlers) section describes the process to register an event handler, -whereas [this](../../sagas/implementation.md#configuring-a-saga) page describes the saga registration process. +Event handlers, or Event Handling Components, come in roughly two flavors: "regular" \(singleton, stateless\) event handlers and [sagas](../../sagas/README.md). +[This](../event-handlers.md#registering-event-handlers) section describes the process to register an event handler, whereas [this](../../sagas/implementation.md#configuring-a-saga) page describes the saga registration process. Now let us consider that the following event handlers have been registered: @@ -61,10 +52,8 @@ Without any intervention, this will trigger the creation of two processors, name 1. `org.axonframework.example.eventhandling` with two handlers called `MyHandler` and `MyOtherHandler` 2. `org.axonframework.example.eventhandling.module` with the single handler `ModuleHandler` -Using the package name serves as a suitable default, but using dedicated names for an Event Processor and/or the -Processing Group is recommended. -The most straightforward approach to reaching a transparent naming scheme of your event handlers is by using -the `ProcessingGroup` annotation. +Using the package name serves as a suitable default, but using dedicated names for an Event Processor and/or the Processing Group is recommended. +The most straightforward approach to reaching a transparent naming scheme of your event handlers is by using the `ProcessingGroup` annotation. This annotation resembles the Processing Group level discussed in the [introduction](README.md#event-processors). The `ProcessingGroup` annotation requires the insertion of a name and can only be set on the class. @@ -77,7 +66,7 @@ class MyHandler { } @ProcessingGroup("my-handlers") -class MyOtherHandler { +class MyOtherHandler{ // ... } @@ -92,61 +81,44 @@ Using the `ProcessingGroup` annotation as depicted, we again construct two proce 1. `my-handlers` with two handlers called `MyHandler` and `MyOtherHandler` 2. `module-handlers` with the single handler `ModuleHandler` -If more control is required to group Event Handling Components, we recommend consulting -the [assignment rules](#event-handler-assignment-rules) section. +If more control is required to group Event Handling Components, we recommend consulting the [assignment rules](#event-handler-assignment-rules) section. ### Event Handler Assignment Rules -The Configuration API allows you to configure other strategies for assigning event handling classes to processors or -assigning specific handler instances to particular processors. -We can separate these assignment rules into roughly two groups: Event Handler to Processing Group and Processing Group -to Event Processor. +The Configuration API allows you to configure other strategies for assigning event handling classes to processors or assigning specific handler instances to particular processors. +We can separate these assignment rules into roughly two groups: Event Handler to Processing Group and Processing Group to Event Processor. Below is an exhaustive list of all the assignment rules the `EventProcessingConfigurer` exposes: **Event Handler to Processing Group** * `byDefaultAssignTo(String)` - defines the default Processing Group name to assign an event handler to. - It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not - present. -* `byDefaultAssignHandlerInstancesTo(Function)` - defines a lambda invoked to assign an event handling - instance to a desired Processing Group by returning that group's name. - It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not - present. -* `byDefaultAssignHandlerTypesTo(Function, String>)` - defines a lambda invoked to assign an event handler type - to a desired Processing Group by returning that group's name. - It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not - present. -* `assignHandlerInstancesMatching(String, Predicate)` - assigns event handlers to the given Processing Group - name based on a predicate ingesting an event handling instance. + It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not present. +* `byDefaultAssignHandlerInstancesTo(Function)` - defines a lambda invoked to assign an event handling instance to a desired Processing Group by returning that group's name. + It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not present. +* `byDefaultAssignHandlerTypesTo(Function, String>)` - defines a lambda invoked to assign an event handler type to a desired Processing Group by returning that group's name. + It will only be taken into account if there are no more specifics rules and if the `ProcessingGroup` annotation is not present. +* `assignHandlerInstancesMatching(String, Predicate)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance. The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is _undefined_. -* `assignHandlerTypesMatching(String, Predicate>)` - assigns event handlers to the given Processing Group name - based on a predicate ingesting an event handler type. +* `assignHandlerTypesMatching(String, Predicate>)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type. The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is _undefined_. -* `assignHandlerInstancesMatching(String, int, Predicate)` - assigns event handlers to the given Processing - Group name based on a predicate ingesting an event handling instance. +* `assignHandlerInstancesMatching(String, int, Predicate)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance. Uses the given priority to decide on rule-ordering. The higher the priority value, the more important the rule is. If an instance matches several criteria, the outcome is _undefined_. -* `assignHandlerTypesMatching(String, int, Predicate>)` - assigns event handlers to the given Processing Group - name based on a predicate ingesting an event handler type. +* `assignHandlerTypesMatching(String, int, Predicate>)` - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type. Uses the given priority to decide on rule-ordering. The higher the priority, the more important the rule is. If an instance matches several criteria, the outcome is _undefined_. **Processing Group to Event Processor** -* `assignProcessingGroup(String, String)` - defines a given Processing Group name that belongs to the given Event - Processor's name. -* `assignProcessingGroup(Function)` - defines a lambda invoked to assign a Processing Group name to the - desired Event Processor by returning that processor's name. +* `assignProcessingGroup(String, String)` - defines a given Processing Group name that belongs to the given Event Processor's name. +* `assignProcessingGroup(Function)` - defines a lambda invoked to assign a Processing Group name to the desired Event Processor by returning that processor's name. ### Ordering Event Handlers within a processor -To order event handlers within an Event Processor, the order in which event handlers are registered (as described in -the [Registering Event Handlers](../event-handlers.md#registering-event-handlers) section) is guiding. -Thus, the ordering in which an Event Processor will call event handlers for event handling is the same as their -insertion ordering in the Configuration API. +To order event handlers within an Event Processor, the order in which event handlers are registered (as described in the [Registering Event Handlers](../event-handlers.md#registering-event-handlers) section) is guiding. +Thus, the ordering in which an Event Processor will call event handlers for event handling is the same as their insertion ordering in the Configuration API. -If we use Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering -by adding the `@Order` annotation. +If we use Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering by adding the `@Order` annotation. This annotation is placed on the event handler class name, containing an `integer` value to specify the ordering. Note that it is **not possible** to order event handlers belonging to different Event Processors. @@ -154,15 +126,12 @@ Each Event Processor acts as an isolated component without any intervention from > **Ordering Considerations** > -> Although we can place an order among event handlers within an Event Processor, separation of event handlers is -> recommended. -> -> Placing an overall ordering on event handlers means those components are inclined to interact with one another, -> introducing a form of coupling. +> Although we can place an order among event handlers within an Event Processor, separation of event handlers is recommended. +> +> Placing an overall ordering on event handlers means those components are inclined to interact with one another, introducing a form of coupling. > Due to this, the event handling process will become complex to manage (e.g., for new team members). -> Furthermore, embracing an ordering approach might lead to place _all_ event handlers in a global ordering, decreasing -> processing speeds in general. -> +> Furthermore, embracing an ordering approach might lead to place _all_ event handlers in a global ordering, decreasing processing speeds in general. +> > In all, you are free to use an ordering, but we recommend using it sparingly. ## Error Handling @@ -170,79 +139,62 @@ Each Event Processor acts as an isolated component without any intervention from Errors are inevitable in any application. Depending on where they happen, you may want to respond differently. -By default, exceptions raised by event handlers are caught in -the [Processing Group layer](#processing-group-listener-invocation-error-handler), logged, and processing continues with -the following events. -When an exception is thrown when a processor is trying to commit a transaction, update -a [token](streaming.md#token-store), or in any other part of the process, the exception will be propagated. +By default, exceptions raised by event handlers are caught in the [Processing Group layer](#processing-group-listener-invocation-error-handler), logged, and processing continues with the following events. +When an exception is thrown when a processor is trying to commit a transaction, update a [token](streaming.md#token-store), or in any other part of the process, the exception will be propagated. -In the case of a [Streaming Event Processor](streaming.md#error-mode), this means the processor will go into error mode, -releasing any tokens and retrying at an incremental interval \(starting at 1 second, up to max 60 seconds\). -A [Subscribing Event Processor](subscribing.md#error-mode) will report a publication error to the component that -provided the event. +In the case of a [Streaming Event Processor](streaming.md#error-mode), this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval \(starting at 1 second, up to max 60 seconds\). +A [Subscribing Event Processor](subscribing.md#error-mode) will report a publication error to the component that provided the event. -To change this behavior, both the Processing Group and Event Processor level allow customization on how to deal with -exceptions: +To change this behavior, both the Processing Group and Event Processor level allow customization on how to deal with exceptions: ### Processing Group - Listener Invocation Error Handler -The component dealing with exceptions thrown from an event handling method is called -the `ListenerInvocationErrorHandler`. -By default, these exceptions are logged (with the `LoggingErrorHandler` implementation), and processing continues with -the next handler or message. +The component dealing with exceptions thrown from an event handling method is called the `ListenerInvocationErrorHandler`. +By default, these exceptions are logged (with the `LoggingErrorHandler` implementation), and processing continues with the next handler or message. The default `ListenerInvocationErrorHandler` used by each processing group can be customized. Furthermore, we can configure the error handling behavior per processing group: {% tabs %} {% tab title="Axon Configuration API" %} - ```java public class AxonConfig { - // ... public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { // To configure a default ... processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */) // ... or for a specific processing group: - .registerListenerInvocationErrorHandler("my-processing-group", - conf -> /* create listener error handler */); + .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */); } } ``` - {% endtab %} {% tab title="Spring Boot AutoConfiguration" %} - ```java @Configuration public class AxonConfig { - // ... @Autowired public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { // To configure a default ... processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */) // ... or for a specific processing group: - .registerListenerInvocationErrorHandler("my-processing-group", - conf -> /* create listener error handler */); + .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */); } } ``` - {% endtab %} {% endtabs %} -It is easy to implement custom error handling behavior. -The error handling method to implement provides the exception, the event that was handled, and a reference to the -handler that was handling the message: +It is easy to implement custom error handling behavior. +The error handling method to implement provides the exception, the event that was handled, and a reference to the handler that was handling the message: ```java public interface ListenerInvocationErrorHandler { - void onError(Exception exception, - EventMessage event, + void onError(Exception exception, + EventMessage event, EventMessageHandler eventHandler) throws Exception; } ``` @@ -256,17 +208,14 @@ Exceptions occurring outside an event handler's scope, or have bubbled up from t The default error handler is the `PropagatingErrorHandler`, which will rethrow any exceptions it catches. How the Event Processor deals with a rethrown exception differ per implementation. -The behaviour for the Subscribing- and the Streaming Event Processor can respectively be -found [here](subscribing.md#error-mode) and [here](streaming.md#error-mode). +The behaviour for the Subscribing- and the Streaming Event Processor can respectively be found [here](subscribing.md#error-mode) and [here](streaming.md#error-mode). We can configure a default `ErrorHandler` for all Event Processors or an `ErrorHandler` for specific processors: {% tabs %} {% tab title="Axon Configuration API" %} - ```java public class AxonConfig { - // ... public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { // To configure a default ... @@ -276,15 +225,12 @@ public class AxonConfig { } } ``` - {% endtab %} {% tab title="Spring Boot AutoConfiguration" %} - ```java @Configuration public class AxonConfig { - // ... @Autowired public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { @@ -295,7 +241,6 @@ public class AxonConfig { } } ``` - {% endtab %} {% endtabs %} @@ -308,8 +253,7 @@ public interface ErrorHandler { } ``` -Based on the provided `ErrorContext` object, you can decide to ignore the error, schedule retries, perform -dead-letter-queue delivery, or rethrow the exception. +Based on the provided `ErrorContext` object, you can decide to ignore the error, schedule retries, perform dead-letter-queue delivery, or rethrow the exception. ### Dead-letter Queue @@ -362,7 +306,7 @@ public class DeadLetterQueueExampleConfig { @Configuration public class DeadLetterQueueExampleConfig { - public static final String PROCESSING_GROUP = "deadLetterProcessor"; + public static final String PROCESSING_GROUP = "deadLetterProcessor"; @Autowired public ConfigurerModule configure() { @@ -402,19 +346,12 @@ first matching event of type ErrorEvent: {% tab title="Process first dead-letter in the queue of type ErrorEvent " %} ```java -public class DeadletterProcessor { - - private final EventProcessingConfiguration eventProcessingConfiguration; - - public DeadletterProcessor(EventProcessingConfiguration eventProcessingConfiguration) { - this.eventProcessingConfiguration = eventProcessingConfiguration; - } - - public void repairErrorEvent() { - eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) - .ifPresent(p -> p.process(deadLetter -> deadLetter.message() - .getPayload() instanceof ErrorEvent)); - } +public class DeadletterProcessor{ + + public void repairErrorEvents() { + eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) + .ifPresent(p -> p.process(deadLetter -> deadLetter.message().getPayload() instanceof ErrorEvent)); + } } ``` @@ -500,46 +437,38 @@ unsuccessfully again. ```java @Configuration -public class CustomDeadLetterPolicy { - - @Autowired - public void configure(EventProcessingConfigurer configurer) { - configurer.registerDeadLetterPolicy(PROCESSING_GROUP, configuration -> - (letter, cause) -> { - if (cause instanceof NullPointerException) { - // It's pointless.. - return Decisions.doNotEnqueue(); - } - final int retries = (int) letter.diagnostics().getOrDefault("retries", -1); - if (letter.message().getPayload() instanceof ErrorEvent) { - // Important, always retry - return Decisions.enqueue(cause); - } - if (retries < 10) { - // Let's continue and increase retries! - return Decisions.enqueue(cause, - deadLetter -> deadLetter.diagnostics().and("retries", retries + 1)); - } - // Exhausted retries - return Decisions.doNotEnqueue(); - }); - } +public class CustomDeadLetterPolicy{ + @Autowired + public void configure(EventProcessingConfigurer configurer) { + configurer.registerDeadLetterPolicy(PROCESSING_GROUP, configuration -> + (letter, cause) -> { + if (cause instanceof NullPointerException) { + // It's pointless.. + return Decisions.doNotEnqueue(); + } + final int retries = (int) letter.diagnostics().getOrDefault("retries", -1); + if (letter.message().getPayload() instanceof ErrorEvent) { + // Important, always retry + return Decisions.enqueue(cause); + } + if(retries < 10) { + // Let's continue and increase retries! + return Decisions.enqueue(cause, deadLetter -> deadLetter.diagnostics().and("retries", retries + 1)); + } + // Exhausted retries + return Decisions.doNotEnqueue(); + }); + } } ``` -One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this is a hard -requirement. -Event processors using the dead-letter queue will not roll back the transaction on an error. That means that if you do -multiple actions in the same handler and a subsequent action fails, any earlier action is not rolled back by the -transaction. That is why idempotency is important. +One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. ## General processor configuration -Alongside [handler assignment](#assigning-handlers-to-processors) and [error handling](#error-handling), Event -Processors allow configuration for other components too. -For [Subscribing](subscribing.md#configuring) and [Streaming](streaming.md#configuring) Event Processor specific -options, their respective sections should be checked. +Alongside [handler assignment](#assigning-handlers-to-processors) and [error handling](#error-handling), Event Processors allow configuration for other components too. +For [Subscribing](subscribing.md#configuring) and [Streaming](streaming.md#configuring) Event Processor specific options, their respective sections should be checked. The remainder of this page will cover the generic configuration options for each Event Processor. ### Event Processor Builders @@ -553,100 +482,74 @@ To that end, we can configure a custom `EventProcessorBuilder`: interface EventProcessorBuilder { // Note: the `EventHandlerInvoker` is the component which holds the event handling functions. - EventProcessor build(String name, - Configuration configuration, + EventProcessor build(String name, + Configuration configuration, EventHandlerInvoker eventHandlerInvoker); } ``` -The `EventProcessorBuilder` functional interface provides the event processor's name, the `Configuration` and -the `EventHandlerInvoker`, and requires returning an `EventProcessor` instance. -Note that any Axon component that an Event Processor requires (e.g., an `EventStore`) is retrievable from -the `Configuration`. +The `EventProcessorBuilder` functional interface provides the event processor's name, the `Configuration` and the `EventHandlerInvoker`, and requires returning an `EventProcessor` instance. +Note that any Axon component that an Event Processor requires (e.g., an `EventStore`) is retrievable from the `Configuration`. The `EventProcessingConfigurer` provides two methods to configure an `EventProcessorBuilder`: -1. `registerEventProcessorFactory(EventProcessorBuilder)` - allows you to define a default factory method that creates - event processors for which no explicit factories are defined -2. `registerEventProcessor(String, EventProcessorBuilder)` - defines the factory method to use to create a processor - with given `name` +1. `registerEventProcessorFactory(EventProcessorBuilder)` - allows you to define a default factory method that creates event processors for which no explicit factories are defined +2. `registerEventProcessor(String, EventProcessorBuilder)` - defines the factory method to use to create a processor with given `name` ### Event Handler Interceptors -Since the Event Processor is the invoker of event handling methods, it is a spot to -configure [Message Handler Interceptors](../../messaging-concepts/message-intercepting.md) too. -Since Event Processors are dedicated to event handling, the `MessageHandlerInterceptor` is required to deal with -an `EventMessage`. -Differently put, -an [EventHandlerInterceptor](../../messaging-concepts/message-intercepting.md#event-handler-interceptors) can be -registered to Event Processors. +Since the Event Processor is the invoker of event handling methods, it is a spot to configure [Message Handler Interceptors](../../messaging-concepts/message-intercepting.md) too. +Since Event Processors are dedicated to event handling, the `MessageHandlerInterceptor` is required to deal with an `EventMessage`. +Differently put, an [EventHandlerInterceptor](../../messaging-concepts/message-intercepting.md#event-handler-interceptors) can be registered to Event Processors. The `EventProcessingConfigurer` provides two methods to configure `MessageHandlerInterceptor` instances: -- `registerDefaultHandlerInterceptor(BiFunction>>)` - - registers a default `MessageHandlerInterceptor` that will be configured on every Event Processor instance -- `registerHandlerInterceptor(String, Function>>)` - - registers a `MessageHandlerInterceptor` that will be configured for the Event Processor matching the given `String` +- `registerDefaultHandlerInterceptor(BiFunction>>)` - registers a default `MessageHandlerInterceptor` that will be configured on every Event Processor instance +- `registerHandlerInterceptor(String, Function>>)` - registers a `MessageHandlerInterceptor` that will be configured for the Event Processor matching the given `String` ### Message Monitors Any Event Processor instance provides the means to contain a Message Monitor. -Message Monitors (discussed in more detail [here](../../monitoring-and-metrics.md#metrics-a-idmetricsa)) allow for -monitoring the flow of messages throughout an Axon application. -For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards -the event handling functions. +Message Monitors (discussed in more detail [here](../../monitoring-and-metrics.md#metrics-a-idmetricsa)) allow for monitoring the flow of messages throughout an Axon application. +For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards the event handling functions. The `EventProcessingConfigurer` provides two approaches towards configuring a `MessageMonitor`: -- `registerMessageMonitor(String, Function>>)` - registers the - given `MessageMonitor` to the Event Processor matching the given `String` -- `registerMessageMonitorFactory(String, MessageMonitorFactory)` - registers the given `MessageMonitorFactory` to - construct a `MessageMonitor` for the Event Processor matching the given `String` +- `registerMessageMonitor(String, Function>>)` - registers the given `MessageMonitor` to the Event Processor matching the given `String` +- `registerMessageMonitorFactory(String, MessageMonitorFactory)` - registers the given `MessageMonitorFactory` to construct a `MessageMonitor` for the Event Processor matching the given `String` -The `MessageMonitorFactory` provides a more fine-grained approach, used throughout the Configuration API, to construct -a `MessageMonitor`: +The `MessageMonitorFactory` provides a more fine-grained approach, used throughout the Configuration API, to construct a `MessageMonitor`: ```java @FunctionalInterface public interface MessageMonitorFactory { - - MessageMonitor> create(Configuration configuration, - Class componentType, + + MessageMonitor> create(Configuration configuration, + Class componentType, String componentName); } ``` We can use the `Configuration` to retrieve the required dependencies to construct the `MessageMonitor`. The type and name reflect which infrastructure component the factory constructs a monitor for. -Whenever you use the `MessageMonitorFactory` to construct a `MessageMonitor` for an Event Processor, the factory expects -the `componentType` to be an `EventProcessor` implementation. +Whenever you use the `MessageMonitorFactory` to construct a `MessageMonitor` for an Event Processor, the factory expects the `componentType` to be an `EventProcessor` implementation. The `componentName`, on the other hand, would resemble the name of the Event Processor. ### Transaction Management -As components that deal with event handling, the Event Processor is a logical place to provide transaction configuration -options. +As components that deal with event handling, the Event Processor is a logical place to provide transaction configuration options. Note that in the majority of the scenarios, the defaults will suffice. This section simply serves to show these options to allow adjustment if the application requires it. The first of these is the `TransactionManager`. -Axon uses the `TransactionManager` to attach a transaction to -every [Unit of Work](../../messaging-concepts/unit-of-work.md). -Within a Spring environment, the `TransactionManager` defaults to a `SpringTransactionManager`, which uses -Spring's `PlatformTransactionManager` under the hood. -In non Spring environments, it would be wise to build a `TransactionManager` implement if transaction management is -required, of course. +Axon uses the `TransactionManager` to attach a transaction to every [Unit of Work](../../messaging-concepts/unit-of-work.md). +Within a Spring environment, the `TransactionManager` defaults to a `SpringTransactionManager`, which uses Spring's `PlatformTransactionManager` under the hood. +In non Spring environments, it would be wise to build a `TransactionManager` implement if transaction management is required, of course. Such an implementation only requires the definition of the `TransactionManager#startTransaction()` method. -To adjust the transaction manager for an Event Processor, -the `registerTransactionManager(String, Function)` on the `EventProcessingConfigurer` -should be used. +To adjust the transaction manager for an Event Processor, the `registerTransactionManager(String, Function)` on the `EventProcessingConfigurer` should be used. Secondly, you can adjust the desired `RollbackConfiguration` per Event Processor. -It is the `RollbackConfiguration` that decide when a [Unit of Work](../../messaging-concepts/unit-of-work.md) should -rollback the transaction. -The default `RollbackConfiguration` is to rollback on any type of `Throwable`; -the [Unit of Work](../../messaging-concepts/unit-of-work.md) page describes the other options you can choose. -To adjust the default behaviour, -the `registerRollbackConfiguration(String, Function)` function should be invoked -on the `EventProcessingConfigurer`. +It is the `RollbackConfiguration` that decide when a [Unit of Work](../../messaging-concepts/unit-of-work.md) should rollback the transaction. +The default `RollbackConfiguration` is to rollback on any type of `Throwable`; the [Unit of Work](../../messaging-concepts/unit-of-work.md) page describes the other options you can choose. +To adjust the default behaviour, the `registerRollbackConfiguration(String, Function)` function should be invoked on the `EventProcessingConfigurer`. From 6bb860f148609518079ce3e1f70389b6cc9e17d2 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 11:18:37 +0200 Subject: [PATCH 21/44] Adjust DLQ intro section Adjust DLQ intro section by: * adding a small introduction, * replacing event processor mentions for processing group, * add details on maintaining a dead letter sequence #281 --- .../events/event-processors/README.md | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 4343f0c3..404cf751 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -257,17 +257,35 @@ Based on the provided `ErrorContext` object, you can decide to ignore the error, ### Dead-letter Queue -When event processor transactions end up in an exception, following events are not handled by that event processor even -though they could be successfully handled. The event processor is stuck until the issue is fixed. -To skip and save the events that are failing, you can configure a dead-letter queue for an event processor. - -Note that a dead-letter queue *can not* be shared between event processors. -Hence, every processor you want to enable this for should receive a unique dead-letter queue instance. - -The `InMemorySequencedDeadLetterQueue` can be used for testing purposes but the dead-letters are gone after a restart. -To persist dead-letters the `JpaSequencedDeadLetterQueue` should be used. -When using the `JpaSequencedDeadLetterQueue` the dead-lettered events are stored in the `dead_letter_entry` database -table. +Although configuring a [Listener Invocation Error Handler](#processing-group---listener-invocation-error-handler) +and [Error Handler](#event-processor---error-handler) +helps with dealing with event handling exceptions, you still end up in an event handling stop. +When you log and proceed, you will likely need to resolve the predicament and [replay](streaming.md#replaying-events) +past events. +If you instead propagate the exceptions, the event processor will stall entirely. + +Although this behavior is sufficient on many occasions, sometimes it is beneficial if we can unblock event handling by parking the problematic event. +To that end, you can configure a dead-letter queue for a [processing group](#event-processors). + +An essential concept of Axon Frameworks event processors is the maintenance of event ordering, also when you configure [parallel processing](streaming.md#parallel-processing). +A perfect example when this is a requirement is the need to handle aggregate events in publishing order. +Simply dead lettering one failed event would cause subsequent events in the sequence to react to stale state. + +It is thus of utmost importance that a dead-letter queue for events enqueues an event and any following events in the sequence. +To that end, the supported dead-letter queue is a so-called `SequencedDeadLetterQueue`. + +Integral to its design is to allow for queueing failed events and events that belong to a faulty sequence. +It does so by maintaining a sequence identifier for each event, determined by the [sequencing policy](/axon-framework/events/event-processors/streaming.md#sequential-processing). + +Note that you *cannot* share a dead-letter queue between different processing groups. +Hence, each processing group you want to enable this behavior for should receive a unique dead-letter queue instance. + +We currently provide the following dead-letter queue implementations: +* `InMemorySequencedDeadLetterQueue` - In-memory variant of the dead-letter queue. + Useful for testing purposes, but as it does not persist dead letters, it is unsuited for most production environments. +* `JpaSequencedDeadLetterQueue` - JPA variant of the dead-letter queue. + It constructs a `dead_letter_entry` table where it persists failed-events in. + The JPA dead-letter queue is a suitable option for production environments by persisting the dead letters. #### Configuring a sequenced Dead-Letter Queue From 17e138769b40fdefe01ba0f34352a64d04df22b0 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 11:39:17 +0200 Subject: [PATCH 22/44] Adjust DLQ configuration section Adjust DLQ configuration section by: * adjusting the indentation, * slightly adjusting the samples, * replacing mentions of event processor with processing group #281 --- .../events/event-processors/README.md | 86 ++++++++----------- 1 file changed, 38 insertions(+), 48 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 404cf751..8425fcc5 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -289,71 +289,61 @@ We currently provide the following dead-letter queue implementations: #### Configuring a sequenced Dead-Letter Queue -A`JpaSequencedDeadLetterQueue` configuration example: +A `JpaSequencedDeadLetterQueue` configuration example: + {% tabs %} {% tab title="Axon Configuration API" %} - ```java public class DeadLetterQueueExampleConfig { - - public static final String PROCESSING_GROUP = "deadLetterProcessor"; - - public ConfigurerModule configure() { - return configurer -> - configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( - PROCESSING_GROUP, - configuration -> JpaSequencedDeadLetterQueue.builder() - .processingGroup(PROCESSING_GROUP) - .transactionManager(configuration.getComponent( - TransactionManager.class)) - .entityManagerProvider(configuration.getComponent( - EntityManagerProvider.class)) - .serializer(configuration.serializer()) - .maxSequences(256) - .maxSequenceSize(256) - .build())); + + public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) { + return configurer -> configurer.eventProcessing( + eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( + processingGroup, + configuration -> JpaSequencedDeadLetterQueue.builder() + .processingGroup(processingGroup) + .maxSequences(256) + .maxSequenceSize(256) + .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) + .transactionManager(configuration.getComponent(TransactionManager.class)) + .serializer(configuration.serializer()) + .build() + ) + ); } } ``` - {% endtab %} - {% tab title="Spring Boot AutoConfiguration" %} - ```java @Configuration public class DeadLetterQueueExampleConfig { - - public static final String PROCESSING_GROUP = "deadLetterProcessor"; - - @Autowired - public ConfigurerModule configure() { - return configurer -> - configurer.eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( - PROCESSING_GROUP, - configuration -> JpaSequencedDeadLetterQueue.builder() - .processingGroup(PROCESSING_GROUP) - .transactionManager(configuration.getComponent( - TransactionManager.class)) - .entityManagerProvider(configuration.getComponent( - EntityManagerProvider.class)) - .serializer(configuration.serializer()) - .maxSequences(256) - .maxSequenceSize(256) - .build())); + + @Autowired + public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) { + return configurer -> configurer.eventProcessing( + eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( + processingGroup, + configuration -> JpaSequencedDeadLetterQueue.builder() + .processingGroup(processingGroup) + .maxSequences(256) + .maxSequenceSize(256) + .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) + .transactionManager(configuration.getComponent(TransactionManager.class)) + .serializer(configuration.serializer()) + .build() + ) + ); } } ``` - {% endtab %} - {% endtabs %} -You can set the maximum amount of sequences that are saved (defaults to 1024) and the maximum amount of dead-letters in -a -sequence (also defaults to 1024). If these thresholds are exceeded a `DeadLetterQueueOverflowException` will be thrown -and the event processor -will stop processing. -The sequence id is determined by the sequencing policy. By default, the sequence identifier is the aggregate id. + +You can set the maximum number of saved sequences (defaults to 1024) and the maximum number of dead letters in a sequence (also defaults to 1024). +If either of these thresholds is exceeded, the queue will throw a `DeadLetterQueueOverflowException`. +This exception means the processing group will stop processing new events altogether. +Thus, the processing group moves back to the behavior described at the start of the [Error Handling](#error-handling) section. #### Processing Dead-Letter Sequences From c8c16e2e512d8c45ae1cf54d3418cfb36ca29499 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 12:25:25 +0200 Subject: [PATCH 23/44] Adjust DLQ processing section Adjust DLQ processing section by: * expanding the intro section by explaining the SequencedDeadLetterProcessor and its methods, * adjust the sample titles * adjusting the indentation, * replacing repair for retry in the samples as there is no guarantee the letter is not enqueued again, * replacing mentions of event processor with processing group #281 --- .../events/event-processors/README.md | 119 ++++++++++-------- 1 file changed, 64 insertions(+), 55 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 8425fcc5..6bce9667 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -347,78 +347,87 @@ Thus, the processing group moves back to the behavior described at the start of #### Processing Dead-Letter Sequences -After fixing the issue the events can be handled again by using the `process` function, in this case it will process the -first matching event of type ErrorEvent: +Once you resolve the problem that led to dead lettering events, we can start processing the dead letters. +We recommend using the `SequencedDeadLetterProcessor` for this, as it processes an entire dead-letter _sequence_ instead of single dead-letter entries. +It will thus ensure the event order is maintained during the retry. -{% tabs %} -{% tab title="Process first dead-letter in the queue of type ErrorEvent " %} +The `SequencedDeadLetterProcessor` provides two operations to process dead letters: -```java -public class DeadletterProcessor{ - - public void repairErrorEvents() { - eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) - .ifPresent(p -> p.process(deadLetter -> deadLetter.message().getPayload() instanceof ErrorEvent)); - } -} -``` +1. `boolean processAny()` - Process the oldest dead-letter sequence. + Returns `true` if it processes a sequence successfully. +2. `boolean process(Predicate>)` - Process the oldest dead-letter sequence matching the predicate. + Note that the predicate only filters based on a sequence's *first* entry. + Returns `true` if it processes a sequence successfully. -{% endtab %} +If the processing of a dead letter fails, the event will be offered to the dead-letter queue again. +How the dead-lettering process reacts to this depends on the [enqueue policy](#dead-letter-enqueue-policy). -{% tab title="Process any dead-letter in the queue" %} +You can retrieve the `SequencedDeadLetterProcessor` from the `EventProcessingConfiguration`. +Below are a couple of examples of how to process dead-letter sequences: +{% tabs %} +{% tab title="Process the oldest dead-letter sequence matching `ErrorEvent`" %} ```java public class DeadletterProcessor { - - private final EventProcessingConfiguration eventProcessingConfiguration; - - public DeadletterProcessor(EventProcessingConfiguration eventProcessingConfiguration) { - this.eventProcessingConfiguration = eventProcessingConfiguration; - } - - public void repairAnyEvent() { - eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP) - .ifPresent(SequencedDeadLetterProcessor::processAny); + + private EventProcessingConfiguration config; + + public void retryErrorEventSequence(String processingGroup) { + config.sequencedDeadLetterProcessor(processingGroup) + .ifPresent(letterProcessor -> letterProcessor.process( + deadLetter -> deadLetter.message().getPayload() instanceof ErrorEvent + )); } } ``` - {% endtab %} - -{% tab title="Process all dead-letters in the queue" %} - +{% tab title="Process the oldest dead-letter sequence in the queue" %} ```java public class DeadletterProcessor { - - private final EventProcessingConfiguration eventProcessingConfiguration; - - public DeadletterProcessor(EventProcessingConfiguration eventProcessingConfiguration) { - this.eventProcessingConfiguration = eventProcessingConfiguration; + + private EventProcessingConfiguration config; + + public void retryAnySequence(String processingGroup) { + config.sequencedDeadLetterProcessor(processingGroup) + .ifPresent(SequencedDeadLetterProcessor::processAny); } - - public void repairAllEvents() { - Optional>>>> deadLetters = eventProcessingConfiguration.deadLetterQueue( - PROCESSING_GROUP).map(SequencedDeadLetterQueue::deadLetters); - - deadLetters.ifPresent(sequences -> { - // Iterate over all sequences - for (Iterable>> sequence : sequences) { - // Iterate over all dead-letters belonging to the same sequence - for (DeadLetter> deadLetterInSequence : sequence) { - eventProcessingConfiguration.sequencedDeadLetterProcessor(PROCESSING_GROUP).ifPresent( - eventMessageSequencedDeadLetterProcessor -> { - eventMessageSequencedDeadLetterProcessor.process( - deadLetter -> deadLetter.message().getIdentifier() - .equals(deadLetterInSequence.message() - .getIdentifier())); - }); - } - } - }); +} +``` +{% endtab %} +{% tab title="Process all dead-letter sequences in the queue" %} +```java +public class DeadletterProcessor { + + private EventProcessingConfiguration config; + + public void retryAllSequences(String processingGroup) { + Optional>> optionalLetterProcessor = + config.sequencedDeadLetterProcessor(processingGroup); + if (!optionalLetterProcessor.isPresent()) { + return; + } + SequencedDeadLetterProcessor> letterProcessor = optionalLetterProcessor.get(); + + // Retrieve all the dead lettered event sequences: + Iterable>>> deadLetterSequences = + config.deadLetterQueue(processingGroup) + .map(SequencedDeadLetterQueue::deadLetters) + .orElseThrow(() -> new IllegalArgumentException("No such Processing Group")); + + // Iterate over all sequences: + for (Iterable>> sequence : deadLetterSequences) { + Iterator>> sequenceIterator = sequence.iterator(); + String firstLetterId = sequenceIterator.next() + .message() + .getIdentifier(); + + // SequencedDeadLetterProcessor#process automatically retries an entire sequence. + // Hence, we only need to filter on the first entry of the sequence: + letterProcessor.process(deadLetter -> deadLetter.message().getIdentifier().equals(firstLetterId)); + } } } ``` - {% endtab %} {% endtabs %} From 2ffdecd0f1462d66c8887c56327bed1129eabd7d Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 12:28:24 +0200 Subject: [PATCH 24/44] Adjust DLQ attributes section Adjust DLQ attributes section by: * fix the section title, * use code sections, * remove markTouched, withCause and withDiagnostics, * change message type to EventMessage * change cause type to Optional, * explain why the cause may be empty, * explain that diagnostics are filled through an EnqueuePolicy #281 --- .../events/event-processors/README.md | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 6bce9667..ebbe8553 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -255,7 +255,7 @@ public interface ErrorHandler { Based on the provided `ErrorContext` object, you can decide to ignore the error, schedule retries, perform dead-letter-queue delivery, or rethrow the exception. -### Dead-letter Queue +### Dead-Letter Queue Although configuring a [Listener Invocation Error Handler](#processing-group---listener-invocation-error-handler) and [Error Handler](#event-processor---error-handler) @@ -431,20 +431,17 @@ public class DeadletterProcessor { {% endtab %} {% endtabs %} -#### Deadletter attributes +#### Dead-Letter attributes -A dead-letter contains the following attributes: +A dead letter contains the following attributes: -| attribute | type | description | -|-----------------|------------|--------------------------------------------------------------------------------------------------------------------------------------| -| message | M | The complete failed message | -| cause | String | The cause for the message to be dead-lettered | -| enqueuedAt | Instant | The moment in time when the message was entered in a dead-letter queue | -| lastTouched | Instant | The moment in time when this letter was last touched. Will equal the enqueuedAt value if this letter is enqueued for the first time. | -| diagnostics | MetaData | The diagnostic MetaData concerning this letter | -| markTouched | DeadLetter | Construct a copy of this DeadLetter, replacing the lastTouched with the current time | -| withCause | DeadLetter | Construct a copy of this DeadLetter, replacing the cause with the given requeueCause | -| withDiagnostics | DeadLetter | Construct a copy of this DeadLetter, replacing the diagnostics with the given diagnostics | +| attribute | type | description | +|-----------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------| +| `message` | `EventMessage` | The complete failed event. | +| `cause` | `Optional` | The cause for the message to be dead lettered. Empty if the letter is enqueued because it is part of a sequence. | +| `enqueuedAt` | `Instant` | The moment in time when the event was enqueued in a dead-letter queue. | +| `lastTouched` | `Instant` | The moment in time when this letter was last touched. Will equal the `enqueuedAt` value if this letter is enqueued for the first time. | +| `diagnostics` | `MetaData` | The diagnostic `MetaData` concerning this letter. Filled through the [enqueue policy](#dead-letter-enqueue-policy). | #### Dead-Letter Enqueue Policy From 43ed042c5b3538744f76f2c1715a2e1330528dd8 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 13:37:39 +0200 Subject: [PATCH 25/44] Adjust DLQ policy section Adjust DLQ policy section by: * introduce desire to not enqueue at all times, * introduce EnqueuePolicy and EnqueueDecision, * explain when the policy kicks in, * explain scenario when to configure a custom policy, * separate the policy creation and configuration into two sample sections * use several Decisions operations to showcase it. #281 --- .../events/event-processors/README.md | 91 +++++++++++++------ 1 file changed, 64 insertions(+), 27 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index ebbe8553..82249baf 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -445,39 +445,76 @@ A dead letter contains the following attributes: #### Dead-Letter Enqueue Policy -You can implement a custom dead-letter policy to exclude some events from the dead-letter queue, these events will be -skipped. This policy is not only called for initial failures but also when dead-lettered events are processed -unsuccessfully again. +By default, when you configure a dead-letter queue and event handling fails, the event is dead-lettered. +However, not all event failures should result in new entries in the dead-letter queue. +Similarly, when [letter processing](#processing-dead-letter-sequences) fails, you might want to reconsider whether you want to enqueue the letter again. + +To that end, you can configure a so-called `EnqueuePolicy`. +The enqueue policy ingests a `DeadLetter` and a cause (`Throwable`) and returns an `EnqueueDecision`. +The `EnqueueDecision`, in turn, describes if the framework should enqueue the dead letter. + +You can customize the dead-letter policy to exclude some events when handling fails. +As a consequence, these events will be skipped. +Note that Axon Framework invokes the policy on initial event handling *and* on [dead-letter processing](#processing-dead-letter-sequences). + +Reevaluating the policy after processing failed may be essential to ensure a dead letter isn't stuck in the queue forever. +To deal with this scenario, you can attach additional diagnostic information to the dead letter through the policy. +See the sample `EnqueuePolicy` below for this: ```java -@Configuration -public class CustomDeadLetterPolicy{ - @Autowired - public void configure(EventProcessingConfigurer configurer) { - configurer.registerDeadLetterPolicy(PROCESSING_GROUP, configuration -> - (letter, cause) -> { - if (cause instanceof NullPointerException) { - // It's pointless.. - return Decisions.doNotEnqueue(); - } - final int retries = (int) letter.diagnostics().getOrDefault("retries", -1); - if (letter.message().getPayload() instanceof ErrorEvent) { - // Important, always retry - return Decisions.enqueue(cause); - } - if(retries < 10) { - // Let's continue and increase retries! - return Decisions.enqueue(cause, deadLetter -> deadLetter.diagnostics().and("retries", retries + 1)); - } - // Exhausted retries - return Decisions.doNotEnqueue(); - }); - } +public class CustomEnqueuePolicy implements EnqueuePolicy> { + + @Override + public EnqueueDecision> decide(DeadLetter> letter, Throwable cause) { + if (cause instanceof NullPointerException) { + // It's pointless: + return Decisions.doNotEnqueue(); + } + + final int retries = (int) letter.diagnostics().getOrDefault("retries", -1); + if (letter.message().getPayload() instanceof ErrorEvent) { + // Important and new entry: + return Decisions.enqueue(cause); + } + if (retries < 10) { + // Let's continue and increase retries: + return Decisions.requeue(cause, l -> l.diagnostics().and("retries", retries + 1)); + } + + // Exhausted all retries: + return Decisions.evict(); + } } +``` + +The `Decisions` utility class provides the most reasonable decisions, but you are free to construct your own `EnqueueDecision` when necessary. +See the following example for configuring our custom policy: +{% tabs %} +{% tab title="Axon Configuration API" %} +```java +public class EnqueuePolicyConfigurer { + + public void configureEnqueuePolicy(EventProcessingConfigurer configurer, String processingGroup) { + configurer.registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy()); + } +} ``` +{% endtab %} +{% tab title="Spring Boot AutoConfiguration" %} +```java +@Configuration +public class EnqueuePolicyConfigurer { -One important note, when implementing event handlers, make them idempotent. With the dead-letter queue this becomes a hard requirement. The principle of exactly once delivery is not guaranteed and at-least-once is the reality to cope with. + @Bean + public ConfigurerModule configureEnqueuePolicy(String processingGroup) { + return configurer -> configurer.eventProcessing() + .registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy()); + } +} +``` +{% endtab %} +{% endtabs %} ## General processor configuration From 89b0a54c8cc0f0bfa64462a1513ef63af74b3354 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 13:49:08 +0200 Subject: [PATCH 26/44] Adjust DLQ idempotency section Adjust DLQ idempotency section by: * moving it to right after the intro section, * making it a separate subsection for clarity, * expanding on why this is strongly recommended #281 --- axon-framework/events/event-processors/README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 82249baf..756502d4 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -287,6 +287,17 @@ We currently provide the following dead-letter queue implementations: It constructs a `dead_letter_entry` table where it persists failed-events in. The JPA dead-letter queue is a suitable option for production environments by persisting the dead letters. +#### Dead-Letter Queues and Idempotency + +Before configuring a `SequencedDeadLetterQueue` it is vital to validate whether your event handling functions are idempotent. +As a processing group consists of several Event Handling Components (as explained in the intro of this chapter), some handlers may succeed in event handling while others will not. +As a configured dead-letter queue does not stall event handling, a failure in one Event Handling Component does not cause a rollback for other event handlers. +Furthermore, as the dead-letter support is on the processing group level, [dead-letter processing](#processing-dead-letter-sequences) will invoke *all* event handlers for that event within the processing group. + +Thus, if your event handlers are not idempotent, processing letters may result in undesired side effects. +Hence, we strongly recommend making your event handlers idempotent when using the dead-letter queue. +The principle of exactly-once delivery is no longer guaranteed; at-least-once delivery is the reality to cope with. + #### Configuring a sequenced Dead-Letter Queue A `JpaSequencedDeadLetterQueue` configuration example: From ad70236705d72159db03b061ca30b45afe0f5dac Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 14:10:21 +0200 Subject: [PATCH 27/44] Strengthen point the InMem is not production ready Strengthen point that the in-memory DLQ is not production ready #281 --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 756502d4..b0f4416d 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -282,7 +282,7 @@ Hence, each processing group you want to enable this behavior for should receive We currently provide the following dead-letter queue implementations: * `InMemorySequencedDeadLetterQueue` - In-memory variant of the dead-letter queue. - Useful for testing purposes, but as it does not persist dead letters, it is unsuited for most production environments. + Useful for testing purposes, but as it does not persist dead letters, it is unsuited for production environments. * `JpaSequencedDeadLetterQueue` - JPA variant of the dead-letter queue. It constructs a `dead_letter_entry` table where it persists failed-events in. The JPA dead-letter queue is a suitable option for production environments by persisting the dead letters. From 7e9cd1131a9da4289461e27230a2b98022070747 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:34:16 +0200 Subject: [PATCH 28/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index b0f4416d..7130a37c 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -259,7 +259,7 @@ Based on the provided `ErrorContext` object, you can decide to ignore the error, Although configuring a [Listener Invocation Error Handler](#processing-group---listener-invocation-error-handler) and [Error Handler](#event-processor---error-handler) -helps with dealing with event handling exceptions, you still end up in an event handling stop. +helps you to deal with exceptions when processing events, you still end up in an event handling stop. When you log and proceed, you will likely need to resolve the predicament and [replay](streaming.md#replaying-events) past events. If you instead propagate the exceptions, the event processor will stall entirely. From 110b9475f7cb75236fd2dbd2ce049a01bffae0ce Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:35:21 +0200 Subject: [PATCH 29/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 7130a37c..97840dd7 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -260,7 +260,7 @@ Based on the provided `ErrorContext` object, you can decide to ignore the error, Although configuring a [Listener Invocation Error Handler](#processing-group---listener-invocation-error-handler) and [Error Handler](#event-processor---error-handler) helps you to deal with exceptions when processing events, you still end up in an event handling stop. -When you log and proceed, you will likely need to resolve the predicament and [replay](streaming.md#replaying-events) +When you only log the error and allow processing to proceed, you will most likely end up with missing data until you fix the predicament and [replay](streaming.md#replaying-events) past events. If you instead propagate the exceptions, the event processor will stall entirely. From 171be4afba670689066f184411c83045ca50409a Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:35:31 +0200 Subject: [PATCH 30/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 97840dd7..9c3ba1c9 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -262,7 +262,7 @@ and [Error Handler](#event-processor---error-handler) helps you to deal with exceptions when processing events, you still end up in an event handling stop. When you only log the error and allow processing to proceed, you will most likely end up with missing data until you fix the predicament and [replay](streaming.md#replaying-events) past events. -If you instead propagate the exceptions, the event processor will stall entirely. +If you instead propagate the exception so the event processor keeps retrying, the event processor will stall entirely when the cause is consistent. Although this behavior is sufficient on many occasions, sometimes it is beneficial if we can unblock event handling by parking the problematic event. To that end, you can configure a dead-letter queue for a [processing group](#event-processors). From dca166c5b24cf3a5e45f6f1980ae3258702260eb Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:35:44 +0200 Subject: [PATCH 31/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 9c3ba1c9..0456fbf6 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -267,7 +267,7 @@ If you instead propagate the exception so the event processor keeps retrying, th Although this behavior is sufficient on many occasions, sometimes it is beneficial if we can unblock event handling by parking the problematic event. To that end, you can configure a dead-letter queue for a [processing group](#event-processors). -An essential concept of Axon Frameworks event processors is the maintenance of event ordering, also when you configure [parallel processing](streaming.md#parallel-processing). +An essential concept of Axon Frameworks event processors is the maintenance of event ordering, even when you configure [parallel processing](streaming.md#parallel-processing). A perfect example when this is a requirement is the need to handle aggregate events in publishing order. Simply dead lettering one failed event would cause subsequent events in the sequence to react to stale state. From 56d667f7843644cc2befce732f3c4d14ad121193 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:36:03 +0200 Subject: [PATCH 32/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 0456fbf6..ecafeb77 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -269,7 +269,7 @@ To that end, you can configure a dead-letter queue for a [processing group](#eve An essential concept of Axon Frameworks event processors is the maintenance of event ordering, even when you configure [parallel processing](streaming.md#parallel-processing). A perfect example when this is a requirement is the need to handle aggregate events in publishing order. -Simply dead lettering one failed event would cause subsequent events in the sequence to react to stale state. +Simply dead lettering one failed event would cause subsequent events in the sequence to be applied to inconsistent state. It is thus of utmost importance that a dead-letter queue for events enqueues an event and any following events in the sequence. To that end, the supported dead-letter queue is a so-called `SequencedDeadLetterQueue`. From 145a26144d7762e910de11cba59c4f1710855daa Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:36:19 +0200 Subject: [PATCH 33/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index ecafeb77..bd7913b1 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -268,7 +268,7 @@ Although this behavior is sufficient on many occasions, sometimes it is benefici To that end, you can configure a dead-letter queue for a [processing group](#event-processors). An essential concept of Axon Frameworks event processors is the maintenance of event ordering, even when you configure [parallel processing](streaming.md#parallel-processing). -A perfect example when this is a requirement is the need to handle aggregate events in publishing order. +A perfect example when this is a requirement is the need to handle events of the same aggregate in the order it was published. Simply dead lettering one failed event would cause subsequent events in the sequence to be applied to inconsistent state. It is thus of utmost importance that a dead-letter queue for events enqueues an event and any following events in the sequence. From bcbc63bc49472c0b6af83f6611d2cf3882e7f39f Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:36:31 +0200 Subject: [PATCH 34/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index bd7913b1..057fb364 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -448,7 +448,7 @@ A dead letter contains the following attributes: | attribute | type | description | |-----------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------| -| `message` | `EventMessage` | The complete failed event. | +| `message` | `EventMessage` | The message containing the failed event. | | `cause` | `Optional` | The cause for the message to be dead lettered. Empty if the letter is enqueued because it is part of a sequence. | | `enqueuedAt` | `Instant` | The moment in time when the event was enqueued in a dead-letter queue. | | `lastTouched` | `Instant` | The moment in time when this letter was last touched. Will equal the `enqueuedAt` value if this letter is enqueued for the first time. | From 825ee8ac4c7cea0c7d43068bb4b29568fa41335f Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:36:51 +0200 Subject: [PATCH 35/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 057fb364..c5d3e95e 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -359,7 +359,7 @@ Thus, the processing group moves back to the behavior described at the start of #### Processing Dead-Letter Sequences Once you resolve the problem that led to dead lettering events, we can start processing the dead letters. -We recommend using the `SequencedDeadLetterProcessor` for this, as it processes an entire dead-letter _sequence_ instead of single dead-letter entries. +We recommend using the `SequencedDeadLetterProcessor` interface for this, which can be retrieved from the `EventProcessingConfiguration` for each processing group with a dead letter queue, as it processes an entire dead-letter _sequence_ instead of single dead-letter entries. It will thus ensure the event order is maintained during the retry. The `SequencedDeadLetterProcessor` provides two operations to process dead letters: From 8fff7113c78e6b8e5c4295ff663df7e2a7dfe70f Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:37:16 +0200 Subject: [PATCH 36/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index c5d3e95e..b86ffb2f 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -457,7 +457,7 @@ A dead letter contains the following attributes: #### Dead-Letter Enqueue Policy By default, when you configure a dead-letter queue and event handling fails, the event is dead-lettered. -However, not all event failures should result in new entries in the dead-letter queue. +However, you might not want all event failures to result in being dead-lettered. Similarly, when [letter processing](#processing-dead-letter-sequences) fails, you might want to reconsider whether you want to enqueue the letter again. To that end, you can configure a so-called `EnqueuePolicy`. From 6f548b0f748c5979fbbb921d16c1f4b8bf641502 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:37:26 +0200 Subject: [PATCH 37/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index b86ffb2f..cf2dd532 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -462,7 +462,7 @@ Similarly, when [letter processing](#processing-dead-letter-sequences) fails, yo To that end, you can configure a so-called `EnqueuePolicy`. The enqueue policy ingests a `DeadLetter` and a cause (`Throwable`) and returns an `EnqueueDecision`. -The `EnqueueDecision`, in turn, describes if the framework should enqueue the dead letter. +The `EnqueueDecision`, in turn, describes if the framework should enqueue the dead letter or ignore it. You can customize the dead-letter policy to exclude some events when handling fails. As a consequence, these events will be skipped. From 2d2db38ae83398724bc43502172127f8cddcb831 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:38:35 +0200 Subject: [PATCH 38/44] Update axon-framework/events/event-processors/README.md Co-authored-by: Mitchell Herrijgers --- axon-framework/events/event-processors/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index cf2dd532..212ff3f6 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -469,7 +469,7 @@ As a consequence, these events will be skipped. Note that Axon Framework invokes the policy on initial event handling *and* on [dead-letter processing](#processing-dead-letter-sequences). Reevaluating the policy after processing failed may be essential to ensure a dead letter isn't stuck in the queue forever. -To deal with this scenario, you can attach additional diagnostic information to the dead letter through the policy. +To deal with this scenario, you can attach additional diagnostic information to the dead letter through the policy, for example to add a number of retries to it and base your decision on that. See the sample `EnqueuePolicy` below for this: ```java From 79f16af8e7179f43b21ed3da2af014c1fb86cb28 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 9 Sep 2022 15:46:20 +0200 Subject: [PATCH 39/44] Process review comments Process review comments by adjusting and further fine-tuning description of: - the importance of sequencing dead letter, - where to find the SequencedDeadLetterProcessor, - the description of the message attribute, - the enqueue decision intro description, - a better example on diagnostics usage. #281 --- .../events/event-processors/README.md | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 212ff3f6..b11e56b4 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -268,7 +268,7 @@ Although this behavior is sufficient on many occasions, sometimes it is benefici To that end, you can configure a dead-letter queue for a [processing group](#event-processors). An essential concept of Axon Frameworks event processors is the maintenance of event ordering, even when you configure [parallel processing](streaming.md#parallel-processing). -A perfect example when this is a requirement is the need to handle events of the same aggregate in the order it was published. +A perfect example when this is a requirement is the need to handle events of the same aggregate in their publishing order. Simply dead lettering one failed event would cause subsequent events in the sequence to be applied to inconsistent state. It is thus of utmost importance that a dead-letter queue for events enqueues an event and any following events in the sequence. @@ -359,7 +359,7 @@ Thus, the processing group moves back to the behavior described at the start of #### Processing Dead-Letter Sequences Once you resolve the problem that led to dead lettering events, we can start processing the dead letters. -We recommend using the `SequencedDeadLetterProcessor` interface for this, which can be retrieved from the `EventProcessingConfiguration` for each processing group with a dead letter queue, as it processes an entire dead-letter _sequence_ instead of single dead-letter entries. +We recommend using the `SequencedDeadLetterProcessor` interface for this, as it processes an entire dead-letter _sequence_ instead of single dead-letter entries. It will thus ensure the event order is maintained during the retry. The `SequencedDeadLetterProcessor` provides two operations to process dead letters: @@ -373,7 +373,7 @@ The `SequencedDeadLetterProcessor` provides two operations to process dead lette If the processing of a dead letter fails, the event will be offered to the dead-letter queue again. How the dead-lettering process reacts to this depends on the [enqueue policy](#dead-letter-enqueue-policy). -You can retrieve the `SequencedDeadLetterProcessor` from the `EventProcessingConfiguration`. +You can retrieve a `SequencedDeadLetterProcessor` from the `EventProcessingConfiguration` based on a processing group name *if* you have configured a dead-letter queue for this processing group. Below are a couple of examples of how to process dead-letter sequences: {% tabs %} @@ -446,30 +446,31 @@ public class DeadletterProcessor { A dead letter contains the following attributes: -| attribute | type | description | -|-----------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------| -| `message` | `EventMessage` | The message containing the failed event. | -| `cause` | `Optional` | The cause for the message to be dead lettered. Empty if the letter is enqueued because it is part of a sequence. | -| `enqueuedAt` | `Instant` | The moment in time when the event was enqueued in a dead-letter queue. | -| `lastTouched` | `Instant` | The moment in time when this letter was last touched. Will equal the `enqueuedAt` value if this letter is enqueued for the first time. | -| `diagnostics` | `MetaData` | The diagnostic `MetaData` concerning this letter. Filled through the [enqueue policy](#dead-letter-enqueue-policy). | +| attribute | type | description | +|------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------| +| `message` | `EventMessage` | The `EventMessage` for which handling failed. The message contains your event, among other `Message` properties. | +| `cause` | `Optional` | The cause for the message to be dead lettered. Empty if the letter is enqueued because it is part of a sequence. | +| `enqueuedAt` | `Instant` | The moment in time when the event was enqueued in a dead-letter queue. | +| `lastTouched` | `Instant` | The moment in time when this letter was last touched. Will equal the `enqueuedAt` value if this letter is enqueued for the first time. | +| `diagnostics` | `MetaData` | The diagnostic `MetaData` concerning this letter. Filled through the [enqueue policy](#dead-letter-enqueue-policy). | #### Dead-Letter Enqueue Policy By default, when you configure a dead-letter queue and event handling fails, the event is dead-lettered. -However, you might not want all event failures to result in being dead-lettered. +However, you might not want all event failures to result in dead-lettered entries. Similarly, when [letter processing](#processing-dead-letter-sequences) fails, you might want to reconsider whether you want to enqueue the letter again. To that end, you can configure a so-called `EnqueuePolicy`. The enqueue policy ingests a `DeadLetter` and a cause (`Throwable`) and returns an `EnqueueDecision`. -The `EnqueueDecision`, in turn, describes if the framework should enqueue the dead letter or ignore it. +The `EnqueueDecision`, in turn, describes if the framework should or should not enqueue the dead letter. You can customize the dead-letter policy to exclude some events when handling fails. As a consequence, these events will be skipped. Note that Axon Framework invokes the policy on initial event handling *and* on [dead-letter processing](#processing-dead-letter-sequences). Reevaluating the policy after processing failed may be essential to ensure a dead letter isn't stuck in the queue forever. -To deal with this scenario, you can attach additional diagnostic information to the dead letter through the policy, for example to add a number of retries to it and base your decision on that. +To deal with this scenario, you can attach additional diagnostic information to the dead letter through the policy. +For example to add a number of retries to the dead letter to base your decision on. See the sample `EnqueuePolicy` below for this: ```java From 367dd28d99fb8b2d7f873c14086efd47418bab0b Mon Sep 17 00:00:00 2001 From: Stefan <91stefan@gmail.com> Date: Mon, 17 Oct 2022 13:48:16 +0200 Subject: [PATCH 40/44] multitenancy documentation --- extensions/multitenancy.md | 138 +++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 extensions/multitenancy.md diff --git a/extensions/multitenancy.md b/extensions/multitenancy.md new file mode 100644 index 00000000..d5bfc951 --- /dev/null +++ b/extensions/multitenancy.md @@ -0,0 +1,138 @@ +#Multitenancy Extension + +Axon Framework Multitenancy Extension provides your application ability to serve multiple tenants (event-stores) at once. +Multi-tenancy is important in cloud computing and this extension will provide ability to connect to tenant dynamically, physical separate tenants data, scale tenants independently... + + +### Requirements + +Following requirements needs to be meet for extension to work: +- Use **Spring Framework** together with **Axon Framework 4.6+** +- Use **Axon Server EE 4.6+** or Axon Cloud as event store (*) +- This is not hard requirement but if you wish to enable multitenancy on projection side, only JPA is supported out-of-the box + +> ** Axon Cloud ** +> +> Axon Cloud works only with static tenant configuration. + +### Configuration + +Minimal configuration is needed to get extension up and running. +Choose to use **either** static or dynamic tenant configuration. + +#### Static tenants configuration + +If you have predefined list of contexts that your application should connect to, set following property: +`axon.axonserver.contexts=tenant-context-1,tenant-context-2,tenant-context-3` + +#### Dynamic tenants configuration + +If you plan to create tenants in runtime, you can define a predicate which will tell application to which contexts to connect to once they appear in runtime: + +```java + @Bean +public TenantConnectPredicate tenantFilterPredicate() { + return context -> context.tenantId().startsWith("tenant-"); + } +``` + +### Route message to specific tenant + +#### Using meta-data + +By default, to route message to specific tenant you need to tag initial message that enters your system with metadata. +This is done with meta-data helper, and to route message to specific tenant you should set tenant name to metadata with key `TenantConfiguration.TENANT_CORRELATION_KEY`. + +```java +message.andMetaData(Collections.singletonMap(TENANT_CORRELATION_KEY, "tenant-context-1") +``` + +Metadata needs to be added only to initial message that enters your system. Any message that is produced by consequence of initial message will have this metadata copied automatically using to `CorrelationProvider`. + +#### Custom tenant resolver + +If you wish to define custom tenant resolver set following property: + +`axon.multi-tenancy.use-metadata-helper=false` + +Then define custom tenant resolver bean. For example following bean can use message payload to route message to specific tenant: + +```java + @Bean + public TargetTenantResolver> customTargetTenantResolver() { + return (message, tenants) -> + TenantDescriptor.tenantWithId( + ((TenantTaggedMessage) message.getPayload()).getTenantName() + ); + } +``` + +### Multi-tenant projections + +If you wish to use distinct database to store projections and token store for each tenant, configure following bean: + +```java + @Bean + public Function tenantDataSourceResolver() { + return tenant -> { + DataSourceProperties properties = new DataSourceProperties(); + properties.setUrl("jdbc:postgresql://localhost:5432/"+tenant.tenantId()); + properties.setDriverClassName("org.postgresql.Driver"); + properties.setUsername("postgres"); + properties.setPassword("postgres"); + return properties; + }; + } +``` + +Note that this works by using JPA multi-tenancy support, that means only SQL Databases are supported out of the box. +If you wish to implement multi-tenancy for a different type of databases (e.g. NoSQL) make sure that your projection database supports multi-tenancy. +While in transaction you may find out which tenant owns transaction by calling:` TenantWrappedTransactionManager.getCurrentTenant()`. + +> **Pre initialize schema** +> +> Schema migration tools like Liquibase or Flyway usually won't be able to initialise schemas for dynamically created data sources. +> Any datasource that will use needs to have pre-initialized schema. + +#### Resetting projections + +Resetting projections works a bit different because there are multiple instances of "same" event processor group (one per each tenant). + +Reset specific tenant event processor group: + +```java + TrackingEventProcessor trackingEventProcessor = configuration.eventProcessingConfiguration() + .eventProcessor("com.demo.query-ep@tenant-context-1", + TrackingEventProcessor.class) + .get(); +``` + +Convention for naming event processor is: `{even processor name}@{tenant name}` + +Access all tenant event processors by retrieving `MultiTenantEventProcessor` only. +`MultiTenantEventProcessor` acts as a proxy Event Processor that references all tenant event processors. + +### Supported multi-tenant components + +Currently, supported multi-tenants components are as follows: + +- MultiTenantCommandBus +- MultiTenantEventProcessor +- MultiTenantEventStore +- MultiTenantQueryBus +- MultiTenantQueryUpdateEmitter +- MultiTenantEventProcessorControlService +- MultiTenantDataSourceManager + +Not yet supported multi-tenants components are: + +- MultitenantDeadlineManager +- MultitenantEventScheduler + + +### Disable extension + +By default, extension is automatically enabled if found on class path. +If you wish to disable extension without removing extension use following property + +`axon.multi-tenancy.enabled=false` From ae04f69291f6faae765b416ece8ad51207ed0831 Mon Sep 17 00:00:00 2001 From: Stefan Date: Tue, 25 Oct 2022 15:02:37 +0200 Subject: [PATCH 41/44] Apply suggestions from code review Co-authored-by: Steven van Beelen --- extensions/multitenancy.md | 126 +++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 62 deletions(-) diff --git a/extensions/multitenancy.md b/extensions/multitenancy.md index d5bfc951..b0f376b6 100644 --- a/extensions/multitenancy.md +++ b/extensions/multitenancy.md @@ -1,120 +1,122 @@ -#Multitenancy Extension - -Axon Framework Multitenancy Extension provides your application ability to serve multiple tenants (event-stores) at once. -Multi-tenancy is important in cloud computing and this extension will provide ability to connect to tenant dynamically, physical separate tenants data, scale tenants independently... +# Multitenancy Extension +The Axon Framework Multitenancy Extension provides your application with the ability to serve multiple tenants (event-stores) at once. +Multi-tenancy is important in cloud computing, as this extension provides the ability to connect tenants dynamically, physical separate tenant-data, and scale tenants independently. ### Requirements -Following requirements needs to be meet for extension to work: -- Use **Spring Framework** together with **Axon Framework 4.6+** -- Use **Axon Server EE 4.6+** or Axon Cloud as event store (*) -- This is not hard requirement but if you wish to enable multitenancy on projection side, only JPA is supported out-of-the box +The following requirements need to be met for the extension to work out-of-the-box: +- Use **Spring Framework** together with **Axon Framework 4.6+**. +- Use **Axon Server EE 4.6+** or Axon Cloud as event store (*). +- This is not hard requirement, but if you wish to enable multi-tenancy for your projections, note that only JPA is supported out-of-the box. -> ** Axon Cloud ** +> ** Axon Cloud and the Multi-Tenancy extension ** > > Axon Cloud works only with static tenant configuration. ### Configuration -Minimal configuration is needed to get extension up and running. -Choose to use **either** static or dynamic tenant configuration. +A minimal configuration is needed to get this extension up and running. +Please choose **either** the static or the dynamic tenant configuration. #### Static tenants configuration -If you have predefined list of contexts that your application should connect to, set following property: +If you have a predefined list of tenants that your application should connect to, set following property: `axon.axonserver.contexts=tenant-context-1,tenant-context-2,tenant-context-3` #### Dynamic tenants configuration -If you plan to create tenants in runtime, you can define a predicate which will tell application to which contexts to connect to once they appear in runtime: +If you plan to create tenants during runtime, you can define a predicate that will tell the application to which tenant-contexts to connect to once they appear: ```java - @Bean +@Bean public TenantConnectPredicate tenantFilterPredicate() { - return context -> context.tenantId().startsWith("tenant-"); - } -``` + return context -> context.tenantId().startsWith("tenant-"); +} -### Route message to specific tenant +### Route Messages to specific tenants #### Using meta-data -By default, to route message to specific tenant you need to tag initial message that enters your system with metadata. -This is done with meta-data helper, and to route message to specific tenant you should set tenant name to metadata with key `TenantConfiguration.TENANT_CORRELATION_KEY`. +By default, to route any `Message` to a specific tenant, you need to tag the initial message that enters your system with metadata. +This is done with a meta-data helper function, which should add the tenant name with key `TenantConfiguration.TENANT_CORRELATION_KEY`. ```java message.andMetaData(Collections.singletonMap(TENANT_CORRELATION_KEY, "tenant-context-1") ``` -Metadata needs to be added only to initial message that enters your system. Any message that is produced by consequence of initial message will have this metadata copied automatically using to `CorrelationProvider`. +Note that you only need to add metadata to the initial message entering your system. +Any message produced as a consequence of the initial message will have this metadata copied automatically using a `CorrelationDataProvider`. #### Custom tenant resolver -If you wish to define custom tenant resolver set following property: +If you wish to define a custom tenant resolver, please set following property: `axon.multi-tenancy.use-metadata-helper=false` -Then define custom tenant resolver bean. For example following bean can use message payload to route message to specific tenant: +Then define the custom tenant resolver bean. +The following example can use the message payload to route a message to specific tenant: ```java - @Bean - public TargetTenantResolver> customTargetTenantResolver() { - return (message, tenants) -> - TenantDescriptor.tenantWithId( - ((TenantTaggedMessage) message.getPayload()).getTenantName() - ); - } +@Bean +public TargetTenantResolver> customTargetTenantResolver() { + return (message, tenants) -> + TenantDescriptor.tenantWithId( + ((TenantTaggedMessage) message.getPayload()).getTenantName() + ); +} ``` ### Multi-tenant projections -If you wish to use distinct database to store projections and token store for each tenant, configure following bean: +If you wish to use distinct tenant-databases to store projections and tokens, please configure the following: ```java - @Bean - public Function tenantDataSourceResolver() { - return tenant -> { - DataSourceProperties properties = new DataSourceProperties(); - properties.setUrl("jdbc:postgresql://localhost:5432/"+tenant.tenantId()); - properties.setDriverClassName("org.postgresql.Driver"); - properties.setUsername("postgres"); - properties.setPassword("postgres"); - return properties; - }; - } +@Bean +public Function tenantDataSourceResolver() { + return tenant -> { + DataSourceProperties properties = new DataSourceProperties(); + properties.setUrl("jdbc:postgresql://localhost:5432/"+tenant.tenantId()); + properties.setDriverClassName("org.postgresql.Driver"); + properties.setUsername("postgres"); + properties.setPassword("postgres"); + return properties; + }; +} ``` -Note that this works by using JPA multi-tenancy support, that means only SQL Databases are supported out of the box. -If you wish to implement multi-tenancy for a different type of databases (e.g. NoSQL) make sure that your projection database supports multi-tenancy. -While in transaction you may find out which tenant owns transaction by calling:` TenantWrappedTransactionManager.getCurrentTenant()`. +Note that this works by using the JPA multi-tenancy support provided in this extension. +That means that currently only SQL Databases are supported out of the box. + +If you wish to implement multi-tenancy for a different type of databases (e.g. NoSQL) make sure that your projection database supports multi-tenancy, too. +When doing so, you can find it which tenants own the transaction by invoking `TenantWrappedTransactionManager.getCurrentTenant()`. -> **Pre initialize schema** +> **Pre-initialized schema** > -> Schema migration tools like Liquibase or Flyway usually won't be able to initialise schemas for dynamically created data sources. -> Any datasource that will use needs to have pre-initialized schema. +> Schema migration tools like Liquibase or Flyway usually won't be able to initialize schemas for dynamically created data sources. +> Hence, any data source that you use needs to have a the schema pre-initialized. #### Resetting projections -Resetting projections works a bit different because there are multiple instances of "same" event processor group (one per each tenant). +Resetting projections works a bit different, because there are multiple instances of the "same" event processor. +Namely, one per tenant. -Reset specific tenant event processor group: +Regard the following sample to reset an Event Processor for a specific tenant: ```java - TrackingEventProcessor trackingEventProcessor = configuration.eventProcessingConfiguration() - .eventProcessor("com.demo.query-ep@tenant-context-1", - TrackingEventProcessor.class) - .get(); +TrackingEventProcessor trackingEventProcessor = configuration.eventProcessingConfiguration() + .eventProcessor("com.demo.query-ep@tenant-context-1", TrackingEventProcessor.class) + .get(); ``` -Convention for naming event processor is: `{even processor name}@{tenant name}` +Note that the convention for naming tenant-specific event processor is `{even processor name}@{tenant name}`. -Access all tenant event processors by retrieving `MultiTenantEventProcessor` only. -`MultiTenantEventProcessor` acts as a proxy Event Processor that references all tenant event processors. +If you need to access all tenant event processors in one go, you can retrieve the `MultiTenantEventProcessor` for a specific processing name. +The `MultiTenantEventProcessor` acts as a proxy event processor referencing all tenant-specific event processors. ### Supported multi-tenant components -Currently, supported multi-tenants components are as follows: +Currently, the following infrastructure components support multi-tenancy: - MultiTenantCommandBus - MultiTenantEventProcessor @@ -124,15 +126,15 @@ Currently, supported multi-tenants components are as follows: - MultiTenantEventProcessorControlService - MultiTenantDataSourceManager -Not yet supported multi-tenants components are: +The following components are not yet supported: - MultitenantDeadlineManager - MultitenantEventScheduler -### Disable extension +### Disabling this Extension -By default, extension is automatically enabled if found on class path. -If you wish to disable extension without removing extension use following property +By default, this extension is enabled if found on class path when utilizing Spring Boot. +If you wish to disable the extension without removing the dependency, you can set the following property to `false`: `axon.multi-tenancy.enabled=false` From e9c65e4f746bc9e33bd5ca07c3785e9b6ce9edf1 Mon Sep 17 00:00:00 2001 From: Stefan <91stefan@gmail.com> Date: Tue, 25 Oct 2022 15:51:44 +0200 Subject: [PATCH 42/44] multitenancy documentation - pr comments --- extensions/multitenancy.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/extensions/multitenancy.md b/extensions/multitenancy.md index b0f376b6..59dc2f0e 100644 --- a/extensions/multitenancy.md +++ b/extensions/multitenancy.md @@ -33,9 +33,14 @@ If you plan to create tenants during runtime, you can define a predicate that wi public TenantConnectPredicate tenantFilterPredicate() { return context -> context.tenantId().startsWith("tenant-"); } +``` ### Route Messages to specific tenants +Backbone of multitenancy is ability to route message to specific tenant. +This extension offers you meta-data based routing which is ready to be used with minimal configuration. +Also, one may wish to define stronger contract and include tenant information in message payload, which is also possible by defining custom tenant resolver. + #### Using meta-data By default, to route any `Message` to a specific tenant, you need to tag the initial message that enters your system with metadata. @@ -50,7 +55,7 @@ Any message produced as a consequence of the initial message will have this meta #### Custom tenant resolver -If you wish to define a custom tenant resolver, please set following property: +If you wish to define a custom tenant resolver, set following property: `axon.multi-tenancy.use-metadata-helper=false` @@ -62,11 +67,14 @@ The following example can use the message payload to route a message to specific public TargetTenantResolver> customTargetTenantResolver() { return (message, tenants) -> TenantDescriptor.tenantWithId( - ((TenantTaggedMessage) message.getPayload()).getTenantName() + ((TenantAwareMessage) message.getPayload()).getTenantName() ); } ``` +In example above, all messages should implement custom `TenantAwareMessage` interface that exposes tenant name. +Then we can use this interface to extract tenant name from the payload and define our tenant resolver. + ### Multi-tenant projections If you wish to use distinct tenant-databases to store projections and tokens, please configure the following: From c3cc200aa301ad9519dfb5240653b43a54278aad Mon Sep 17 00:00:00 2001 From: Stefan <91stefan@gmail.com> Date: Tue, 25 Oct 2022 16:06:16 +0200 Subject: [PATCH 43/44] multitenancy documentation - pr comments --- extensions/multitenancy.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/extensions/multitenancy.md b/extensions/multitenancy.md index 59dc2f0e..3029c7a6 100644 --- a/extensions/multitenancy.md +++ b/extensions/multitenancy.md @@ -5,14 +5,14 @@ Multi-tenancy is important in cloud computing, as this extension provides the ab ### Requirements -The following requirements need to be met for the extension to work out-of-the-box: -- Use **Spring Framework** together with **Axon Framework 4.6+**. -- Use **Axon Server EE 4.6+** or Axon Cloud as event store (*). -- This is not hard requirement, but if you wish to enable multi-tenancy for your projections, note that only JPA is supported out-of-the box. +- Currently, It's possible to configure extension using **Axon Framework 4.6+** together with **Spring Framework**. +- Minimal configuration and out-of-the box solution is available only for **Axon Server EE 4.6+** or Axon Cloud (*). +- Any other custom user solutions should implement [own factory beans for components and tenant provider](https://github.com/AxonFramework/extension-multitenancy/blob/main/multitenancy-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/multitenancy/autoconfig/MultiTenancyAxonServerAutoConfiguration.java) +- If you wish to enable multi-tenancy for your projections and token store, note that only JPA is supported out-of-the box. -> ** Axon Cloud and the Multi-Tenancy extension ** +> **Axon Cloud and the Multi-Tenancy extension** > -> Axon Cloud works only with static tenant configuration. +> Currently, Axon Cloud works only with static tenant configuration. ### Configuration From dd4a2723ead3566130c06b55984d16a8d30a94ba Mon Sep 17 00:00:00 2001 From: Stefan <91stefan@gmail.com> Date: Tue, 25 Oct 2022 16:08:30 +0200 Subject: [PATCH 44/44] multitenancy documentation - pr comments --- SUMMARY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/SUMMARY.md b/SUMMARY.md index f01a3361..c1ae9b1a 100644 --- a/SUMMARY.md +++ b/SUMMARY.md @@ -169,6 +169,7 @@ * [Reactor Gateways](extensions/reactor/reactive-gateways/reactive-gateways.md) * [Spring Cloud](extensions/spring-cloud.md) * [Tracing](extensions/tracing.md) +* [Multitenancy](extensions/multitenancy.md) ## Appendices