Skip to content

Commit d4eeb31

Browse files
author
Daniel Bustamante Ospina
committed
Implement Discart notification by events.
1 parent ea17d84 commit d4eeb31

File tree

27 files changed

+170
-78
lines changed

27 files changed

+170
-78
lines changed

acceptance/async-tests/src/test/java/org/reactivecommons/test/DynamicRegistryTest.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,10 @@
22

33
import org.junit.Test;
44
import org.junit.runner.RunWith;
5-
import org.mockito.Mockito;
65
import org.reactivecommons.api.domain.DomainEvent;
76
import org.reactivecommons.api.domain.DomainEventBus;
8-
import org.reactivecommons.async.api.AsyncQuery;
9-
import org.reactivecommons.async.api.DirectAsyncGateway;
107
import org.reactivecommons.async.api.DynamicRegistry;
11-
import org.reactivecommons.async.api.HandlerRegistry;
128
import org.reactivecommons.async.api.handlers.EventHandler;
13-
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
149
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
1510
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
1611
import org.reactivestreams.Publisher;
@@ -19,9 +14,7 @@
1914
import org.springframework.boot.SpringApplication;
2015
import org.springframework.boot.autoconfigure.SpringBootApplication;
2116
import org.springframework.boot.test.context.SpringBootTest;
22-
import org.springframework.context.annotation.Bean;
2317
import org.springframework.test.context.junit4.SpringRunner;
24-
import reactor.core.publisher.Mono;
2518
import reactor.core.publisher.UnicastProcessor;
2619
import reactor.test.StepVerifier;
2720

acceptance/async-tests/src/test/java/org/reactivecommons/test/SimpleDirectCommunicationTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@
1616
import org.springframework.boot.test.context.SpringBootTest;
1717
import org.springframework.context.annotation.Bean;
1818
import org.springframework.test.context.junit4.SpringRunner;
19-
import reactor.core.publisher.Flux;
2019
import reactor.core.publisher.Mono;
21-
import reactor.core.publisher.SignalType;
2220
import reactor.core.publisher.UnicastProcessor;
2321
import reactor.test.StepVerifier;
2422

2523
import java.time.Duration;
2624
import java.util.concurrent.ThreadLocalRandom;
27-
import java.util.logging.Level;
2825

2926
import static org.assertj.core.api.Assertions.assertThat;
3027
import static reactor.core.publisher.Mono.*;

acceptance/async-tests/src/test/java/org/reactivecommons/test/SimpleEventNotificationTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import reactor.test.StepVerifier;
1919

2020
import java.util.concurrent.ThreadLocalRandom;
21-
import java.util.concurrent.TimeUnit;
2221

2322
import static org.assertj.core.api.Assertions.assertThat;
2423
import static reactor.core.publisher.Mono.*;

acceptance/async-tests/src/test/java/org/reactivecommons/test/perf/BlockingCommandHandlePerfTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivecommons.test.perf;
22

3-
import org.junit.Ignore;
43
import org.junit.Test;
54
import org.junit.runner.RunWith;
65
import org.reactivecommons.api.domain.Command;
@@ -17,9 +16,7 @@
1716
import org.springframework.context.annotation.Bean;
1817
import org.springframework.test.context.junit4.SpringRunner;
1918
import reactor.core.publisher.Flux;
20-
import reactor.core.publisher.Mono;
2119
import reactor.core.publisher.UnicastProcessor;
22-
import reactor.core.scheduler.Schedulers;
2320
import reactor.test.StepVerifier;
2421

2522
import java.time.Duration;

acceptance/async-tests/src/test/java/org/reactivecommons/test/perf/ParallelOnBlockingInSubscriptionTimeTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,12 @@
2121
import reactor.test.StepVerifier;
2222

2323
import java.time.Duration;
24-
import java.util.List;
2524
import java.util.concurrent.ThreadLocalRandom;
2625
import java.util.concurrent.TimeUnit;
2726

2827
import static java.lang.System.out;
2928
import static org.assertj.core.api.Assertions.assertThat;
3029
import static reactor.core.publisher.Mono.empty;
31-
import static reactor.core.publisher.Mono.fromRunnable;
3230

3331
@SpringBootTest
3432
@RunWith(SpringRunner.class)

acceptance/async-tests/src/test/java/org/reactivecommons/test/perf/SimpleCommandHandlePerfTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@
1313
import org.springframework.beans.factory.annotation.Value;
1414
import org.springframework.boot.SpringApplication;
1515
import org.springframework.boot.autoconfigure.SpringBootApplication;
16-
import org.springframework.boot.context.properties.ConfigurationProperties;
1716
import org.springframework.boot.test.context.SpringBootTest;
1817
import org.springframework.context.annotation.Bean;
19-
import org.springframework.context.annotation.ImportResource;
20-
import org.springframework.context.annotation.PropertySource;
2118
import org.springframework.test.context.junit4.SpringRunner;
2219
import reactor.core.publisher.Flux;
2320
import reactor.core.publisher.UnicastProcessor;

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> f
4646
return this;
4747
}
4848

49-
@SuppressWarnings("unchecked")
5049
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler){
5150
return serveQuery(resource, handler, inferGenericParameterType(handler));
5251
}

async/async-commons-standalone/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.rabbitmq.client.Connection;
44
import com.rabbitmq.client.ConnectionFactory;
55
import lombok.extern.java.Log;
6-
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
76
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
87
import org.reactivecommons.async.impl.communications.TopologyCreator;
98
import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter;

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/ReplyCommandSender.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivecommons.async.impl;
22

3-
import com.fasterxml.jackson.databind.ObjectMapper;
43
import org.reactivecommons.async.api.AsyncQuery;
54
import org.reactivecommons.async.api.DirectAsyncGateway;
65
import org.springframework.beans.factory.annotation.Autowired;

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@
44
import org.reactivecommons.async.api.DefaultQueryHandler;
55
import org.reactivecommons.async.api.DynamicRegistry;
66
import org.reactivecommons.async.api.HandlerRegistry;
7-
import org.reactivecommons.async.api.handlers.QueryHandler;
87
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
98
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
109
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
10+
import org.reactivecommons.async.impl.DiscardNotifier;
1111
import org.reactivecommons.async.impl.DynamicRegistryImp;
1212
import org.reactivecommons.async.impl.HandlerResolver;
1313
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1414
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
15-
import org.reactivecommons.async.impl.communications.TopologyCreator;
16-
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
1715
import org.reactivecommons.async.impl.converters.MessageConverter;
1816
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1917
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
@@ -54,22 +52,22 @@ public class MessageListenersConfig {
5452
private boolean withDLQRetry;
5553

5654
@Bean //TODO: move to own config (QueryListenerConfig)
57-
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter, ReactiveMessageListener receiver) throws Exception {
58-
final ApplicationEventListener listener = new ApplicationEventListener(receiver, appName + ".subsEvents", resolver, domainEventsExchangeName, messageConverter, withDLQRetry, maxRetries, retryDelay);
55+
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter, ReactiveMessageListener receiver, DiscardNotifier discardNotifier) throws Exception {
56+
final ApplicationEventListener listener = new ApplicationEventListener(receiver, appName + ".subsEvents", resolver, domainEventsExchangeName, messageConverter, withDLQRetry, maxRetries, retryDelay, discardNotifier);
5957
listener.startListener();
6058
return listener;
6159
}
6260

6361
@Bean //TODO: move to own config (QueryListenerConfig)
64-
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver, ReactiveMessageSender sender, ReactiveMessageListener rlistener) throws Exception {
65-
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener, appName+".query", resolver, sender, directMessagesExchangeName, converter, "globalReply", withDLQRetry, maxRetries, retryDelay);
62+
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver, ReactiveMessageSender sender, ReactiveMessageListener rlistener, DiscardNotifier discardNotifier) throws Exception {
63+
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener, appName+".query", resolver, sender, directMessagesExchangeName, converter, "globalReply", withDLQRetry, maxRetries, retryDelay, discardNotifier);
6664
listener.startListener();
6765
return listener;
6866
}
6967

7068
@Bean
71-
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener, HandlerResolver resolver, MessageConverter converter){
72-
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver, directMessagesExchangeName, converter, withDLQRetry, maxRetries, retryDelay);
69+
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener, HandlerResolver resolver, MessageConverter converter, DiscardNotifier discardNotifier){
70+
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver, directMessagesExchangeName, converter, withDLQRetry, maxRetries, retryDelay, discardNotifier);
7371
commandListener.startListener();
7472
return commandListener;
7573
}

0 commit comments

Comments
 (0)