From 54e1a978944a2195e9d2069b345dac996d77f1a5 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Mon, 6 May 2024 19:15:40 +0300 Subject: [PATCH] Produce Kafka records at once --- .../Sink.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala index 1a8ecbad1..1b098d521 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala @@ -12,16 +12,13 @@ package com.snowplowanalytics.snowplow.enrich.kafka import java.util.UUID - import cats.Parallel import cats.implicits._ - import cats.effect.kernel.{Async, Resource, Sync} - import fs2.kafka._ - import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import fs2.Chunk object Sink { @@ -40,12 +37,10 @@ object Sink { output match { case k: Output.Kafka => mkProducer(k, authCallbackClass).map { producer => records => - records.parTraverse_ { record => - producer - .produceOne_(toProducerRecord(k.topicName, record)) - .flatten - .void - } + producer + .produce(Chunk.from(records.map(toProducerRecord(k.topicName, _)))) + .flatten + .void } case o => Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"Output $o is not Kafka"))) }