Skip to content
This repository has been archived by the owner on May 21, 2020. It is now read-only.

Commit

Permalink
fix BackpressureTimeout after checkpoint throw ShutdownException (#15)
Browse files Browse the repository at this point in the history
* fix substream exception cause BackpressureTimeout

* update jdk to openjdk8
  • Loading branch information
CatTail authored and aserrallerios committed Aug 22, 2019
1 parent 1a9c22d commit c6fd1d7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ scala:
- 2.11.12
- 2.12.7
jdk:
- oraclejdk8
- openjdk8
script:
- sbt -J-XX:ReservedCodeCacheSize=128m ++$TRAVIS_SCALA_VERSION ";test:compile"
# make 'git branch' work again
Expand Down
13 changes: 10 additions & 3 deletions src/main/scala/aserralle/akka/stream/kcl/CommittableRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package aserralle.akka.stream.kcl

import akka.Done
import software.amazon.kinesis.exceptions.ShutdownException
import software.amazon.kinesis.lifecycle.ShutdownReason
import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import software.amazon.kinesis.retrieval.KinesisClientRecord
Expand All @@ -30,10 +31,16 @@ class CommittableRecord(
def canBeCheckpointed(): Boolean =
recordProcessorShutdownReason().isEmpty

def tryToCheckpoint(): Future[Done] =
def tryToCheckpoint(): Future[Boolean] =
Future {
checkpointer.checkpoint(sequenceNumber, subSequenceNumber)
Done
try {
checkpointer.checkpoint(sequenceNumber, subSequenceNumber)
true
} catch {
case _: ShutdownException =>
false
case exception => throw exception
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,23 @@ object KinesisWorkerSource {

val `{` =
b.add(scaladsl.Broadcast[immutable.Seq[CommittableRecord]](2))
val `}` = b.add(Zip[Done, immutable.Seq[CommittableRecord]])
val `}` = b.add(Zip[Boolean, immutable.Seq[CommittableRecord]])
val `=` = b.add(Flow[KinesisClientRecord])

`{`.out(0)
.map(_.max)
.mapAsync(1)(r =>
if (r.canBeCheckpointed()) r.tryToCheckpoint()
else Future.successful(Done)) ~> `}`.in0
.mapAsync(1)(
r =>
if (r.canBeCheckpointed()) r.tryToCheckpoint()
else Future.successful(true)
) ~> `}`.in0
`{`.out(1) ~> `}`.in1

`}`.out.map(_._2).mapConcat(identity).map(_.record) ~> `=`
`}`.out.filter(_._1).map(_._2).mapConcat(identity).map(_.record) ~> `=`

FlowShape(`{`.in, `=`.out)
})
.mergeSubstreams
.withAttributes(ActorAttributes.supervisionStrategy {
case _: ShutdownException =>
Resume
case _ => Stop
})

def checkpointRecordsSink(
settings: KinesisWorkerCheckpointSettings =
Expand Down

0 comments on commit c6fd1d7

Please sign in to comment.