Skip to content

Commit 2c34bfc

Browse files
author
Daniel Bustamante Ospina
committed
Remove usage of operator onErrorContinue for internal error/retry handling due to some cases of inconsistent behaviour.
1 parent af8e398 commit 2c34bfc

File tree

1 file changed

+10
-22
lines changed

1 file changed

+10
-22
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/GenericMessageListener.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,25 @@ public void startListener() {
7171

7272
setUpBindings(messageListener.getTopologyCreator()).thenMany(
7373
receiver.consumeManualAck(queueName, consumeOptions)
74-
.transform(this::consumeFaultTolerant)
75-
.transform(this::outerFailureProtection))
74+
.transform(this::consumeFaultTolerant))
7675
.subscribe();
7776
}
7877

7978

8079
private Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj) {
81-
String executorPath = getExecutorPath(msj);
82-
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
83-
final Message message = RabbitMessage.fromDelivery(msj);
80+
try {
81+
String executorPath = getExecutorPath(msj);
82+
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
83+
final Message message = RabbitMessage.fromDelivery(msj);
8484

85-
return defer(() -> handler.apply(message))
85+
return defer(() -> handler.apply(message))
8686
.transform(enrichPostProcess(message))
8787
.transform(logExecution(executorPath))
8888
.subscribeOn(scheduler).thenReturn(msj);
89+
} catch (Exception e) {
90+
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", msj.getProperties().getMessageId()));
91+
return Mono.error(e);
92+
}
8993
}
9094

9195
private Function<Mono<Object>, Mono<Object>> logExecution(String executorPath) {
@@ -102,22 +106,6 @@ private Function<Mono<Object>, Mono<Object>> logExecution(String executorPath) {
102106
};
103107
}
104108

105-
private <T> Flux<T> outerFailureProtection(Flux<T> messageFlux) {
106-
return messageFlux.onErrorContinue(t -> true, (throwable, elem) -> {
107-
if (elem instanceof AcknowledgableDelivery) {
108-
try {
109-
String messageID = ((AcknowledgableDelivery) elem).getProperties().getMessageId();
110-
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", messageID));
111-
requeueOrAck((AcknowledgableDelivery) elem, throwable).subscribe();
112-
} catch (Exception e) {
113-
log.log(Level.SEVERE, "Error returning message in failure!", e);
114-
}
115-
} else {
116-
throw new RuntimeException(throwable);
117-
}
118-
});
119-
}
120-
121109
private Flux<AcknowledgableDelivery> consumeFaultTolerant(Flux<AcknowledgableDelivery> messageFlux) {
122110
return messageFlux.flatMap(msj ->
123111
handle(msj)

0 commit comments

Comments
 (0)