Skip to content

Commit 72f1897

Browse files
committed
create query delegate communication, create some unit tests
1 parent 0b12eb9 commit 72f1897

File tree

14 files changed

+301
-86
lines changed

14 files changed

+301
-86
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1111
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1212
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
13+
import reactor.core.publisher.Mono;
1314

1415
import java.lang.reflect.ParameterizedType;
1516
import java.util.List;
@@ -58,11 +59,11 @@ public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> han
5859
}
5960

6061
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
61-
handlers.add(new RegisteredQueryHandler<>(resource, handler, queryClass));
62+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
6263
return this;
6364
}
6465

65-
public <R> HandlerRegistry serveQuery(String resource, QueryHandlerDelegate<R> handler, Class<R> queryClass) {
66+
public <R> HandlerRegistry serveQuery(String resource, QueryHandlerDelegate<Void, R> handler, Class<R> queryClass) {
6667
handlers.add(new RegisteredQueryHandler<>(resource, handler, queryClass));
6768
return this;
6869
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/QueryExecutor.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/QueryHandlerDelegate.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
import org.reactivecommons.async.api.From;
44
import reactor.core.publisher.Mono;
55

6-
public interface QueryHandlerDelegate<M> {
7-
Mono<Void> handle(From from, M message);
6+
public interface QueryHandlerDelegate<T, M> {
7+
Mono<T> handle(From from, M message);
88
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/registered/RegisteredQueryHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
import lombok.Getter;
44
import lombok.RequiredArgsConstructor;
5+
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
56

67
@RequiredArgsConstructor
78
@Getter
8-
public class RegisteredQueryHandler<R> {
9+
public class RegisteredQueryHandler<T, C> {
910
private final String path;
10-
private final Object handler;
11-
private final Class<R> queryClass;
11+
private final QueryHandlerDelegate<T, C> handler;
12+
private final Class<C> queryClass;
1213
}

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,36 @@
11
package org.reactivecommons.async.api;
22

3-
import static org.assertj.core.api.Assertions.*;
4-
import static org.mockito.Mockito.*;
5-
63
import lombok.Data;
74
import org.junit.Test;
85
import org.reactivecommons.api.domain.Command;
96
import org.reactivecommons.api.domain.DomainEvent;
107
import org.reactivecommons.async.api.handlers.CommandHandler;
118
import org.reactivecommons.async.api.handlers.EventHandler;
129
import org.reactivecommons.async.api.handlers.QueryHandler;
10+
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
1311
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1412
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1513
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
1614
import reactor.core.publisher.Mono;
1715

16+
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.mockito.Mockito.mock;
18+
1819
public class HandlerRegistryTest {
1920

2021
private HandlerRegistry registry = HandlerRegistry.register();
2122
private String name = "some.event";
2223

2324
@Test
24-
public void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed(){
25+
public void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {
2526
SomeEventHandler eventHandler = new SomeEventHandler();
2627

2728
registry.listenEvent(name, eventHandler);
2829

2930
assertThat(registry.getEventListeners()).anySatisfy(registered -> {
30-
assertThat(registered).extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
31-
.containsExactly(name, SomeDataClass.class, eventHandler);
31+
assertThat(registered).extracting(RegisteredEventListener::getPath,
32+
RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
33+
.containsExactly(name, SomeDataClass.class, eventHandler);
3234
}).hasSize(1);
3335
}
3436

@@ -39,8 +41,9 @@ public void listenEvent() {
3941
registry.listenEvent(name, handler, SomeDataClass.class);
4042

4143
assertThat(registry.getEventListeners()).anySatisfy(registered -> {
42-
assertThat(registered).extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
43-
.containsExactly(name, SomeDataClass.class, handler);
44+
assertThat(registered).extracting(RegisteredEventListener::getPath,
45+
RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
46+
.containsExactly(name, SomeDataClass.class, handler);
4447
}).hasSize(1);
4548
}
4649

@@ -51,8 +54,9 @@ public void handleCommandWithTypeInference() {
5154
registry.handleCommand(name, handler);
5255

5356
assertThat(registry.getCommandHandlers()).anySatisfy(registered -> {
54-
assertThat(registered).extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler)
55-
.containsExactly(name, SomeDataClass.class, handler);
57+
assertThat(registered).extracting(RegisteredCommandHandler::getPath,
58+
RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler)
59+
.containsExactly(name, SomeDataClass.class, handler);
5660
}).hasSize(1);
5761
}
5862

@@ -76,8 +80,9 @@ public void handleCommandWithLambda() {
7680
registry.handleCommand(name, (Command<SomeDataClass> message) -> Mono.empty(), SomeDataClass.class);
7781

7882
assertThat(registry.getCommandHandlers()).anySatisfy(registered -> {
79-
assertThat(registered).extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass)
80-
.containsExactly(name, SomeDataClass.class);
83+
assertThat(registered).extracting(RegisteredCommandHandler::getPath,
84+
RegisteredCommandHandler::getInputClass)
85+
.containsExactly(name, SomeDataClass.class);
8186
}).hasSize(1);
8287
}
8388

@@ -87,7 +92,7 @@ public void serveQueryWithLambda() {
8792
registry.serveQuery(name, message -> Mono.empty(), SomeDataClass.class);
8893
assertThat(registry.getHandlers()).anySatisfy(registered -> {
8994
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
90-
.containsExactly(name, SomeDataClass.class);
95+
.containsExactly(name, SomeDataClass.class);
9196
}).hasSize(1);
9297
}
9398

@@ -96,11 +101,38 @@ public void serveQueryWithTypeInference() {
96101
QueryHandler<SomeDataClass, SomeDataClass> handler = new SomeQueryHandler();
97102
registry.serveQuery(name, handler);
98103
assertThat(registry.getHandlers()).anySatisfy(registered -> {
99-
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass, RegisteredQueryHandler::getHandler)
100-
.containsExactly(name, SomeDataClass.class, handler);
104+
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
105+
.containsExactly(name, SomeDataClass.class);
106+
assertThat(registered).extracting(RegisteredQueryHandler::getHandler).isInstanceOf(QueryHandlerDelegate.class);
107+
}).hasSize(1);
108+
}
109+
110+
@Test
111+
public void serveQueryDelegate() {
112+
QueryHandlerDelegate<Void, SomeDataClass> handler = new SomeQueryHandlerDelegate();
113+
registry.serveQuery(name, handler, SomeDataClass.class);
114+
assertThat(registry.getHandlers()).anySatisfy(registered -> {
115+
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
116+
.containsExactly(name, SomeDataClass.class);
117+
}).hasSize(1);
118+
}
119+
120+
@Test
121+
public void serveQueryDelegateWithLambda() {
122+
registry.serveQuery(name, (from, message) -> Mono.empty(), SomeDataClass.class);
123+
assertThat(registry.getHandlers()).anySatisfy(registered -> {
124+
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
125+
.containsExactly(name, SomeDataClass.class);
101126
}).hasSize(1);
102127
}
103128

129+
private static class SomeQueryHandlerDelegate implements QueryHandlerDelegate<Void, SomeDataClass> {
130+
@Override
131+
public Mono<Void> handle(From from, SomeDataClass message) {
132+
return Mono.empty();
133+
}
134+
}
135+
104136
private static class SomeEventHandler implements EventHandler<SomeDataClass> {
105137
@Override
106138
public Mono<Void> handle(DomainEvent<SomeDataClass> message) {
@@ -129,4 +161,4 @@ private static class SomeDataClass {
129161
private Integer someProp2;
130162
}
131163

132-
}
164+
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/HandlerResolver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ public class HandlerResolver {
1414
private final Map<String, RegisteredQueryHandler> queryHandlers;
1515
private final Map<String, RegisteredEventListener> eventListeners;
1616
private final Map<String, RegisteredCommandHandler> commandHandlers;
17-
private final Map<String , RegisteredEventListener> eventNotificationListeners;
17+
private final Map<String, RegisteredEventListener> eventNotificationListeners;
1818

1919
@SuppressWarnings("unchecked")
20-
public <T> RegisteredQueryHandler<T> getQueryHandler(String path) {
21-
return (RegisteredQueryHandler<T>) queryHandlers.get(path);
20+
public <T, M> RegisteredQueryHandler<T, M> getQueryHandler(String path) {
21+
return (RegisteredQueryHandler<T, M>) queryHandlers.get(path);
2222
}
2323

2424
@SuppressWarnings("unchecked")

async/async-commons/src/main/java/org/reactivecommons/async/impl/QueryDirectExecutor.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

async/async-commons/src/main/java/org/reactivecommons/async/impl/QueryDelegateExecutor.java renamed to async/async-commons/src/main/java/org/reactivecommons/async/impl/QueryExecutor.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33

44
import org.reactivecommons.async.api.From;
5-
import org.reactivecommons.async.api.handlers.QueryExecutor;
65
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
76
import org.reactivecommons.async.impl.communications.Message;
87
import reactor.core.publisher.Mono;
@@ -12,19 +11,19 @@
1211
import static org.reactivecommons.async.impl.Headers.CORRELATION_ID;
1312
import static org.reactivecommons.async.impl.Headers.REPLY_ID;
1413

15-
public class QueryDelegateExecutor<M> implements QueryExecutor<Void, Message> {
16-
private final QueryHandlerDelegate<M> queryHandler;
14+
public class QueryExecutor<T, M> {
15+
private final QueryHandlerDelegate<T, M> queryHandler;
1716
private final Function<Message, M> converter;
1817

19-
public QueryDelegateExecutor(QueryHandlerDelegate<M> queryHandler, Function<Message, M> converter) {
18+
public QueryExecutor(QueryHandlerDelegate<T, M> queryHandler, Function<Message, M> converter) {
2019
this.queryHandler = queryHandler;
2120
this.converter = converter;
2221
}
2322

24-
public Mono<Void> execute(Message rawMessage) {
23+
public Mono<T> execute(Message rawMessage) {
2524
From from = new From();
26-
from.setCorrelationID(rawMessage.getProperties().getHeaders().get(CORRELATION_ID).toString());
27-
from.setReplyID(rawMessage.getProperties().getHeaders().get(REPLY_ID).toString());
25+
from.setCorrelationID(rawMessage.getProperties().getHeaders().getOrDefault(CORRELATION_ID, "").toString());
26+
from.setReplyID(rawMessage.getProperties().getHeaders().getOrDefault(REPLY_ID, "").toString());
2827
return queryHandler.handle(from, converter.apply(rawMessage));
2928
}
3029
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationQueryListener.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,17 @@
22

33
import com.rabbitmq.client.AMQP;
44
import lombok.extern.java.Log;
5-
import org.reactivecommons.async.api.handlers.QueryExecutor;
6-
import org.reactivecommons.async.api.handlers.QueryHandler;
7-
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
85
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
9-
import org.reactivecommons.async.impl.*;
6+
import org.reactivecommons.async.impl.DiscardNotifier;
7+
import org.reactivecommons.async.impl.HandlerResolver;
8+
import org.reactivecommons.async.impl.Headers;
9+
import org.reactivecommons.async.impl.QueryExecutor;
1010
import org.reactivecommons.async.impl.communications.Message;
1111
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1212
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
1313
import org.reactivecommons.async.impl.communications.TopologyCreator;
1414
import org.reactivecommons.async.impl.converters.MessageConverter;
1515
import reactor.core.publisher.Mono;
16-
import reactor.core.publisher.Signal;
1716
import reactor.rabbitmq.AcknowledgableDelivery;
1817
import reactor.rabbitmq.BindingSpecification;
1918
import reactor.rabbitmq.ExchangeSpecification;
@@ -58,27 +57,16 @@ public ApplicationQueryListener(ReactiveMessageListener listener, String queueNa
5857

5958
@Override
6059
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
61-
final RegisteredQueryHandler<Object> handler = handlerResolver.getQueryHandler(executorPath);
60+
final RegisteredQueryHandler<Object, Object> handler = handlerResolver.getQueryHandler(executorPath);
6261
if (handler == null) {
6362
return message -> Mono.error(new RuntimeException("Handler Not registered for Query: " + executorPath));
6463
}
6564
final Class<?> handlerClass = handler.getQueryClass();
6665
Function<Message, Object> messageConverter = msj -> converter.readAsyncQuery(msj, handlerClass).getQueryData();
67-
final QueryExecutor<Object, Object> executor = buildQueryExecutor(handler.getHandler(), messageConverter);
66+
final QueryExecutor<Object, Object> executor = new QueryExecutor<>(handler.getHandler(), messageConverter);
6867
return executor::execute;
6968
}
7069

71-
@SuppressWarnings({"rawtypes", "unchecked"})
72-
private QueryExecutor<Object, Object> buildQueryExecutor(Object handler, Function<Message, Object> converter) {
73-
QueryExecutor executor;
74-
if (handler instanceof QueryHandler) {
75-
executor = new QueryDirectExecutor<Object, Object>((QueryHandler) handler, converter);
76-
} else {
77-
executor = new QueryDelegateExecutor<Object>((QueryHandlerDelegate) handler, converter);
78-
}
79-
return executor;
80-
}
81-
8270
protected Mono<Void> setUpBindings(TopologyCreator creator) {
8371
if (withDLQRetry) {
8472
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
@@ -116,10 +104,6 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
116104
final HashMap<String, Object> headers = new HashMap<>();
117105
headers.put(CORRELATION_ID, correlationID);
118106

119-
if (!signal.hasValue()) {
120-
headers.put(Headers.COMPLETION_ONLY_SIGNAL, TRUE.toString());
121-
}
122-
123107
return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false);
124108
});
125109
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.reactivecommons.async.helpers;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
6+
import java.util.Date;
7+
8+
@RequiredArgsConstructor
9+
@Getter
10+
public class SampleClass {
11+
private final String id;
12+
private final String name;
13+
private final Date date;
14+
}

0 commit comments

Comments
 (0)