34
34
import org .apache .kafka .common .KafkaFuture ;
35
35
import org .apache .kafka .common .TopicPartition ;
36
36
import org .apache .kafka .common .errors .InterruptException ;
37
+ import org .apache .kafka .common .header .Header ;
37
38
import org .apache .kafka .common .serialization .ByteArrayDeserializer ;
38
39
import org .apache .kafka .common .serialization .StringDeserializer ;
39
40
import org .apache .kafka .common .utils .ThreadUtils ;
@@ -92,10 +93,10 @@ public ConsumerService(String bootstrapServer, Properties properties) {
92
93
* @param config consumer configuration
93
94
* @return the number of consumers created
94
95
*/
95
- public int createConsumers (List <Topic > topics , ConsumersConfig config ) {
96
+ public int createConsumers (List <Topic > topics , ConsumersConfig config , Stats stats ) {
96
97
int count = 0 ;
97
98
for (int g = 0 ; g < config .groupsPerTopic ; g ++) {
98
- Group group = new Group (g , config .consumersPerGroup , topics , config );
99
+ Group group = new Group (g , config .consumersPerGroup , topics , config , stats );
99
100
groups .add (group );
100
101
count += group .consumerCount ();
101
102
}
@@ -104,40 +105,36 @@ public int createConsumers(List<Topic> topics, ConsumersConfig config) {
104
105
105
106
public void start (ConsumerCallback callback , int pollRate ) {
106
107
BlockingBucket bucket = rateLimitBucket (pollRate );
107
- ConsumerCallback callbackWithRateLimit = (tp , p , st ) -> {
108
- callback .messageReceived (tp , p , st );
109
- bucket .consume (1 );
110
- };
111
108
CompletableFuture .allOf (
112
109
groups .stream ()
113
- .map (group -> group .start (callbackWithRateLimit ))
110
+ .map (group -> group .start (callback , bucket ))
114
111
.toArray (CompletableFuture []::new )
115
112
).join ();
116
113
}
117
114
118
115
public void pause () {
119
116
groups .forEach (Group ::pause );
120
117
}
121
-
118
+
122
119
/**
123
120
* Resume all consumer groups
124
121
*/
125
122
public void resume () {
126
123
groups .forEach (Group ::resume );
127
124
}
128
-
125
+
129
126
/**
130
127
* Resume only a percentage of consumer groups
131
- *
128
+ *
132
129
* @param percentage The percentage of consumers to resume (0-100)
133
130
*/
134
131
public void resume (int percentage ) {
135
132
int size = groups .size ();
136
133
int consumersToResume = (int ) Math .ceil (size * (percentage / 100.0 ));
137
134
consumersToResume = Math .max (1 , Math .min (size , consumersToResume )); // Ensure at least 1 and at most size
138
-
135
+
139
136
LOGGER .info ("Resuming {}% of consumers ({} out of {})" , percentage , consumersToResume , size );
140
-
137
+
141
138
for (int i = 0 ; i < consumersToResume ; i ++) {
142
139
groups .get (i ).resume ();
143
140
}
@@ -177,7 +174,7 @@ public void resumeTopics(Collection<String> topics) {
177
174
178
175
/**
179
176
* Reset consumer offsets for catch-up reading.
180
- *
177
+ *
181
178
* @param startMillis The timestamp to start seeking from
182
179
* @param intervalMillis The interval between group starts
183
180
* @param percentage The percentage of consumers to activate (0-100)
@@ -187,21 +184,21 @@ public void resetOffset(long startMillis, long intervalMillis, int percentage) {
187
184
int size = groups .size ();
188
185
int consumersToActivate = (int ) Math .ceil (size * (percentage / 100.0 ));
189
186
consumersToActivate = Math .max (1 , Math .min (size , consumersToActivate )); // Ensure at least 1 and at most size
190
-
187
+
191
188
LOGGER .info ("Activating {}% of consumers ({} out of {})" , percentage , consumersToActivate , size );
192
-
189
+
193
190
for (int i = 0 ; i < consumersToActivate ; i ++) {
194
191
Group group = groups .get (i );
195
192
group .seek (timestamp .getAndAdd (intervalMillis ));
196
193
LOGGER .info ("Reset consumer group offsets: {}/{}" , i + 1 , consumersToActivate );
197
194
}
198
-
195
+
199
196
// Keep the remaining consumers paused
200
197
if (consumersToActivate < size ) {
201
198
LOGGER .info ("Keeping {} consumer groups paused during catch-up" , size - consumersToActivate );
202
199
}
203
200
}
204
-
201
+
205
202
/**
206
203
* Reset all consumer offsets (100% consumers)
207
204
* @param startMillis The timestamp to start seeking from
@@ -238,10 +235,8 @@ public interface ConsumerCallback {
238
235
* Called when a message is received.
239
236
*
240
237
* @param topicPartition the topic partition of the received message
241
- * @param payload the received message payload
242
- * @param sendTimeNanos the time in nanoseconds when the message was sent
243
238
*/
244
- void messageReceived (TopicPartition topicPartition , byte [] payload , long sendTimeNanos ) throws InterruptedException ;
239
+ void messageReceived (TopicPartition topicPartition ) throws InterruptedException ;
245
240
}
246
241
247
242
public static class ConsumersConfig {
@@ -263,22 +258,22 @@ private class Group implements AutoCloseable {
263
258
private final int index ;
264
259
private final Map <Topic , List <Consumer >> consumers = new HashMap <>();
265
260
266
- public Group (int index , int consumersPerGroup , List <Topic > topics , ConsumersConfig config ) {
261
+ public Group (int index , int consumersPerGroup , List <Topic > topics , ConsumersConfig config , Stats stats ) {
267
262
this .index = index ;
268
263
269
264
Properties common = toProperties (config );
270
265
for (Topic topic : topics ) {
271
266
List <Consumer > topicConsumers = new ArrayList <>();
272
267
for (int c = 0 ; c < consumersPerGroup ; c ++) {
273
- Consumer consumer = newConsumer (topic , common );
268
+ Consumer consumer = newConsumer (topic , common , stats );
274
269
topicConsumers .add (consumer );
275
270
}
276
271
consumers .put (topic , topicConsumers );
277
272
}
278
273
}
279
274
280
- public CompletableFuture <Void > start (ConsumerCallback callback ) {
281
- consumers ().forEach (consumer -> consumer .start (callback ));
275
+ public CompletableFuture <Void > start (ConsumerCallback callback , BlockingBucket bucket ) {
276
+ consumers ().forEach (consumer -> consumer .start (callback , bucket ));
282
277
283
278
// wait for all consumers to join the group
284
279
return CompletableFuture .allOf (consumers ()
@@ -336,11 +331,11 @@ private Properties toProperties(ConsumersConfig config) {
336
331
return properties ;
337
332
}
338
333
339
- private Consumer newConsumer (Topic topic , Properties common ) {
334
+ private Consumer newConsumer (Topic topic , Properties common , Stats stats ) {
340
335
Properties properties = new Properties ();
341
336
properties .putAll (common );
342
337
properties .put (ConsumerConfig .GROUP_ID_CONFIG , groupId (topic ));
343
- return new Consumer (properties , topic .name );
338
+ return new Consumer (properties , topic .name , stats );
344
339
}
345
340
346
341
private Stream <Consumer > consumers () {
@@ -369,16 +364,17 @@ private static class Consumer {
369
364
private final CompletableFuture <Void > started = new CompletableFuture <>();
370
365
private boolean paused = false ;
371
366
private volatile boolean closing = false ;
367
+ private final Stats stats ;
372
368
373
- public Consumer (Properties properties , String topic ) {
369
+ public Consumer (Properties properties , String topic , Stats stats ) {
374
370
this .consumer = new KafkaConsumer <>(properties );
375
371
this .executor = Executors .newSingleThreadExecutor (ThreadUtils .createThreadFactory ("perf-consumer" , false ));
376
-
372
+ this . stats = stats ;
377
373
consumer .subscribe (List .of (topic ), subscribeListener ());
378
374
}
379
375
380
- public void start (ConsumerCallback callback ) {
381
- this .task = this .executor .submit (() -> pollRecords (consumer , callback ));
376
+ public void start (ConsumerCallback callback , BlockingBucket bucket ) {
377
+ this .task = this .executor .submit (() -> pollRecords (consumer , callback , bucket ));
382
378
}
383
379
384
380
public CompletableFuture <Void > started () {
@@ -408,18 +404,28 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
408
404
};
409
405
}
410
406
411
- private void pollRecords (KafkaConsumer <String , byte []> consumer , ConsumerCallback callback ) {
407
+ private void pollRecords (KafkaConsumer <String , byte []> consumer , ConsumerCallback callback , BlockingBucket bucket ) {
412
408
while (!closing ) {
413
409
try {
414
410
while (paused ) {
415
411
Thread .sleep (PAUSE_INTERVAL );
416
412
}
417
413
ConsumerRecords <String , byte []> records = consumer .poll (POLL_TIMEOUT );
414
+ int numMessages = records .count ();
415
+ if (numMessages == 0 ) {
416
+ continue ;
417
+ }
418
+ ConsumerRecord <String , byte []> firstRecord = records .iterator ().next ();
419
+ Header header = firstRecord .headers ().lastHeader (HEADER_KEY_SEND_TIME_NANOS );
420
+ long bytes = 0 ;
421
+ long sendTimeNanos = Longs .fromByteArray (header .value ());
418
422
for (ConsumerRecord <String , byte []> record : records ) {
419
- long sendTimeNanos = Longs .fromByteArray (record .headers ().lastHeader (HEADER_KEY_SEND_TIME_NANOS ).value ());
420
423
TopicPartition topicPartition = new TopicPartition (record .topic (), record .partition ());
421
- callback .messageReceived (topicPartition , record .value (), sendTimeNanos );
424
+ bytes += record .value ().length ;
425
+ callback .messageReceived (topicPartition );
422
426
}
427
+ stats .messageReceived (numMessages , bytes , sendTimeNanos );
428
+ bucket .consume (records .count ());
423
429
} catch (InterruptException | InterruptedException e ) {
424
430
// ignore, as we are closing
425
431
} catch (Exception e ) {
0 commit comments