From 3edf48741f87d2f758e681d5e89b78873061ea49 Mon Sep 17 00:00:00 2001 From: Chiyu Zhong Date: Tue, 28 Aug 2018 19:34:35 +0800 Subject: [PATCH] passing cause to BackpressureTimeout --- src/main/scala/aserralle/akka/stream/kcl/Errors.scala | 10 ++-------- .../akka/stream/kcl/scaladsl/KinesisWorkerSource.scala | 2 +- .../stream/kcl/KinesisWorkerSourceSourceSpec.scala | 2 +- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/main/scala/aserralle/akka/stream/kcl/Errors.scala b/src/main/scala/aserralle/akka/stream/kcl/Errors.scala index bec9f92..a9226d3 100644 --- a/src/main/scala/aserralle/akka/stream/kcl/Errors.scala +++ b/src/main/scala/aserralle/akka/stream/kcl/Errors.scala @@ -4,14 +4,8 @@ package aserralle.akka.stream.kcl -import scala.util.control.NoStackTrace - object Errors { + case class WorkerUnexpectedShutdown(cause: Throwable) extends Throwable(cause) - sealed trait KinesisWorkerSourceError extends NoStackTrace - case class WorkerUnexpectedShutdown(cause: Throwable) - extends KinesisWorkerSourceError - - case object BackpressureTimeout extends KinesisWorkerSourceError - + case class BackpressureTimeout(cause: Throwable) extends Throwable(cause) } diff --git a/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala b/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala index cecf093..2a92c6e 100644 --- a/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala +++ b/src/main/scala/aserralle/akka/stream/kcl/scaladsl/KinesisWorkerSource.scala @@ -53,7 +53,7 @@ object KinesisWorkerSource { (Exception.nonFatalCatch either Await.result( queue.offer(record), settings.backpressureTimeout) left) - .foreach(_ => queue.fail(BackpressureTimeout)) + .foreach(err => queue.fail(BackpressureTimeout(err))) semaphore.release() }, settings.terminateStreamGracePeriod diff --git a/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala b/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala index 67443f0..6f183e1 100644 --- a/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala +++ b/src/test/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceSourceSpec.scala @@ -202,7 +202,7 @@ class KinesisWorkerSourceSourceSpec Await.ready(watch, 5.seconds) val Failure(exception) = watch.value.get - assert(exception == BackpressureTimeout) + assert(exception.getCause.getMessage.contains("Futures timed out after [100 milliseconds]")) killSwitch.shutdown() }