diff --git a/src/main/scala/aserralle/akka/stream/kcl/Errors.scala b/src/main/scala/aserralle/akka/stream/kcl/Errors.scala index bec9f92..a9226d3 100644 --- a/src/main/scala/aserralle/akka/stream/kcl/Errors.scala +++ b/src/main/scala/aserralle/akka/stream/kcl/Errors.scala @@ -4,14 +4,8 @@ package aserralle.akka.stream.kcl -import scala.util.control.NoStackTrace - object Errors { + case class WorkerUnexpectedShutdown(cause: Throwable) extends Throwable(cause) - sealed trait KinesisWorkerSourceError extends NoStackTrace - case class WorkerUnexpectedShutdown(cause: Throwable) - extends KinesisWorkerSourceError - - case object BackpressureTimeout extends KinesisWorkerSourceError - + case class BackpressureTimeout(cause: Throwable) extends Throwable(cause) } diff --git a/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala b/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala index cecf093..2a92c6e 100644 --- a/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala +++ b/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala @@ -53,7 +53,7 @@ object KinesisWorkerSource { (Exception.nonFatalCatch either Await.result( queue.offer(record), settings.backpressureTimeout) left) - .foreach(_ => queue.fail(BackpressureTimeout)) + .foreach(err => queue.fail(BackpressureTimeout(err))) semaphore.release() }, settings.terminateStreamGracePeriod diff --git a/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala b/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala index 67443f0..6f183e1 100644 --- a/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala +++ b/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala @@ -202,7 +202,7 @@ class KinesisWorkerSourceSourceSpec Await.ready(watch, 5.seconds) val Failure(exception) = watch.value.get - assert(exception == BackpressureTimeout) + assert(exception.getCause.getMessage.contains("Futures timed out after [100 milliseconds]")) killSwitch.shutdown() }