Skip to content

Commit 7bf984a

Browse files
authored
Merge pull request #2973 from rockwotj/kafka-output-perf
kafka: output perf improvement with large batches
2 parents 595c5d2 + c9363cc commit 7bf984a

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

internal/impl/kafka/output_kafka_franz.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -366,24 +366,38 @@ func (f *FranzKafkaWriter) WriteBatch(ctx context.Context, b service.MessageBatc
366366
return service.ErrNotConnected
367367
}
368368

369+
topicExecutor := b.InterpolationExecutor(f.topic)
370+
var keyExecutor *service.MessageBatchInterpolationExecutor
371+
if f.key != nil {
372+
keyExecutor = b.InterpolationExecutor(f.key)
373+
}
374+
var partitionExecutor *service.MessageBatchInterpolationExecutor
375+
if f.partition != nil {
376+
partitionExecutor = b.InterpolationExecutor(f.partition)
377+
}
378+
var timestampExecutor *service.MessageBatchInterpolationExecutor
379+
if f.timestamp != nil {
380+
timestampExecutor = b.InterpolationExecutor(f.timestamp)
381+
}
382+
369383
records := make([]*kgo.Record, 0, len(b))
370384
for i, msg := range b {
371385
var topic string
372-
if topic, err = b.TryInterpolatedString(i, f.topic); err != nil {
386+
if topic, err = topicExecutor.TryString(i); err != nil {
373387
return fmt.Errorf("topic interpolation error: %w", err)
374388
}
375389

376390
record := &kgo.Record{Topic: topic}
377391
if record.Value, err = msg.AsBytes(); err != nil {
378392
return
379393
}
380-
if f.key != nil {
381-
if record.Key, err = b.TryInterpolatedBytes(i, f.key); err != nil {
394+
if keyExecutor != nil {
395+
if record.Key, err = keyExecutor.TryBytes(i); err != nil {
382396
return fmt.Errorf("key interpolation error: %w", err)
383397
}
384398
}
385-
if f.partition != nil {
386-
partStr, err := b.TryInterpolatedString(i, f.partition)
399+
if partitionExecutor != nil {
400+
partStr, err := partitionExecutor.TryString(i)
387401
if err != nil {
388402
return fmt.Errorf("partition interpolation error: %w", err)
389403
}
@@ -400,8 +414,8 @@ func (f *FranzKafkaWriter) WriteBatch(ctx context.Context, b service.MessageBatc
400414
})
401415
return nil
402416
})
403-
if f.timestamp != nil {
404-
if tsStr, err := b.TryInterpolatedString(i, f.timestamp); err != nil {
417+
if timestampExecutor != nil {
418+
if tsStr, err := timestampExecutor.TryString(i); err != nil {
405419
return fmt.Errorf("timestamp interpolation error: %w", err)
406420
} else {
407421
if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil {

internal/impl/kafka/output_sarama_kafka.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -505,17 +505,28 @@ func (k *kafkaWriter) WriteBatch(ctx context.Context, msg service.MessageBatch)
505505
return service.ErrNotConnected
506506
}
507507

508+
topicExecutor := msg.InterpolationExecutor(k.topic)
509+
keyExecutor := msg.InterpolationExecutor(k.key)
510+
var partitionExecutor *service.MessageBatchInterpolationExecutor
511+
if k.partition != nil {
512+
partitionExecutor = msg.InterpolationExecutor(k.partition)
513+
}
514+
var timestampExecutor *service.MessageBatchInterpolationExecutor
515+
if k.timestamp != nil {
516+
timestampExecutor = msg.InterpolationExecutor(k.timestamp)
517+
}
518+
508519
boff := k.backoffCtor()
509520

510521
userDefinedHeaders := k.buildUserDefinedHeaders(k.staticHeaders)
511522
msgs := []*sarama.ProducerMessage{}
512523

513524
for i := 0; i < len(msg); i++ {
514-
key, err := msg.TryInterpolatedBytes(i, k.key)
525+
key, err := keyExecutor.TryBytes(i)
515526
if err != nil {
516527
return fmt.Errorf("key interpolation error: %w", err)
517528
}
518-
topic, err := msg.TryInterpolatedString(i, k.topic)
529+
topic, err := topicExecutor.TryString(i)
519530
if err != nil {
520531
return fmt.Errorf("topic interpolation error: %w", err)
521532
}
@@ -543,8 +554,8 @@ func (k *kafkaWriter) WriteBatch(ctx context.Context, msg service.MessageBatch)
543554
// partitioner. Although samara will (currently) ignore the partition
544555
// field when not using a manual partitioner, we should only set it when
545556
// we explicitly want that.
546-
if k.partition != nil {
547-
partitionString, err := msg.TryInterpolatedString(i, k.partition)
557+
if partitionExecutor != nil {
558+
partitionString, err := partitionExecutor.TryString(i)
548559
if err != nil {
549560
return fmt.Errorf("partition interpolation error: %w", err)
550561
}
@@ -563,8 +574,8 @@ func (k *kafkaWriter) WriteBatch(ctx context.Context, msg service.MessageBatch)
563574
nextMsg.Partition = int32(partitionInt)
564575
}
565576

566-
if k.timestamp != nil {
567-
if tsStr, err := msg.TryInterpolatedString(i, k.timestamp); err != nil {
577+
if timestampExecutor != nil {
578+
if tsStr, err := timestampExecutor.TryString(i); err != nil {
568579
return fmt.Errorf("timestamp interpolation error: %w", err)
569580
} else {
570581
if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil {

0 commit comments

Comments
 (0)