diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala index 1a8ecbad1..1b098d521 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala @@ -12,16 +12,13 @@ package com.snowplowanalytics.snowplow.enrich.kafka import java.util.UUID - import cats.Parallel import cats.implicits._ - import cats.effect.kernel.{Async, Resource, Sync} - import fs2.kafka._ - import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import fs2.Chunk object Sink { @@ -40,12 +37,10 @@ object Sink { output match { case k: Output.Kafka => mkProducer(k, authCallbackClass).map { producer => records => - records.parTraverse_ { record => - producer - .produceOne_(toProducerRecord(k.topicName, record)) - .flatten - .void - } + producer + .produce(Chunk.from(records.map(toProducerRecord(k.topicName, _)))) + .flatten + .void } case o => Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"Output $o is not Kafka"))) }