Skip to content

Commit 5df24a9

Browse files
author
Don Tregonning
committed
default command line values, javadoc work, cleanup
1 parent d9bca73 commit 5df24a9

File tree

6 files changed

+148
-68
lines changed

6 files changed

+148
-68
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77

88
### JAVA ###
99
*.class
10-
data-gen.log
10+
*.log
1111

src/main/java/generator/CommandLineParams.java

+29-17
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,58 @@
33
import org.kohsuke.args4j.Option;
44

55
/**
6-
* Created by dtregonning on 9/13/17.
6+
* CommandLineParams is the class implementation of commandline parameters for the kafka-data-gen application.
7+
*
8+
* @author Don Tregonning (dtregonning)
9+
* @version 1.0
10+
* @since 1.0
711
*/
812
public class CommandLineParams {
9-
@Option(name="-message-count", usage="Sets amount of messages to create")
13+
@Option(name="-message-count", usage="(Required)Sets amount of messages to create and send to kafka topic")
1014
public String messageCount;
1115

12-
@Option(name="-message-size", usage="Sets size of messages to create")
16+
@Option(name="-message-size", usage="(Required)Sets size of messages to create")
1317
public String messageSize;
1418

15-
@Option(name="-eps", usage="Amount of events per second to send to Kafka")
19+
@Option(name="-eps", usage="(Required)Amount of events per second to send to Kafka")
1620
public String eps;
1721

18-
@Option(name="-worker-thread-count", usage="Kafka buffer amount")
22+
@Option(name="-worker-thread-count", usage="Event generating worker threads, default = 4")
1923
public String workerThreadCount;
2024

21-
@Option(name="-topic", usage="Kafka Topic tp send messages to")
25+
@Option(name="-topic", usage="(Required)Kafka Topic to send messages to")
2226
public String topic;
2327

24-
@Option(name="-bootstrap.servers", usage="Kafka Servers to send messages to")
28+
@Option(name="-bootstrap.servers", usage="(Required) Kafka Servers to send messages to")
2529
public String bootStrapServers;
2630

27-
@Option(name="-acks", usage="Acknowledgement Scheme (all, 1, 0)")
31+
@Option(name="-acks", usage="Acknowledgement Scheme (all, 1, 0), default = all")
2832
public String acks;
2933

30-
@Option(name="-kafka-retries", usage="Kafka retries amount")
34+
@Option(name="-kafka-retries", usage="Kafka retries amount, default = 0")
3135
public String retries;
3236

33-
@Option(name="-kafka-batch-size", usage="Kafka batch size amount")
37+
@Option(name="-kafka-batch-size", usage="Kafka batch size amount, default = 1000")
3438
public String kafkaBatchSize;
3539

36-
@Option(name="-kafka-linger", usage="Kafka linger setting(ms)")
40+
@Option(name="-kafka-linger", usage="Kafka linger setting(ms) , default = 1ms")
3741
public String kafkaLingerms;
3842

39-
@Option(name="-kafka-buffer-memory", usage="Kafka buffer amount")
43+
@Option(name="-kafka-buffer-memory", usage="Kafka buffer amount, default = 16384")
4044
public String kafkaBufferMemory;
4145

42-
public void run() {
43-
System.out.println("Command Line Parameters");
44-
System.out.println("- message-count: " + messageCount);
45-
System.out.println("- message-size: " + messageSize);
46-
System.out.println("- topic: " + topic);
46+
public String toString() {
47+
return "[Command Line Parameters]"
48+
+ "{ message-count: " + messageCount
49+
+ ", message-size: " + messageSize
50+
+ ", topic : " + topic
51+
+ ", eps: " + eps
52+
+ ", worker-thread-count: " + workerThreadCount
53+
+ ", bootstrap.servers: " + bootStrapServers
54+
+ ", acks: " + acks
55+
+ ", kafka-retries: " + retries
56+
+ ", kafka-batch-size: " + kafkaBatchSize
57+
+ ", kafka-linger: " + kafkaLingerms
58+
+ ", kafka-buffer-memory: " + kafkaBufferMemory + "}";
4759
}
4860
}

src/main/java/generator/DataGenerator.java

+43-14
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,25 @@
88
import org.kohsuke.args4j.CmdLineParser;
99
import org.kohsuke.args4j.CmdLineException;
1010

11+
/**
12+
* DataGenerator is the main method and entry point for the kafka-data-gen application, The DataGenerator will:
13+
* <ul>
14+
* <li> Read and Parse in command line values</li>
15+
* <li> Set any default values for parameters</li>
16+
* <li> Create all threads needed and loop till finished</li>
17+
* <li> Output metrics on program complete.</li>
18+
* </ul>
19+
*
20+
* @author Don Tregonning (dtregonning)
21+
* @version 1.0
22+
* @since 1.0
23+
*/
1124
public class DataGenerator {
12-
private static Logger logger = LogManager.getLogger(DataGenerator.class);
25+
private static final Logger logger = LogManager.getLogger(DataGenerator.class);
1326

1427
public static void main(String[] args) {
1528
logger.info("Starting Kafka Data Generator");
29+
1630
CommandLineParams params = new CommandLineParams();
1731
CmdLineParser parser = new CmdLineParser(params);
1832

@@ -22,28 +36,40 @@ public static void main(String[] args) {
2236
} catch (CmdLineException e) {
2337
logger.error(e.getMessage());
2438
parser.printUsage(System.err);
39+
System.exit(2);
2540
}
2641

27-
Properties props = new Properties();
28-
props = parseKafkaArguments(params, props);
42+
// Check for required parameters
43+
if(params.bootStrapServers == null || params.topic == null){
44+
logger.error("Missing required commandline parameter - quiting kafka-data-gen - Exit Status 1");
45+
System.exit(1);
46+
}
2947

30-
long startTime = System.currentTimeMillis();
48+
//Set defaults for non required params. And log printout of value default or configured
49+
if(params.workerThreadCount == null) { params.workerThreadCount = "4";}
50+
if(params.eps == null) { params.eps = "0";}
51+
if(params.messageSize == null) { params.messageSize = "256";}
52+
if(params.messageCount == null) { params.messageSize = "10000";}
53+
logger.info(params);
3154

32-
int workers;
33-
if(params.workerThreadCount != null) {
34-
workers = Integer.parseInt(params.workerThreadCount);
35-
}else {
36-
workers = 4;
37-
}
38-
logger.info("Worker count configured to: " + workers);
55+
//Create and configure Kafka Producer variables. Store in Properties object
56+
Properties props = new Properties();
57+
props = parseKafkaArguments(params, props);
3958

4059
EPSToken epsToken = new EPSToken();
60+
61+
/* Thread Implementation.
62+
* RefreshTokenThread will be used to throttle and control message creation. If a token is not available at a
63+
* given time a message cannot be created and sent. By using tokens we can control throughput. This thread will
64+
* refresh tokens. Logic implemented in EPSToken().
65+
* MetricsCalculatorThread is responsible for periodically reporting metrics to the log file.
66+
* WorkThreads - create, bacth and sent events to a Kafka topic.
67+
* */
4168
EPSThread thread_01 = new EPSThread("RefreshTokenThread", epsToken, props, params);
4269
EPSThread thread_02 = new EPSThread("MetricsCalculatorThread", epsToken, props, params);
70+
EPSThread[] epsThreadArray = new EPSThread[Integer.parseInt(params.workerThreadCount)];
4371

44-
45-
46-
EPSThread[] epsThreadArray = new EPSThread[workers];
72+
long startTime = System.currentTimeMillis();
4773

4874
for(int i = 0; i < epsThreadArray.length; i++) {
4975
String threadName = "WorkerThread-" + i;
@@ -60,8 +86,11 @@ public static void main(String[] args) {
6086
}
6187

6288
long elapsedTime = System.currentTimeMillis() - startTime;
89+
90+
// Program Exit statistics
6391
logger.info(epsToken.getMessageKey() + " messages created and sent in " + (elapsedTime / 1000) + " seconds");
6492
logger.info("Events Per Second of " + (long)(epsToken.getMessageKey())/(elapsedTime / 1000));
93+
logger.info("Program run and complete without interruption");
6594
}
6695

6796
public static Properties parseKafkaArguments(CommandLineParams params, Properties props) {

src/main/java/generator/EPSThread.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,19 @@ public void run() {
4646
Thread.sleep(500);
4747
} while(epsTokenObj.getMessageKey() < Integer.parseInt(params.messageCount));
4848
epsTokenObj.toggleFinished();
49-
logger.info("Total Message count reached, cleaning up.");
49+
logger.info("Total Message count reached, cleaning up program for exit.");
5050
}
5151

5252
} else if(thrd.getName().compareTo("MetricsCalculatorThread") == 0) {
5353
do {
5454
Thread.sleep(5000);
55-
metricsCalc.getMetrics();
55+
logger.info("Current Record Send Rate is: " + metricsCalc.getKafkaProducerMetrics("record-send-rate", "producer-metrics"));
5656
} while (epsTokenObj.complete() == false);
5757
}
5858
else {
59-
6059
Producer<String, String> producer = new KafkaProducer<>(props);
61-
metricsCalc.addProducer(producer);
60+
if(!metricsCalc.addProducer(producer)) {logger.warn("Error adding producer for metrics Calculator, Metric Calculations may be incorrect" + thrd.getName()); }
61+
6262
do {
6363
if (epsTokenObj.takeToken()) { shipEvent(producer, epsTokenObj, params); }
6464
} while (epsTokenObj.complete() == false);
@@ -72,7 +72,11 @@ public void run() {
7272
}
7373

7474
public static void shipEvent(Producer<String, String> producer,EPSToken epsTokenObj , CommandLineParams params) {
75-
int sequenceNumber = epsTokenObj.getKey();
75+
int sequenceNumber = epsTokenObj.getMessageKeyAndInc();
76+
77+
//TODO: Smarter Live Logging, hardcoded 10000 value. 10% of total messages?
78+
if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); }
79+
7680
byte[] event = createEvent(params, sequenceNumber);
7781
try {
7882
ProducerRecord<String, String> record = new ProducerRecord<>(params.topic, Integer.toString(sequenceNumber), new String(event));

src/main/java/generator/EPSToken.java

+3-18
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
package generator;
22

3-
import org.apache.logging.log4j.Logger;
4-
import org.apache.logging.log4j.LogManager;
5-
63
class EPSToken {
7-
private static Logger logger = LogManager.getLogger(EPSToken.class);
84
private int tokenCount;
95
private int messageKey;
106
private boolean finished;
@@ -14,9 +10,7 @@ public EPSToken() {
1410
messageKey = 0;
1511
finished = false;
1612
}
17-
public synchronized void increaseTokens(int tokenNumber) {
18-
tokenCount += tokenNumber;
19-
}
13+
public synchronized void increaseTokens(int tokenNumber) { tokenCount += tokenNumber; }
2014

2115
public synchronized boolean takeToken() {
2216
if(tokenCount != 0) {
@@ -26,22 +20,13 @@ public synchronized boolean takeToken() {
2620
return false;
2721
}
2822

29-
public synchronized void toggleFinished() {
30-
finished = !finished;
31-
}
32-
3323
public boolean complete() {
3424
if(getTokenCount() == 0 && getFinished() == true) { return true; }
3525
else {return false; }
3626
}
37-
27+
public synchronized void toggleFinished() { finished = !finished; }
3828
public synchronized int getTokenCount() { return tokenCount; }
39-
public synchronized int getKey() {
40-
if(messageKey % 10000 == 0) {
41-
logger.info(getMessageKey() + " events created and shipped");
42-
}
43-
44-
return ++messageKey; }
29+
public synchronized int getMessageKeyAndInc() { return ++messageKey; }
4530
public synchronized int getMessageKey() { return messageKey; }
4631
public synchronized boolean getFinished() { return finished; }
4732
}
+63-13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
package generator;
2+
23
import org.apache.kafka.clients.producer.*;
34
import org.apache.kafka.common.Metric;
45
import org.apache.kafka.common.MetricName;
@@ -7,35 +8,84 @@
78

89
import java.util.Map;
910

11+
/**
12+
* MetricsCalculator is a class responsible for producing consolidated metric counts from multiple threads and
13+
* returning a string and metric based on the parameter sent through.
14+
15+
*
16+
* @author Don Tregonning (dtregonning)
17+
* @version 1.0
18+
* @since 1.0
19+
*/
1020
public class MetricsCalculator {
11-
Producer<String, String>[] producerArray;
21+
/** Single Log4J Logger for class. */
1222
private static Logger logger = LogManager.getLogger(EPSThread.class);
1323

14-
int arrayIndex = 0;
24+
/** Array of Kafka Producers to track and return metrics for. */
25+
protected static Producer<String, String>[] producerArray;
26+
27+
/** Current index value for the producerArray */
28+
private static int producerArrayIndex = 0;
1529

30+
/**
31+
* Constructor implements the static array producerArray with the correct size based on the configurable
32+
* commandline parameter CommandLineParams.workerThreadCount.
33+
*
34+
* @param workerThreadCount the amount of worker threads configured within kafka-data-gen
35+
36+
* @see Producer
37+
* @since 1.0
38+
*/
1639
public MetricsCalculator(int workerThreadCount) {
1740
producerArray = (Producer<String, String>[])new Producer[workerThreadCount];
1841
}
1942

20-
public synchronized void addProducer(Producer<String, String> producer) {
21-
producerArray[arrayIndex] = producer;
22-
logger.info("Im here");
23-
arrayIndex++;
43+
/**
44+
* addProducer will add a Kafka Producer to the class array producerArray. Adding producers to this
45+
* array is used as a central lookup of the current producers inside of worker threads.
46+
*
47+
* @param producer the amount of worker threads configured within kafka-data-gen
48+
* @return <code>true</code>if the producer is successfully added to array.
49+
* <code>false</code> otherwise.
50+
*
51+
* @since 1.0
52+
*/
53+
public synchronized boolean addProducer(Producer<String, String> producer) {
54+
try {
55+
producerArray[producerArrayIndex] = producer;
56+
producerArrayIndex++;
57+
} catch (Exception Ex) {
58+
logger.error(Ex.toString());
59+
return false;
60+
}
61+
return true;
2462
}
25-
26-
public void getMetrics() {
27-
double recordSendRate = 0;
28-
for(int i = 0; i < arrayIndex; i++){
63+
/**
64+
* method used for fetching metrics from the following
65+
* https://docs.confluent.io/current/kafka/monitoring.html#producer-metrics.
66+
*
67+
*
68+
*
69+
* @param metricName Name of metric to
70+
* @param metricGroup Group of metric
71+
* @return double value consolidatedMetricValue which will bring together metric value from all
72+
* worker threads and return a single value.
73+
*
74+
* @since 1.0
75+
*/
76+
public double getKafkaProducerMetrics(String metricName, String metricGroup) {
77+
double consolidatedMetricValue = 0;
78+
for(int i = 0; i < producerArrayIndex; i++){
2979
Producer<String, String> producer = producerArray[i];
3080
Map<MetricName, ? extends Metric> metrics = producer.metrics();
3181
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
3282
String name = entry.getKey().name();
3383
String group = entry.getKey().group();
34-
if (name.equalsIgnoreCase("record-send-rate") && group.equalsIgnoreCase("producer-metrics")) {
35-
recordSendRate += entry.getValue().value();
84+
if (name.equalsIgnoreCase(metricName) && group.equalsIgnoreCase(metricGroup)) {
85+
consolidatedMetricValue += entry.getValue().value();
3686
}
3787
}
3888
}
39-
System.out.println("Current Record Send Rate is: " + recordSendRate);
89+
return consolidatedMetricValue;
4090
}
4191
}

0 commit comments

Comments
 (0)