Skip to content

Commit 0b12eb9

Browse files
committed
request reply delegate
1 parent 63bb4c9 commit 0b12eb9

File tree

13 files changed

+189
-104
lines changed

13 files changed

+189
-104
lines changed
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.reactivecommons.async.api;
22

3+
import lombok.Data;
4+
5+
@Data
36
public class From {
4-
//String getCorrelationID();
5-
//String getReplyID();
6-
///...
7+
private String replyID;
8+
private String correlationID;
79
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,87 +19,86 @@
1919
@NoArgsConstructor(access = AccessLevel.PACKAGE)
2020
public class HandlerRegistry {
2121

22-
public static HandlerRegistry register(){
23-
return new HandlerRegistry();
24-
}
25-
26-
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
22+
private final List<RegisteredQueryHandler> handlers = new CopyOnWriteArrayList<>();
2723
private final List<RegisteredEventListener> eventListeners = new CopyOnWriteArrayList<>();
2824
private final List<RegisteredCommandHandler> commandHandlers = new CopyOnWriteArrayList<>();
2925
private final List<RegisteredEventListener> eventNotificationListener = new CopyOnWriteArrayList<>();
3026

31-
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
32-
eventListeners.add(new RegisteredEventListener<>(eventName, fn, eventClass));
27+
public static HandlerRegistry register() {
28+
return new HandlerRegistry();
29+
}
30+
31+
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass) {
32+
eventListeners.add(new RegisteredEventListener<>(eventName, fn, eventClass));
3333
return this;
3434
}
3535

36-
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler){
37-
eventListeners.add(new RegisteredEventListener<>(eventName, handler, inferGenericParameterType(handler)));
36+
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler) {
37+
eventListeners.add(new RegisteredEventListener<>(eventName, handler, inferGenericParameterType(handler)));
3838
return this;
3939
}
4040

41-
public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
42-
eventNotificationListener.add(new RegisteredEventListener<>(eventName, fn, eventClass));
41+
public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandler<T> fn, Class<T> eventClass) {
42+
eventNotificationListener.add(new RegisteredEventListener<>(eventName, fn, eventClass));
4343
return this;
4444
}
4545

46-
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn, Class<T> commandClass){
47-
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, commandClass));
46+
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn, Class<T> commandClass) {
47+
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, commandClass));
4848
return this;
4949
}
5050

51-
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn){
52-
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, inferGenericParameterType(fn)));
51+
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn) {
52+
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, inferGenericParameterType(fn)));
5353
return this;
5454
}
5555

56-
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler){
56+
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler) {
5757
return serveQuery(resource, handler, inferGenericParameterType(handler));
5858
}
5959

60-
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass){
60+
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
6161
handlers.add(new RegisteredQueryHandler<>(resource, handler, queryClass));
6262
return this;
6363
}
6464

65-
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandlerDelegate<R> handler, Class<R> queryClass){
66-
//handlers.add(new RegisteredQueryHandler<>(resource, handler, queryClass));
65+
public <R> HandlerRegistry serveQuery(String resource, QueryHandlerDelegate<R> handler, Class<R> queryClass) {
66+
handlers.add(new RegisteredQueryHandler<>(resource, handler, queryClass));
6767
return this;
6868
}
6969

7070

7171
@SuppressWarnings("unchecked")
72-
private <T, R> Class<R> inferGenericParameterType(QueryHandler<T, R> handler){
73-
try{
72+
private <T, R> Class<R> inferGenericParameterType(QueryHandler<T, R> handler) {
73+
try {
7474
ParameterizedType genericSuperclass = (ParameterizedType) handler.getClass().getGenericInterfaces()[0];
75-
return (Class<R>) genericSuperclass.getActualTypeArguments()[1];
76-
}catch (Exception e){
77-
throw new RuntimeException("Fail to infer generic Query class, please use serveQuery(path, handler, class) instead");
75+
return (Class<R>) genericSuperclass.getActualTypeArguments()[1];
76+
} catch (Exception e) {
77+
throw new RuntimeException("Fail to infer generic Query class, please use serveQuery(path, handler, " +
78+
"class) instead");
7879
}
7980
}
8081

8182
@SuppressWarnings("unchecked")
82-
private <T> Class<T> inferGenericParameterType(CommandHandler<T> handler){
83-
try{
83+
private <T> Class<T> inferGenericParameterType(CommandHandler<T> handler) {
84+
try {
8485
ParameterizedType genericSuperclass = (ParameterizedType) handler.getClass().getGenericInterfaces()[0];
85-
return (Class<T>) genericSuperclass.getActualTypeArguments()[0];
86-
}catch (Exception e){
87-
throw new RuntimeException("Fail to infer generic Command class, please use handleCommand(path, handler, class) instead");
86+
return (Class<T>) genericSuperclass.getActualTypeArguments()[0];
87+
} catch (Exception e) {
88+
throw new RuntimeException("Fail to infer generic Command class, please use handleCommand(path, handler, " +
89+
"class) instead");
8890
}
8991
}
9092

91-
private <T> Class<T> inferGenericParameterType(EventHandler<T> handler){
92-
try{
93+
private <T> Class<T> inferGenericParameterType(EventHandler<T> handler) {
94+
try {
9395
ParameterizedType genericSuperclass = (ParameterizedType) handler.getClass().getGenericInterfaces()[0];
94-
return (Class<T>) genericSuperclass.getActualTypeArguments()[0];
95-
}catch (Exception e){
96-
throw new RuntimeException("Fail to infer generic Query class, please use listenEvent(eventName, handler, class) instead");
96+
return (Class<T>) genericSuperclass.getActualTypeArguments()[0];
97+
} catch (Exception e) {
98+
throw new RuntimeException("Fail to infer generic Query class, please use listenEvent(eventName, handler," +
99+
" class) instead");
97100
}
98101
}
99-
100-
101-
102-
103102
}
104103

105104

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
public interface QueryExecutor<R, M> {
6+
Mono<R> execute(M rawMessage);
7+
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/registered/RegisteredQueryHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
import lombok.Getter;
44
import lombok.RequiredArgsConstructor;
5-
import org.reactivecommons.async.api.handlers.QueryHandler;
65

76
@RequiredArgsConstructor
87
@Getter
9-
public class RegisteredQueryHandler<T, R> {
8+
public class RegisteredQueryHandler<R> {
109
private final String path;
11-
private final QueryHandler<T, R> handler;
10+
private final Object handler;
1211
private final Class<R> queryClass;
1312
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/HandlerResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ public class HandlerResolver {
1717
private final Map<String , RegisteredEventListener> eventNotificationListeners;
1818

1919
@SuppressWarnings("unchecked")
20-
public <T, R> RegisteredQueryHandler<T, R> getQueryHandler(String path) {
21-
return (RegisteredQueryHandler<T, R>) queryHandlers.get(path);
20+
public <T> RegisteredQueryHandler<T> getQueryHandler(String path) {
21+
return (RegisteredQueryHandler<T>) queryHandlers.get(path);
2222
}
2323

2424
@SuppressWarnings("unchecked")
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.reactivecommons.async.impl;
2+
3+
4+
import org.reactivecommons.async.api.From;
5+
import org.reactivecommons.async.api.handlers.QueryExecutor;
6+
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
7+
import org.reactivecommons.async.impl.communications.Message;
8+
import reactor.core.publisher.Mono;
9+
10+
import java.util.function.Function;
11+
12+
import static org.reactivecommons.async.impl.Headers.CORRELATION_ID;
13+
import static org.reactivecommons.async.impl.Headers.REPLY_ID;
14+
15+
public class QueryDelegateExecutor<M> implements QueryExecutor<Void, Message> {
16+
private final QueryHandlerDelegate<M> queryHandler;
17+
private final Function<Message, M> converter;
18+
19+
public QueryDelegateExecutor(QueryHandlerDelegate<M> queryHandler, Function<Message, M> converter) {
20+
this.queryHandler = queryHandler;
21+
this.converter = converter;
22+
}
23+
24+
public Mono<Void> execute(Message rawMessage) {
25+
From from = new From();
26+
from.setCorrelationID(rawMessage.getProperties().getHeaders().get(CORRELATION_ID).toString());
27+
from.setReplyID(rawMessage.getProperties().getHeaders().get(REPLY_ID).toString());
28+
return queryHandler.handle(from, converter.apply(rawMessage));
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
package org.reactivecommons.async.impl;
22

33

4+
import org.reactivecommons.async.api.handlers.QueryExecutor;
45
import org.reactivecommons.async.api.handlers.QueryHandler;
56
import org.reactivecommons.async.impl.communications.Message;
67
import reactor.core.publisher.Mono;
78

89
import java.util.function.Function;
910

10-
public class QueryExecutor<C, R> {
11+
public class QueryDirectExecutor<C, R> implements QueryExecutor<R, Message> {
1112
private final QueryHandler<R, C> queryHandler;
1213
private final Function<Message, C> converter;
1314

14-
public QueryExecutor(QueryHandler<R, C> queryHandler, Function<Message, C> converter) {
15+
public QueryDirectExecutor(QueryHandler<R, C> queryHandler, Function<Message, C> converter) {
1516
this.queryHandler = queryHandler;
1617
this.converter = converter;
1718
}
1819

19-
public Mono<R> execute(Message rawMessage){
20+
public Mono<R> execute(Message rawMessage) {
2021
return queryHandler.handle(converter.apply(rawMessage));
2122
}
2223
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/RabbitDirectAsyncGateway.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.reactivecommons.api.domain.Command;
44
import org.reactivecommons.async.api.AsyncQuery;
55
import org.reactivecommons.async.api.DirectAsyncGateway;
6+
import org.reactivecommons.async.api.From;
67
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
78
import org.reactivecommons.async.impl.config.BrokerConfig;
89
import org.reactivecommons.async.impl.converters.MessageConverter;
@@ -17,6 +18,7 @@
1718
import java.util.Map;
1819
import java.util.UUID;
1920

21+
import static java.lang.Boolean.TRUE;
2022
import static org.reactivecommons.async.impl.Headers.*;
2123
import static reactor.core.publisher.Mono.fromCallable;
2224

@@ -32,7 +34,8 @@ public class RabbitDirectAsyncGateway implements DirectAsyncGateway {
3234
private final Duration replyTimeout;
3335

3436

35-
public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender, String exchange, MessageConverter converter) {
37+
public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender,
38+
String exchange, MessageConverter converter) {
3639
this.config = config;
3740
this.router = router;
3841
this.sender = sender;
@@ -58,8 +61,8 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
5861
final String correlationID = UUID.randomUUID().toString().replaceAll("-", "");
5962

6063
final Mono<R> replyHolder = router.register(correlationID)
61-
.timeout(replyTimeout)
62-
.flatMap(s -> fromCallable(() -> converter.readValue(s, type)));
64+
.timeout(replyTimeout)
65+
.flatMap(s -> fromCallable(() -> converter.readValue(s, type)));
6366

6467
Map<String, Object> headers = new HashMap<>();
6568
headers.put(REPLY_ID, config.getRoutingKey());
@@ -69,4 +72,16 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
6972
return sender.sendNoConfirm(query, exchange, targetName + ".query", headers, persistentQueries).then(replyHolder);
7073
}
7174

75+
@Override
76+
public <T> Mono<Void> reply(T response, From from) {
77+
final HashMap<String, Object> headers = new HashMap<>();
78+
headers.put(CORRELATION_ID, from.getCorrelationID());
79+
80+
if (response == null) {
81+
headers.put(Headers.COMPLETION_ONLY_SIGNAL, TRUE.toString());
82+
}
83+
84+
return sender.sendNoConfirm(response, "globalReply", from.getReplyID(), headers, false);
85+
}
86+
7287
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationQueryListener.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
import com.rabbitmq.client.AMQP;
44
import lombok.extern.java.Log;
5+
import org.reactivecommons.async.api.handlers.QueryExecutor;
6+
import org.reactivecommons.async.api.handlers.QueryHandler;
7+
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
58
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
6-
import org.reactivecommons.async.impl.DiscardNotifier;
7-
import org.reactivecommons.async.impl.HandlerResolver;
8-
import org.reactivecommons.async.impl.Headers;
9-
import org.reactivecommons.async.impl.QueryExecutor;
9+
import org.reactivecommons.async.impl.*;
1010
import org.reactivecommons.async.impl.communications.Message;
1111
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1212
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
1313
import org.reactivecommons.async.impl.communications.TopologyCreator;
1414
import org.reactivecommons.async.impl.converters.MessageConverter;
1515
import reactor.core.publisher.Mono;
16+
import reactor.core.publisher.Signal;
1617
import reactor.rabbitmq.AcknowledgableDelivery;
1718
import reactor.rabbitmq.BindingSpecification;
1819
import reactor.rabbitmq.ExchangeSpecification;
19-
import reactor.rabbitmq.QueueSpecification;
2020

2121
import java.util.HashMap;
2222
import java.util.Optional;
@@ -41,7 +41,10 @@ public class ApplicationQueryListener extends GenericMessageListener {
4141
private final Optional<Integer> maxLengthBytes;
4242

4343

44-
public ApplicationQueryListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, ReactiveMessageSender sender, String directExchange, MessageConverter converter, String replyExchange, boolean withDLQRetry, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier) {
44+
public ApplicationQueryListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver,
45+
ReactiveMessageSender sender, String directExchange, MessageConverter converter,
46+
String replyExchange, boolean withDLQRetry, long maxRetries, int retryDelay,
47+
Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier) {
4548
super(queueName, listener, withDLQRetry, maxRetries, discardNotifier, "query");
4649
this.retryDelay = retryDelay;
4750
this.withDLQRetry = withDLQRetry;
@@ -53,19 +56,29 @@ public ApplicationQueryListener(ReactiveMessageListener listener, String queueNa
5356
this.maxLengthBytes = maxLengthBytes;
5457
}
5558

56-
5759
@Override
5860
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
59-
final RegisteredQueryHandler<Object, Object> handler1 = handlerResolver.getQueryHandler(executorPath);
60-
if (handler1 == null) {
61+
final RegisteredQueryHandler<Object> handler = handlerResolver.getQueryHandler(executorPath);
62+
if (handler == null) {
6163
return message -> Mono.error(new RuntimeException("Handler Not registered for Query: " + executorPath));
6264
}
63-
final Class<?> handlerClass = (Class<?>) handler1.getQueryClass();
65+
final Class<?> handlerClass = handler.getQueryClass();
6466
Function<Message, Object> messageConverter = msj -> converter.readAsyncQuery(msj, handlerClass).getQueryData();
65-
final QueryExecutor executor = new QueryExecutor(handler1.getHandler(), messageConverter);
67+
final QueryExecutor<Object, Object> executor = buildQueryExecutor(handler.getHandler(), messageConverter);
6668
return executor::execute;
6769
}
6870

71+
@SuppressWarnings({"rawtypes", "unchecked"})
72+
private QueryExecutor<Object, Object> buildQueryExecutor(Object handler, Function<Message, Object> converter) {
73+
QueryExecutor executor;
74+
if (handler instanceof QueryHandler) {
75+
executor = new QueryDirectExecutor<Object, Object>((QueryHandler) handler, converter);
76+
} else {
77+
executor = new QueryDelegateExecutor<Object>((QueryHandlerDelegate) handler, converter);
78+
}
79+
return executor;
80+
}
81+
6982
protected Mono<Void> setUpBindings(TopologyCreator creator) {
7083
if (withDLQRetry) {
7184
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
@@ -94,6 +107,9 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
94107
if (signal.isOnError()) {
95108
return Mono.error(ofNullable(signal.getThrowable()).orElseGet(RuntimeException::new));
96109
}
110+
if (signal.isOnComplete()) {
111+
return Mono.empty();
112+
}
97113

98114
final String replyID = msg.getProperties().getHeaders().get(REPLY_ID).toString();
99115
final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString();
@@ -104,7 +120,7 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
104120
headers.put(Headers.COMPLETION_ONLY_SIGNAL, TRUE.toString());
105121
}
106122

107-
return sender.sendNoConfirm(signal.get(),replyExchange, replyID, headers, false);
123+
return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false);
108124
});
109125
}
110126
}

0 commit comments

Comments
 (0)