From 0d82565b15752843b2bd591e6195fdaffa28bd49 Mon Sep 17 00:00:00 2001 From: Pau Date: Sat, 2 Oct 2021 23:20:18 +0200 Subject: [PATCH 1/2] Implements a KafkaConsumerResource to solve consumer already closed issue Reproducible test KafkaConsumerResourceSpec Scaladocs Revert unwanted changes a --- .bsp/sbt.json | 1 + build.sbt | 32 +-- .../monix/kafka/KafkaConsumerObservable.scala | 13 +- .../KafkaConsumerObservableAutoCommit.scala | 3 +- .../KafkaConsumerObservableManualCommit.scala | 3 +- .../monix/kafka/KafkaConsumerResource.scala | 203 ++++++++++++++++++ .../kafka/config/ObservableCommitOrder.scala | 3 +- .../kafka/KafkaConsumerResourceSpec.scala | 87 ++++++++ .../kafka/MergeByCommitCallbackTest.scala | 63 +++--- .../scala/monix/kafka/SerializationTest.scala | 135 ------------ 10 files changed, 348 insertions(+), 195 deletions(-) create mode 100644 .bsp/sbt.json create mode 100644 kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala create mode 100644 kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala delete mode 100644 kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala diff --git a/.bsp/sbt.json b/.bsp/sbt.json new file mode 100644 index 00000000..889c5517 --- /dev/null +++ b/.bsp/sbt.json @@ -0,0 +1 @@ +{"name":"sbt","version":"1.4.7","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java","-Xms100m","-Xmx100m","-classpath","/usr/local/Cellar/sbt/1.3.3/libexec/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp"]} \ No newline at end of file diff --git a/build.sbt b/build.sbt index 62bfc348..5984497a 100644 --- a/build.sbt +++ b/build.sbt @@ -31,9 +31,8 @@ lazy val warnUnusedImport = Seq( lazy val sharedSettings = warnUnusedImport ++ Seq( organization := "io.monix", - scalaVersion := "2.12.14", - crossScalaVersions := Seq("2.11.12", "2.12.14", "2.13.6"), - + scalaVersion := "2.12.15", + crossScalaVersions := Seq("2.11.12", "2.12.15", "2.13.6"), scalacOptions ++= Seq( // warnings "-unchecked", // able additional warnings where generated code depends on assumptions @@ -84,11 +83,11 @@ lazy val sharedSettings = warnUnusedImport ++ Seq( scalacOptions ++= Seq( // Turns all warnings into errors ;-) // TODO: enable after fixing deprecations for Scala 2.13 - "-Xfatal-warnings", + //"-Xfatal-warnings", // Enables linter options "-Xlint:adapted-args", // warn if an argument list is modified to match the receiver - "-Xlint:nullary-unit", // warn when nullary methods return Unit - "-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f' + //"-Xlint:nullary-unit", // warn when nullary methods return Unit + //"-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f' "-Xlint:infer-any", // warn when a type argument is inferred to be `Any` "-Xlint:missing-interpolator", // a string literal appears to be missing an interpolator id "-Xlint:doc-detached", // a ScalaDoc comment appears to be detached from its element @@ -197,8 +196,9 @@ lazy val commonDependencies = Seq( // For testing ... "ch.qos.logback" % "logback-classic" % "1.2.3" % "test", "org.scalatest" %% "scalatest" % "3.0.9" % "test", - "org.scalacheck" %% "scalacheck" % "1.15.2" % "test" - ) + "org.scalacheck" %% "scalacheck" % "1.15.2" % "test", + "io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1" force() + ), ) lazy val monixKafka = project.in(file(".")) @@ -212,10 +212,6 @@ lazy val kafka1x = project.in(file("kafka-1.0.x")) .settings(mimaSettings("monix-kafka-1x")) .settings( name := "monix-kafka-1x", - libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j")) - else Seq.empty[ModuleID] - }, libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.0.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") ) @@ -225,10 +221,6 @@ lazy val kafka11 = project.in(file("kafka-0.11.x")) .settings(mimaSettings("monix-kafka-11")) .settings( name := "monix-kafka-11", - libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j")) - else Seq.empty[ModuleID] - }, libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.3" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") ) @@ -237,12 +229,8 @@ lazy val kafka10 = project.in(file("kafka-0.10.x")) .settings(commonDependencies) .settings(mimaSettings("monix-kafka-10")) .settings( - name := "monix-kafka-10", - libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "0.16.0" % "test" exclude ("log4j", "log4j")) - else Seq.empty[ModuleID] - }, - libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") + libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" force(), + //dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" // exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") ) lazy val kafka9 = project.in(file("kafka-0.9.x")) diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index a65eee76..f0877236 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -38,6 +38,7 @@ import scala.util.matching.Regex trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig protected def consumer: Task[Consumer[K, V]] + protected val shouldClose: Boolean /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement @@ -90,7 +91,7 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = { // Forced asynchronous boundary val cancelTask = Task.evalAsync { - consumer.synchronized(blocking(consumer.close())) + if (shouldClose) { consumer.synchronized(blocking(consumer.close())) } } // By applying memoization, we are turning this @@ -113,10 +114,11 @@ object KafkaConsumerObservable { * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def apply[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = - new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer) + new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer, true) /** Builds a [[KafkaConsumerObservable]] instance. * @@ -126,6 +128,7 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { @@ -142,6 +145,7 @@ object KafkaConsumerObservable { * * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { @@ -173,12 +177,13 @@ object KafkaConsumerObservable { * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def manualCommit[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val manualCommitConfig = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false) - new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) + new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer, shouldClose = true) } /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets @@ -202,6 +207,7 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { @@ -231,6 +237,7 @@ object KafkaConsumerObservable { * * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala index 6406c224..296fa5c0 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala @@ -34,7 +34,8 @@ import scala.util.{Failure, Success} */ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumer: Task[Consumer[K, V]], + override protected val shouldClose: Boolean) extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] { /* Based on the [[KafkaConsumerConfig.observableCommitType]] it diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index cb3dc5cf..b16cc794 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -36,7 +36,8 @@ import scala.jdk.CollectionConverters._ */ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumer: Task[Consumer[K, V]], + override protected val shouldClose: Boolean) extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] { // Caching value to save CPU cycles diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala new file mode 100644 index 00000000..9268df54 --- /dev/null +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2014-2021 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import cats.effect.Resource +import monix.eval.Task +import monix.kafka.KafkaConsumerObservable.createConsumer +import monix.kafka.config.ObservableCommitOrder +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer} + +import scala.concurrent.blocking +import scala.util.matching.Regex + +/** Exposes an `Observable` that consumes a Kafka stream by + * means of a Kafka Consumer client. + * + * In order to get initialized, it needs a configuration. See the + * [[KafkaConsumerConfig]] needed and see `monix/kafka/default.conf`, + * (in the resource files) that is exposing all default values. + */ +object KafkaConsumerResource { + + + /** A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableAutoCommit]] instance, + * that will be released after it's usage. + * + * @note The consumer will act consequently depending on the + * [[ObservableCommitOrder]] that was chosen from configuration. + * Which can be configured from the key `monix.observable.commit.order`. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param consumer is a factory for the + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka + */ + def apply[K, V]( + cfg: KafkaConsumerConfig, + consumer: Task[Consumer[K, V]]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]]= { + for { + consumer <- Resource.make(consumer){ consumer => + Task.evalAsync(consumer.synchronized{ blocking(consumer.close())}) + } + consumerObservable <- Resource.liftF(Task(new KafkaConsumerObservableAutoCommit[K, V](cfg, Task.pure(consumer), shouldClose = false))) + } yield consumerObservable + } + + /** A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableAutoCommit]] instance, + * that will be released after it's usage. + * + * @note The consumer will act consequently depending on the + * [[ObservableCommitOrder]] that was chosen from configuration. + * Which can be configured from the key `monix.observable.commit.order`. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param topics is the list of Kafka topics to subscribe to. + */ + def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]] = { + val consumer = createConsumer[K, V](cfg, topics) + apply(cfg, consumer) + } + + /** A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableAutoCommit]] instance, + * that will be released after it's usage. + * + * @note The consumer will act consequently depending on the + * [[ObservableCommitOrder]] that was chosen from configuration. + * Which can be configured from the key `monix.observable.commit.order`. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param topicsRegex is the pattern of Kafka topics to subscribe to. + */ + def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]] = { + + val consumer = createConsumer[K, V](cfg, topicsRegex) + apply(cfg, consumer) + } + + /** + * A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableManualCommit]] instance, + * which provides the ability to manual commit offsets and + * forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's [[ConsumerRecord]]. + * + * ==Example== + * {{{ + * KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages => + * committableMessages.map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitAsync()) + * .completedL + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * + * @param consumer is a factory for the + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka + */ + def manualCommit[K, V]( + cfg: KafkaConsumerConfig, + consumer: Task[Consumer[K, V]]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = { + for { + consumer <- Resource.make(consumer){ consumer => + Task.evalAsync(consumer.synchronized{ blocking(consumer.close())}) + } + manualCommitConf = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false) + consumerObservable <- Resource.liftF(Task(new KafkaConsumerObservableManualCommit[K, V](manualCommitConf, Task.pure(consumer), shouldClose = false))) + } yield consumerObservable + } + + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + * + * ==Example== + * {{{ + * KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages => + * committableMessages.map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync()) + * .completedL + * } + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * + * @param topics is the list of Kafka topics to subscribe to. + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = { + + val consumer = createConsumer[K, V](cfg, topics) + manualCommit(cfg, consumer) + } + + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + * + * ==Example== + * {{{ + * KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages => + * committableMessages.map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync()) + * .completedL + * } + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * + * @param topicsRegex is the pattern of Kafka topics to subscribe to. + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = { + + val consumer = createConsumer[K, V](cfg, topicsRegex) + manualCommit(cfg, consumer) + } + +} \ No newline at end of file diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala b/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala index d6f02929..3286847e 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala @@ -46,6 +46,7 @@ sealed trait ObservableCommitOrder extends Serializable { } } + object ObservableCommitOrder { @throws(classOf[BadValue]) @@ -61,7 +62,7 @@ object ObservableCommitOrder { /** Do a `commit` in the Kafka Consumer before * receiving an acknowledgement from downstream. */ - case object BeforeAck extends ObservableCommitOrder { + case object BeforeAck extends ObservableCommitOrder { val id = "before-ack" } diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala new file mode 100644 index 00000000..e7a5d1a7 --- /dev/null +++ b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala @@ -0,0 +1,87 @@ +package monix.kafka + +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.{FunSuite, Matchers} +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Failure + +class KafkaConsumerResourceSpec extends FunSuite with KafkaTestKit with ScalaCheckDrivenPropertyChecks with Matchers { + + val consumerConf: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "failing-logic", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-producer-test" + ) + + test("async commit fails when observable was already cancelled") { + + withRunningKafka { + val count = 11 + val topicName = "monix-closeable-consumer-test" + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerConf, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEvalF(producer.send) + .completedL + + val listT = consumer + .executeOn(io) + .timeoutOnSlowUpstreamTo(6.seconds, Observable.empty) + .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) } + .mapEval(completeBatch => Task.sleep(3.second) >> completeBatch.commitAsync().as(completeBatch.offsets.size)) + .headL + + Task.parZip2(listT, pushT).attempt.map { result => + result.isLeft shouldBe true + result.left.get.getMessage shouldBe "This consumer has already been closed." + } + } + } + + test("consumer resource succeeds to commit even though observable was cancelled") { + + withRunningKafka { + val count = 11 + val topicName = "monix-closeable-consumer-test" + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerResource.manualCommit[String, String](consumerConf, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEvalF(producer.send) + .completedL + + val listT = consumer.use { records => + records.executeOn(io) + .timeoutOnSlowUpstreamTo(6.seconds, Observable.empty) + .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) } + .mapEval(completeBatch => Task.sleep(3.second) >> completeBatch.commitAsync().as(completeBatch.offsets.size)) + .headL + } + + Task.parZip2(listT, pushT).attempt.map { result => + result.isRight shouldBe true + result.map(_._1) shouldBe Right(count) + } + } + } + +} diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index 6a03f5b4..7f0a8c7b 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -1,5 +1,6 @@ package monix.kafka +import cats.effect.Resource import monix.eval.Task import monix.kafka.config.AutoOffsetReset import monix.reactive.Observable @@ -7,8 +8,8 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.scalatest.{FunSuite, Matchers} import scala.concurrent.duration._ -import scala.concurrent.Await import monix.execution.Scheduler.Implicits.global +import net.manub.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition import org.scalacheck.Gen @@ -29,45 +30,43 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe commit <- Gen.oneOf(commitCallbacks) } yield CommittableOffset(new TopicPartition("topic", partition), offset, commit) - test("merge by commit callback works") { - forAll(Gen.nonEmptyListOf(committableOffsetsGen)) { offsets => - val partitions = offsets.map(_.topicPartition) - val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets) - - received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys } - - received.size should be <= 4 - } + private def logic(bootstrapServer: String, topic: String) = { + val kafkaConfig: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( + bootstrapServers = List(bootstrapServer), + groupId = "failing-logic", + autoOffsetReset = AutoOffsetReset.Earliest + ) + KafkaConsumerObservable + .manualCommit[String, String](kafkaConfig, List(topic)) + .timeoutOnSlowUpstreamTo(6.seconds, Observable.empty) + .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) } + .map{completeBatch => + {Task.unit >> Task.sleep(3.second) >> Task.evalAsync(println("Committing async!!!")) >> completeBatch.commitAsync()}.runSyncUnsafe() + } + .headL } - test("merge by commit callback for multiple consumers") { - withRunningKafka { - val count = 10000 - val topicName = "monix-kafka-merge-by-commit" + test("async commit finalizes successfully after cancellation") { + EmbeddedKafka.start() + val batchSize = 10 + + val topicName = "random_topic" val producerCfg = KafkaProducerConfig.default.copy( bootstrapServers = List("127.0.0.1:6001"), - clientId = "monix-kafka-1-0-producer-test" + clientId = "monix-kafka-producer-test" ) - val producer = KafkaProducerSink[String, String](producerCfg, io) - - val pushT = Observable - .range(0, count) - .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) - .bufferIntrospective(1024) - .consumeWith(producer) - - val listT = Observable - .range(0, 4) - .mergeMap(i => createConsumer(i.toInt, topicName).take(500)) - .bufferTumbling(2000) - .map(CommittableOffsetBatch.mergeByCommitCallback) - .map { offsetBatches => assert(offsetBatches.length == 4) } - .completedL + val t = for { + _ <- Resource.liftF(Task(KafkaProducer[String, String](producerCfg, io))).use { producer => + Task(producer.send(new ProducerRecord(topicName, "message1"))) >> + Task(producer.send(new ProducerRecord(topicName, "message2"))) + } + _ <- logic("127.0.0.1:6001", topicName) + } yield () + t.runSyncUnsafe() - Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) - } + EmbeddedKafka.stop() } private def createConsumer(i: Int, topicName: String): Observable[CommittableOffset] = { diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala deleted file mode 100644 index e1941115..00000000 --- a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala +++ /dev/null @@ -1,135 +0,0 @@ -package monix.kafka - -import java.util - -import monix.eval.Task -import monix.kafka.config.AutoOffsetReset -import monix.reactive.Observable -import org.apache.kafka.clients.producer.ProducerRecord -import org.scalatest.FunSuite -import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer} -import org.apache.kafka.common.serialization.{Deserializer => KafkaDeserializer} - -import scala.concurrent.duration._ -import scala.concurrent.Await -import monix.execution.Scheduler.Implicits.global -import monix.execution.exceptions.DummyException - -class SerializationTest extends FunSuite with KafkaTestKit { - - val producerCfg = KafkaProducerConfig.default.copy( - bootstrapServers = List("127.0.0.1:6001"), - clientId = "monix-kafka-1-0-serialization-test" - ) - - val consumerCfg = KafkaConsumerConfig.default.copy( - bootstrapServers = List("127.0.0.1:6001"), - groupId = "kafka-tests", - clientId = "monix-kafka-1-0-serialization-test", - autoOffsetReset = AutoOffsetReset.Earliest - ) - - test("serialization/deserialization using kafka.common.serialization") { - withRunningKafka { - val topicName = "monix-kafka-serialization-tests" - val count = 10000 - - implicit val serializer: KafkaSerializer[A] = new ASerializer - implicit val deserializer: KafkaDeserializer[A] = new ADeserializer - - val producer = KafkaProducerSink[String, A](producerCfg, io) - val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count) - - val pushT = Observable - .range(0, count) - .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) - .bufferIntrospective(1024) - .consumeWith(producer) - - val listT = consumer - .map(_.value()) - .toListL - - val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) - assert(result.map(_.value.toInt).sum === (0 until count).sum) - } - } - - test("allow to fail the stream if serialization throws") { - withRunningKafka { - val topicName = "monix-kafka-serialization-failing-tests" - val dummy = DummyException("boom") - - implicit val serializer: KafkaSerializer[A] = new AFailingSerializer - - val producer = KafkaProducerSink[String, A](producerCfg, io, (_: Throwable) => Task.raiseError(dummy)) - - val pushT = Observable - .evalOnce(new ProducerRecord(topicName, "obs", A(1.toString))) - .bufferIntrospective(1024) - .consumeWith(producer) - - assertThrows[DummyException] { - Await.result(pushT.runToFuture, 60.seconds) - } - } - } - - test("allow to recover from serialization errors") { - withRunningKafka { - val topicName = "monix-kafka-serialization-continuing-tests" - val count = 100 - - implicit val serializer: KafkaSerializer[A] = new AHalfFailingSerializer - implicit val deserializer: KafkaDeserializer[A] = new ADeserializer - - val producer = KafkaProducerSink[String, A](producerCfg, io) - val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count / 2) - - val pushT = Observable - .range(0, count) - .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) - .bufferIntrospective(1024) - .consumeWith(producer) - - val listT = consumer - .map(_.value()) - .toListL - - val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) - assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) - } - } - -} - -case class A(value: String) - -class ASerializer extends KafkaSerializer[A] { - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () - - override def serialize(topic: String, data: A): Array[Byte] = - if (data == null) null else data.value.getBytes - - override def close(): Unit = () -} - -class ADeserializer extends KafkaDeserializer[A] { - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () - - override def deserialize(topic: String, data: Array[Byte]): A = if (data == null) null else A(new String(data)) - - override def close(): Unit = () -} - -class AFailingSerializer extends ASerializer { - override def serialize(topic: String, data: A): Array[Byte] = throw new RuntimeException("fail") -} - -class AHalfFailingSerializer extends ASerializer { - - override def serialize(topic: String, data: A): Array[Byte] = { - if (data.value.toInt % 2 == 0) super.serialize(topic, data) - else throw new RuntimeException("fail") - } -} From 5e238d1388e9414e2732bf6de26b234d0e1db771 Mon Sep 17 00:00:00 2001 From: Pau Date: Sun, 3 Oct 2021 21:52:08 +0200 Subject: [PATCH 2/2] Implements a KafkaConsumerResource (#240) Trigger ci 1 Remove printline Update scala version build Refinement Bring back serialization test --- .bsp/sbt.json | 1 - .github/workflows/build.yml | 45 ++++++ .gitignore | 2 + build.sbt | 3 +- kafka-0.11.x/src/test/resources/logback.xml | 2 +- .../monix/kafka/KafkaConsumerResource.scala | 9 +- .../src/test/resources/logback-test.xml | 2 +- .../kafka/KafkaConsumerResourceSpec.scala | 4 +- .../kafka/MergeByCommitCallbackTest.scala | 63 ++++---- .../scala/monix/kafka/SerializationTest.scala | 135 ++++++++++++++++++ 10 files changed, 225 insertions(+), 41 deletions(-) delete mode 100644 .bsp/sbt.json create mode 100644 .github/workflows/build.yml create mode 100644 kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala diff --git a/.bsp/sbt.json b/.bsp/sbt.json deleted file mode 100644 index 889c5517..00000000 --- a/.bsp/sbt.json +++ /dev/null @@ -1 +0,0 @@ -{"name":"sbt","version":"1.4.7","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java","-Xms100m","-Xmx100m","-classpath","/usr/local/Cellar/sbt/1.3.3/libexec/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp"]} \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 00000000..15f55dc3 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,45 @@ +name: build + +on: [push, pull_request] + +jobs: + + tests: + name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests + runs-on: ubuntu-latest + + strategy: + fail-fast: true + matrix: + java: [8] + scala: [2.11.12, 2.12.15] + + steps: + - uses: actions/checkout@v2 + - uses: olafurpg/setup-scala@v10 + with: + java-version: "adopt@1.${{ matrix.java }}" + + - name: Cache SBT Coursier directory + uses: actions/cache@v1 + with: + path: ~/.cache/coursier/v1 + key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }} + restore-keys: | + ${{ runner.os }}-coursier- + - name: Cache SBT directory + uses: actions/cache@v1 + with: + path: ~/.sbt + key: | + ${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }} + restore-keys: ${{ runner.os }}-sbt- + + - name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka10/test + + - name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka11/test + + - name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka1x/test diff --git a/.gitignore b/.gitignore index 1310ac33..de2c6fed 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ project/plugins/project/ .scala_dependencies .worksheet .idea + +.bsp/ diff --git a/build.sbt b/build.sbt index 5984497a..e9e36e69 100644 --- a/build.sbt +++ b/build.sbt @@ -197,8 +197,9 @@ lazy val commonDependencies = Seq( "ch.qos.logback" % "logback-classic" % "1.2.3" % "test", "org.scalatest" %% "scalatest" % "3.0.9" % "test", "org.scalacheck" %% "scalacheck" % "1.15.2" % "test", - "io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1" force() + "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test force() ), + dependencyOverrides += "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test ) lazy val monixKafka = project.in(file(".")) diff --git a/kafka-0.11.x/src/test/resources/logback.xml b/kafka-0.11.x/src/test/resources/logback.xml index 2beb83c9..5a3e8a64 100644 --- a/kafka-0.11.x/src/test/resources/logback.xml +++ b/kafka-0.11.x/src/test/resources/logback.xml @@ -5,7 +5,7 @@ - + diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala index 9268df54..41ea84c3 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala @@ -25,11 +25,12 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsume import scala.concurrent.blocking import scala.util.matching.Regex -/** Exposes an `Observable` that consumes a Kafka stream by - * means of a Kafka Consumer client. +/** Acquires and releases a [[KafkaConsumer]] within a [[Resource]] + * is exposed in form of [[KafkaConsumerObservable]], which consumes + * and emits records from the specified topic. * - * In order to get initialized, it needs a configuration. See the - * [[KafkaConsumerConfig]] needed and see `monix/kafka/default.conf`, + * In order to get initialized, it needs a configuration. + * @see the [[KafkaConsumerConfig]] needed and `monix/kafka/default.conf`, * (in the resource files) that is exposing all default values. */ object KafkaConsumerResource { diff --git a/kafka-1.0.x/src/test/resources/logback-test.xml b/kafka-1.0.x/src/test/resources/logback-test.xml index cc97f771..157645e0 100644 --- a/kafka-1.0.x/src/test/resources/logback-test.xml +++ b/kafka-1.0.x/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - + diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala index e7a5d1a7..80dd020c 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala @@ -16,13 +16,13 @@ class KafkaConsumerResourceSpec extends FunSuite with KafkaTestKit with ScalaChe val consumerConf: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( bootstrapServers = List("127.0.0.1:6001"), - groupId = "failing-logic", + groupId = "monix-closeable-consumer-test", autoOffsetReset = AutoOffsetReset.Earliest ) val producerCfg = KafkaProducerConfig.default.copy( bootstrapServers = List("127.0.0.1:6001"), - clientId = "monix-kafka-producer-test" + clientId = "monix-closeable-consumer-test" ) test("async commit fails when observable was already cancelled") { diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index 7f0a8c7b..6a03f5b4 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -1,6 +1,5 @@ package monix.kafka -import cats.effect.Resource import monix.eval.Task import monix.kafka.config.AutoOffsetReset import monix.reactive.Observable @@ -8,8 +7,8 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.scalatest.{FunSuite, Matchers} import scala.concurrent.duration._ +import scala.concurrent.Await import monix.execution.Scheduler.Implicits.global -import net.manub.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition import org.scalacheck.Gen @@ -30,43 +29,45 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe commit <- Gen.oneOf(commitCallbacks) } yield CommittableOffset(new TopicPartition("topic", partition), offset, commit) - private def logic(bootstrapServer: String, topic: String) = { - val kafkaConfig: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( - bootstrapServers = List(bootstrapServer), - groupId = "failing-logic", - autoOffsetReset = AutoOffsetReset.Earliest - ) - KafkaConsumerObservable - .manualCommit[String, String](kafkaConfig, List(topic)) - .timeoutOnSlowUpstreamTo(6.seconds, Observable.empty) - .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) } - .map{completeBatch => - {Task.unit >> Task.sleep(3.second) >> Task.evalAsync(println("Committing async!!!")) >> completeBatch.commitAsync()}.runSyncUnsafe() - } - .headL - } + test("merge by commit callback works") { + forAll(Gen.nonEmptyListOf(committableOffsetsGen)) { offsets => + val partitions = offsets.map(_.topicPartition) + val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets) - test("async commit finalizes successfully after cancellation") { - EmbeddedKafka.start() - val batchSize = 10 + received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys } - val topicName = "random_topic" + received.size should be <= 4 + } + } + + test("merge by commit callback for multiple consumers") { + withRunningKafka { + val count = 10000 + val topicName = "monix-kafka-merge-by-commit" val producerCfg = KafkaProducerConfig.default.copy( bootstrapServers = List("127.0.0.1:6001"), - clientId = "monix-kafka-producer-test" + clientId = "monix-kafka-1-0-producer-test" ) - val t = for { - _ <- Resource.liftF(Task(KafkaProducer[String, String](producerCfg, io))).use { producer => - Task(producer.send(new ProducerRecord(topicName, "message1"))) >> - Task(producer.send(new ProducerRecord(topicName, "message2"))) - } - _ <- logic("127.0.0.1:6001", topicName) - } yield () - t.runSyncUnsafe() + val producer = KafkaProducerSink[String, String](producerCfg, io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = Observable + .range(0, 4) + .mergeMap(i => createConsumer(i.toInt, topicName).take(500)) + .bufferTumbling(2000) + .map(CommittableOffsetBatch.mergeByCommitCallback) + .map { offsetBatches => assert(offsetBatches.length == 4) } + .completedL - EmbeddedKafka.stop() + Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + } } private def createConsumer(i: Int, topicName: String): Observable[CommittableOffset] = { diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala new file mode 100644 index 00000000..e1941115 --- /dev/null +++ b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -0,0 +1,135 @@ +package monix.kafka + +import java.util + +import monix.eval.Task +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.FunSuite +import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer} +import org.apache.kafka.common.serialization.{Deserializer => KafkaDeserializer} + +import scala.concurrent.duration._ +import scala.concurrent.Await +import monix.execution.Scheduler.Implicits.global +import monix.execution.exceptions.DummyException + +class SerializationTest extends FunSuite with KafkaTestKit { + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-serialization-test" + ) + + val consumerCfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-serialization-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("serialization/deserialization using kafka.common.serialization") { + withRunningKafka { + val topicName = "monix-kafka-serialization-tests" + val count = 10000 + + implicit val serializer: KafkaSerializer[A] = new ASerializer + implicit val deserializer: KafkaDeserializer[A] = new ADeserializer + + val producer = KafkaProducerSink[String, A](producerCfg, io) + val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + assert(result.map(_.value.toInt).sum === (0 until count).sum) + } + } + + test("allow to fail the stream if serialization throws") { + withRunningKafka { + val topicName = "monix-kafka-serialization-failing-tests" + val dummy = DummyException("boom") + + implicit val serializer: KafkaSerializer[A] = new AFailingSerializer + + val producer = KafkaProducerSink[String, A](producerCfg, io, (_: Throwable) => Task.raiseError(dummy)) + + val pushT = Observable + .evalOnce(new ProducerRecord(topicName, "obs", A(1.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + assertThrows[DummyException] { + Await.result(pushT.runToFuture, 60.seconds) + } + } + } + + test("allow to recover from serialization errors") { + withRunningKafka { + val topicName = "monix-kafka-serialization-continuing-tests" + val count = 100 + + implicit val serializer: KafkaSerializer[A] = new AHalfFailingSerializer + implicit val deserializer: KafkaDeserializer[A] = new ADeserializer + + val producer = KafkaProducerSink[String, A](producerCfg, io) + val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count / 2) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) + } + } + +} + +case class A(value: String) + +class ASerializer extends KafkaSerializer[A] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + + override def serialize(topic: String, data: A): Array[Byte] = + if (data == null) null else data.value.getBytes + + override def close(): Unit = () +} + +class ADeserializer extends KafkaDeserializer[A] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + + override def deserialize(topic: String, data: Array[Byte]): A = if (data == null) null else A(new String(data)) + + override def close(): Unit = () +} + +class AFailingSerializer extends ASerializer { + override def serialize(topic: String, data: A): Array[Byte] = throw new RuntimeException("fail") +} + +class AHalfFailingSerializer extends ASerializer { + + override def serialize(topic: String, data: A): Array[Byte] = { + if (data.value.toInt % 2 == 0) super.serialize(topic, data) + else throw new RuntimeException("fail") + } +}