Skip to content

Commit feb1540

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #63 from reactive-commons/feature/refactor-dynamimc-event-handlers
Event listeners unification
2 parents 2cd9396 + 14deca2 commit feb1540

File tree

12 files changed

+91
-116
lines changed

12 files changed

+91
-116
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/DynamicRegistry.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.reactivecommons.async.api.handlers.EventHandler;
44
import org.reactivecommons.async.api.handlers.QueryHandler;
5+
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
56
import reactor.core.publisher.Mono;
67

78
public interface DynamicRegistry {
@@ -11,6 +12,8 @@ public interface DynamicRegistry {
1112

1213
<T, R> void serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass);
1314

15+
<R> void serveQuery(String resource, QueryHandlerDelegate<Void, R> handler, Class<R> queryClass);
16+
1417
Mono<Void> startListeningEvent(String eventName);
1518

1619
Mono<Void> stopListeningEvent(String eventName);

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ public class HandlerRegistry {
2121

2222
private final List<RegisteredEventListener<?>> eventListeners = new CopyOnWriteArrayList<>();
2323
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
24-
private final List<RegisteredEventListener<?>> dynamicEventsHandlers = new CopyOnWriteArrayList<>();
25-
2624
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2725
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
2826

@@ -44,11 +42,27 @@ public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandle
4442
return this;
4543
}
4644

45+
/**
46+
* @param eventNamePattern
47+
* @param handler
48+
* @param eventClass
49+
* @param <T>
50+
* @return HandlerRegistry
51+
* Avoid use this deprecated method please use listenEvent
52+
*/
53+
@Deprecated
4754
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler, Class<T> eventClass) {
48-
dynamicEventsHandlers.add(new RegisteredEventListener<>(eventNamePattern, handler, eventClass));
49-
return this;
55+
return listenEvent(eventNamePattern, handler, eventClass);
5056
}
5157

58+
/**
59+
* @param eventNamePattern
60+
* @param handler
61+
* @param <T>
62+
* @return HandlerRegistry
63+
* Avoid use this deprecated method please use listenEvent
64+
*/
65+
@Deprecated
5266
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler) {
5367
return handleDynamicEvents(eventNamePattern, handler, inferGenericParameterType(handler));
5468
}

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,60 +18,54 @@
1818
import static org.mockito.Mockito.mock;
1919

2020
class HandlerRegistryTest {
21-
22-
private HandlerRegistry registry = HandlerRegistry.register();
23-
private String name = "some.event";
21+
private final HandlerRegistry registry = HandlerRegistry.register();
22+
private final String name = "some.event";
2423

2524
@Test
2625
void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {
2726
SomeEventHandler eventHandler = new SomeEventHandler();
2827

2928
registry.listenEvent(name, eventHandler);
3029

31-
assertThat(registry.getEventListeners()).anySatisfy(registered -> {
32-
assertThat(registered).extracting(RegisteredEventListener::getPath,
33-
RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
34-
.containsExactly(name, SomeDataClass.class, eventHandler);
35-
}).hasSize(1);
30+
assertThat(registry.getEventListeners())
31+
.anySatisfy(registered -> assertThat(registered)
32+
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
33+
.containsExactly(name, SomeDataClass.class, eventHandler)).hasSize(1);
3634
}
3735

3836
@Test
39-
void shouldRegisterDynamicEventsHandlerWithTypeInference() {
37+
void shouldRegisterPatternEventHandlerWithTypeInference() {
4038
SomeEventHandler eventHandler = new SomeEventHandler();
4139

4240
String eventNamePattern = "a.*";
4341

44-
HandlerRegistry resultRegistry = registry.handleDynamicEvents(eventNamePattern, eventHandler);
42+
HandlerRegistry resultRegistry = registry.listenEvent(eventNamePattern, eventHandler);
4543
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
4644
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
4745

48-
assertThat(registry.getDynamicEventsHandlers())
49-
.anySatisfy(registeredEventListener -> {
50-
assertThat(registeredEventListener)
51-
.usingRecursiveComparison()
52-
.isEqualTo(expectedRegisteredEventListener);
53-
});
46+
assertThat(registry.getEventListeners())
47+
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
48+
.usingRecursiveComparison()
49+
.isEqualTo(expectedRegisteredEventListener));
5450

5551
assertThat(resultRegistry)
5652
.isSameAs(registry);
5753
}
5854

5955
@Test
60-
void shouldRegisterDynamicEventsHandler() {
56+
void shouldRegisterPatternEventHandler() {
6157
SomeEventHandler eventHandler = new SomeEventHandler();
6258

6359
String eventNamePattern = "a.*";
6460

65-
HandlerRegistry resultRegistry = registry.handleDynamicEvents(eventNamePattern, eventHandler, SomeDataClass.class);
61+
HandlerRegistry resultRegistry = registry.listenEvent(eventNamePattern, eventHandler, SomeDataClass.class);
6662
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
6763
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
6864

69-
assertThat(registry.getDynamicEventsHandlers())
70-
.anySatisfy(registeredEventListener -> {
71-
assertThat(registeredEventListener)
72-
.usingRecursiveComparison()
73-
.isEqualTo(expectedRegisteredEventListener);
74-
});
65+
assertThat(registry.getEventListeners())
66+
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
67+
.usingRecursiveComparison()
68+
.isEqualTo(expectedRegisteredEventListener));
7569

7670
assertThat(resultRegistry)
7771
.isSameAs(registry);
@@ -80,8 +74,8 @@ void shouldRegisterDynamicEventsHandler() {
8074
@Test
8175
void shouldRegisterNotificationEventListener() {
8276
registry.listenNotificationEvent(name, message -> Mono.empty(), SomeDataClass.class);
83-
assertThat(registry.getEventNotificationListener()).anySatisfy(listener ->
84-
assertThat(listener.getPath()).isEqualTo(name));
77+
assertThat(registry.getEventNotificationListener())
78+
.anySatisfy(listener -> assertThat(listener.getPath()).isEqualTo(name));
8579
}
8680

8781
@Test
@@ -90,11 +84,10 @@ public void listenEvent() {
9084
EventHandler<SomeDataClass> handler = mock(EventHandler.class);
9185
registry.listenEvent(name, handler, SomeDataClass.class);
9286

93-
assertThat(registry.getEventListeners()).anySatisfy(registered -> {
94-
assertThat(registered).extracting(RegisteredEventListener::getPath,
95-
RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
96-
.containsExactly(name, SomeDataClass.class, handler);
97-
}).hasSize(1);
87+
assertThat(registry.getEventListeners())
88+
.anySatisfy(registered -> assertThat(registered)
89+
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
90+
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);
9891
}
9992

10093
@Test
@@ -103,11 +96,10 @@ void handleCommandWithTypeInference() {
10396

10497
registry.handleCommand(name, handler);
10598

106-
assertThat(registry.getCommandHandlers()).anySatisfy(registered -> {
107-
assertThat(registered).extracting(RegisteredCommandHandler::getPath,
108-
RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler)
109-
.containsExactly(name, SomeDataClass.class, handler);
110-
}).hasSize(1);
99+
assertThat(registry.getCommandHandlers())
100+
.anySatisfy(registered -> assertThat(registered)
101+
.extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler)
102+
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);
111103
}
112104

113105
@Test
@@ -135,21 +127,20 @@ void handleQueryWithoutTypeShouldFail() {
135127
void handleCommandWithLambda() {
136128
registry.handleCommand(name, (Command<SomeDataClass> message) -> Mono.empty(), SomeDataClass.class);
137129

138-
assertThat(registry.getCommandHandlers()).anySatisfy(registered -> {
139-
assertThat(registered).extracting(RegisteredCommandHandler::getPath,
140-
RegisteredCommandHandler::getInputClass)
141-
.containsExactly(name, SomeDataClass.class);
142-
}).hasSize(1);
130+
assertThat(registry.getCommandHandlers())
131+
.anySatisfy(registered -> assertThat(registered)
132+
.extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass)
133+
.containsExactly(name, SomeDataClass.class)).hasSize(1);
143134
}
144135

145136

146137
@Test
147138
void serveQueryWithLambda() {
148139
registry.serveQuery(name, message -> Mono.empty(), SomeDataClass.class);
149-
assertThat(registry.getHandlers()).anySatisfy(registered -> {
150-
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
151-
.containsExactly(name, SomeDataClass.class);
152-
}).hasSize(1);
140+
assertThat(registry.getHandlers())
141+
.anySatisfy(registered -> assertThat(registered)
142+
.extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
143+
.containsExactly(name, SomeDataClass.class)).hasSize(1);
153144
}
154145

155146
@Test

async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,6 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
204204
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
205205
ConcurrentHashMap::putAll);
206206

207-
final ConcurrentMap<String, RegisteredEventListener<?>> dynamicEventHandlers = registries
208-
.values().stream()
209-
.flatMap(r -> r.getDynamicEventsHandlers().stream())
210-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
211-
ConcurrentHashMap::putAll);
212-
213207
final ConcurrentMap<String, RegisteredCommandHandler<?>> commandHandlers = registries
214208
.values().stream()
215209
.flatMap(r -> r.getCommandHandlers().stream())
@@ -223,8 +217,7 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
223217
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
224218
ConcurrentHashMap::putAll);
225219

226-
return new HandlerResolver(queryHandlers, eventListeners, eventNotificationListener,
227-
dynamicEventHandlers, commandHandlers) {
220+
return new HandlerResolver(queryHandlers, eventListeners, eventNotificationListener, commandHandlers) {
228221
@Override
229222
@SuppressWarnings("unchecked")
230223
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/DynamicRegistryImp.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.reactivecommons.async.api.DynamicRegistry;
66
import org.reactivecommons.async.api.handlers.EventHandler;
77
import org.reactivecommons.async.api.handlers.QueryHandler;
8+
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
89
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
910
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
1011
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
@@ -33,6 +34,11 @@ public <T, R> void serveQuery(String resource, QueryHandler<T, R> handler, Class
3334
resolver.addQueryHandler(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
3435
}
3536

37+
@Override
38+
public <R> void serveQuery(String resource, QueryHandlerDelegate<Void, R> handler, Class<R> queryClass) {
39+
resolver.addQueryHandler(new RegisteredQueryHandler<>(resource, handler, queryClass));
40+
}
41+
3642
@Override
3743
public Mono<Void> startListeningEvent(String eventName) {
3844
return topologyCreator.bind(buildBindingSpecification(eventName))

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/HandlerResolver.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
import org.reactivecommons.async.commons.utils.matcher.Matcher;
1010

1111
import java.util.Collection;
12-
import java.util.HashSet;
1312
import java.util.Map;
14-
import java.util.Set;
1513
import java.util.function.Function;
1614

1715
@Log
@@ -21,7 +19,6 @@ public class HandlerResolver {
2119
private final Map<String, RegisteredQueryHandler<?, ?>> queryHandlers;
2220
private final Map<String, RegisteredEventListener<?>> eventListeners;
2321
private final Map<String, RegisteredEventListener<?>> eventNotificationListeners;
24-
private final Map<String, RegisteredEventListener<?>> dynamicEventsHandlers;
2522
private final Map<String, RegisteredCommandHandler<?>> commandHandlers;
2623
private final Matcher matcher = new KeyMatcher();
2724

@@ -39,13 +36,12 @@ public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
3936

4037
@SuppressWarnings("unchecked")
4138
public <T> RegisteredEventListener<T> getEventListener(String path) {
42-
return (RegisteredEventListener<T>) eventListeners.get(path);
39+
if (eventListeners.containsKey(path)) {
40+
return (RegisteredEventListener<T>) eventListeners.get(path);
41+
}
42+
return (RegisteredEventListener<T>) getMatchHandler(eventListeners).apply(path);
4343
}
4444

45-
@SuppressWarnings("unchecked")
46-
public <T> RegisteredEventListener<T> getDynamicEventsHandler(String path) {
47-
return (RegisteredEventListener<T>) dynamicEventsHandlers.get(path);
48-
}
4945

5046
public Collection<RegisteredEventListener<?>> getNotificationListeners() {
5147
return eventNotificationListeners.values();
@@ -61,16 +57,6 @@ public Collection<RegisteredEventListener<?>> getEventListeners() {
6157
return eventListeners.values();
6258
}
6359

64-
public Set<String> getToListenEventNames() {
65-
Set<String> toListenEventNames = new HashSet<>(eventListeners.size() +
66-
dynamicEventsHandlers.size());
67-
68-
toListenEventNames.addAll(eventListeners.keySet());
69-
toListenEventNames.addAll(dynamicEventsHandlers.keySet());
70-
71-
return toListenEventNames;
72-
}
73-
7460
void addEventListener(RegisteredEventListener<?> listener) {
7561
eventListeners.put(listener.getPath(), listener);
7662
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33
import com.rabbitmq.client.AMQP;
44
import lombok.extern.java.Log;
55
import org.reactivecommons.api.domain.DomainEvent;
6-
import org.reactivecommons.async.commons.communications.Message;
7-
import org.reactivecommons.async.commons.converters.MessageConverter;
8-
import org.reactivecommons.async.commons.DiscardNotifier;
96
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
7+
import org.reactivecommons.async.commons.DiscardNotifier;
108
import org.reactivecommons.async.commons.EventExecutor;
11-
import org.reactivecommons.async.rabbit.HandlerResolver;
12-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
13-
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
9+
import org.reactivecommons.async.commons.communications.Message;
10+
import org.reactivecommons.async.commons.converters.MessageConverter;
1411
import org.reactivecommons.async.commons.ext.CustomReporter;
1512
import org.reactivecommons.async.commons.utils.matcher.KeyMatcher;
1613
import org.reactivecommons.async.commons.utils.matcher.Matcher;
14+
import org.reactivecommons.async.rabbit.HandlerResolver;
15+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
16+
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1717
import reactor.core.publisher.Flux;
1818
import reactor.core.publisher.Mono;
1919
import reactor.rabbitmq.AcknowledgableDelivery;
@@ -83,8 +83,7 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
8383

8484
@Override
8585
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
86-
final String matchedKey = keyMatcher.match(resolver.getToListenEventNames(), executorPath);
87-
final RegisteredEventListener<Object> handler = getEventListener(matchedKey);
86+
final RegisteredEventListener<Object> handler = resolver.getEventListener(executorPath);
8887

8988
final Class<Object> eventClass = handler.getInputClass();
9089
Function<Message, DomainEvent<Object>> converter = msj -> messageConverter.readDomainEvent(msj, eventClass);
@@ -96,16 +95,6 @@ protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath)
9695
.cast(Object.class);
9796
}
9897

99-
private RegisteredEventListener<Object> getEventListener(String matchedKey) {
100-
RegisteredEventListener<Object> eventListener = resolver.getEventListener(matchedKey);
101-
102-
if (eventListener == null) {
103-
return resolver.getDynamicEventsHandler(matchedKey);
104-
}
105-
106-
return eventListener;
107-
}
108-
10998
protected String getExecutorPath(AcknowledgableDelivery msj) {
11099
return msj.getEnvelope().getRoutingKey();
111100
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/DynamicRegistryImpTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ void setUp() {
4646
Map<String, RegisteredCommandHandler<?>> commandHandlers = new ConcurrentHashMap<>();
4747
Map<String, RegisteredEventListener<?>> eventListeners = new ConcurrentHashMap<>();
4848
Map<String, RegisteredEventListener<?>> notificationEventListeners = new ConcurrentHashMap<>();
49-
Map<String, RegisteredEventListener<?>> dynamicEventsHandlers = new ConcurrentHashMap<>();
5049
Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = new ConcurrentHashMap<>();
51-
resolver = new HandlerResolver(queryHandlers, eventListeners, notificationEventListeners,
52-
dynamicEventsHandlers, commandHandlers);
50+
resolver = new HandlerResolver(queryHandlers, eventListeners, notificationEventListeners, commandHandlers);
5351
dynamicRegistry = new DynamicRegistryImp(resolver, topologyCreator, props);
5452
}
5553

@@ -153,7 +151,7 @@ void serveQueryShouldAddHandler() {
153151
.verifyComplete();
154152
}
155153

156-
private void setupMock(){
154+
private void setupMock() {
157155
when(props.getDomainEventsExchangeName()).thenReturn("domainEx");
158156
when(props.getEventsQueue()).thenReturn("events.queue");
159157
}

0 commit comments

Comments
 (0)