Skip to content
This repository has been archived by the owner on May 21, 2020. It is now read-only.

Commit

Permalink
Merge pull request #6 from IMGGaming/fix-backpressure-across-shards
Browse files Browse the repository at this point in the history
Fix backpressure across shards
  • Loading branch information
aserrallerios authored May 19, 2018
2 parents 2d1e8b1 + 59b4f3a commit 8e6a8b9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package aserralle.akka.stream.kcl.scaladsl

import java.util.concurrent.Semaphore

import akka.stream.Supervision.{Resume, Stop}
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Sink, Source, Zip}
Expand Down Expand Up @@ -41,15 +43,19 @@ object KinesisWorkerSource {
.watchTermination()(Keep.both)
.mapMaterializedValue {
case (queue, watch) =>
val semaphore = new Semaphore(1, true)
val worker = workerBuilder(
new IRecordProcessorFactory {
override def createProcessor(): IRecordProcessor =
new IRecordProcessor(
record =>
record => {
semaphore.acquire(1)
(Exception.nonFatalCatch either Await.result(
queue.offer(record),
settings.backpressureTimeout) left)
.foreach(_ => queue.fail(BackpressureTimeout)),
.foreach(_ => queue.fail(BackpressureTimeout))
semaphore.release()
},
settings.terminateStreamGracePeriod
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package aserralle.akka.stream.kcl

import java.nio.ByteBuffer
import java.util.Date
import java.util.concurrent.Semaphore
import aserralle.akka.stream.kcl.Errors.BackpressureTimeout
import java.util.concurrent.{CountDownLatch, Semaphore}

import aserralle.akka.stream.kcl.Errors.BackpressureTimeout
import akka.stream.KillSwitches
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
Expand Down Expand Up @@ -144,6 +144,51 @@ class KinesisWorkerSourceSourceSpec
sinkProbe.expectComplete()
}

"not drop messages in case of back-pressure with multiple shard workers" in new KinesisWorkerContext
with TestData {
recordProcessor.initialize(initializationInput)
recordProcessor2.initialize(initializationInput.withShardId("shard2"))

for (i <- 1 to 5) { // 10 is a buffer size
val record = org.mockito.Mockito.mock(classOf[Record])
when(record.getSequenceNumber).thenReturn(i.toString)
recordProcessor.processRecords(
recordsInput.withRecords(List(record).asJava))
recordProcessor2.processRecords(
recordsInput.withRecords(List(record).asJava))
}

//expect to consume all 10 across both shards
for (_ <- 1 to 10) sinkProbe.requestNext()

// Each shard is assigned its own worker thread, so we get messages
// from each thread simultaneously.
def simulateWorkerThread(rp: v2.IRecordProcessor): Future[Unit] = {
Future {
for (i <- 1 to 25) { // 10 is a buffer size
val record = org.mockito.Mockito.mock(classOf[Record])
when(record.getSequenceNumber).thenReturn(i.toString)
rp.processRecords(recordsInput.withRecords(List(record).asJava))
}
}
}

//send another batch to exceed the queue size - this is shard 1
simulateWorkerThread(recordProcessor)

//send another batch to exceed the queue size - this is shard 2
simulateWorkerThread(recordProcessor2)

//expect to consume all 50 with slow consumer
for (_ <- 1 to 50) {
sinkProbe.requestNext()
Thread.sleep(100)
}

killSwitch.shutdown()
sinkProbe.expectComplete()
}

"stop the stream when back pressure timeout elapsed" in new KinesisWorkerContext(
backpressureTimeout = 100.milliseconds) with TestData {
recordProcessor.initialize(initializationInput)
Expand Down Expand Up @@ -177,9 +222,11 @@ class KinesisWorkerSourceSourceSpec

var recordProcessorFactory: IRecordProcessorFactory = _
var recordProcessor: v2.IRecordProcessor = _
var recordProcessor2: v2.IRecordProcessor = _
val workerBuilder = { x: IRecordProcessorFactory =>
recordProcessorFactory = x
recordProcessor = x.createProcessor()
recordProcessor2 = x.createProcessor()
semaphore.release()
worker
}
Expand Down

0 comments on commit 8e6a8b9

Please sign in to comment.