diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 00000000..15f55dc3 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,45 @@ +name: build + +on: [push, pull_request] + +jobs: + + tests: + name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests + runs-on: ubuntu-latest + + strategy: + fail-fast: true + matrix: + java: [8] + scala: [2.11.12, 2.12.15] + + steps: + - uses: actions/checkout@v2 + - uses: olafurpg/setup-scala@v10 + with: + java-version: "adopt@1.${{ matrix.java }}" + + - name: Cache SBT Coursier directory + uses: actions/cache@v1 + with: + path: ~/.cache/coursier/v1 + key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }} + restore-keys: | + ${{ runner.os }}-coursier- + - name: Cache SBT directory + uses: actions/cache@v1 + with: + path: ~/.sbt + key: | + ${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }} + restore-keys: ${{ runner.os }}-sbt- + + - name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka10/test + + - name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka11/test + + - name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka1x/test diff --git a/.gitignore b/.gitignore index 1310ac33..de2c6fed 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ project/plugins/project/ .scala_dependencies .worksheet .idea + +.bsp/ diff --git a/build.sbt b/build.sbt index 62bfc348..e9e36e69 100644 --- a/build.sbt +++ b/build.sbt @@ -31,9 +31,8 @@ lazy val warnUnusedImport = Seq( lazy val sharedSettings = warnUnusedImport ++ Seq( organization := "io.monix", - scalaVersion := "2.12.14", - crossScalaVersions := Seq("2.11.12", "2.12.14", "2.13.6"), - + scalaVersion := "2.12.15", + crossScalaVersions := Seq("2.11.12", "2.12.15", "2.13.6"), scalacOptions ++= Seq( // warnings "-unchecked", // able additional warnings where generated code depends on assumptions @@ -84,11 +83,11 @@ lazy val sharedSettings = warnUnusedImport ++ Seq( scalacOptions ++= Seq( // Turns all warnings into errors ;-) // TODO: enable after fixing deprecations for Scala 2.13 - "-Xfatal-warnings", + //"-Xfatal-warnings", // Enables linter options "-Xlint:adapted-args", // warn if an argument list is modified to match the receiver - "-Xlint:nullary-unit", // warn when nullary methods return Unit - "-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f' + //"-Xlint:nullary-unit", // warn when nullary methods return Unit + //"-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f' "-Xlint:infer-any", // warn when a type argument is inferred to be `Any` "-Xlint:missing-interpolator", // a string literal appears to be missing an interpolator id "-Xlint:doc-detached", // a ScalaDoc comment appears to be detached from its element @@ -197,8 +196,10 @@ lazy val commonDependencies = Seq( // For testing ... "ch.qos.logback" % "logback-classic" % "1.2.3" % "test", "org.scalatest" %% "scalatest" % "3.0.9" % "test", - "org.scalacheck" %% "scalacheck" % "1.15.2" % "test" - ) + "org.scalacheck" %% "scalacheck" % "1.15.2" % "test", + "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test force() + ), + dependencyOverrides += "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test ) lazy val monixKafka = project.in(file(".")) @@ -212,10 +213,6 @@ lazy val kafka1x = project.in(file("kafka-1.0.x")) .settings(mimaSettings("monix-kafka-1x")) .settings( name := "monix-kafka-1x", - libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j")) - else Seq.empty[ModuleID] - }, libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.0.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") ) @@ -225,10 +222,6 @@ lazy val kafka11 = project.in(file("kafka-0.11.x")) .settings(mimaSettings("monix-kafka-11")) .settings( name := "monix-kafka-11", - libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j")) - else Seq.empty[ModuleID] - }, libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.3" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") ) @@ -237,12 +230,8 @@ lazy val kafka10 = project.in(file("kafka-0.10.x")) .settings(commonDependencies) .settings(mimaSettings("monix-kafka-10")) .settings( - name := "monix-kafka-10", - libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "0.16.0" % "test" exclude ("log4j", "log4j")) - else Seq.empty[ModuleID] - }, - libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") + libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" force(), + //dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" // exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") ) lazy val kafka9 = project.in(file("kafka-0.9.x")) diff --git a/kafka-0.11.x/src/test/resources/logback.xml b/kafka-0.11.x/src/test/resources/logback.xml index 2beb83c9..5a3e8a64 100644 --- a/kafka-0.11.x/src/test/resources/logback.xml +++ b/kafka-0.11.x/src/test/resources/logback.xml @@ -5,7 +5,7 @@ - + diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index a65eee76..f0877236 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -38,6 +38,7 @@ import scala.util.matching.Regex trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig protected def consumer: Task[Consumer[K, V]] + protected val shouldClose: Boolean /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement @@ -90,7 +91,7 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = { // Forced asynchronous boundary val cancelTask = Task.evalAsync { - consumer.synchronized(blocking(consumer.close())) + if (shouldClose) { consumer.synchronized(blocking(consumer.close())) } } // By applying memoization, we are turning this @@ -113,10 +114,11 @@ object KafkaConsumerObservable { * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def apply[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = - new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer) + new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer, true) /** Builds a [[KafkaConsumerObservable]] instance. * @@ -126,6 +128,7 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { @@ -142,6 +145,7 @@ object KafkaConsumerObservable { * * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { @@ -173,12 +177,13 @@ object KafkaConsumerObservable { * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def manualCommit[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val manualCommitConfig = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false) - new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) + new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer, shouldClose = true) } /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets @@ -202,6 +207,7 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { @@ -231,6 +237,7 @@ object KafkaConsumerObservable { * * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ + @deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8") def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala index 6406c224..296fa5c0 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala @@ -34,7 +34,8 @@ import scala.util.{Failure, Success} */ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumer: Task[Consumer[K, V]], + override protected val shouldClose: Boolean) extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] { /* Based on the [[KafkaConsumerConfig.observableCommitType]] it diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index cb3dc5cf..b16cc794 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -36,7 +36,8 @@ import scala.jdk.CollectionConverters._ */ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumer: Task[Consumer[K, V]], + override protected val shouldClose: Boolean) extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] { // Caching value to save CPU cycles diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala new file mode 100644 index 00000000..41ea84c3 --- /dev/null +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2014-2021 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import cats.effect.Resource +import monix.eval.Task +import monix.kafka.KafkaConsumerObservable.createConsumer +import monix.kafka.config.ObservableCommitOrder +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer} + +import scala.concurrent.blocking +import scala.util.matching.Regex + +/** Acquires and releases a [[KafkaConsumer]] within a [[Resource]] + * is exposed in form of [[KafkaConsumerObservable]], which consumes + * and emits records from the specified topic. + * + * In order to get initialized, it needs a configuration. + * @see the [[KafkaConsumerConfig]] needed and `monix/kafka/default.conf`, + * (in the resource files) that is exposing all default values. + */ +object KafkaConsumerResource { + + + /** A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableAutoCommit]] instance, + * that will be released after it's usage. + * + * @note The consumer will act consequently depending on the + * [[ObservableCommitOrder]] that was chosen from configuration. + * Which can be configured from the key `monix.observable.commit.order`. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param consumer is a factory for the + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka + */ + def apply[K, V]( + cfg: KafkaConsumerConfig, + consumer: Task[Consumer[K, V]]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]]= { + for { + consumer <- Resource.make(consumer){ consumer => + Task.evalAsync(consumer.synchronized{ blocking(consumer.close())}) + } + consumerObservable <- Resource.liftF(Task(new KafkaConsumerObservableAutoCommit[K, V](cfg, Task.pure(consumer), shouldClose = false))) + } yield consumerObservable + } + + /** A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableAutoCommit]] instance, + * that will be released after it's usage. + * + * @note The consumer will act consequently depending on the + * [[ObservableCommitOrder]] that was chosen from configuration. + * Which can be configured from the key `monix.observable.commit.order`. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param topics is the list of Kafka topics to subscribe to. + */ + def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]] = { + val consumer = createConsumer[K, V](cfg, topics) + apply(cfg, consumer) + } + + /** A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableAutoCommit]] instance, + * that will be released after it's usage. + * + * @note The consumer will act consequently depending on the + * [[ObservableCommitOrder]] that was chosen from configuration. + * Which can be configured from the key `monix.observable.commit.order`. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param topicsRegex is the pattern of Kafka topics to subscribe to. + */ + def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]] = { + + val consumer = createConsumer[K, V](cfg, topicsRegex) + apply(cfg, consumer) + } + + /** + * A [[Resource]] that acquires a [[KafkaConsumer]] used + * to build a [[KafkaConsumerObservableManualCommit]] instance, + * which provides the ability to manual commit offsets and + * forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's [[ConsumerRecord]]. + * + * ==Example== + * {{{ + * KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages => + * committableMessages.map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitAsync()) + * .completedL + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * + * @param consumer is a factory for the + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka + */ + def manualCommit[K, V]( + cfg: KafkaConsumerConfig, + consumer: Task[Consumer[K, V]]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = { + for { + consumer <- Resource.make(consumer){ consumer => + Task.evalAsync(consumer.synchronized{ blocking(consumer.close())}) + } + manualCommitConf = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false) + consumerObservable <- Resource.liftF(Task(new KafkaConsumerObservableManualCommit[K, V](manualCommitConf, Task.pure(consumer), shouldClose = false))) + } yield consumerObservable + } + + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + * + * ==Example== + * {{{ + * KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages => + * committableMessages.map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync()) + * .completedL + * } + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * + * @param topics is the list of Kafka topics to subscribe to. + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = { + + val consumer = createConsumer[K, V](cfg, topics) + manualCommit(cfg, consumer) + } + + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + * + * ==Example== + * {{{ + * KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages => + * committableMessages.map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync()) + * .completedL + * } + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * + * @param topicsRegex is the pattern of Kafka topics to subscribe to. + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], + V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = { + + val consumer = createConsumer[K, V](cfg, topicsRegex) + manualCommit(cfg, consumer) + } + +} \ No newline at end of file diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala b/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala index d6f02929..3286847e 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala @@ -46,6 +46,7 @@ sealed trait ObservableCommitOrder extends Serializable { } } + object ObservableCommitOrder { @throws(classOf[BadValue]) @@ -61,7 +62,7 @@ object ObservableCommitOrder { /** Do a `commit` in the Kafka Consumer before * receiving an acknowledgement from downstream. */ - case object BeforeAck extends ObservableCommitOrder { + case object BeforeAck extends ObservableCommitOrder { val id = "before-ack" } diff --git a/kafka-1.0.x/src/test/resources/logback-test.xml b/kafka-1.0.x/src/test/resources/logback-test.xml index cc97f771..157645e0 100644 --- a/kafka-1.0.x/src/test/resources/logback-test.xml +++ b/kafka-1.0.x/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - + diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala new file mode 100644 index 00000000..80dd020c --- /dev/null +++ b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala @@ -0,0 +1,87 @@ +package monix.kafka + +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.{FunSuite, Matchers} +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Failure + +class KafkaConsumerResourceSpec extends FunSuite with KafkaTestKit with ScalaCheckDrivenPropertyChecks with Matchers { + + val consumerConf: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "monix-closeable-consumer-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-closeable-consumer-test" + ) + + test("async commit fails when observable was already cancelled") { + + withRunningKafka { + val count = 11 + val topicName = "monix-closeable-consumer-test" + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerConf, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEvalF(producer.send) + .completedL + + val listT = consumer + .executeOn(io) + .timeoutOnSlowUpstreamTo(6.seconds, Observable.empty) + .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) } + .mapEval(completeBatch => Task.sleep(3.second) >> completeBatch.commitAsync().as(completeBatch.offsets.size)) + .headL + + Task.parZip2(listT, pushT).attempt.map { result => + result.isLeft shouldBe true + result.left.get.getMessage shouldBe "This consumer has already been closed." + } + } + } + + test("consumer resource succeeds to commit even though observable was cancelled") { + + withRunningKafka { + val count = 11 + val topicName = "monix-closeable-consumer-test" + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerResource.manualCommit[String, String](consumerConf, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEvalF(producer.send) + .completedL + + val listT = consumer.use { records => + records.executeOn(io) + .timeoutOnSlowUpstreamTo(6.seconds, Observable.empty) + .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) } + .mapEval(completeBatch => Task.sleep(3.second) >> completeBatch.commitAsync().as(completeBatch.offsets.size)) + .headL + } + + Task.parZip2(listT, pushT).attempt.map { result => + result.isRight shouldBe true + result.map(_._1) shouldBe Right(count) + } + } + } + +}