Skip to content

Commit d36197c

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #65 from reactive-commons/fix/dynamic-event-binding
fix event bindings for dynamic events
2 parents e34e691 + 9bbc473 commit d36197c

File tree

11 files changed

+113
-36
lines changed

11 files changed

+113
-36
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
public class HandlerRegistry {
2121

2222
private final List<RegisteredEventListener<?>> eventListeners = new CopyOnWriteArrayList<>();
23+
private final List<RegisteredEventListener<?>> dynamicEventHandlers = new CopyOnWriteArrayList<>();
2324
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
2425
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2526
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
@@ -42,27 +43,11 @@ public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandle
4243
return this;
4344
}
4445

45-
/**
46-
* @param eventNamePattern
47-
* @param handler
48-
* @param eventClass
49-
* @param <T>
50-
* @return HandlerRegistry
51-
* Avoid use this deprecated method please use listenEvent
52-
*/
53-
@Deprecated
5446
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler, Class<T> eventClass) {
55-
return listenEvent(eventNamePattern, handler, eventClass);
47+
dynamicEventHandlers.add(new RegisteredEventListener<>(eventNamePattern, handler, eventClass));
48+
return this;
5649
}
5750

58-
/**
59-
* @param eventNamePattern
60-
* @param handler
61-
* @param <T>
62-
* @return HandlerRegistry
63-
* Avoid use this deprecated method please use listenEvent
64-
*/
65-
@Deprecated
6651
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler) {
6752
return handleDynamicEvents(eventNamePattern, handler, inferGenericParameterType(handler));
6853
}

async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
import org.reactivecommons.api.domain.Command;
88
import org.reactivecommons.api.domain.DomainEvent;
99
import org.reactivecommons.api.domain.DomainEventBus;
10-
import org.reactivecommons.async.api.*;
10+
import org.reactivecommons.async.api.AsyncQuery;
11+
import org.reactivecommons.async.api.DefaultCommandHandler;
12+
import org.reactivecommons.async.api.DefaultQueryHandler;
13+
import org.reactivecommons.async.api.DynamicRegistry;
14+
import org.reactivecommons.async.api.HandlerRegistry;
1115
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1216
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1317
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
@@ -38,7 +42,15 @@
3842
import org.springframework.context.annotation.Configuration;
3943
import org.springframework.context.annotation.Import;
4044
import reactor.core.publisher.Mono;
41-
import reactor.rabbitmq.*;
45+
import reactor.rabbitmq.ChannelPool;
46+
import reactor.rabbitmq.ChannelPoolFactory;
47+
import reactor.rabbitmq.ChannelPoolOptions;
48+
import reactor.rabbitmq.RabbitFlux;
49+
import reactor.rabbitmq.Receiver;
50+
import reactor.rabbitmq.ReceiverOptions;
51+
import reactor.rabbitmq.Sender;
52+
import reactor.rabbitmq.SenderOptions;
53+
import reactor.rabbitmq.Utils;
4254
import reactor.util.retry.Retry;
4355

4456
import java.security.KeyManagementException;
@@ -48,6 +60,7 @@
4860
import java.util.concurrent.ConcurrentHashMap;
4961
import java.util.concurrent.ConcurrentMap;
5062
import java.util.logging.Level;
63+
import java.util.stream.Stream;
5164

5265
import static reactor.rabbitmq.ExchangeSpecification.exchange;
5366

@@ -198,12 +211,19 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
198211
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
199212
ConcurrentHashMap::putAll);
200213

201-
final ConcurrentMap<String, RegisteredEventListener<?>> eventListeners = registries
214+
final ConcurrentMap<String, RegisteredEventListener<?>> eventsToBind = registries
202215
.values().stream()
203216
.flatMap(r -> r.getEventListeners().stream())
204217
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
205218
ConcurrentHashMap::putAll);
206219

220+
// event handlers and dynamic handlers
221+
final ConcurrentMap<String, RegisteredEventListener<?>> eventHandlers = registries
222+
.values().stream()
223+
.flatMap(r -> Stream.concat(r.getEventListeners().stream(), r.getDynamicEventHandlers().stream()))
224+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
225+
ConcurrentHashMap::putAll);
226+
207227
final ConcurrentMap<String, RegisteredCommandHandler<?>> commandHandlers = registries
208228
.values().stream()
209229
.flatMap(r -> r.getCommandHandlers().stream())
@@ -217,7 +237,7 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
217237
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
218238
ConcurrentHashMap::putAll);
219239

220-
return new HandlerResolver(queryHandlers, eventListeners, eventNotificationListener, commandHandlers) {
240+
return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, commandHandlers) {
221241
@Override
222242
@SuppressWarnings("unchecked")
223243
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/HandlerResolver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class HandlerResolver {
1818

1919
private final Map<String, RegisteredQueryHandler<?, ?>> queryHandlers;
2020
private final Map<String, RegisteredEventListener<?>> eventListeners;
21+
private final Map<String, RegisteredEventListener<?>> eventsToBind;
2122
private final Map<String, RegisteredEventListener<?>> eventNotificationListeners;
2223
private final Map<String, RegisteredCommandHandler<?>> commandHandlers;
2324
private final Matcher matcher = new KeyMatcher();
@@ -53,8 +54,9 @@ public <T> RegisteredEventListener<T> getNotificationListener(String path) {
5354
.computeIfAbsent(path, getMatchHandler(eventNotificationListeners));
5455
}
5556

57+
// Returns only the listenEvent not the handleDynamicEvents
5658
public Collection<RegisteredEventListener<?>> getEventListeners() {
57-
return eventListeners.values();
59+
return eventsToBind.values();
5860
}
5961

6062
void addEventListener(RegisteredEventListener<?> listener) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import org.reactivecommons.async.commons.communications.Message;
1010
import org.reactivecommons.async.commons.converters.MessageConverter;
1111
import org.reactivecommons.async.commons.ext.CustomReporter;
12-
import org.reactivecommons.async.commons.utils.matcher.KeyMatcher;
13-
import org.reactivecommons.async.commons.utils.matcher.Matcher;
1412
import org.reactivecommons.async.rabbit.HandlerResolver;
1513
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1614
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
@@ -35,7 +33,6 @@ public class ApplicationEventListener extends GenericMessageListener {
3533
private final boolean withDLQRetry;
3634
private final int retryDelay;
3735
private final Optional<Integer> maxLengthBytes;
38-
private final Matcher keyMatcher;
3936
private final String appName;
4037

4138

@@ -57,7 +54,6 @@ public ApplicationEventListener(ReactiveMessageListener receiver,
5754
this.eventsExchange = eventsExchange;
5855
this.messageConverter = messageConverter;
5956
this.maxLengthBytes = maxLengthBytes;
60-
this.keyMatcher = new KeyMatcher();
6157
this.appName = appName;
6258
}
6359

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/DynamicRegistryImpTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ class DynamicRegistryImpTest {
4545
void setUp() {
4646
Map<String, RegisteredCommandHandler<?>> commandHandlers = new ConcurrentHashMap<>();
4747
Map<String, RegisteredEventListener<?>> eventListeners = new ConcurrentHashMap<>();
48+
Map<String, RegisteredEventListener<?>> eventsToBind = new ConcurrentHashMap<>();
4849
Map<String, RegisteredEventListener<?>> notificationEventListeners = new ConcurrentHashMap<>();
4950
Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = new ConcurrentHashMap<>();
50-
resolver = new HandlerResolver(queryHandlers, eventListeners, notificationEventListeners, commandHandlers);
51+
resolver = new HandlerResolver(queryHandlers, eventListeners, eventsToBind, notificationEventListeners, commandHandlers);
5152
dynamicRegistry = new DynamicRegistryImp(resolver, topologyCreator, props);
5253
}
5354

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.reactivecommons.async.rabbit;
2+
3+
import org.assertj.core.api.Assertions;
4+
import org.junit.jupiter.api.BeforeEach;
5+
import org.junit.jupiter.api.Test;
6+
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
7+
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
8+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
9+
import reactor.core.publisher.Mono;
10+
11+
import java.util.Collection;
12+
import java.util.Map;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
15+
class HandlerResolverTest {
16+
private HandlerResolver resolver;
17+
18+
@BeforeEach
19+
void setup() {
20+
Map<String, RegisteredCommandHandler<?>> commandHandlers = new ConcurrentHashMap<>();
21+
Map<String, RegisteredEventListener<?>> eventListeners = new ConcurrentHashMap<>();
22+
eventListeners.put("event.name", new RegisteredEventListener<>("event.name", message -> Mono.empty(), String.class));
23+
eventListeners.put("event.name2", new RegisteredEventListener<>("event.name2", message -> Mono.empty(), String.class));
24+
eventListeners.put("some.*", new RegisteredEventListener<>("some.*", message -> Mono.empty(), String.class));
25+
Map<String, RegisteredEventListener<?>> eventsToBind = new ConcurrentHashMap<>();
26+
eventsToBind.put("event.name", new RegisteredEventListener<>("event.name", message -> Mono.empty(), String.class));
27+
eventsToBind.put("event.name2", new RegisteredEventListener<>("event.name2", message -> Mono.empty(), String.class));
28+
Map<String, RegisteredEventListener<?>> notificationEventListeners = new ConcurrentHashMap<>();
29+
Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = new ConcurrentHashMap<>();
30+
resolver = new HandlerResolver(queryHandlers, eventListeners, eventsToBind, notificationEventListeners, commandHandlers);
31+
}
32+
33+
@Test
34+
void shouldGetOnlyTheBindingEvents() {
35+
// Act
36+
Collection<RegisteredEventListener<?>> eventListener = resolver.getEventListeners();
37+
// Assert
38+
Assertions.assertThat(eventListener.size()).isEqualTo(2);
39+
}
40+
41+
@Test
42+
void shouldMatchForAWildcardEvent() {
43+
// Act
44+
RegisteredEventListener<Object> eventListener = resolver.getEventListener("some.sample");
45+
// Assert
46+
Assertions.assertThat(eventListener.getPath()).isEqualTo("some.*");
47+
}
48+
49+
@Test
50+
void shouldMatchForAnExactEvent() {
51+
// Act
52+
RegisteredEventListener<Object> eventListener = resolver.getEventListener("event.name");
53+
// Assert
54+
Assertions.assertThat(eventListener.getPath()).isEqualTo("event.name");
55+
}
56+
57+
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ private HandlerResolver createHandlerResolver(final HandlerRegistry initialRegis
271271
final HandlerRegistry registry = range(0, 20).reduce(initialRegistry, (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
272272
final ConcurrentMap<String, RegisteredCommandHandler<?>> commandHandlers = registry.getCommandHandlers().stream()
273273
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll);
274-
return new HandlerResolver(null, null, null, commandHandlers) {
274+
return new HandlerResolver(null, null, null, null, commandHandlers) {
275275
@Override
276276
@SuppressWarnings("unchecked")
277277
public RegisteredCommandHandler<Object> getCommandHandler(String path) {

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void setUp() {
8080
QueryHandler<String, SampleClass> handler = (message) -> just("OK");
8181
handlers.put("queryDirect", new RegisteredQueryHandler<>("queryDirect",
8282
(from, message) -> handler.handle(message), SampleClass.class));
83-
HandlerResolver resolver = new HandlerResolver(handlers, null, null, null);
83+
HandlerResolver resolver = new HandlerResolver(handlers, null,null, null, null);
8484
applicationQueryListener = new ApplicationQueryListener(reactiveMessageListener, "queue", resolver, sender,
8585
"directExchange", messageConverter, "replyExchange", false,
8686
1, 100, maxLengthBytes, true, discardNotifier, errorReporter);

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,18 @@
2424
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
2525
import org.reactivecommons.async.utils.TestUtils;
2626
import reactor.core.publisher.Flux;
27-
import reactor.rabbitmq.*;
28-
29-
import java.util.*;
27+
import reactor.rabbitmq.AcknowledgableDelivery;
28+
import reactor.rabbitmq.BindingSpecification;
29+
import reactor.rabbitmq.ConsumeOptions;
30+
import reactor.rabbitmq.ExchangeSpecification;
31+
import reactor.rabbitmq.Receiver;
32+
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.Random;
3039
import java.util.concurrent.ConcurrentHashMap;
3140
import java.util.concurrent.Semaphore;
3241
import java.util.concurrent.TimeUnit;
@@ -38,7 +47,9 @@
3847
import static java.util.stream.Collectors.toMap;
3948
import static org.assertj.core.api.Assertions.assertThat;
4049
import static org.mockito.ArgumentMatchers.any;
41-
import static org.mockito.Mockito.*;
50+
import static org.mockito.Mockito.mock;
51+
import static org.mockito.Mockito.verify;
52+
import static org.mockito.Mockito.when;
4253
import static reactor.core.publisher.Mono.empty;
4354
import static reactor.core.publisher.Mono.just;
4455

@@ -120,13 +131,15 @@ protected <T> Flux<AcknowledgableDelivery> createSource(Function<T, String> rout
120131
protected abstract GenericMessageListener createMessageListener(final HandlerResolver handlerResolver);
121132

122133
private HandlerResolver createHandlerResolver(final HandlerRegistry registry) {
123-
final Map<String, RegisteredEventListener<?>> eventHandlers = registry.getEventListeners().stream().collect(toMap(RegisteredEventListener::getPath, identity()));
134+
final Map<String, RegisteredEventListener<?>> eventHandlers = Stream.concat(registry.getDynamicEventHandlers().stream(), registry.getEventListeners().stream()).collect(toMap(RegisteredEventListener::getPath, identity()));
135+
final Map<String, RegisteredEventListener<?>> eventsToBind = registry.getEventListeners().stream().collect(toMap(RegisteredEventListener::getPath, identity()));
124136
final Map<String, RegisteredEventListener<?>> notificationHandlers = registry.getEventNotificationListener().stream().collect(toMap(RegisteredEventListener::getPath, identity()));
125137
final Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = registry.getHandlers().stream().collect(toMap(RegisteredQueryHandler::getPath, identity()));
126138
final Map<String, RegisteredCommandHandler<?>> commandHandlers = registry.getCommandHandlers().stream().collect(toMap(RegisteredCommandHandler::getPath, identity()));
127139
return new HandlerResolver(
128140
new ConcurrentHashMap<>(queryHandlers),
129141
new ConcurrentHashMap<>(eventHandlers),
142+
new ConcurrentHashMap<>(eventsToBind),
130143
new ConcurrentHashMap<>(notificationHandlers),
131144
new ConcurrentHashMap<>(commandHandlers));
132145
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=1.0.3
1+
version=1.0.4
22
springBootVersion=2.4.2
33
gradleVersionsVersion=0.36.0
44
toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,domain-events-api,async-rabbit

0 commit comments

Comments
 (0)