Skip to content

Commit 0d0511c

Browse files
authored
fix(retries): local retries
fix(retries): local retries
2 parents 4293a6e + 5ed1930 commit 0d0511c

File tree

4 files changed

+24
-7
lines changed

4 files changed

+24
-7
lines changed

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import reactor.rabbitmq.AcknowledgableDelivery;
1818
import reactor.rabbitmq.ConsumeOptions;
1919
import reactor.rabbitmq.Receiver;
20+
import reactor.util.retry.Retry;
2021

2122
import java.time.Duration;
2223
import java.time.Instant;
@@ -69,6 +70,10 @@ private static long resolveRetries(boolean useDLQRetries, long maxRetries) {
6970
return useDLQRetries && maxRetries == -1 ? DEFAULT_RETRIES_DLQ : maxRetries;
7071
}
7172

73+
private boolean hasLocalRetries() {
74+
return !useDLQRetries && maxRetries != -1;
75+
}
76+
7277
protected Mono<Void> setUpBindings(TopologyCreator creator) {
7378
return Mono.empty();
7479
}
@@ -109,9 +114,12 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
109114
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
110115
final Message message = RabbitMessage.fromDelivery(msj);
111116

112-
return defer(() -> handler.apply(message))
113-
.transform(enrichPostProcess(message))
114-
.doOnSuccess(o -> logExecution(executorPath, initTime, true))
117+
Mono<Object> flow = defer(() -> handler.apply(message))
118+
.transform(enrichPostProcess(message));
119+
if (hasLocalRetries()) {
120+
flow = flow.retryWhen(Retry.fixedDelay(maxRetries, retryDelay));
121+
}
122+
return flow.doOnSuccess(o -> logExecution(executorPath, initTime, true))
115123
.subscribeOn(scheduler).thenReturn(msj);
116124
} catch (Exception e) {
117125
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", msj.getProperties().getMessageId()));
@@ -194,16 +202,16 @@ private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery msj, Th
194202
final boolean redeliver = msj.getEnvelope().isRedeliver();
195203
reportErrorMetric(msj, init);
196204
sendErrorToCustomReporter(err, rabbitMessage, redeliver || retryNumber > 0);
197-
if (maxRetries != -1 && retryNumber >= maxRetries) {
205+
if (hasLocalRetries() || retryNumber >= maxRetries) { // Discard
198206
logError(err, msj, FallbackStrategy.DEFINITIVE_DISCARD);
199207
return discardNotifier
200208
.notifyDiscard(rabbitMessage)
201209
.doOnSuccess(_a -> msj.ack()).thenReturn(msj);
202-
} else if (useDLQRetries) {
210+
} else if (useDLQRetries) { // DLQ retries
203211
logError(err, msj, FallbackStrategy.RETRY_DLQ);
204212
msj.nack(false);
205213
return Mono.just(msj);
206-
} else {
214+
} else { // infinity fast retries
207215
logError(err, msj, FallbackStrategy.FAST_RETRY);
208216
return Mono.just(msj).delayElement(retryDelay).doOnNext(m -> m.nack(true));
209217
}

docs/docs/reactive-commons/10-wildcards.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ The next code will help you to avoid unexpected behaviors, which indicates you t
9999
}
100100
```
101101

102+
## Possible issues
103+
104+
Unprocessed events will be notified with an event, so please handle it properly and ensure you don't have wildcards that match all events, because it can cause unexpected behaviors.
105+
106+
For example if you have a handler for `my.event.#` and you start listening to `my.event`, it will match all events that start with `my.event`, so it will match `my.event.any` if your handler has dlq retries or has local retries, when your handler fails the retry limit, an event with name `my.event.any.dlq` will be sent.
107+
108+
And if you have the same binding `my.event.#` the event will be handled by the handler, so you will have two different event structures for the same handler, and it can cause unexpected behaviors.
109+
102110
## Example
103111

104112
You can see a real example at [samples/async/async-receiver-responder](https://github.com/reactive-commons/reactive-commons-java/tree/master/samples/async/async-receiver-responder)

samples/async/async-receiver-responder/src/main/java/sample/HandlersConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class HandlersConfig {
2525
public HandlerRegistry handlerRegistrySubs(UseCase useCase) {
2626
animalEventEventHandler = event -> {
2727
log.info(event);
28-
return Mono.empty();
28+
return Mono.error(new RuntimeException("Not implemented"));
2929
};
3030
return HandlerRegistry.register()
3131
.listenEvent(Constants.MEMBER_REMOVED, useCase::removeMember, RemovedMemberEvent.class)

samples/async/async-receiver-responder/src/main/resources/application.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ app:
77
async:
88
createTopology: true
99
listenReplies: false
10+
maxRetries: 3
1011

0 commit comments

Comments
 (0)