Skip to content

Commit 63d6502

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #53 from reactive-commons/feat/discard-timeout-queries
Discard timeout queries
2 parents f3b2f6e + 0483262 commit 63d6502

File tree

8 files changed

+203
-44
lines changed

8 files changed

+203
-44
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/commons/Headers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ public final class Headers {
77
public static final String COMPLETION_ONLY_SIGNAL = "x-empty-completion";
88
public static final String SERVED_QUERY_ID = "x-serveQuery-id";
99
public static final String SOURCE_APPLICATION = "sourceApplication";
10-
public static final String SIGNAL_TYPE = "x-signal-type";
10+
public static final String REPLY_TIMEOUT_MILLIS = "x-reply-timeout-millis";
1111

1212

1313
private Headers() {

async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/QueryListenerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public ApplicationQueryListener queryListener(MessageConverter converter, Handle
3232
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
3333
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
3434
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
35-
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier, errorReporter);
35+
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(),
36+
asyncProps.getDirect().isDiscardTimeoutQueries(), discardNotifier, errorReporter);
3637

3738
listener.startListener();
3839

async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/DirectProps.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ public class DirectProps {
1313

1414
private Optional<Integer> maxLengthBytes = Optional.empty();
1515

16+
private boolean discardTimeoutQueries = false;
17+
1618
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
import org.reactivecommons.async.api.AsyncQuery;
55
import org.reactivecommons.async.api.DirectAsyncGateway;
66
import org.reactivecommons.async.api.From;
7-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
87
import org.reactivecommons.async.commons.config.BrokerConfig;
98
import org.reactivecommons.async.commons.converters.MessageConverter;
109
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
10+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1111
import reactor.core.publisher.Flux;
1212
import reactor.core.publisher.Mono;
1313
import reactor.rabbitmq.OutboundMessageResult;
14-
import static org.reactivecommons.async.commons.Headers.*;
14+
1515
import java.time.Duration;
1616
import java.util.Collections;
1717
import java.util.HashMap;
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.TimeoutException;
2121

2222
import static java.lang.Boolean.TRUE;
23+
import static org.reactivecommons.async.commons.Headers.*;
2324
import static reactor.core.publisher.Mono.fromCallable;
2425

2526
public class RabbitDirectAsyncGateway implements DirectAsyncGateway {
@@ -69,8 +70,10 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
6970
headers.put(REPLY_ID, config.getRoutingKey());
7071
headers.put(SERVED_QUERY_ID, query.getResource());
7172
headers.put(CORRELATION_ID, correlationID);
73+
headers.put(REPLY_TIMEOUT_MILLIS, replyTimeout.toMillis());
7274

73-
return sender.sendNoConfirm(query, exchange, targetName + ".query", headers, persistentQueries).then(replyHolder);
75+
return sender.sendNoConfirm(query, exchange, targetName + ".query", headers, persistentQueries)
76+
.then(replyHolder);
7477
}
7578

7679
@Override

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
import reactor.rabbitmq.BindingSpecification;
1818
import reactor.rabbitmq.ExchangeSpecification;
1919

20+
import java.time.Duration;
21+
import java.time.Instant;
2022
import java.util.HashMap;
2123
import java.util.Optional;
2224
import java.util.function.Function;
25+
import java.util.logging.Level;
2326

2427
import static java.util.Optional.ofNullable;
2528
import static org.reactivecommons.async.commons.Headers.*;
@@ -37,12 +40,14 @@ public class ApplicationQueryListener extends GenericMessageListener {
3740
private final boolean withDLQRetry;
3841
private final int retryDelay;
3942
private final Optional<Integer> maxLengthBytes;
43+
private final boolean discardTimeoutQueries;
4044

4145

4246
public ApplicationQueryListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver,
4347
ReactiveMessageSender sender, String directExchange, MessageConverter converter,
4448
String replyExchange, boolean withDLQRetry, long maxRetries, int retryDelay,
45-
Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
49+
Optional<Integer> maxLengthBytes, boolean discardTimeoutQueries,
50+
DiscardNotifier discardNotifier, CustomReporter errorReporter) {
4651
super(queueName, listener, withDLQRetry, maxRetries, discardNotifier, "query", errorReporter);
4752
this.retryDelay = retryDelay;
4853
this.withDLQRetry = withDLQRetry;
@@ -52,6 +57,7 @@ public ApplicationQueryListener(ReactiveMessageListener listener, String queueNa
5257
this.replyExchange = replyExchange;
5358
this.directExchange = directExchange;
5459
this.maxLengthBytes = maxLengthBytes;
60+
this.discardTimeoutQueries = discardTimeoutQueries;
5561
}
5662

5763
@Override
@@ -69,11 +75,11 @@ protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath)
6975
protected Mono<Void> setUpBindings(TopologyCreator creator) {
7076
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
7177
if (withDLQRetry) {
72-
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(directExchange+".DLQ").durable(true).type("direct"));
73-
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, directExchange+".DLQ", maxLengthBytes);
78+
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(directExchange + ".DLQ").durable(true).type("direct"));
79+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, directExchange + ".DLQ", maxLengthBytes);
7480
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, directExchange, retryDelay, maxLengthBytes);
7581
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
76-
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(directExchange+".DLQ", queueName, queueName + ".DLQ"));
82+
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(directExchange + ".DLQ", queueName, queueName + ".DLQ"));
7783
return declareExchange.then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).then(binding).then(bindingDLQ).then();
7884
} else {
7985
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, maxLengthBytes);
@@ -82,6 +88,45 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
8288
}
8389
}
8490

91+
@Override
92+
protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instant initTime) {
93+
AMQP.BasicProperties messageProperties = msj.getProperties();
94+
95+
boolean messageDoesNotContainTimeoutMetadata = messageProperties.getTimestamp() == null ||
96+
!messageProperties.getHeaders().containsKey(REPLY_TIMEOUT_MILLIS);
97+
98+
if (messageDoesNotContainTimeoutMetadata || !discardTimeoutQueries) {
99+
return super.handle(msj, initTime);
100+
}
101+
102+
return handleWithTimeout(msj, initTime, messageProperties);
103+
}
104+
105+
private Mono<AcknowledgableDelivery> handleWithTimeout(AcknowledgableDelivery msj,
106+
Instant initTime,
107+
AMQP.BasicProperties messageProperties) {
108+
long messageTimestamp = msj.getProperties().getTimestamp().getTime();
109+
long replyTimeoutMillis = (int) messageProperties.getHeaders().get(REPLY_TIMEOUT_MILLIS);
110+
long millisUntilTimeout = (messageTimestamp + replyTimeoutMillis) - currentTimestamp().toEpochMilli();
111+
String executorPath = getExecutorPath(msj);
112+
113+
if (millisUntilTimeout > 0) {
114+
return super.handle(msj, initTime)
115+
.timeout(Duration.ofMillis(millisUntilTimeout), buildTimeOutFallback(executorPath));
116+
}
117+
118+
return buildTimeOutFallback(executorPath);
119+
}
120+
121+
private Instant currentTimestamp() {
122+
return Instant.now();
123+
}
124+
125+
private Mono<AcknowledgableDelivery> buildTimeOutFallback(String executorPath) {
126+
return Mono.fromRunnable(() -> log.log(Level.WARNING, String.format("query with path %s discarded by timeout",
127+
executorPath)));
128+
}
129+
85130
@Override
86131
protected String getExecutorPath(AcknowledgableDelivery msj) {
87132
return msj.getProperties().getHeaders().get(SERVED_QUERY_ID).toString();

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -80,28 +80,27 @@ public void startListener() {
8080
consumeOptions.qos(messageListener.getPrefetchCount());
8181

8282
this.messageFlux = setUpBindings(messageListener.getTopologyCreator()).thenMany(
83-
receiver.consumeManualAck(queueName, consumeOptions)
84-
.transform(this::consumeFaultTolerant));
83+
receiver.consumeManualAck(queueName, consumeOptions)
84+
.transform(this::consumeFaultTolerant));
8585

8686
onTerminate();
8787
}
8888

8989
private void onTerminate() {
9090
messageFlux.doOnTerminate(this::onTerminate)
91-
.subscribe(new LoggerSubscriber<>(getClass().getName()));
91+
.subscribe(new LoggerSubscriber<>(getClass().getName()));
9292
}
9393

94-
95-
private Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instant initTime) {
94+
protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instant initTime) {
9695
try {
9796
final String executorPath = getExecutorPath(msj);
9897
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
9998
final Message message = RabbitMessage.fromDelivery(msj);
10099

101100
return defer(() -> handler.apply(message))
102-
.transform(enrichPostProcess(message))
103-
.doOnSuccess(o -> logExecution(executorPath, initTime, true))
104-
.subscribeOn(scheduler).thenReturn(msj);
101+
.transform(enrichPostProcess(message))
102+
.doOnSuccess(o -> logExecution(executorPath, initTime, true))
103+
.subscribeOn(scheduler).thenReturn(msj);
105104
} catch (Exception e) {
106105
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", msj.getProperties().getMessageId()));
107106
return Mono.error(e);
@@ -114,7 +113,7 @@ private void logExecution(String executorPath, Instant initTime, boolean success
114113
final long timeElapsed = Duration.between(initTime, afterExecutionTime).toMillis();
115114
doLogExecution(executorPath, timeElapsed);
116115
customReporter.reportMetric(objectType, executorPath, timeElapsed, success);
117-
}catch (Exception e){
116+
} catch (Exception e) {
118117
log.log(Level.WARNING, "Unable to send execution metrics!", e);
119118
}
120119

@@ -124,23 +123,23 @@ private void reportErrorMetric(AcknowledgableDelivery msj, Instant initTime) {
124123
String executorPath;
125124
try {
126125
executorPath = getExecutorPath(msj);
127-
}catch (Exception e){
126+
} catch (Exception e) {
128127
executorPath = "unknown";
129128
}
130129
logExecution(executorPath, initTime, false);
131130
}
132131

133132
private void doLogExecution(String executorPath, long timeElapsed) {
134133
log.log(Level.FINE, String.format("%s with path %s handled, took %d ms",
135-
objectType, executorPath, timeElapsed));
134+
objectType, executorPath, timeElapsed));
136135
}
137136

138137
private Flux<AcknowledgableDelivery> consumeFaultTolerant(Flux<AcknowledgableDelivery> messageFlux) {
139138
return messageFlux.flatMap(msj -> {
140139
final Instant init = Instant.now();
141140
return handle(msj, init)
142-
.doOnSuccess(AcknowledgableDelivery::ack)
143-
.onErrorResume(err -> requeueOrAck(msj, err, init));
141+
.doOnSuccess(AcknowledgableDelivery::ack)
142+
.onErrorResume(err -> requeueOrAck(msj, err, init));
144143
}, messageListener.getMaxConcurrency());
145144
}
146145

@@ -200,13 +199,13 @@ private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery msj, Th
200199
}
201200
}
202201

203-
private void sendErrorToCustomReporter(final Throwable err, final Message message, final boolean redelivered){
202+
private void sendErrorToCustomReporter(final Throwable err, final Message message, final boolean redelivered) {
204203
try {
205204
customReporter.reportError(err, message, parseMessageForReporter(message), redelivered)
206-
.subscribeOn(errorReporterScheduler)
207-
.doOnError(t -> log.log(Level.WARNING, "Error sending error to external reporter", t))
208-
.subscribe();
209-
}catch (Throwable t){
205+
.subscribeOn(errorReporterScheduler)
206+
.doOnError(t -> log.log(Level.WARNING, "Error sending error to external reporter", t))
207+
.subscribe();
208+
} catch (Throwable t) {
210209
log.log(Level.WARNING, "Error in scheduler when sending error to external reporter", t);
211210
}
212211
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListenerErrorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ protected GenericMessageListener createMessageListener(HandlerResolver handlerRe
4444
class StubGenericMessageListener extends ApplicationQueryListener {
4545

4646
public StubGenericMessageListener(HandlerResolver handlerResolver) {
47-
super(reactiveMessageListener, "queueName", handlerResolver, mock(ReactiveMessageSender.class), "exchange", messageConverter, "exchange", true, 10L, 100, Optional.of(1), discardNotifier, errorReporter);
47+
super(reactiveMessageListener, "queueName", handlerResolver, mock(ReactiveMessageSender.class),
48+
"exchange", messageConverter, "exchange", true, 10L,
49+
100, Optional.of(1), true, discardNotifier, errorReporter);
4850
}
4951

5052
}

0 commit comments

Comments
 (0)