Skip to content

Commit 091e34a

Browse files
authored
Merge pull request #159 from aguibert/kafka
Kafka Support
2 parents 3796ed4 + 6c10259 commit 091e34a

File tree

18 files changed

+789
-17
lines changed

18 files changed

+789
-17
lines changed

core/src/main/java/org/microshed/testing/jupiter/MicroShedTestExtension.java

+96-1
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import java.lang.reflect.Modifier;
2424
import java.net.URL;
2525
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.Collection;
2628
import java.util.List;
2729
import java.util.Optional;
30+
import java.util.Properties;
2831

2932
import org.junit.jupiter.api.extension.BeforeAllCallback;
3033
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
@@ -37,6 +40,8 @@
3740
import org.microshed.testing.jaxrs.RestClientBuilder;
3841
import org.microshed.testing.jwt.JwtBuilder;
3942
import org.microshed.testing.jwt.JwtConfig;
43+
import org.microshed.testing.kafka.KafkaConsumerConfig;
44+
import org.microshed.testing.kafka.KafkaProducerConfig;
4045
import org.slf4j.Logger;
4146
import org.slf4j.LoggerFactory;
4247

@@ -64,6 +69,7 @@ public void beforeAll(ExtensionContext context) throws Exception {
6469
config.start();
6570
configureRestAssured(config);
6671
injectRestClients(testClass);
72+
injectKafkaClients(testClass);
6773
}
6874

6975
private static void injectRestClients(Class<?> clazz) {
@@ -105,7 +111,96 @@ private static void injectRestClients(Class<?> clazz) {
105111
restClientField.set(null, restClient);
106112
LOG.debug("Injected rest client for " + restClientField);
107113
} catch (Exception e) {
108-
throw new ExtensionConfigurationException("Unable to set field " + restClientField, e);
114+
throw new ExtensionConfigurationException("Unable to inject field " + restClientField, e);
115+
}
116+
}
117+
}
118+
119+
private void injectKafkaClients(Class<?> clazz) {
120+
// Verify kafka-client and testcontainers-kafka is on classpath
121+
Class<?> KafkaProducer = tryLoad("org.apache.kafka.clients.producer.KafkaProducer");
122+
Class<?> KafkaConsumer = tryLoad("org.apache.kafka.clients.consumer.KafkaConsumer");
123+
if (KafkaProducer == null || KafkaConsumer == null)
124+
return;
125+
String globalBootstrapServers = System.getProperty("org.microshed.kafka.bootstrap.servers");
126+
127+
List<Field> kafkaProducerFields = AnnotationSupport.findAnnotatedFields(clazz, KafkaProducerConfig.class);
128+
for (Field producerField : kafkaProducerFields) {
129+
if (!KafkaProducer.isAssignableFrom(producerField.getType())) {
130+
throw new ExtensionConfigurationException("Fields annotated with @KafkaProducerConfig must be of the type " + KafkaProducer.getName());
131+
}
132+
if (!Modifier.isPublic(producerField.getModifiers()) ||
133+
!Modifier.isStatic(producerField.getModifiers()) ||
134+
Modifier.isFinal(producerField.getModifiers())) {
135+
throw new ExtensionConfigurationException("The KafkaProducer field annotated with @KafkaProducerConfig " +
136+
"must be public, static, and non-final: " + producerField);
137+
}
138+
139+
KafkaProducerConfig producerConfig = producerField.getAnnotation(KafkaProducerConfig.class);
140+
Properties properties = new Properties();
141+
String bootstrapServers = producerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : producerConfig.bootstrapServers();
142+
if (bootstrapServers.isEmpty())
143+
throw new ExtensionConfigurationException("To use @KafkaProducerConfig on a KafkaProducer a bootstrap server must be " +
144+
"defined in the @KafkaProducerConfig annotation or using the " +
145+
"'org.microshed.kafka.bootstrap.servers' system property");
146+
properties.put("bootstrap.servers", bootstrapServers);
147+
properties.put("key.serializer", producerConfig.keySerializer().getName());
148+
properties.put("value.serializer", producerConfig.valueSerializer().getName());
149+
for (String prop : producerConfig.properties()) {
150+
int split = prop.indexOf("=");
151+
if (split < 2)
152+
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + producerField + " must be in the format 'key=value'");
153+
properties.put(prop.substring(0, split), prop.substring(split + 1));
154+
}
155+
try {
156+
Object producer = KafkaProducer.getConstructor(Properties.class).newInstance(properties);
157+
producerField.set(null, producer);
158+
LOG.debug("Injected kafka producer for " + producerField + " with config " + producerConfig);
159+
} catch (Exception e) {
160+
throw new ExtensionConfigurationException("Unable to inject field " + producerField, e);
161+
}
162+
}
163+
164+
List<Field> kafkaConsumerFields = AnnotationSupport.findAnnotatedFields(clazz, KafkaConsumerConfig.class);
165+
for (Field consumerField : kafkaConsumerFields) {
166+
if (!KafkaConsumer.isAssignableFrom(consumerField.getType())) {
167+
throw new ExtensionConfigurationException("Fields annotated with @KafkaConsumerConfig must be of the type " + KafkaConsumer.getName());
168+
}
169+
if (!Modifier.isPublic(consumerField.getModifiers()) ||
170+
!Modifier.isStatic(consumerField.getModifiers()) ||
171+
Modifier.isFinal(consumerField.getModifiers())) {
172+
throw new ExtensionConfigurationException("The KafkaProducer field annotated with @KafkaConsumerConfig " +
173+
"must be public, static, and non-final: " + consumerField);
174+
}
175+
176+
KafkaConsumerConfig consumerConfig = consumerField.getAnnotation(KafkaConsumerConfig.class);
177+
Properties properties = new Properties();
178+
String bootstrapServers = consumerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : consumerConfig.bootstrapServers();
179+
if (bootstrapServers.isEmpty())
180+
throw new ExtensionConfigurationException("To use @KafkaConsumerConfig on a KafkaConsumer a bootstrap server must be " +
181+
"defined in the @KafkaConsumerConfig annotation or using the " +
182+
"'org.microshed.kafka.bootstrap.servers' system property");
183+
properties.put("bootstrap.servers", bootstrapServers);
184+
properties.put("group.id", consumerConfig.groupId());
185+
properties.put("key.deserializer", consumerConfig.keyDeserializer().getName());
186+
properties.put("value.deserializer", consumerConfig.valueDeserializer().getName());
187+
for (String prop : consumerConfig.properties()) {
188+
int split = prop.indexOf("=");
189+
if (split < 2)
190+
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + consumerField + " must be in the format 'key=value'");
191+
properties.put(prop.substring(0, split), prop.substring(split + 1));
192+
}
193+
try {
194+
Object consumer = KafkaConsumer.getConstructor(Properties.class).newInstance(properties);
195+
consumerField.set(null, consumer);
196+
LOG.debug("Injected kafka consumer for " + consumerField + " with config " + consumerConfig);
197+
if (consumerConfig.topics().length > 0) {
198+
Collection<String> topics = Arrays.asList(consumerConfig.topics());
199+
KafkaConsumer.getMethod("subscribe", Collection.class).invoke(consumer, topics);
200+
LOG.debug("Subscribed kafka consumer for " + consumerField + " to topics " + topics);
201+
}
202+
} catch (Exception e) {
203+
throw new ExtensionConfigurationException("Unable to inject field " + consumerField, e);
109204
}
110205
}
111206
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2020 IBM Corporation and others
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.microshed.testing.kafka;
20+
21+
import java.lang.annotation.ElementType;
22+
import java.lang.annotation.Retention;
23+
import java.lang.annotation.RetentionPolicy;
24+
import java.lang.annotation.Target;
25+
26+
/**
27+
* Identifies an injection point for a <code>org.apache.kafka.clients.consumer.KafkaConsumer</code>
28+
* The annotated field MUST be <code>public static</code> and non-final.
29+
*
30+
* The injected <code>KafkaConsumer</code> will be auto-configured according the values
31+
* in this annotation.
32+
*/
33+
@Target({ ElementType.FIELD })
34+
@Retention(RetentionPolicy.RUNTIME)
35+
public @interface KafkaConsumerConfig {
36+
37+
/**
38+
* @return Sets the <code>bootstrap.servers</code> property for the injected <code>KafkaConsumer</code>.
39+
* Otherwise, the <code>org.microshed.kafka.bootstrap.servers</code> system property is used if set.
40+
* Otherwise, any <code>org.testcontainers.containers.KafkaContainer</code> discovered in the test
41+
* will be used.
42+
* If none of the previous options are discovered, an error is raised.
43+
*/
44+
String bootstrapServers() default "";
45+
46+
/**
47+
* @return Sets the <code>key.deserializer</code> property for the injected <code>KafkaConsumer</code>.
48+
*/
49+
Class<?> keyDeserializer();
50+
51+
/**
52+
* @return Sets the <code>value.deserializer</code> property for the injected <code>KafkaConsumer</code>.
53+
*/
54+
Class<?> valueDeserializer();
55+
56+
/**
57+
* @return Sets the <code>group.id</code> property for the injected <code>KafkaConsumer</code>.
58+
*/
59+
String groupId();
60+
61+
/**
62+
* @return The topics that the injected <code>KafkaConsumer</code> will be automatically subscribed to.
63+
*/
64+
String[] topics() default {};
65+
66+
/**
67+
* @return An optional array of <code>key=value</code> strings, which will be used as configuration options
68+
* in the injected <code>KafkaConsumer</code>.
69+
*/
70+
String[] properties() default {};
71+
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2020 IBM Corporation and others
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.microshed.testing.kafka;
20+
21+
import java.lang.annotation.ElementType;
22+
import java.lang.annotation.Retention;
23+
import java.lang.annotation.RetentionPolicy;
24+
import java.lang.annotation.Target;
25+
26+
/**
27+
* Identifies an injection point for a <code>org.apache.kafka.clients.producer.KafkaProducer</code>
28+
* The annotated field MUST be <code>public static</code> and non-final.
29+
*
30+
* The injected <code>KafkaProducer</code> will be auto-configured according the values
31+
* in this annotation.
32+
*/
33+
@Target({ ElementType.FIELD })
34+
@Retention(RetentionPolicy.RUNTIME)
35+
public @interface KafkaProducerConfig {
36+
37+
/**
38+
* @return Sets the <code>bootstrap.servers</code> property for the injected <code>KafkaProducer</code>.
39+
* Otherwise, the <code>org.microshed.kafka.bootstrap.servers</code> system property is used if set.
40+
* Otherwise, any <code>org.testcontainers.containers.KafkaContainer</code> discovered in the test
41+
* will be used.
42+
* If none of the previous options are discovered, an error is raised.
43+
*/
44+
String bootstrapServers() default "";
45+
46+
/**
47+
* @return Sets the <code>key.serializer</code> property for the injected <code>KafkaProducer</code>.
48+
*/
49+
Class<?> keySerializer();
50+
51+
/**
52+
* @return Sets the <code>value.serializer</code> property for the injected <code>KafkaProducer</code>.
53+
*/
54+
Class<?> valueSerializer();
55+
56+
/**
57+
* @return An optional array of <code>key=value</code> strings, which will be used as configuration options
58+
* in the injected <code>KafkaProducer</code>.
59+
*/
60+
String[] properties() default {};
61+
62+
}

docs/features/Examples.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Sometimes code is worth a thousand words. Here are some pointers to working exam
1515
- [JAX-RS application secured with Basic Auth](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/jaxrs-basicauth)
1616
- [JAX-RS application secured with MP JWT](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/jaxrs-mpjwt)
1717
- [JAX-RS and MongoDB application that depends on an external REST service](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/everything-app)
18+
- [Application using Apache Kafka messaging](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/kafka-app)
1819
- [Application with no Dockerfile using OpenLiberty adapter](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/liberty-app)
1920

2021
## Runtime examples:

docs/features/KafkaMessaging.md

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
---
2+
layout: post
3+
title: "Kafka Messaging"
4+
order: 22
5+
---
6+
7+
MicroShed Testing provides integration with applications using [Apache Kafka](https://kafka.apache.org/) for messaging. Apache Kafka is
8+
a messaging engine that is commonly used with Java microservice applications, and also is commonly used with [MicroProfile Reactive Messaging(https://github.com/eclipse/microprofile-reactive-messaging).
9+
10+
## Sending and receiving messages from tests
11+
12+
If an application purely uses Kafka Messaging for communication, a true-to-production way of testing is to also have the test client driving requests
13+
on the application via message passing. To do this, MicroShed Testing offers two annotations: `@KafkaConsumerConfig` and `@KafkaProducerConfig`
14+
15+
### Example setup
16+
17+
To begin using Kafka with MicroShed Testing, define a `KafkaContainer` in the test environment:
18+
19+
```java
20+
import org.testcontainers.containers.KafkaContainer;
21+
import org.testcontainers.containers.Network;
22+
// other imports ...
23+
24+
public class AppContainerConfig implements SharedContainerConfiguration {
25+
26+
private static Network network = Network.newNetwork();
27+
28+
@Container
29+
public static KafkaContainer kafka = new KafkaContainer()
30+
.withNetwork(network);
31+
32+
@Container
33+
public static ApplicationContainer app = new ApplicationContainer()
34+
.withNetwork(network)
35+
.dependsOn(kafka);
36+
}
37+
```
38+
39+
Runtimes such as OpenLiberty and Quarkus will be auto-configured together if a `KafkaContainer` is present
40+
in the test environment. For Quarkus, no `ApplicationContainer` or `Network` is needed either.
41+
For other runtimes, you can link the containers together by using `kafka.withNetworkAlias("kafka")`
42+
and `app.withEnv("<runtime-specific kafka bootstrap servers property>", "kafka:9092")`.
43+
44+
45+
### Example usage
46+
47+
```java
48+
import org.apache.kafka.clients.consumer.KafkaConsumer;
49+
import org.apache.kafka.clients.producer.KafkaProducer;
50+
import org.apache.kafka.common.serialization.StringDeserializer;
51+
import org.apache.kafka.common.serialization.StringSerializer;
52+
import org.microshed.testing.kafka.KafkaConsumerConfig;
53+
import org.microshed.testing.kafka.KafkaProducerConfig;
54+
// other imports ...
55+
56+
@MicroShedTest
57+
@SharedContainerConfig(AppContainerConfig.class)
58+
public class KitchenEndpointIT {
59+
60+
@KafkaProducerConfig(keySerializer = StringSerializer.class, // (1)
61+
valueSerializer = StringSerializer.class)
62+
public static KafkaProducer<String, String> producer;
63+
64+
@KafkaConsumerConfig(keyDeserializer = StringDeserializer.class,
65+
valueDeserializer = StringDeserializer.class,
66+
groupId = "update-status",
67+
topics = "statusTopic") // (2)
68+
public static KafkaConsumer<String, String> consumer;
69+
70+
@Test
71+
public void myTest() {
72+
// Use the producer to send messages
73+
producer.send(...);
74+
75+
// Use the consumer to poll for records
76+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
77+
// ...
78+
}
79+
}
80+
```
81+
82+
1. Each `@KafkaProducerConfig` and `@KafkaConsumerConfig` must define a set of key/value [de]serializers
83+
that correspond to the key/value types defined in the `KafkaProducer` and `KafkaConsumer`.
84+
2. For `@KafkaConsumerConfig` zero or more `topics` may be specified to automatically subscribe the
85+
injected `consumer` to the specified `topics`.
86+
87+
88+
## Additional resources
89+
90+
- [Example application using Apache Kafka messaging](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/kafka-app)
91+
- [OpenLiberty blog on using MicroProfile Reactive Messaging](https://openliberty.io/blog/2019/09/13/microprofile-reactive-messaging.html)
92+
- [Quarkus guide on using Apache Kafka with Reactive Messaging](https://quarkus.io/guides/kafka)

0 commit comments

Comments
 (0)