Skip to content

Commit 1e5a502

Browse files
committed
chore(next): Update starters and extract common reusable code to an additional module
1 parent 1fc9a95 commit 1e5a502

File tree

7 files changed

+1954
-46
lines changed

7 files changed

+1954
-46
lines changed

docs/docs/reactive-commons/1-getting-started.md

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,146 @@ If you want to use it, you should read the [Creating a CloudEvent guide](11-crea
120120

121121
</TabItem>
122122
<TabItem value="kafka" label="Kafka">
123-
Comming soon...
123+
This quick start tutorial sets up a single node Kafka and runs the sample reactive sender and consumer using Reactive
124+
Commons.
125+
126+
## Requirements
127+
128+
You need Java JRE installed (Java 17 or later).
129+
130+
## Start Kafka
131+
132+
Start a Kafka broker on your local machine with all the defaults (e.g. port is 9092).
133+
134+
### Containerized
135+
136+
You can run it with Docker or Podman.
137+
138+
The following docker compose has a Kafka broker, a Zookeeper and a Kafka UI.
139+
140+
docker-compose.yml
141+
```yaml
142+
services:
143+
zookeeper:
144+
image: confluentinc/cp-zookeeper:7.4.1
145+
environment:
146+
ZOOKEEPER_CLIENT_PORT: 2181
147+
ZOOKEEPER_TICK_TIME: 2000
148+
ports:
149+
- "2181:2181"
150+
151+
kafka:
152+
image: confluentinc/cp-kafka:7.4.1
153+
environment:
154+
KAFKA_BROKER_ID: 1
155+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
156+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
157+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
158+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
159+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
160+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
161+
ports:
162+
- "9092:9092"
163+
depends_on:
164+
- zookeeper
165+
166+
kafka-ui:
167+
image: provectuslabs/kafka-ui:latest
168+
environment:
169+
KAFKA_CLUSTERS_0_NAME: local
170+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
171+
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
172+
ports:
173+
- "8081:8080"
174+
depends_on:
175+
- kafka
176+
```
177+
178+
```shell
179+
docker-compose up
180+
```
181+
182+
You may set in /etc/hosts (or equivalent) the following entry:
183+
184+
```txt
185+
127.0.0.1 kafka
186+
```
187+
188+
To enter the Kafka UI, open your browser and go to `http://localhost:8081`
189+
190+
## Spring Boot Application
191+
192+
The Spring Boot sample publishes and consumes messages with the `DomainEventBus`. This application illustrates how to
193+
configure Reactive Commons using RabbitMQ in a Spring Boot environment.
194+
195+
To build your own application using the Reactive Commons API, you need to include a dependency to Reactive Commons.
196+
197+
### Current version
198+
199+
![Maven metadata URL](https://img.shields.io/maven-metadata/v?metadataUrl=https%3A%2F%2Frepo1.maven.org%2Fmaven2%2Forg%2Freactivecommons%2Fasync-commons-rabbit-starter%2Fmaven-metadata.xml)
200+
201+
### Dependency
202+
203+
```groovy
204+
dependencies {
205+
implementation "org.reactivecommons:async-kafka-starter:<version>"
206+
}
207+
```
208+
209+
### Configuration properties
210+
211+
Also you need to include the name for your app in the `application.properties`, it is important because this value will
212+
be used
213+
to name the application queues inside RabbitMQ:
214+
215+
```properties
216+
spring.application.name=MyAppName
217+
```
218+
219+
Or in your `application.yaml`
220+
221+
```yaml
222+
spring:
223+
application:
224+
name: MyAppName
225+
```
226+
227+
You can set the RabbitMQ connection properties through spring boot with
228+
the [`spring.kafka.*` properties](https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html)
229+
230+
```yaml
231+
spring:
232+
kafka:
233+
bootstrap-servers: localhost:9092
234+
```
235+
236+
You can also set it in runtime for example from a secret, so you can create the `KafkaProperties` bean like:
237+
238+
```java title="org.reactivecommons.async.rabbit.config.RabbitProperties"
239+
240+
@Configuration
241+
public class MyKafkaConfig {
242+
243+
@Bean
244+
@Primary
245+
public KafkaProperties myRCKafkaProperties() {
246+
KafkaProperties properties = new KafkaProperties();
247+
properties.setBootstrapServers(List.of("localhost:9092"));
248+
return properties;
249+
}
250+
}
251+
```
252+
253+
### Multi Broker Instances of Kafka or Multi Domain support
254+
255+
Enables to you the ability to listen events from different domains.
256+
257+
### Cloud Events
258+
259+
Includes the Cloud Events specification.
260+
261+
If you want to use it, you should read the [Creating a CloudEvent guide](11-creating-a-cloud-event.md)
262+
124263
</TabItem>
125264
</Tabs>
126265

docs/docs/reactive-commons/9-configuration-properties.md

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import TabItem from '@theme/TabItem';
1313
You can customize some predefined variables of Reactive Commons
1414

1515
This can be done by Spring Boot `application.yaml` or by overriding
16-
the [AsyncProps](https://github.com/reactive-commons/reactive-commons-java/blob/master/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java)
16+
the [AsyncProps](https://github.com/reactive-commons/reactive-commons-java/blob/master/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java)
1717
bean.
1818

1919
```yaml
@@ -24,7 +24,7 @@ app:
2424
maxRetries: -1 # -1 will be considered default value. When withDLQRetry is true, it will be retried 10 times. When withDLQRetry is false, it will be retried indefinitely.
2525
retryDelay: 1000 # interval for message retries, with and without DLQRetry
2626
listenReplies: true # if you will not use ReqReply patter you can set it to false
27-
createTopology: true # if your organization have restricctions with automatic topology creation you can set it to false and create it manually or by your organization process.
27+
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
2828
delayedCommands: false # Enable to send a delayed command to an external target
2929
prefetchCount: 250 # is the maximum number of in flight messages you can reduce it to process less concurrent messages, this settings acts per instance of your service
3030
flux:
@@ -50,7 +50,7 @@ app:
5050
password: guest
5151
virtual-host: /
5252
# Another domain can be configured with same properties structure that app
53-
accounts: # this is a second domain name and can have another independent settup
53+
accounts: # this is a second domain name and can have another independent setup
5454
connectionProperties: # you can override the connection properties of each domain
5555
host: localhost
5656
port: 5672
@@ -117,6 +117,80 @@ public AsyncPropsDomain.SecretFiller customFiller() {
117117

118118
</TabItem>
119119
<TabItem value="kafka" label="Kafka">
120-
Comming soon...
120+
You can customize some predefined variables of Reactive Commons
121+
122+
This can be done by Spring Boot `application.yaml` or by overriding
123+
the [AsyncKafkaProps](https://github.com/reactive-commons/reactive-commons-java/blob/master/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/AsyncKafkaProps.java)
124+
bean.
125+
126+
```yaml
127+
reactive:
128+
commons:
129+
kafka:
130+
app: # this is the name of the default domain
131+
withDLQRetry: false # if you want to have dlq queues with retries you can set it to true, you cannot change it after queues are created, because you will get an error, so you should delete topology before the change.
132+
maxRetries: -1 # -1 will be considered default value. When withDLQRetry is true, it will be retried 10 times. When withDLQRetry is false, it will be retried indefinitely.
133+
retryDelay: 1000 # interval for message retries, with and without DLQRetry
134+
checkExistingTopics: true # if you don't want to verify topic existence before send a record you can set it to false
135+
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
136+
domain:
137+
ignoreThisListener: false # Allows you to disable event listener for this specific domain
138+
connectionProperties: # you can override the connection properties of each domain
139+
bootstrap-servers: localhost:9092
140+
# Another domain can be configured with same properties structure that app
141+
accounts: # this is a second domain name and can have another independent setup
142+
connectionProperties: # you can override the connection properties of each domain
143+
bootstrap-servers: localhost:9093
144+
```
145+
146+
You can override this settings programmatically through a `AsyncKafkaPropsDomainProperties` bean.
147+
148+
```java
149+
package sample;
150+
151+
import org.reactivecommons.async.rabbit.config.RabbitProperties;
152+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
153+
import org.reactivecommons.async.rabbit.config.props.AsyncRabbitPropsDomainProperties;
154+
import org.springframework.context.annotation.Bean;
155+
import org.springframework.context.annotation.Primary;
156+
157+
@Configuration
158+
public class MyDomainConfig {
159+
160+
@Bean
161+
@Primary
162+
public AsyncKafkaPropsDomainProperties customKafkaDomainProperties() {
163+
KafkaProperties propertiesApp = new KafkaProperties();
164+
propertiesApp.setBootstrapServers(List.of("localhost:9092"));
165+
166+
RabbitProperties propertiesAccounts = new RabbitProperties();
167+
propertiesAccounts.setBootstrapServers(List.of("localhost:9093"));
168+
169+
return AsyncKafkaPropsDomainProperties.builder()
170+
.withDomain("app", AsyncProps.builder()
171+
.connectionProperties(propertiesApp)
172+
.build())
173+
.withDomain("accounts", AsyncProps.builder()
174+
.connectionProperties(propertiesAccounts)
175+
.build())
176+
.build();
177+
}
178+
}
179+
```
180+
181+
Additionally, if you want to set only connection properties you can use the `AsyncKafkaPropsDomain.KafkaSecretFiller`
182+
class.
183+
184+
```java
185+
186+
@Bean
187+
@Primary
188+
public AsyncKafkaPropsDomain.KafkaSecretFiller customKafkaFiller() {
189+
return (domain, asyncProps) -> {
190+
// customize asyncProps here by domain
191+
};
192+
}
193+
```
194+
121195
</TabItem>
122196
</Tabs>
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.reactivecommons.async.kafka.config;
22

33

4+
import org.reactivecommons.async.kafka.config.spring.KafkaPropertiesBase;
5+
46
public class KafkaProperties extends KafkaPropertiesBase {
57

68
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package org.reactivecommons.async.kafka.config;
22

33

4+
import org.reactivecommons.async.kafka.config.spring.KafkaPropertiesBase;
45
import org.springframework.boot.context.properties.ConfigurationProperties;
56

67
@ConfigurationProperties(prefix = "spring.kafka")
78
public class KafkaPropertiesAutoConfig extends KafkaPropertiesBase {
89
public KafkaPropertiesAutoConfig() {
9-
put("bootstrap.servers", "localhost:9092");
10+
// put("bootstrap.servers", "localhost:9092");
1011
}
1112
}

starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/KafkaPropertiesBase.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaConfig.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter;
2929
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3030
import org.springframework.boot.context.properties.EnableConfigurationProperties;
31+
import org.springframework.boot.ssl.SslBundles;
3132
import org.springframework.context.ApplicationContext;
3233
import org.springframework.context.annotation.Bean;
3334
import org.springframework.context.annotation.Configuration;
@@ -42,11 +43,6 @@
4243
import java.nio.file.Path;
4344
import java.util.Map;
4445

45-
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
46-
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
47-
import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
48-
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
49-
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
5046
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
5147

5248
@Configuration
@@ -57,12 +53,13 @@ public class RCKafkaConfig {
5753
@Bean
5854
public ConnectionManager kafkaConnectionManager(AsyncKafkaPropsDomain props,
5955
MessageConverter converter,
60-
KafkaCustomizations customizations) {
56+
KafkaCustomizations customizations,
57+
SslBundles sslBundles) {
6158
ConnectionManager connectionManager = new ConnectionManager();
6259
props.forEach((domain, properties) -> {
63-
TopologyCreator creator = createTopologyCreator(properties, customizations);
64-
ReactiveMessageSender sender = createMessageSender(properties, converter, creator);
65-
ReactiveMessageListener listener = createMessageListener(properties);
60+
TopologyCreator creator = createTopologyCreator(properties, customizations, sslBundles);
61+
ReactiveMessageSender sender = createMessageSender(properties, converter, creator, sslBundles);
62+
ReactiveMessageListener listener = createMessageListener(properties, sslBundles);
6663
connectionManager.addDomain(domain, listener, sender, creator);
6764

6865
ReactiveMessageSender appDomainSender = connectionManager.getSender(domain);
@@ -98,29 +95,31 @@ public DomainEventBus kafkaDomainEventBus(ConnectionManager manager) {
9895

9996
private static ReactiveMessageSender createMessageSender(AsyncKafkaProps config,
10097
MessageConverter converter,
101-
TopologyCreator topologyCreator) {
98+
TopologyCreator topologyCreator,
99+
SslBundles sslBundles) {
102100
KafkaProperties props = config.getConnectionProperties();
103-
props.put(CLIENT_ID_CONFIG, config.getAppName());
104-
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
105-
props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
106-
SenderOptions<String, byte[]> senderOptions = SenderOptions.create(props);
101+
props.setClientId(config.getAppName()); // CLIENT_ID_CONFIG
102+
props.getProducer().setKeySerializer(StringSerializer.class); // KEY_SERIALIZER_CLASS_CONFIG;
103+
props.getProducer().setValueSerializer(ByteArraySerializer.class); // VALUE_SERIALIZER_CLASS_CONFIG
104+
SenderOptions<String, byte[]> senderOptions = SenderOptions.create(props.buildProducerProperties(sslBundles));
107105
KafkaSender<String, byte[]> kafkaSender = KafkaSender.create(senderOptions);
108106
return new ReactiveMessageSender(kafkaSender, converter, topologyCreator);
109107
}
110108

111109
// Receiver
112110

113-
private static ReactiveMessageListener createMessageListener(AsyncKafkaProps config) {
111+
private static ReactiveMessageListener createMessageListener(AsyncKafkaProps config, SslBundles sslBundles) {
114112
KafkaProperties props = config.getConnectionProperties();
115-
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
116-
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
117-
ReceiverOptions<String, byte[]> receiverOptions = ReceiverOptions.create(props);
113+
props.getConsumer().setKeyDeserializer(StringDeserializer.class); // KEY_DESERIALIZER_CLASS_CONFIG
114+
props.getConsumer().setValueDeserializer(ByteArrayDeserializer.class); // VALUE_DESERIALIZER_CLASS_CONFIG
115+
ReceiverOptions<String, byte[]> receiverOptions = ReceiverOptions.create(props.buildConsumerProperties(sslBundles));
118116
return new ReactiveMessageListener(receiverOptions);
119117
}
120118

121119
// Shared
122-
private static TopologyCreator createTopologyCreator(AsyncKafkaProps config, KafkaCustomizations customizations) {
123-
AdminClient adminClient = AdminClient.create(config.getConnectionProperties());
120+
private static TopologyCreator createTopologyCreator(AsyncKafkaProps config, KafkaCustomizations customizations,
121+
SslBundles sslBundles) {
122+
AdminClient adminClient = AdminClient.create(config.getConnectionProperties().buildAdminProperties(sslBundles));
124123
return new TopologyCreator(adminClient, customizations, config.getCheckExistingTopics());
125124
}
126125

@@ -179,12 +178,13 @@ public static KafkaProperties readPropsFromDotEnv(Path path) throws IOException
179178
String env = Files.readString(path);
180179
String[] split = env.split("\n");
181180
KafkaProperties props = new KafkaProperties();
181+
Map<String, String> properties = props.getProperties();
182182
for (String s : split) {
183183
if (s.startsWith("#")) {
184184
continue;
185185
}
186186
String[] split1 = s.split("=", 2);
187-
props.put(split1[0], split1[1]);
187+
properties.put(split1[0], split1[1]);
188188
}
189189
return props;
190190
}

0 commit comments

Comments
 (0)