diff --git a/config/config.kafka.extended.hocon b/config/config.kafka.extended.hocon index 78965106f..e22126f54 100644 --- a/config/config.kafka.extended.hocon +++ b/config/config.kafka.extended.hocon @@ -96,8 +96,7 @@ # Number of events that can get enriched at the same time within a chunk "enrich": 256 # Number of chunks that can get sunk at the same time - # WARNING: if greater than 1, records can get checkpointed before they are sunk - "sink": 1 + "sink": 8 } # Optional, period after which enrich assets should be checked for updates diff --git a/config/config.kinesis.extended.hocon b/config/config.kinesis.extended.hocon index 46aa49adc..8980c023f 100644 --- a/config/config.kinesis.extended.hocon +++ b/config/config.kinesis.extended.hocon @@ -221,8 +221,7 @@ # Number of events that can get enriched at the same time within a chunk "enrich": 256 # Number of chunks that can get sunk at the same time - # WARNING: if greater than 1, records can get checkpointed before they are sunk - "sink": 1 + "sink": 8 } # Optional, period after which enrich assets should be checked for updates diff --git a/modules/kafka/src/main/resources/application.conf b/modules/kafka/src/main/resources/application.conf index 5c5897eda..d5d0bb10c 100644 --- a/modules/kafka/src/main/resources/application.conf +++ b/modules/kafka/src/main/resources/application.conf @@ -53,7 +53,7 @@ "concurrency" : { "enrich": 256 - "sink": 1 + "sink": 8 } "remoteAdapters" : { diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala index 1a8ecbad1..1b098d521 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala @@ -12,16 +12,13 @@ package com.snowplowanalytics.snowplow.enrich.kafka import java.util.UUID - import cats.Parallel import cats.implicits._ - import cats.effect.kernel.{Async, Resource, Sync} - import fs2.kafka._ - import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import fs2.Chunk object Sink { @@ -40,12 +37,10 @@ object Sink { output match { case k: Output.Kafka => mkProducer(k, authCallbackClass).map { producer => records => - records.parTraverse_ { record => - producer - .produceOne_(toProducerRecord(k.topicName, record)) - .flatten - .void - } + producer + .produce(Chunk.from(records.map(toProducerRecord(k.topicName, _)))) + .flatten + .void } case o => Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"Output $o is not Kafka"))) } diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index a75006bde..05145dbd2 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -68,7 +68,7 @@ "concurrency" : { "enrich": 256 - "sink": 1 + "sink": 8 } "remoteAdapters" : { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0da492983..cf99f3761 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -71,18 +71,18 @@ object Dependencies { val gcpSdk = "2.36.1" val awsSdk2 = "2.25.24" val kinesisClient2 = "2.4.3" - val kafka = "2.8.2" + val kafka = "3.7.0" val mskAuth = "2.0.3" val nsqClient = "1.3.0" val jackson = "2.16.1" val config = "1.3.4" val decline = "1.0.0" - val fs2 = "3.9.3" - val catsEffect = "3.5.2" + val fs2 = "3.10.2" + val catsEffect = "3.5.4" val fs2PubSub = "0.22.0" val fs2Aws = "4.1.0" - val fs2Kafka = "3.2.0" + val fs2Kafka = "3.5.1" val fs2BlobStorage = "0.9.12" val azureIdentity = "1.11.1" val nimbusJoseJwt = "9.37.2"