diff --git a/.travis.yml b/.travis.yml index b8c69d6..c565461 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ scala: - 2.11.12 - 2.12.7 jdk: -- oraclejdk8 +- openjdk8 script: - sbt -J-XX:ReservedCodeCacheSize=128m ++$TRAVIS_SCALA_VERSION ";test:compile" # make 'git branch' work again diff --git a/src/main/scala/aserralle/akka/stream/kcl/CommittableRecord.scala b/src/main/scala/aserralle/akka/stream/kcl/CommittableRecord.scala index 3db0623..251f569 100644 --- a/src/main/scala/aserralle/akka/stream/kcl/CommittableRecord.scala +++ b/src/main/scala/aserralle/akka/stream/kcl/CommittableRecord.scala @@ -5,6 +5,7 @@ package aserralle.akka.stream.kcl import akka.Done +import software.amazon.kinesis.exceptions.ShutdownException import software.amazon.kinesis.lifecycle.ShutdownReason import software.amazon.kinesis.processor.RecordProcessorCheckpointer import software.amazon.kinesis.retrieval.KinesisClientRecord @@ -30,10 +31,16 @@ class CommittableRecord( def canBeCheckpointed(): Boolean = recordProcessorShutdownReason().isEmpty - def tryToCheckpoint(): Future[Done] = + def tryToCheckpoint(): Future[Boolean] = Future { - checkpointer.checkpoint(sequenceNumber, subSequenceNumber) - Done + try { + checkpointer.checkpoint(sequenceNumber, subSequenceNumber) + true + } catch { + case _: ShutdownException => + false + case exception => throw exception + } } } diff --git a/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala b/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala index 4e98693..83fd545 100644 --- a/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala +++ b/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala @@ -86,26 +86,23 @@ object KinesisWorkerSource { val `{` = b.add(scaladsl.Broadcast[immutable.Seq[CommittableRecord]](2)) - val `}` = b.add(Zip[Done, immutable.Seq[CommittableRecord]]) + val `}` = b.add(Zip[Boolean, immutable.Seq[CommittableRecord]]) val `=` = b.add(Flow[KinesisClientRecord]) `{`.out(0) .map(_.max) - .mapAsync(1)(r => - if (r.canBeCheckpointed()) r.tryToCheckpoint() - else Future.successful(Done)) ~> `}`.in0 + .mapAsync(1)( + r => + if (r.canBeCheckpointed()) r.tryToCheckpoint() + else Future.successful(true) + ) ~> `}`.in0 `{`.out(1) ~> `}`.in1 - `}`.out.map(_._2).mapConcat(identity).map(_.record) ~> `=` + `}`.out.filter(_._1).map(_._2).mapConcat(identity).map(_.record) ~> `=` FlowShape(`{`.in, `=`.out) }) .mergeSubstreams - .withAttributes(ActorAttributes.supervisionStrategy { - case _: ShutdownException => - Resume - case _ => Stop - }) def checkpointRecordsSink( settings: KinesisWorkerCheckpointSettings =