@@ -236,9 +236,9 @@ public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
236
236
try {
237
237
Config config = ConfigUtils .propertiesToConfig (state .getProperties ());
238
238
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
239
- .resolveClass (
240
- state . getProp ( GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS ,
241
- DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS )).newInstance ();
239
+ .resolveClass (state . getProp (
240
+ GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS ,
241
+ DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS )).newInstance ();
242
242
243
243
this .kafkaConsumerClient .set (kafkaConsumerClientFactory .create (config ));
244
244
@@ -440,29 +440,35 @@ private int calculateNumMappersForPacker(SourceState state,
440
440
/*
441
441
* This function need to be thread safe since it is called in the Runnable
442
442
*/
443
- private List <WorkUnit > getWorkUnitsForTopic (KafkaTopic topic , SourceState state ,
443
+ public List <WorkUnit > getWorkUnitsForTopic (KafkaTopic topic , SourceState state ,
444
444
Optional <State > topicSpecificState , Optional <Set <Integer >> filteredPartitions ) {
445
445
Timer .Context context = this .metricContext .timer ("isTopicQualifiedTimer" ).time ();
446
446
boolean topicQualified = isTopicQualified (topic );
447
447
context .close ();
448
448
449
- List <WorkUnit > workUnits = Lists .newArrayList ();
450
- List <KafkaPartition > topicPartitions = topic .getPartitions ();
451
- for (KafkaPartition partition : topicPartitions ) {
452
- if (filteredPartitions .isPresent () && !filteredPartitions .get ().contains (partition .getId ())) {
453
- continue ;
454
- }
455
- WorkUnit workUnit = getWorkUnitForTopicPartition (partition , state , topicSpecificState );
456
- if (workUnit != null ) {
457
- // For disqualified topics, for each of its workunits set the high watermark to be the same
458
- // as the low watermark, so that it will be skipped.
459
- if (!topicQualified ) {
460
- skipWorkUnit (workUnit );
461
- }
462
- workUnit .setProp (NUM_TOPIC_PARTITIONS , topicPartitions .size ());
463
- workUnits .add (workUnit );
464
- }
449
+ final List <WorkUnit > workUnits = Lists .newArrayList ();
450
+ final List <KafkaPartition > topicPartitions = topic .getPartitions ();
451
+ Map <KafkaPartition , WorkUnit > workUnitMap ;
452
+
453
+ if (filteredPartitions .isPresent ()) {
454
+ LOG .info ("Filtered partitions for topic {} are {}" , topic .getName (), filteredPartitions .get ());
455
+ final List <KafkaPartition > filteredPartitionsToBeProcessed = topicPartitions .stream ()
456
+ .filter (partition -> filteredPartitions .get ().contains (partition .getId ()))
457
+ .collect (Collectors .toList ());
458
+ workUnitMap = getWorkUnits (filteredPartitionsToBeProcessed , state , topicSpecificState );
459
+ } else {
460
+ workUnitMap = getWorkUnits (topicPartitions , state , topicSpecificState );
461
+ }
462
+
463
+ if (!topicQualified ) {
464
+ workUnitMap .values ().forEach (KafkaSource ::skipWorkUnit );
465
+ }
466
+
467
+ for (WorkUnit workUnit : workUnitMap .values ()) {
468
+ workUnit .setProp (NUM_TOPIC_PARTITIONS , topicPartitions .size ());
469
+ workUnits .add (workUnit );
465
470
}
471
+
466
472
this .partitionsToBeProcessed .addAll (topic .getPartitions ());
467
473
return workUnits ;
468
474
}
@@ -482,20 +488,61 @@ private static void skipWorkUnit(WorkUnit workUnit) {
482
488
workUnit .setProp (ConfigurationKeys .WORK_UNIT_HIGH_WATER_MARK_KEY , workUnit .getLowWaterMark ());
483
489
}
484
490
485
- private WorkUnit getWorkUnitForTopicPartition (KafkaPartition partition , SourceState state ,
491
+ /**
492
+ * Get the workunits of all the partitions passed, this method fetches all the offsets for the partitions
493
+ * at once from kafka, and for each partiton creates a workunit.
494
+ * @param partitions
495
+ * @param state
496
+ * @param topicSpecificState
497
+ * @return
498
+ */
499
+ private Map <KafkaPartition , WorkUnit > getWorkUnits (Collection <KafkaPartition > partitions , SourceState state ,
486
500
Optional <State > topicSpecificState ) {
487
- Offsets offsets = new Offsets ();
488
-
489
- boolean failedToGetKafkaOffsets = false ;
490
-
491
- try (Timer .Context context = this .metricContext .timer (OFFSET_FETCH_TIMER ).time ()) {
492
- offsets .setOffsetFetchEpochTime (System .currentTimeMillis ());
493
- offsets .setEarliestOffset (this .kafkaConsumerClient .get ().getEarliestOffset (partition ));
494
- offsets .setLatestOffset (this .kafkaConsumerClient .get ().getLatestOffset (partition ));
495
- } catch (Throwable t ) {
496
- failedToGetKafkaOffsets = true ;
497
- LOG .error ("Caught error in creating work unit for {}" , partition , t );
501
+ final Map <KafkaPartition , Offsets > partitionOffsetMap = Maps .newHashMap ();
502
+ final Set <KafkaPartition > failedOffsetsGetList = Sets .newHashSet ();
503
+ try (final Timer .Context context = this .metricContext .timer (OFFSET_FETCH_TIMER ).time ()) {
504
+ // Fetch the offsets for all the partitions at once
505
+ final Map <KafkaPartition , Long > earliestOffsetMap = this .kafkaConsumerClient .get ().getEarliestOffsets (partitions );
506
+ final Map <KafkaPartition , Long > latestOffsetMap = this .kafkaConsumerClient .get ().getLatestOffsets (partitions );
507
+ for (KafkaPartition partition : partitions ) {
508
+ final Offsets offsets = new Offsets ();
509
+ offsets .setOffsetFetchEpochTime (System .currentTimeMillis ());
510
+ // Check if both earliest and latest offset are fetched for the partition, then set the offsets
511
+ if (earliestOffsetMap .containsKey (partition ) && latestOffsetMap .containsKey (partition )) {
512
+ offsets .setEarliestOffset (earliestOffsetMap .get (partition ));
513
+ offsets .setLatestOffset (latestOffsetMap .get (partition ));
514
+ offsets .setOffsetFetchEpochTime (System .currentTimeMillis ());
515
+ partitionOffsetMap .put (partition , offsets );
516
+ // If either is not available, put it in the failed offsets list
517
+ } else {
518
+ failedOffsetsGetList .add (partition );
519
+ }
520
+ }
521
+ LOG .info ("Time taken to fetch offset for partitions {} is {} ms" , partitions ,
522
+ TimeUnit .NANOSECONDS .toMillis (context .stop ()));
523
+ } catch (KafkaOffsetRetrievalFailureException e ) {
524
+ // When exception occurred while fetching earliest or latest offset for all the partitions,
525
+ // add all the partitions to fetchOffsetsFailedPartitions
526
+ failedOffsetsGetList .addAll (partitions );
527
+ LOG .error ("Caught error in creating work unit for {}" , partitions , e );
498
528
}
529
+ if (!failedOffsetsGetList .isEmpty ()) {
530
+ LOG .error ("Failed to fetch offsets for partitions {}" , failedOffsetsGetList );
531
+ }
532
+ final Map <KafkaPartition , WorkUnit > workUnitMap = Maps .newHashMap ();
533
+ for (Map .Entry <KafkaPartition , Offsets > partitionOffset : partitionOffsetMap .entrySet ()) {
534
+ WorkUnit workUnit =
535
+ getWorkUnitForTopicPartition (partitionOffset .getKey (), state , topicSpecificState , partitionOffset .getValue (),
536
+ failedOffsetsGetList .contains (partitionOffset .getKey ()));
537
+ if (workUnit != null ) {
538
+ workUnitMap .put (partitionOffset .getKey (), workUnit );
539
+ }
540
+ }
541
+ return workUnitMap ;
542
+ }
543
+
544
+ private WorkUnit getWorkUnitForTopicPartition (KafkaPartition partition , SourceState state ,
545
+ Optional <State > topicSpecificState , Offsets offsets , boolean failedToGetKafkaOffsets ) {
499
546
500
547
long previousOffset = 0 ;
501
548
long previousOffsetFetchEpochTime = 0 ;
0 commit comments