Skip to content

Commit 590ee2f

Browse files
committed
next(shared-starter): Create shared starter classes for any broker type to enable many brokers usage for 5 version
1 parent 2fa73b5 commit 590ee2f

File tree

116 files changed

+2513
-1709
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+2513
-1709
lines changed
Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package org.reactivecommons.async.commons.config;
22

3+
import lombok.Getter;
4+
35
import java.time.Duration;
46
import java.util.UUID;
57

8+
@Getter
69
public class BrokerConfig {
710
private final String routingKey = UUID.randomUUID().toString().replaceAll("-", "");
811
private final boolean persistentQueries;
@@ -24,24 +27,4 @@ public BrokerConfig(boolean persistentQueries, boolean persistentCommands, boole
2427
this.replyTimeout = replyTimeout;
2528
}
2629

27-
public boolean isPersistentQueries() {
28-
return persistentQueries;
29-
}
30-
31-
public boolean isPersistentCommands() {
32-
return persistentCommands;
33-
}
34-
35-
public boolean isPersistentEvents() {
36-
return persistentEvents;
37-
}
38-
39-
public Duration getReplyTimeout() {
40-
return replyTimeout;
41-
}
42-
43-
public String getRoutingKey() {
44-
return routingKey;
45-
}
46-
4730
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.reactivecommons.async.kafka;
2+
3+
import io.cloudevents.CloudEvent;
4+
import org.reactivecommons.api.domain.Command;
5+
import org.reactivecommons.async.api.AsyncQuery;
6+
import org.reactivecommons.async.api.DirectAsyncGateway;
7+
import org.reactivecommons.async.api.From;
8+
import reactor.core.publisher.Mono;
9+
10+
public class KafkaDirectAsyncGateway implements DirectAsyncGateway {
11+
@Override
12+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
13+
throw new UnsupportedOperationException("Not implemented yet");
14+
}
15+
16+
@Override
17+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
18+
throw new UnsupportedOperationException("Not implemented yet");
19+
}
20+
21+
@Override
22+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
23+
throw new UnsupportedOperationException("Not implemented yet");
24+
}
25+
26+
@Override
27+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
28+
throw new UnsupportedOperationException("Not implemented yet");
29+
}
30+
31+
@Override
32+
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
33+
throw new UnsupportedOperationException("Not implemented yet");
34+
}
35+
36+
@Override
37+
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis) {
38+
throw new UnsupportedOperationException("Not implemented yet");
39+
}
40+
41+
@Override
42+
public Mono<Void> sendCommand(CloudEvent command, String targetName, String domain) {
43+
throw new UnsupportedOperationException("Not implemented yet");
44+
}
45+
46+
@Override
47+
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis, String domain) {
48+
throw new UnsupportedOperationException("Not implemented yet");
49+
}
50+
51+
@Override
52+
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
53+
throw new UnsupportedOperationException("Not implemented yet");
54+
}
55+
56+
@Override
57+
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain) {
58+
throw new UnsupportedOperationException("Not implemented yet");
59+
}
60+
61+
@Override
62+
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
63+
throw new UnsupportedOperationException("Not implemented yet");
64+
}
65+
66+
@Override
67+
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain) {
68+
throw new UnsupportedOperationException("Not implemented yet");
69+
}
70+
71+
@Override
72+
public <T> Mono<Void> reply(T response, From from) {
73+
throw new UnsupportedOperationException("Not implemented yet");
74+
}
75+
}

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,18 @@ public <T> Publisher<Void> emit(DomainEvent<T> event) {
1616
return sender.send(event);
1717
}
1818

19+
@Override
20+
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
21+
throw new UnsupportedOperationException("Not implemented yet");
22+
}
23+
1924
@Override
2025
public Publisher<Void> emit(CloudEvent event) {
2126
return sender.send(event);
2227
}
28+
29+
@Override
30+
public Publisher<Void> emit(String domain, CloudEvent event) {
31+
throw new UnsupportedOperationException("Not implemented yet");
32+
}
2333
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package org.reactivecommons.async.rabbit;
22

33
import io.cloudevents.CloudEvent;
4-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
4+
import org.reactivecommons.api.domain.DomainEvent;
5+
import org.reactivecommons.api.domain.DomainEventBus;
56
import org.reactivecommons.async.commons.config.BrokerConfig;
7+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
68
import org.reactivestreams.Publisher;
79
import reactor.core.publisher.Mono;
8-
import org.reactivecommons.api.domain.DomainEvent;
9-
import org.reactivecommons.api.domain.DomainEventBus;
1010

1111
import java.util.Collections;
1212

@@ -29,7 +29,12 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke
2929
@Override
3030
public <T> Mono<Void> emit(DomainEvent<T> event) {
3131
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
32-
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
32+
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
33+
}
34+
35+
@Override
36+
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
37+
throw new UnsupportedOperationException("Not implemented yet");
3338
}
3439

3540
@Override
@@ -39,4 +44,9 @@ public Publisher<Void> emit(CloudEvent cloudEvent) {
3944
.onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err));
4045
}
4146

47+
@Override
48+
public Publisher<Void> emit(String domain, CloudEvent event) {
49+
throw new UnsupportedOperationException("Not implemented yet");
50+
}
51+
4252
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationNotificationListener.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
88
import org.reactivecommons.async.commons.DiscardNotifier;
99
import org.reactivecommons.async.commons.EventExecutor;
10+
import org.reactivecommons.async.commons.HandlerResolver;
1011
import org.reactivecommons.async.commons.communications.Message;
1112
import org.reactivecommons.async.commons.converters.MessageConverter;
1213
import org.reactivecommons.async.commons.ext.CustomReporter;
13-
import org.reactivecommons.async.commons.HandlerResolver;
1414
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1515
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1616
import reactor.core.publisher.Flux;
@@ -51,9 +51,6 @@ public ApplicationNotificationListener(ReactiveMessageListener receiver,
5151
}
5252

5353
protected Mono<Void> setUpBindings(TopologyCreator creator) {
54-
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
55-
.type("topic")
56-
.durable(true));
5754

5855
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
5956
queue(queueName)
@@ -65,6 +62,10 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
6562
.flatMap(listener -> creator.bind(binding(exchangeName, listener.getPath(), queueName)));
6663

6764
if (createTopology) {
65+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
66+
.type("topic")
67+
.durable(true));
68+
6869
return declareExchange
6970
.then(declareQueue)
7071
.thenMany(bindings)

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
139139
}
140140

141141
private void onTerminate() {
142-
messageFlux.doOnTerminate(this::onTerminate)
142+
messageFlux
143+
.doOnTerminate(this::onTerminate)
143144
.subscribe(new LoggerSubscriber<>(getClass().getName()));
144145
}
145146

build.gradle

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,6 @@ plugins {
1818
id 'co.com.bancolombia.cleanArchitecture' version '3.17.13'
1919
}
2020

21-
sonar {
22-
properties {
23-
property 'sonar.projectKey', 'reactive-commons_reactive-commons-java'
24-
property 'sonar.coverage.exclusions', 'samples/**/*'
25-
property 'sonar.organization', 'reactive-commons'
26-
property 'sonar.host.url', 'https://sonarcloud.io'
27-
}
28-
}
29-
3021
repositories {
3122
mavenCentral()
3223
}

docs/docs/reactive-commons/1-getting-started.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ spring:
8383

8484
You can also set it in runtime for example from a secret, so you can create the `RabbitProperties` bean like:
8585

86-
```java title="org.reactivecommons.async.rabbit.config.RabbitProperties"
86+
```java title="org.reactivecommons.async.rabbit.standalone.config.RabbitProperties"
8787
8888
@Configuration
8989
public class MyRabbitMQConfig {

docs/docs/reactive-commons/9-configuration-properties.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ app:
2727
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
2828
delayedCommands: false # Enable to send a delayed command to an external target
2929
prefetchCount: 250 # is the maximum number of in flight messages you can reduce it to process less concurrent messages, this settings acts per instance of your service
30+
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
31+
enabled: true # if you want to disable this domain you can set it to false
32+
brokerType: "rabbitmq" # please don't change this value
3033
flux:
3134
maxConcurrency: 250 # max concurrency of listener flow
3235
domain:
@@ -64,7 +67,7 @@ You can override this settings programmatically through a `AsyncPropsDomainPrope
6467
```java
6568
package sample;
6669
67-
import org.reactivecommons.async.rabbit.config.RabbitProperties;
70+
import org.reactivecommons.async.rabbit.standalone.config.RabbitProperties;
6871
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
6972
import org.reactivecommons.async.rabbit.config.props.AsyncRabbitPropsDomainProperties;
7073
import org.springframework.context.annotation.Bean;
@@ -133,6 +136,9 @@ reactive:
133136
retryDelay: 1000 # interval for message retries, with and without DLQRetry
134137
checkExistingTopics: true # if you don't want to verify topic existence before send a record you can set it to false
135138
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
139+
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
140+
enabled: true # if you want to disable this domain you can set it to false
141+
brokerType: "kafka" # please don't change this value
136142
domain:
137143
ignoreThisListener: false # Allows you to disable event listener for this specific domain
138144
connectionProperties: # you can override the connection properties of each domain

domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
public interface DomainEventBus {
77
<T> Publisher<Void> emit(DomainEvent<T> event);
8+
<T> Publisher<Void> emit(String domain, DomainEvent<T> event);
89

910
Publisher<Void> emit(CloudEvent event);
11+
Publisher<Void> emit(String domain, CloudEvent event);
1012
}

0 commit comments

Comments
 (0)