Skip to content

Commit 37cf7df

Browse files
author
Stephen Powis
committed
final prep before release
1 parent e4abfd5 commit 37cf7df

File tree

7 files changed

+125
-44
lines changed

7 files changed

+125
-44
lines changed

dev-cluster/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,20 @@
111111
<skip>true</skip>
112112
</configuration>
113113
</plugin>
114+
115+
<!-- Copy dependencies over into packaged jar -->
116+
<plugin>
117+
<groupId>org.apache.maven.plugins</groupId>
118+
<artifactId>maven-dependency-plugin</artifactId>
119+
<version>3.1.1</version>
120+
<executions>
121+
<execution>
122+
<id>copy-dependencies</id>
123+
<phase>package</phase>
124+
<goals><goal>copy-dependencies</goal></goals>
125+
</execution>
126+
</executions>
127+
</plugin>
114128
</plugins>
115129
</build>
116130

dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import org.apache.commons.cli.Option;
3939
import org.apache.commons.cli.Options;
4040
import org.apache.commons.cli.ParseException;
41+
import org.apache.kafka.clients.admin.TopicDescription;
4142
import org.apache.kafka.clients.consumer.ConsumerRecords;
4243
import org.apache.kafka.clients.consumer.KafkaConsumer;
44+
import org.apache.kafka.common.TopicPartitionInfo;
4345
import org.apache.kafka.common.serialization.StringDeserializer;
4446
import org.slf4j.Logger;
4547
import org.slf4j.LoggerFactory;
@@ -141,12 +143,24 @@ public static void main(final String[] args) throws Exception {
141143
logger.info("Started broker with Id {} at {}", broker.getBrokerId(), broker.getConnectString());
142144
});
143145

146+
if (topicNames != null && topicNames.length > 0) {
147+
final KafkaTestUtils testUtils = new KafkaTestUtils(kafkaTestCluster);
148+
if (cmd.hasOption("consumer")) {
149+
for (final String topicName : topicNames) {
150+
runEndlessConsumer(topicName, testUtils);
151+
}
152+
}
153+
154+
if (cmd.hasOption("producer")) {
155+
for (final String topicName : topicNames) {
156+
runEndlessProducer(topicName, testUtils);
157+
}
158+
}
159+
}
160+
144161
// Log cluster connect string.
145162
logger.info("Cluster started at: {}", kafkaTestCluster.getKafkaConnectString());
146163

147-
//runEndlessConsumer(topicName, utils);
148-
//runEndlessProducer(topicName, partitionsCount, utils);
149-
150164
// Wait forever.
151165
Thread.currentThread().join();
152166
}
@@ -189,6 +203,24 @@ private static CommandLine parseArguments(final String[] args) throws ParseExcep
189203
.build()
190204
);
191205

206+
// Optionally start an endless consumer
207+
options.addOption(Option.builder("consumer")
208+
.desc("Start an endless consumer for each of the defined topics.")
209+
.required(false)
210+
.hasArg(false)
211+
.type(Boolean.class)
212+
.build()
213+
);
214+
215+
// Optionally start an endless consumer
216+
options.addOption(Option.builder("producer")
217+
.desc("Start an endless producer for each of the defined topics.")
218+
.required(false)
219+
.hasArg(false)
220+
.type(Boolean.class)
221+
.build()
222+
);
223+
192224
try {
193225
final CommandLineParser parser = new DefaultParser();
194226
return parser.parse(options, args);
@@ -206,20 +238,20 @@ private static CommandLine parseArguments(final String[] args) throws ParseExcep
206238
/**
207239
* Fire up a new thread running an endless producer script into the given topic and partitions.
208240
* @param topicName Name of the topic to produce records into.
209-
* @param partitionCount number of partitions that exist on that topic.
210241
* @param utils KafkaUtils instance.
211242
*/
212243
private static void runEndlessProducer(
213244
final String topicName,
214-
final int partitionCount,
215245
final KafkaTestUtils utils
216246
) {
217247
final Thread producerThread = new Thread(() -> {
218-
do {
248+
// Determine how many partitions there are
249+
final TopicDescription topicDescription = utils.describeTopic(topicName);
219250

220-
// Publish some data into that topic
221-
for (int partition = 0; partition < partitionCount; partition++) {
222-
utils.produceRecords(1000, topicName, partition);
251+
do {
252+
// Publish some data into that topic for each partition.
253+
for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
254+
utils.produceRecords(1000, topicName, partitionInfo.partition());
223255
try {
224256
Thread.sleep(1000L);
225257
} catch (InterruptedException e) {
@@ -233,8 +265,10 @@ private static void runEndlessProducer(
233265
}
234266
} while (true);
235267
});
236-
producerThread.start();
237268

269+
logger.info("Starting endless producer for topic {}", topicName);
270+
producerThread.setName("Endless producer for topic " + topicName);
271+
producerThread.start();
238272
}
239273

240274
/**
@@ -270,6 +304,9 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt
270304
return;
271305
}
272306
});
307+
308+
logger.info("Starting endless consumer for topic {}", topicName);
309+
consumerThread.setName("Endless consumer for topic " + topicName);
273310
consumerThread.start();
274311
}
275312
}

dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/LdapServer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ public class LdapServer {
4242
* @param args command line arguments (none).
4343
*/
4444
public static void main(final String[] args) throws LDAPException, InterruptedException {
45+
final String baseDn = "dc=springframework,dc=org";
46+
4547
// Create a base configuration for the server.
4648
final InMemoryDirectoryServerConfig config = new InMemoryDirectoryServerConfig(
47-
"dc=springframework,dc=org"
49+
baseDn
4850
);
4951

5052
// Create new listener
@@ -66,9 +68,10 @@ public static void main(final String[] args) throws LDAPException, InterruptedEx
6668
server.startListening();
6769

6870
logger.info(
69-
"Server running at: {}:{}",
71+
"Server running at: ldap://{}:{}/{}",
7072
server.getConnection("LDAP").getConnectedAddress(),
71-
server.getConnection("LDAP").getConnectedPort()
73+
server.getConnection("LDAP").getConnectedPort(),
74+
baseDn
7275
);
7376

7477
Thread.currentThread().join();

dev-cluster/startDevCluster.sh

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/bin/bash
2+
3+
set -e
4+
5+
TRUST_STORE_FILE="src/main/resources/kafka.truststore.jks"
6+
KEY_STORE_FILE="src/main/resources/kafka.keystore.jks"
7+
JAR_FILE="target/dev-cluster-2.1.0.jar"
8+
JAAS_FILE="src/main/resources/jaas.conf"
9+
10+
## Change to local directory
11+
cd "${0%/*}"
12+
13+
## Ensure certificates exist
14+
if [ ! -f $KEY_STORE_FILE ] || [ ! -f $TRUST_STORE_FILE ]; then
15+
echo "Key store files do no exist...generating now...";
16+
## Build certificates
17+
././../generateDummySslCertificates.sh
18+
fi
19+
20+
## If jar doesn't exit, build it.
21+
if [ ! -f $JAR_FILE ]; then
22+
echo "Building package...";
23+
mvn clean package -DskipTests=true -DskipLicenseCheck=true
24+
fi
25+
26+
## Execute.
27+
echo "starting..."
28+
java -Djava.security.auth.login.config=$JAAS_FILE -cp "${JAR_FILE}:target/dependency/*" org.sourcelab.kafka.devcluster.DevCluster "$@"

kafka-webview-ui/src/assembly/distribution/config.yml

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,40 +27,41 @@ app:
2727
## Require user authentication
2828
## Setting to false will disable login requirement.
2929
enabled: true
30-
## Optional: if you want to use LDAP for user authentication instead of locally defined users.
31-
ldap:
32-
## Disabled by default.
33-
enabled: false
3430

35-
## Example values defined below, adjust as needed.
36-
## How to find user records
37-
userDnPattern: "uid={0},ou=people"
31+
## Optional: if you want to use LDAP for user authentication instead of locally defined users.
32+
ldap:
33+
## Disabled by default.
34+
enabled: false
3835

39-
## The attribute in which the password is stored.
40-
passwordAttribute: "userPassword"
36+
## URL/Hostname for your LDAP server
37+
url: "ldap://localhost:8389/dc=example,dc=org"
4138

42-
## Where to find user group membership
43-
groupSearchBase: "ou=groups"
44-
groupRoleAttribute: "cn"
39+
## Example values defined below, adjust as needed.
40+
## How to find user records
41+
userDnPattern: "uid={0},ou=people"
4542

46-
## How passwords are validated, must implement PasswordEncoder interface
47-
passwordEncoderClass: "org.springframework.security.crypto.password.LdapShaPasswordEncoder"
43+
## The attribute in which the password is stored.
44+
passwordAttribute: "userPassword"
4845

49-
## Comma separated list of groups. A user which is a member of this group will be granted
50-
## administrator access to Kafka WebView.
51-
adminGroups: "ADMINGROUP1,ADMINGROUP2"
46+
## Where to find user group membership
47+
groupSearchBase: "ou=groups"
48+
groupRoleAttribute: "cn"
5249

53-
## Comma separated list of groups. A user which is a member of this group will be granted
54-
## standard user level access to Kafka WebView.
55-
userGroups: "USERGROUP1,USERGROUP2"
50+
## How passwords are validated, must implement PasswordEncoder interface
51+
passwordEncoderClass: "org.springframework.security.crypto.password.LdapShaPasswordEncoder"
5652

57-
## Any user who is not a member of at least one of the above groups will be denied access
58-
## to Kafka WebView.
53+
## Comma separated list of groups. A user which is a member of this group will be granted
54+
## administrator access to Kafka WebView.
55+
adminGroups: "ADMINGROUP1,ADMINGROUP2"
5956

60-
## URL/Hostname for your LDAP server
61-
url: "ldap://localhost:8389/dc=example,dc=org"
57+
## Comma separated list of groups. A user which is a member of this group will be granted
58+
## standard user level access to Kafka WebView.
59+
userGroups: "USERGROUP1,USERGROUP2"
6260

63-
## If LDAP does not allow anonymous access, define the user/password to connect using.
64-
## If not required, leave both fields empty
65-
bindUser: "cn=ManagementUser"
66-
bindUserPassword: "password-here"
61+
## Any user who is not a member of at least one of the above groups will be denied access
62+
## to Kafka WebView.
63+
64+
## If LDAP does not allow anonymous access, define the user/password to connect using.
65+
## If not required, leave both fields empty
66+
bindUser: "cn=ManagementUser"
67+
bindUserPassword: "password-here"

kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,6 @@
309309
} else {
310310
consumerPositions.push(0);
311311
}
312-
//tailPositions.push(offsetData.tail);
313-
//consumerPositions.push(offsetData.offset);
314312
});
315313

316314
// Push data into graph sources

kafka-webview-ui/src/main/resources/templates/configuration/cluster/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
</thead>
3838
<tbody>
3939
<tr th:if="${clusterList.isEmpty()}" align="center">
40-
<td colspan="5">
40+
<td colspan="6">
4141
No clusters found!
4242
</td>
4343
</tr>

0 commit comments

Comments
 (0)