Skip to content

Commit ea17d84

Browse files
author
Daniel Bustamante Ospina
committed
Implement new and refined parametrized Retry Strategy
1 parent 4aff71c commit ea17d84

File tree

9 files changed

+191
-42
lines changed

9 files changed

+191
-42
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,32 @@ public class MessageListenersConfig {
4444
@Value("${app.async.direct.exchange:directMessages}")
4545
private String directMessagesExchangeName;
4646

47+
@Value("${app.async.maxRetries:10}")
48+
private long maxRetries;
49+
50+
@Value("${app.async.retryDelay:1000}")
51+
private int retryDelay;
52+
53+
@Value("${app.async.withDLQRetry:false}")
54+
private boolean withDLQRetry;
55+
4756
@Bean //TODO: move to own config (QueryListenerConfig)
4857
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter, ReactiveMessageListener receiver) throws Exception {
49-
final ApplicationEventListener listener = new ApplicationEventListener(receiver, appName + ".subsEvents", resolver, domainEventsExchangeName, messageConverter);
58+
final ApplicationEventListener listener = new ApplicationEventListener(receiver, appName + ".subsEvents", resolver, domainEventsExchangeName, messageConverter, withDLQRetry, maxRetries, retryDelay);
5059
listener.startListener();
5160
return listener;
5261
}
5362

5463
@Bean //TODO: move to own config (QueryListenerConfig)
5564
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver, ReactiveMessageSender sender, ReactiveMessageListener rlistener) throws Exception {
56-
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener, appName+".query", resolver, sender, directMessagesExchangeName, converter, "globalReply");
65+
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener, appName+".query", resolver, sender, directMessagesExchangeName, converter, "globalReply", withDLQRetry, maxRetries, retryDelay);
5766
listener.startListener();
5867
return listener;
5968
}
6069

6170
@Bean
6271
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener, HandlerResolver resolver, MessageConverter converter){
63-
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver, directMessagesExchangeName, converter);
72+
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver, directMessagesExchangeName, converter, withDLQRetry, maxRetries, retryDelay);
6473
commandListener.startListener();
6574
return commandListener;
6675
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.reactivecommons.async.impl;
2+
3+
public enum FallbackStrategy {
4+
FAST_RETRY("ATTENTION!! Fast retry message to same Queue: %s"),
5+
DEFINITIVE_DISCARD("ATTENTION!! DEFINITIVE DISCARD!! of the message: %s"),
6+
RETRY_DLQ("ATTENTION!! Sending message to Retry DLQ: %s");
7+
8+
public final String message;
9+
FallbackStrategy(String message){
10+
this.message = message;
11+
}
12+
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/TopologyCreator.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
import java.io.IOException;
1414
import java.time.Duration;
15+
import java.util.HashMap;
16+
import java.util.Map;
1517
import java.util.logging.Level;
1618

1719
@Log
@@ -46,6 +48,25 @@ public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification binding) {
4648
.onErrorMap(TopologyDefException::new);
4749
}
4850

51+
public Mono<AMQP.Queue.DeclareOk> declareDLQ(String originQueue, String retryTarget, int retryTime){
52+
final Map<String, Object> args = new HashMap<>();
53+
args.put("x-dead-letter-exchange", retryTarget);
54+
args.put("x-message-ttl", retryTime);
55+
QueueSpecification specification = QueueSpecification.queue(originQueue + ".DLQ")
56+
.durable(true)
57+
.arguments(args);
58+
return declare(specification);
59+
}
60+
61+
public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, String dlqExchange){
62+
final Map<String, Object> args = new HashMap<>();
63+
args.put("x-dead-letter-exchange", dlqExchange);
64+
QueueSpecification specification = QueueSpecification.queue(name)
65+
.durable(true)
66+
.arguments(args);
67+
return declare(specification);
68+
}
69+
4970
public static class TopologyDefException extends RuntimeException {
5071
public TopologyDefException(Throwable cause) {
5172
super(cause);

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationCommandListener.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,37 @@ public class ApplicationCommandListener extends GenericMessageListener {
2525
private final MessageConverter messageConverter;
2626
private final HandlerResolver resolver;
2727
private final String directExchange;
28+
private final boolean withDLQRetry;
29+
private final int retryDelay;
2830

29-
30-
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter) {
31-
super(queueName, listener);
31+
//TODO: change large constructor parameters number
32+
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay) {
33+
super(queueName, listener, withDLQRetry, maxRetries);
34+
this.retryDelay = retryDelay;
35+
this.withDLQRetry = withDLQRetry;
3236
this.resolver = resolver;
3337
this.directExchange = directExchange;
3438
this.messageConverter = messageConverter;
3539
}
3640

3741
protected Mono<Void> setUpBindings(TopologyCreator creator) {
38-
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
39-
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(QueueSpecification.queue(queueName).durable(true));
40-
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
41-
return declareExchange.then(declareQueue).then(binding).then();
42+
if (withDLQRetry) {
43+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
44+
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(directExchange+".DLQ").durable(true).type("direct"));
45+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, directExchange+".DLQ");
46+
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, directExchange, retryDelay);
47+
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
48+
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(directExchange+".DLQ", queueName, queueName + ".DLQ"));
49+
return declareExchange.then(declareExchangeDLQ).then(declareDLQ).then(declareQueue).then(bindingDLQ).then(binding).then();
50+
} else {
51+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
52+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(QueueSpecification.queue(queueName).durable(true));
53+
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
54+
return declareExchange.then(declareQueue).then(binding).then();
55+
}
4256
}
4357

58+
4459
@Override
4560
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
4661
final RegisteredCommandHandler<Object> handler = resolver.getCommandHandler(executorPath);

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationEventListener.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,35 @@ public class ApplicationEventListener extends GenericMessageListener {
2929
private final MessageConverter messageConverter;
3030
private final HandlerResolver resolver;
3131
private final String eventsExchange;
32+
private final boolean withDLQRetry;
33+
private final int retryDelay;
3234

33-
34-
public ApplicationEventListener(ReactiveMessageListener receiver, String queueName, HandlerResolver resolver, String eventsExchange, MessageConverter messageConverter) {
35-
super(queueName, receiver);
35+
public ApplicationEventListener(ReactiveMessageListener receiver, String queueName, HandlerResolver resolver, String eventsExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay) {
36+
super(queueName, receiver, withDLQRetry, maxRetries);
37+
this.retryDelay = retryDelay;
38+
this.withDLQRetry = withDLQRetry;
3639
this.resolver = resolver;
3740
this.eventsExchange = eventsExchange;
3841
this.messageConverter = messageConverter;
3942
}
4043

4144
protected Mono<Void> setUpBindings(TopologyCreator creator) {
42-
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners())
43-
.flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));
44-
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic"));
45-
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(QueueSpecification.queue(queueName).durable(true));
46-
return declareExchange.then(declareQueue).thenMany(bindings).then();
45+
if (withDLQRetry) {
46+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic"));
47+
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(eventsExchange+".DLQ").durable(true).type("topic"));
48+
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, eventsExchange, retryDelay);
49+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, eventsExchange+".DLQ");
50+
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));
51+
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(eventsExchange+".DLQ", "#", queueName + ".DLQ"));
52+
return declareExchange.then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).thenMany(bindings).then(bindingDLQ).then();
53+
} else {
54+
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners())
55+
.flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));
56+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic"));
57+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(QueueSpecification.queue(queueName).durable(true));
58+
return declareExchange.then(declareQueue).thenMany(bindings).then();
59+
}
60+
4761
}
4862

4963
@Override

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationQueryListener.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@ public class ApplicationQueryListener extends GenericMessageListener {
3636
private final ReactiveMessageSender sender;
3737
private final String replyExchange;
3838
private final String directExchange;
39+
private final boolean withDLQRetry;
40+
private final int retryDelay;
3941

4042

41-
public ApplicationQueryListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, ReactiveMessageSender sender, String directExchange, MessageConverter converter, String replyExchange) {
42-
super(queueName, listener);
43+
public ApplicationQueryListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, ReactiveMessageSender sender, String directExchange, MessageConverter converter, String replyExchange, boolean withDLQRetry, long maxRetries, int retryDelay) {
44+
super(queueName, listener, withDLQRetry, maxRetries);
45+
this.retryDelay = retryDelay;
46+
this.withDLQRetry = withDLQRetry;
4347
this.converter = converter;
4448
this.handlerResolver = resolver;
4549
this.sender = sender;
@@ -61,10 +65,20 @@ protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath)
6165
}
6266

6367
protected Mono<Void> setUpBindings(TopologyCreator creator) {
64-
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
65-
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(QueueSpecification.queue(queueName).durable(true));
66-
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
67-
return declareExchange.then(declareQueue).then(binding).then();
68+
if (withDLQRetry) {
69+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
70+
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(directExchange+".DLQ").durable(true).type("direct"));
71+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, directExchange+".DLQ");
72+
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, directExchange, retryDelay);
73+
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
74+
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(directExchange+".DLQ", queueName, queueName + ".DLQ"));
75+
return declareExchange.then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).then(binding).then(bindingDLQ).then();
76+
} else {
77+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
78+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(QueueSpecification.queue(queueName).durable(true));
79+
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
80+
return declareExchange.then(declareQueue).then(binding).then();
81+
}
6882
}
6983

7084
@Override

0 commit comments

Comments
 (0)