Skip to content

Commit 02a4460

Browse files
authored
test(retries): local retries
test(retries): local retries
2 parents 0d0511c + 08a742c commit 02a4460

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,14 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
117117
Mono<Object> flow = defer(() -> handler.apply(message))
118118
.transform(enrichPostProcess(message));
119119
if (hasLocalRetries()) {
120-
flow = flow.retryWhen(Retry.fixedDelay(maxRetries, retryDelay));
120+
flow = flow.retryWhen(Retry.fixedDelay(maxRetries, retryDelay))
121+
.onErrorMap(err -> {
122+
if (err.getMessage() != null && err.getMessage().contains("Retries exhausted")) {
123+
log.warning(err.getMessage());
124+
return err.getCause();
125+
}
126+
return err;
127+
});
121128
}
122129
return flow.doOnSuccess(o -> logExecution(executorPath, initTime, true))
123130
.subscribeOn(scheduler).thenReturn(msj);

0 commit comments

Comments
 (0)