Skip to content

Commit f50d371

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #39 from kwah07/feature/notifications
Adds event notification listener
2 parents c69f7ef + 1edf834 commit f50d371

File tree

11 files changed

+216
-24
lines changed

11 files changed

+216
-24
lines changed

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,31 @@ Don't forget to add the implementation dependency to the main spring boot module
128128
}
129129
```
130130

131+
### Domain Event-Listener
132+
Reactive commons has four types of listeners implemented, available to be registered in the application via the **HandlerRegistry**, each of them is designed to tackle
133+
common requirements for listeners in event based applications and abstracts the behavior of event flow in every situation (Varying for example in retrying strategy, dead letter events, sources and so on).
134+
135+
The available event listeners are:
136+
- Domain Event Listener
137+
- Query Event Listener
138+
- Command Listener
139+
- Notification Listener
140+
141+
Example Code:
142+
```java
143+
public HandlerRegistry notificationEvents() {
144+
return HandlerRegistry.register()
145+
.listenNotificationEvent("gatewayRouteAdded", message -> {
146+
System.out.println("Refreshing instance");
147+
return Mono.empty();
148+
},GatewayEvent.class);
149+
}
150+
```
151+
152+
The above code shows how to handle a notification event (Notification event: an event that should be handled by
153+
every running instance of a microservice, e.g: notify to every instance that a configuration setting has changed
154+
and has to do a hot reload from a persistent source of that data).
155+
131156
### Request-Reply
132157
Example Code:
133158

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public static HandlerRegistry register(){
2525
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2626
private final List<RegisteredEventListener> eventListeners = new CopyOnWriteArrayList<>();
2727
private final List<RegisteredCommandHandler> commandHandlers = new CopyOnWriteArrayList<>();
28+
private final List<RegisteredEventListener> eventNotificationListener = new CopyOnWriteArrayList<>();
2829

2930
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
3031
eventListeners.add(new RegisteredEventListener<>(eventName, fn, eventClass));
@@ -36,6 +37,11 @@ public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler
3637
return this;
3738
}
3839

40+
public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
41+
eventNotificationListener.add(new RegisteredEventListener<>(eventName, fn, eventClass));
42+
return this;
43+
}
44+
3945
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn, Class<T> commandClass){
4046
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, commandClass));
4147
return this;

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.reactivecommons.async.impl.converters.MessageConverter;
1818
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1919
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
20+
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
2021
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
2122
import org.springframework.beans.factory.annotation.Value;
2223
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -48,9 +49,21 @@ public ApplicationEventListener eventListener(HandlerResolver resolver, MessageC
4849
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
4950
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),
5051
discardNotifier);
51-
5252
listener.startListener();
53+
return listener;
54+
}
5355

56+
@Bean
57+
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
58+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
59+
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
60+
receiver,
61+
asyncProps.getDomain().getEvents().getExchange(),
62+
asyncProps.getNotificationProps().getQueueName(appName),
63+
resolver,
64+
messageConverter,
65+
discardNotifier);
66+
listener.startListener();
5467
return listener;
5568
}
5669

@@ -62,9 +75,7 @@ public ApplicationQueryListener queryListener(MessageConverter converter, Handle
6275
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
6376
"globalReply", asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
6477
asyncProps.getRetryDelay(), discardNotifier);
65-
6678
listener.startListener();
67-
6879
return listener;
6980
}
7081

@@ -75,9 +86,7 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList
7586
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
7687
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
7788
asyncProps.getRetryDelay(), discardNotifier);
78-
7989
commandListener.startListener();
80-
8190
return commandListener;
8291
}
8392

@@ -108,7 +117,14 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
108117
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
109118
ConcurrentHashMap::putAll);
110119

111-
return new HandlerResolver(handlers, eventListeners, commandHandlers) {
120+
final ConcurrentMap<String, RegisteredEventListener> eventNotificationListener = registries
121+
.values()
122+
.stream()
123+
.flatMap(r -> r.getEventNotificationListener().stream())
124+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
125+
ConcurrentHashMap::putAll);
126+
127+
return new HandlerResolver(handlers, eventListeners, commandHandlers, eventNotificationListener) {
112128
@Override
113129
@SuppressWarnings("unchecked")
114130
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
@@ -125,7 +141,6 @@ public DefaultQueryHandler defaultHandler() {
125141
Mono.error(new RuntimeException("No Handler Registered"));
126142
}
127143

128-
129144
@Bean
130145
@ConditionalOnMissingBean
131146
public DefaultCommandHandler defaultCommandHandler() {

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/props/AsyncProps.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ public class AsyncProps {
2020
@NestedConfigurationProperty
2121
private DirectProps direct = new DirectProps();
2222

23+
@NestedConfigurationProperty
24+
private NotificationProps notificationProps = new NotificationProps();
25+
2326
private Integer maxRetries = 10;
2427

2528
private Integer prefetchCount = 250;
2629

2730
private Integer retryDelay = 1000;
2831

2932
private Boolean withDLQRetry = false;
30-
3133
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import lombok.Getter;
44
import lombok.RequiredArgsConstructor;
55
import org.reactivecommons.async.impl.config.IBrokerConfigProps;
6+
import org.reactivecommons.async.impl.utils.NameGenerator;
67
import org.springframework.beans.factory.annotation.Value;
78
import org.springframework.context.annotation.Configuration;
89
import org.springframework.util.Base64Utils;
@@ -19,9 +20,7 @@ public class BrokerConfigProps implements IBrokerConfigProps {
1920

2021
@Value("${spring.application.name}")
2122
private String appName;
22-
2323
private final AsyncProps asyncProps;
24-
2524
private final AtomicReference<String> replyQueueName = new AtomicReference<>();
2625

2726
@Override
@@ -43,7 +42,7 @@ public String getCommandsQueue() {
4342
public String getReplyQueue() {
4443
final String name = replyQueueName.get();
4544
if (name == null) {
46-
final String replyName = newRandomQueueName();
45+
final String replyName = NameGenerator.generateNameFrom(appName);
4746
if (replyQueueName.compareAndSet(null, replyName)) {
4847
return replyName;
4948
} else {
@@ -62,14 +61,4 @@ public String getDomainEventsExchangeName() {
6261
public String getDirectMessagesExchangeName() {
6362
return asyncProps.getDirect().getExchange();
6463
}
65-
66-
private String newRandomQueueName() {
67-
UUID uuid = UUID.randomUUID();
68-
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
69-
bb.putLong(uuid.getMostSignificantBits())
70-
.putLong(uuid.getLeastSignificantBits());
71-
return appName + Base64Utils.encodeToUrlSafeString(bb.array())
72-
.replaceAll("=", "");
73-
}
74-
7564
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
import org.reactivecommons.async.impl.utils.NameGenerator;
6+
7+
import java.util.concurrent.atomic.AtomicReference;
8+
9+
@Getter
10+
@RequiredArgsConstructor
11+
public class NotificationProps {
12+
13+
private final AtomicReference<String> queueName = new AtomicReference<>();
14+
private final String queueSuffix = "notification";
15+
16+
public String getQueueName(String applicationName) {
17+
final String name = this.queueName.get();
18+
if(name == null) return getGeneratedName(applicationName);
19+
return name;
20+
}
21+
22+
private String getGeneratedName(String applicationName) {
23+
String generatedName = NameGenerator.generateNameFrom(applicationName, queueSuffix);
24+
return this.queueName
25+
.compareAndSet(null, generatedName) ?
26+
generatedName : this.queueName.get();
27+
}
28+
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/HandlerResolver.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class HandlerResolver {
1414
private final Map<String, RegisteredQueryHandler> queryHandlers;
1515
private final Map<String, RegisteredEventListener> eventListeners;
1616
private final Map<String, RegisteredCommandHandler> commandHandlers;
17-
17+
private final Map<String , RegisteredEventListener> eventNotificationListeners;
1818

1919
@SuppressWarnings("unchecked")
2020
public <T, R> RegisteredQueryHandler<T, R> getQueryHandler(String path) {
@@ -31,6 +31,14 @@ public <T> RegisteredEventListener<T> getEventListener(String path) {
3131
return eventListeners.get(path);
3232
}
3333

34+
public Collection<RegisteredEventListener> getNotificationListeners() {
35+
return eventNotificationListeners.values();
36+
}
37+
38+
public <T> RegisteredEventListener<T> getNotificationListener(String path) {
39+
return eventNotificationListeners.get(path);
40+
}
41+
3442
public Collection<RegisteredEventListener> getEventListeners() {
3543
return eventListeners.values();
3644
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package org.reactivecommons.async.impl.listeners;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import lombok.extern.java.Log;
5+
import org.reactivecommons.api.domain.DomainEvent;
6+
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
7+
import org.reactivecommons.async.impl.DiscardNotifier;
8+
import org.reactivecommons.async.impl.EventExecutor;
9+
import org.reactivecommons.async.impl.HandlerResolver;
10+
import org.reactivecommons.async.impl.communications.Message;
11+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
12+
import org.reactivecommons.async.impl.communications.TopologyCreator;
13+
import org.reactivecommons.async.impl.converters.MessageConverter;
14+
import reactor.core.publisher.Flux;
15+
import reactor.core.publisher.Mono;
16+
import reactor.rabbitmq.AcknowledgableDelivery;
17+
18+
import java.util.function.Function;
19+
20+
import static reactor.core.publisher.Flux.fromIterable;
21+
import static reactor.rabbitmq.BindingSpecification.binding;
22+
import static reactor.rabbitmq.ExchangeSpecification.exchange;
23+
import static reactor.rabbitmq.QueueSpecification.queue;
24+
25+
@Log
26+
public class ApplicationNotificationListener extends GenericMessageListener {
27+
28+
private final MessageConverter messageConverter;
29+
private final HandlerResolver resolver;
30+
private final String exchangeName;
31+
32+
33+
public ApplicationNotificationListener(ReactiveMessageListener receiver,
34+
String exchangeName,
35+
String queueName,
36+
HandlerResolver handlerResolver,
37+
MessageConverter messageConverter,
38+
DiscardNotifier discardNotifier) {
39+
super(queueName,receiver,false,1,discardNotifier,"event");
40+
this.resolver = handlerResolver;
41+
this.messageConverter = messageConverter;
42+
this.exchangeName = exchangeName;
43+
}
44+
45+
protected Mono<Void> setUpBindings(TopologyCreator creator) {
46+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
47+
.type("topic")
48+
.durable(true));
49+
50+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
51+
queue(queueName)
52+
.durable(false)
53+
.autoDelete(true)
54+
.exclusive(true));
55+
56+
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getNotificationListeners())
57+
.flatMap(listener -> creator.bind(binding(exchangeName, listener.getPath(), queueName)));
58+
59+
return declareExchange
60+
.then(declareQueue)
61+
.thenMany(bindings)
62+
.then();
63+
}
64+
65+
@Override
66+
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
67+
final RegisteredEventListener<Object> eventListener = resolver.getNotificationListener(executorPath);
68+
final Function<Message, DomainEvent<Object>> converter = message -> messageConverter
69+
.readDomainEvent(message, eventListener.getInputClass());
70+
final EventExecutor<Object> executor = new EventExecutor<>(eventListener.getHandler(), converter);
71+
return message -> executor
72+
.execute(message)
73+
.cast(Object.class);
74+
}
75+
76+
@Override
77+
protected String getExecutorPath(AcknowledgableDelivery message) {
78+
return message.getEnvelope()
79+
.getRoutingKey();
80+
}
81+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.reactivecommons.async.impl.utils;
2+
3+
import java.nio.ByteBuffer;
4+
import java.util.Base64;
5+
import java.util.UUID;
6+
7+
public class NameGenerator {
8+
9+
public static String generateNameFrom(String applicationName, String suffix) {
10+
return generateName(applicationName,suffix);
11+
}
12+
13+
public static String generateNameFrom(String applicationName) {
14+
return generateName(applicationName,"");
15+
}
16+
17+
private static String generateName(String applicationName, String suffix) {
18+
UUID uuid = UUID.randomUUID();
19+
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
20+
bb.putLong(uuid.getMostSignificantBits())
21+
.putLong(uuid.getLeastSignificantBits());
22+
// Convert to base64 and remove trailing =
23+
return applicationName+"-"+ suffix + "-" + encodeToUrlSafeString(bb.array())
24+
.replace("=", "");
25+
}
26+
27+
private static String encodeToUrlSafeString(byte[] src) {
28+
return new String(encodeUrlSafe(src));
29+
}
30+
31+
private static byte[] encodeUrlSafe(byte[] src) {
32+
if (src.length == 0) {
33+
return src;
34+
}
35+
return Base64.getUrlEncoder().encode(src);
36+
}
37+
}

async/async-commons/src/test/java/org/reactivecommons/async/impl/DynamicRegistryImpTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public void init() {
4242
Map<String, RegisteredCommandHandler> commandHandlers = new ConcurrentHashMap<>();
4343
Map<String, RegisteredEventListener> eventListeners = new ConcurrentHashMap<>();
4444
Map<String, RegisteredQueryHandler> queryHandlers = new ConcurrentHashMap<>();
45-
resolver = new HandlerResolver(queryHandlers, eventListeners, commandHandlers);
45+
Map<String, RegisteredEventListener> notificationEventListeners = new ConcurrentHashMap<>();
46+
resolver = new HandlerResolver(queryHandlers, eventListeners, commandHandlers,notificationEventListeners);
4647
when(props.getDomainEventsExchangeName()).thenReturn("domainEx");
4748
when(props.getEventsQueue()).thenReturn("events.queue");
4849
registry = new DynamicRegistryImp(resolver, topologyCreator, props);

0 commit comments

Comments
 (0)