Skip to content
This repository has been archived by the owner on May 21, 2020. It is now read-only.

Commit

Permalink
passing cause to BackpressureTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
CatTail committed Aug 28, 2018
1 parent e5ad806 commit 3edf487
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 10 deletions.
10 changes: 2 additions & 8 deletions src/main/scala/aserralle/akka/stream/kcl/Errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@

package aserralle.akka.stream.kcl

import scala.util.control.NoStackTrace

object Errors {
case class WorkerUnexpectedShutdown(cause: Throwable) extends Throwable(cause)

sealed trait KinesisWorkerSourceError extends NoStackTrace
case class WorkerUnexpectedShutdown(cause: Throwable)
extends KinesisWorkerSourceError

case object BackpressureTimeout extends KinesisWorkerSourceError

case class BackpressureTimeout(cause: Throwable) extends Throwable(cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object KinesisWorkerSource {
(Exception.nonFatalCatch either Await.result(
queue.offer(record),
settings.backpressureTimeout) left)
.foreach(_ => queue.fail(BackpressureTimeout))
.foreach(err => queue.fail(BackpressureTimeout(err)))
semaphore.release()
},
settings.terminateStreamGracePeriod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class KinesisWorkerSourceSourceSpec

Await.ready(watch, 5.seconds)
val Failure(exception) = watch.value.get
assert(exception == BackpressureTimeout)
assert(exception.getCause.getMessage.contains("Futures timed out after [100 milliseconds]"))

killSwitch.shutdown()
}
Expand Down

0 comments on commit 3edf487

Please sign in to comment.