Skip to content

Commit 7b534fc

Browse files
committed
event listener unificcation
1 parent 2cd9396 commit 7b534fc

File tree

9 files changed

+81
-115
lines changed

9 files changed

+81
-115
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ public class HandlerRegistry {
2121

2222
private final List<RegisteredEventListener<?>> eventListeners = new CopyOnWriteArrayList<>();
2323
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
24-
private final List<RegisteredEventListener<?>> dynamicEventsHandlers = new CopyOnWriteArrayList<>();
25-
2624
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2725
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
2826

@@ -44,11 +42,27 @@ public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandle
4442
return this;
4543
}
4644

45+
/**
46+
* @param eventNamePattern
47+
* @param handler
48+
* @param eventClass
49+
* @param <T>
50+
* @return HandlerRegistry
51+
* Avoid use this deprecated method please use listenEvent
52+
*/
53+
@Deprecated
4754
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler, Class<T> eventClass) {
48-
dynamicEventsHandlers.add(new RegisteredEventListener<>(eventNamePattern, handler, eventClass));
49-
return this;
55+
return listenEvent(eventNamePattern, handler, eventClass);
5056
}
5157

58+
/**
59+
* @param eventNamePattern
60+
* @param handler
61+
* @param <T>
62+
* @return HandlerRegistry
63+
* Avoid use this deprecated method please use listenEvent
64+
*/
65+
@Deprecated
5266
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler) {
5367
return handleDynamicEvents(eventNamePattern, handler, inferGenericParameterType(handler));
5468
}

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,60 +18,54 @@
1818
import static org.mockito.Mockito.mock;
1919

2020
class HandlerRegistryTest {
21-
22-
private HandlerRegistry registry = HandlerRegistry.register();
23-
private String name = "some.event";
21+
private final HandlerRegistry registry = HandlerRegistry.register();
22+
private final String name = "some.event";
2423

2524
@Test
2625
void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {
2726
SomeEventHandler eventHandler = new SomeEventHandler();
2827

2928
registry.listenEvent(name, eventHandler);
3029

31-
assertThat(registry.getEventListeners()).anySatisfy(registered -> {
32-
assertThat(registered).extracting(RegisteredEventListener::getPath,
33-
RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
34-
.containsExactly(name, SomeDataClass.class, eventHandler);
35-
}).hasSize(1);
30+
assertThat(registry.getEventListeners())
31+
.anySatisfy(registered -> assertThat(registered)
32+
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
33+
.containsExactly(name, SomeDataClass.class, eventHandler)).hasSize(1);
3634
}
3735

3836
@Test
39-
void shouldRegisterDynamicEventsHandlerWithTypeInference() {
37+
void shouldRegisterPatternEventHandlerWithTypeInference() {
4038
SomeEventHandler eventHandler = new SomeEventHandler();
4139

4240
String eventNamePattern = "a.*";
4341

44-
HandlerRegistry resultRegistry = registry.handleDynamicEvents(eventNamePattern, eventHandler);
42+
HandlerRegistry resultRegistry = registry.listenEvent(eventNamePattern, eventHandler);
4543
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
4644
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
4745

48-
assertThat(registry.getDynamicEventsHandlers())
49-
.anySatisfy(registeredEventListener -> {
50-
assertThat(registeredEventListener)
51-
.usingRecursiveComparison()
52-
.isEqualTo(expectedRegisteredEventListener);
53-
});
46+
assertThat(registry.getEventListeners())
47+
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
48+
.usingRecursiveComparison()
49+
.isEqualTo(expectedRegisteredEventListener));
5450

5551
assertThat(resultRegistry)
5652
.isSameAs(registry);
5753
}
5854

5955
@Test
60-
void shouldRegisterDynamicEventsHandler() {
56+
void shouldRegisterPatternEventHandler() {
6157
SomeEventHandler eventHandler = new SomeEventHandler();
6258

6359
String eventNamePattern = "a.*";
6460

65-
HandlerRegistry resultRegistry = registry.handleDynamicEvents(eventNamePattern, eventHandler, SomeDataClass.class);
61+
HandlerRegistry resultRegistry = registry.listenEvent(eventNamePattern, eventHandler, SomeDataClass.class);
6662
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
6763
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
6864

69-
assertThat(registry.getDynamicEventsHandlers())
70-
.anySatisfy(registeredEventListener -> {
71-
assertThat(registeredEventListener)
72-
.usingRecursiveComparison()
73-
.isEqualTo(expectedRegisteredEventListener);
74-
});
65+
assertThat(registry.getEventListeners())
66+
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
67+
.usingRecursiveComparison()
68+
.isEqualTo(expectedRegisteredEventListener));
7569

7670
assertThat(resultRegistry)
7771
.isSameAs(registry);
@@ -80,8 +74,8 @@ void shouldRegisterDynamicEventsHandler() {
8074
@Test
8175
void shouldRegisterNotificationEventListener() {
8276
registry.listenNotificationEvent(name, message -> Mono.empty(), SomeDataClass.class);
83-
assertThat(registry.getEventNotificationListener()).anySatisfy(listener ->
84-
assertThat(listener.getPath()).isEqualTo(name));
77+
assertThat(registry.getEventNotificationListener())
78+
.anySatisfy(listener -> assertThat(listener.getPath()).isEqualTo(name));
8579
}
8680

8781
@Test
@@ -90,11 +84,10 @@ public void listenEvent() {
9084
EventHandler<SomeDataClass> handler = mock(EventHandler.class);
9185
registry.listenEvent(name, handler, SomeDataClass.class);
9286

93-
assertThat(registry.getEventListeners()).anySatisfy(registered -> {
94-
assertThat(registered).extracting(RegisteredEventListener::getPath,
95-
RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
96-
.containsExactly(name, SomeDataClass.class, handler);
97-
}).hasSize(1);
87+
assertThat(registry.getEventListeners())
88+
.anySatisfy(registered -> assertThat(registered)
89+
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
90+
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);
9891
}
9992

10093
@Test
@@ -103,11 +96,10 @@ void handleCommandWithTypeInference() {
10396

10497
registry.handleCommand(name, handler);
10598

106-
assertThat(registry.getCommandHandlers()).anySatisfy(registered -> {
107-
assertThat(registered).extracting(RegisteredCommandHandler::getPath,
108-
RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler)
109-
.containsExactly(name, SomeDataClass.class, handler);
110-
}).hasSize(1);
99+
assertThat(registry.getCommandHandlers())
100+
.anySatisfy(registered -> assertThat(registered)
101+
.extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler)
102+
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);
111103
}
112104

113105
@Test
@@ -135,21 +127,20 @@ void handleQueryWithoutTypeShouldFail() {
135127
void handleCommandWithLambda() {
136128
registry.handleCommand(name, (Command<SomeDataClass> message) -> Mono.empty(), SomeDataClass.class);
137129

138-
assertThat(registry.getCommandHandlers()).anySatisfy(registered -> {
139-
assertThat(registered).extracting(RegisteredCommandHandler::getPath,
140-
RegisteredCommandHandler::getInputClass)
141-
.containsExactly(name, SomeDataClass.class);
142-
}).hasSize(1);
130+
assertThat(registry.getCommandHandlers())
131+
.anySatisfy(registered -> assertThat(registered)
132+
.extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass)
133+
.containsExactly(name, SomeDataClass.class)).hasSize(1);
143134
}
144135

145136

146137
@Test
147138
void serveQueryWithLambda() {
148139
registry.serveQuery(name, message -> Mono.empty(), SomeDataClass.class);
149-
assertThat(registry.getHandlers()).anySatisfy(registered -> {
150-
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
151-
.containsExactly(name, SomeDataClass.class);
152-
}).hasSize(1);
140+
assertThat(registry.getHandlers())
141+
.anySatisfy(registered -> assertThat(registered)
142+
.extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
143+
.containsExactly(name, SomeDataClass.class)).hasSize(1);
153144
}
154145

155146
@Test

async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,6 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
204204
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
205205
ConcurrentHashMap::putAll);
206206

207-
final ConcurrentMap<String, RegisteredEventListener<?>> dynamicEventHandlers = registries
208-
.values().stream()
209-
.flatMap(r -> r.getDynamicEventsHandlers().stream())
210-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
211-
ConcurrentHashMap::putAll);
212-
213207
final ConcurrentMap<String, RegisteredCommandHandler<?>> commandHandlers = registries
214208
.values().stream()
215209
.flatMap(r -> r.getCommandHandlers().stream())
@@ -223,8 +217,7 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
223217
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
224218
ConcurrentHashMap::putAll);
225219

226-
return new HandlerResolver(queryHandlers, eventListeners, eventNotificationListener,
227-
dynamicEventHandlers, commandHandlers) {
220+
return new HandlerResolver(queryHandlers, eventListeners, eventNotificationListener, commandHandlers) {
228221
@Override
229222
@SuppressWarnings("unchecked")
230223
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/HandlerResolver.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
import org.reactivecommons.async.commons.utils.matcher.Matcher;
1010

1111
import java.util.Collection;
12-
import java.util.HashSet;
1312
import java.util.Map;
14-
import java.util.Set;
1513
import java.util.function.Function;
1614

1715
@Log
@@ -21,7 +19,6 @@ public class HandlerResolver {
2119
private final Map<String, RegisteredQueryHandler<?, ?>> queryHandlers;
2220
private final Map<String, RegisteredEventListener<?>> eventListeners;
2321
private final Map<String, RegisteredEventListener<?>> eventNotificationListeners;
24-
private final Map<String, RegisteredEventListener<?>> dynamicEventsHandlers;
2522
private final Map<String, RegisteredCommandHandler<?>> commandHandlers;
2623
private final Matcher matcher = new KeyMatcher();
2724

@@ -39,13 +36,12 @@ public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
3936

4037
@SuppressWarnings("unchecked")
4138
public <T> RegisteredEventListener<T> getEventListener(String path) {
42-
return (RegisteredEventListener<T>) eventListeners.get(path);
39+
if (eventListeners.containsKey(path)) {
40+
return (RegisteredEventListener<T>) eventListeners.get(path);
41+
}
42+
return (RegisteredEventListener<T>) getMatchHandler(eventListeners).apply(path);
4343
}
4444

45-
@SuppressWarnings("unchecked")
46-
public <T> RegisteredEventListener<T> getDynamicEventsHandler(String path) {
47-
return (RegisteredEventListener<T>) dynamicEventsHandlers.get(path);
48-
}
4945

5046
public Collection<RegisteredEventListener<?>> getNotificationListeners() {
5147
return eventNotificationListeners.values();
@@ -61,16 +57,6 @@ public Collection<RegisteredEventListener<?>> getEventListeners() {
6157
return eventListeners.values();
6258
}
6359

64-
public Set<String> getToListenEventNames() {
65-
Set<String> toListenEventNames = new HashSet<>(eventListeners.size() +
66-
dynamicEventsHandlers.size());
67-
68-
toListenEventNames.addAll(eventListeners.keySet());
69-
toListenEventNames.addAll(dynamicEventsHandlers.keySet());
70-
71-
return toListenEventNames;
72-
}
73-
7460
void addEventListener(RegisteredEventListener<?> listener) {
7561
eventListeners.put(listener.getPath(), listener);
7662
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33
import com.rabbitmq.client.AMQP;
44
import lombok.extern.java.Log;
55
import org.reactivecommons.api.domain.DomainEvent;
6-
import org.reactivecommons.async.commons.communications.Message;
7-
import org.reactivecommons.async.commons.converters.MessageConverter;
8-
import org.reactivecommons.async.commons.DiscardNotifier;
96
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
7+
import org.reactivecommons.async.commons.DiscardNotifier;
108
import org.reactivecommons.async.commons.EventExecutor;
11-
import org.reactivecommons.async.rabbit.HandlerResolver;
12-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
13-
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
9+
import org.reactivecommons.async.commons.communications.Message;
10+
import org.reactivecommons.async.commons.converters.MessageConverter;
1411
import org.reactivecommons.async.commons.ext.CustomReporter;
1512
import org.reactivecommons.async.commons.utils.matcher.KeyMatcher;
1613
import org.reactivecommons.async.commons.utils.matcher.Matcher;
14+
import org.reactivecommons.async.rabbit.HandlerResolver;
15+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
16+
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1717
import reactor.core.publisher.Flux;
1818
import reactor.core.publisher.Mono;
1919
import reactor.rabbitmq.AcknowledgableDelivery;
@@ -83,8 +83,7 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
8383

8484
@Override
8585
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
86-
final String matchedKey = keyMatcher.match(resolver.getToListenEventNames(), executorPath);
87-
final RegisteredEventListener<Object> handler = getEventListener(matchedKey);
86+
final RegisteredEventListener<Object> handler = resolver.getEventListener(executorPath);
8887

8988
final Class<Object> eventClass = handler.getInputClass();
9089
Function<Message, DomainEvent<Object>> converter = msj -> messageConverter.readDomainEvent(msj, eventClass);
@@ -96,16 +95,6 @@ protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath)
9695
.cast(Object.class);
9796
}
9897

99-
private RegisteredEventListener<Object> getEventListener(String matchedKey) {
100-
RegisteredEventListener<Object> eventListener = resolver.getEventListener(matchedKey);
101-
102-
if (eventListener == null) {
103-
return resolver.getDynamicEventsHandler(matchedKey);
104-
}
105-
106-
return eventListener;
107-
}
108-
10998
protected String getExecutorPath(AcknowledgableDelivery msj) {
11099
return msj.getEnvelope().getRoutingKey();
111100
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/DynamicRegistryImpTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ void setUp() {
4646
Map<String, RegisteredCommandHandler<?>> commandHandlers = new ConcurrentHashMap<>();
4747
Map<String, RegisteredEventListener<?>> eventListeners = new ConcurrentHashMap<>();
4848
Map<String, RegisteredEventListener<?>> notificationEventListeners = new ConcurrentHashMap<>();
49-
Map<String, RegisteredEventListener<?>> dynamicEventsHandlers = new ConcurrentHashMap<>();
5049
Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = new ConcurrentHashMap<>();
51-
resolver = new HandlerResolver(queryHandlers, eventListeners, notificationEventListeners,
52-
dynamicEventsHandlers, commandHandlers);
50+
resolver = new HandlerResolver(queryHandlers, eventListeners, notificationEventListeners, commandHandlers);
5351
dynamicRegistry = new DynamicRegistryImp(resolver, topologyCreator, props);
5452
}
5553

@@ -153,7 +151,7 @@ void serveQueryShouldAddHandler() {
153151
.verifyComplete();
154152
}
155153

156-
private void setupMock(){
154+
private void setupMock() {
157155
when(props.getDomainEventsExchangeName()).thenReturn("domainEx");
158156
when(props.getEventsQueue()).thenReturn("events.queue");
159157
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,30 @@
1717
import org.reactivecommons.async.api.HandlerRegistry;
1818
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1919
import org.reactivecommons.async.commons.DiscardNotifier;
20+
import org.reactivecommons.async.commons.converters.MessageConverter;
21+
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
22+
import org.reactivecommons.async.commons.ext.CustomReporter;
2023
import org.reactivecommons.async.rabbit.HandlerResolver;
2124
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
2225
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
23-
import org.reactivecommons.async.commons.converters.MessageConverter;
24-
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
2526
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
26-
import org.reactivecommons.async.commons.ext.CustomReporter;
2727
import org.reactivecommons.async.utils.TestUtils;
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.Mono;
30-
import reactor.rabbitmq.*;
30+
import reactor.rabbitmq.AcknowledgableDelivery;
31+
import reactor.rabbitmq.BindingSpecification;
32+
import reactor.rabbitmq.ExchangeSpecification;
33+
import reactor.rabbitmq.Receiver;
3134

3235
import java.math.BigInteger;
3336
import java.time.Duration;
3437
import java.util.*;
3538
import java.util.concurrent.*;
36-
import java.util.concurrent.atomic.AtomicBoolean;
3739
import java.util.stream.Collectors;
3840
import java.util.stream.IntStream;
3941

4042
import static org.mockito.ArgumentMatchers.any;
4143
import static org.mockito.Mockito.mock;
42-
import static org.mockito.Mockito.when;
43-
import static reactor.core.publisher.Flux.defer;
4444
import static reactor.core.publisher.Flux.range;
4545
import static reactor.core.publisher.Mono.just;
4646

@@ -271,7 +271,7 @@ private HandlerResolver createHandlerResolver(final HandlerRegistry initialRegis
271271
final HandlerRegistry registry = range(0, 20).reduce(initialRegistry, (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
272272
final ConcurrentMap<String, RegisteredCommandHandler<?>> commandHandlers = registry.getCommandHandlers().stream()
273273
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll);
274-
return new HandlerResolver(null, null, null, null, commandHandlers) {
274+
return new HandlerResolver(null, null, null, commandHandlers) {
275275
@Override
276276
@SuppressWarnings("unchecked")
277277
public RegisteredCommandHandler<Object> getCommandHandler(String path) {

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void setUp() {
8080
QueryHandler<String, SampleClass> handler = (message) -> just("OK");
8181
handlers.put("queryDirect", new RegisteredQueryHandler<>("queryDirect",
8282
(from, message) -> handler.handle(message), SampleClass.class));
83-
HandlerResolver resolver = new HandlerResolver(handlers, null, null, null, null);
83+
HandlerResolver resolver = new HandlerResolver(handlers, null, null, null);
8484
applicationQueryListener = new ApplicationQueryListener(reactiveMessageListener, "queue", resolver, sender,
8585
"directExchange", messageConverter, "replyExchange", false,
8686
1, 100, maxLengthBytes, true, discardNotifier, errorReporter);

0 commit comments

Comments
 (0)