Skip to content

Commit 10abaf7

Browse files
committed
fix(notifier): discard notifier for eda variant
1 parent a829053 commit 10abaf7

File tree

3 files changed

+15
-14
lines changed

3 files changed

+15
-14
lines changed

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public void forListener(BiConsumer<String, ReactiveMessageListener> consumer) {
3232
connections.forEach((key, conn) -> consumer.accept(key, conn.getListener()));
3333
}
3434

35-
public void setDiscardNotifier(String domain, DiscardNotifier discardNotifier) {
36-
getChecked(domain).setDiscardNotifier(discardNotifier);
35+
public void setDiscardNotifierForAll(DiscardNotifier discardNotifier) {
36+
connections.forEach((key, conn) -> conn.setDiscardNotifier(discardNotifier));
3737
}
3838

3939
public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender,
Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package org.reactivecommons.async.rabbit.config;
22

33
import org.reactivecommons.api.domain.DomainEventBus;
4-
import org.reactivecommons.async.commons.DiscardNotifier;
54
import org.reactivecommons.async.commons.config.BrokerConfig;
6-
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
7-
import org.reactivecommons.async.rabbit.RabbitDiscardNotifier;
85
import org.reactivecommons.async.rabbit.RabbitDomainEventBus;
96
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
107
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
@@ -22,19 +19,13 @@ public class EventBusConfig {
2219

2320
@Bean // app connection
2421
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfig config,
25-
AsyncPropsDomain asyncPropsDomain, ObjectMapperSupplier objectMapperSupplier) {
22+
AsyncPropsDomain asyncPropsDomain) {
2623
ReactiveMessageSender sender = manager.getSender(DEFAULT_DOMAIN);
2724
AsyncProps asyncProps = asyncPropsDomain.getProps(DEFAULT_DOMAIN);
2825
final String exchangeName = asyncProps.getBrokerConfigProps().getDomainEventsExchangeName();
2926
if (asyncProps.getCreateTopology()) {
3027
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
3128
}
32-
DomainEventBus domainEventBus = new RabbitDomainEventBus(sender, exchangeName, config);
33-
manager.setDiscardNotifier(DEFAULT_DOMAIN, createDiscardNotifier(domainEventBus, objectMapperSupplier));
34-
return domainEventBus;
35-
}
36-
37-
private DiscardNotifier createDiscardNotifier(DomainEventBus domainEventBus, ObjectMapperSupplier objectMapperSupplier) {
38-
return new RabbitDiscardNotifier(domainEventBus, objectMapperSupplier.get());
29+
return new RabbitDomainEventBus(sender, exchangeName, config);
3930
}
4031
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import lombok.extern.java.Log;
88
import org.reactivecommons.api.domain.Command;
99
import org.reactivecommons.api.domain.DomainEvent;
10+
import org.reactivecommons.api.domain.DomainEventBus;
1011
import org.reactivecommons.async.api.AsyncQuery;
1112
import org.reactivecommons.async.api.DefaultCommandHandler;
1213
import org.reactivecommons.async.api.DefaultQueryHandler;
1314
import org.reactivecommons.async.api.DynamicRegistry;
1415
import org.reactivecommons.async.api.HandlerRegistry;
16+
import org.reactivecommons.async.commons.DiscardNotifier;
1517
import org.reactivecommons.async.commons.communications.Message;
1618
import org.reactivecommons.async.commons.config.BrokerConfig;
1719
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
@@ -21,6 +23,8 @@
2123
import org.reactivecommons.async.commons.ext.CustomReporter;
2224
import org.reactivecommons.async.rabbit.DynamicRegistryImp;
2325
import org.reactivecommons.async.rabbit.HandlerResolver;
26+
import org.reactivecommons.async.rabbit.RabbitDiscardNotifier;
27+
import org.reactivecommons.async.rabbit.RabbitDomainEventBus;
2428
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
2529
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
2630
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
@@ -67,14 +71,20 @@ public class RabbitMqConfig {
6771

6872

6973
@Bean
70-
public ConnectionManager buildConnectionManager(AsyncPropsDomain props, MessageConverter converter) {
74+
public ConnectionManager buildConnectionManager(AsyncPropsDomain props, MessageConverter converter,
75+
BrokerConfig brokerConfig, ObjectMapperSupplier objectMapperSupplier) {
7176
ConnectionManager connectionManager = new ConnectionManager();
7277
props.forEach((domain, properties) -> {
7378
ConnectionFactoryProvider provider = createConnectionFactoryProvider(properties.getConnectionProperties());
7479
ReactiveMessageSender sender = createMessageSender(provider, properties, converter);
7580
ReactiveMessageListener listener = createMessageListener(provider, properties);
7681
connectionManager.addDomain(domain, listener, sender, provider);
7782
});
83+
ReactiveMessageSender appDomainSender = connectionManager.getSender(DEFAULT_DOMAIN);
84+
DomainEventBus appDomainEventBus = new RabbitDomainEventBus(appDomainSender, props.getProps(DEFAULT_DOMAIN)
85+
.getBrokerConfigProps().getDomainEventsExchangeName(), brokerConfig);
86+
DiscardNotifier notifier = new RabbitDiscardNotifier(appDomainEventBus, objectMapperSupplier.get());
87+
connectionManager.setDiscardNotifierForAll(notifier);
7888
return connectionManager;
7989
}
8090

0 commit comments

Comments
 (0)