Skip to content
This repository has been archived by the owner on May 21, 2020. It is now read-only.

Commit

Permalink
Merge pull request #14 from DiceTechnology/kcl-2-upgrade
Browse files Browse the repository at this point in the history
Kcl 2 upgrade
  • Loading branch information
aserrallerios authored Jan 2, 2019
2 parents f1109ac + 505e3d4 commit 1a9c22d
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 264 deletions.
10 changes: 6 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import sbt._, Keys._
object Dependencies {

val AkkaVersion = sys.env.get("AKKA_SERIES") match {
case Some("2.5") => "2.5.11"
case Some("2.5") => "2.5.19"
case _ => "2.4.20"
}

val AwsSdkVersion = "1.11.311"
val AwsSdkVersion = "2.2.0"

val Common = Seq(
libraryDependencies ++= Seq(
Expand All @@ -21,8 +21,10 @@ object Dependencies {

val Kinesis = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-kinesis" % AwsSdkVersion % Provided, // ApacheV2
"com.amazonaws" % "amazon-kinesis-client" % "1.9.0" % Provided, // Amazon Software License
"software.amazon.awssdk" % "kinesis" % AwsSdkVersion % Provided, // ApacheV2
"software.amazon.awssdk" % "dynamodb" % AwsSdkVersion % Provided, // ApacheV2
"software.amazon.awssdk" % "cloudwatch" % AwsSdkVersion % Provided, // ApacheV2
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.0.5" % Provided, // Amazon Software License
"org.mockito" % "mockito-core" % "2.7.11" % Test // MIT
)
)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.2.4
sbt.version=1.2.6
34 changes: 14 additions & 20 deletions src/main/scala/aserralle/akka/stream/kcl/CommittableRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,43 @@
package aserralle.akka.stream.kcl

import akka.Done
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.clientlibrary.types.{
ExtendedSequenceNumber,
UserRecord
}
import com.amazonaws.services.kinesis.model.Record
import software.amazon.kinesis.lifecycle.ShutdownReason
import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import software.amazon.kinesis.retrieval.KinesisClientRecord
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import scala.concurrent.{ExecutionContext, Future}

class CommittableRecord(
val shardId: String,
val recordProcessorStartingSequenceNumber: ExtendedSequenceNumber,
val millisBehindLatest: Long,
val record: Record,
recordProcessor: IRecordProcessor,
checkpointer: IRecordProcessorCheckpointer
val record: KinesisClientRecord,
recordProcessor: ShardProcessor,
checkpointer: RecordProcessorCheckpointer
)(implicit executor: ExecutionContext) {

val sequenceNumber: String = record.getSequenceNumber
val sequenceNumber: String = record.sequenceNumber()
val subSequenceNumber: Long = record.subSequenceNumber()

def recordProcessorShutdownReason(): Option[ShutdownReason] =
recordProcessor.shutdown

def canBeCheckpointed(): Boolean =
recordProcessorShutdownReason().isEmpty

def tryToCheckpoint(): Future[Done] =
Future {
checkpointer.checkpoint(record)
checkpointer.checkpoint(sequenceNumber, subSequenceNumber)
Done
}

}

object CommittableRecord {

// Only makes sense to compare Records belonging to the same shard
// Records that have been batched by the KCL producer all have the
// Records that have been batched by the KPL producer all have the
// same sequence number but will differ by subsequence number
implicit val orderBySequenceNumber: Ordering[CommittableRecord] =
Ordering[(String, Long)].on(cr
(cr.sequenceNumber, cr.record match {
case ur: UserRecord ur.getSubSequenceNumber
case _ 0
}))

Ordering[(String, Long)].on(cr (cr.sequenceNumber, cr.subSequenceNumber))
}
62 changes: 0 additions & 62 deletions src/main/scala/aserralle/akka/stream/kcl/IRecordProcessor.scala

This file was deleted.

64 changes: 64 additions & 0 deletions src/main/scala/aserralle/akka/stream/kcl/ShardProcessor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (C) 2018 Albert Serrallé
*/

package aserralle.akka.stream.kcl

import software.amazon.kinesis.lifecycle.ShutdownReason
import software.amazon.kinesis.lifecycle.events._
import software.amazon.kinesis.processor.ShardRecordProcessor
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

private[kcl] class ShardProcessor(
callback: CommittableRecord => Unit,
terminateStreamGracePeriod: FiniteDuration
)(implicit executionContext: ExecutionContext)
extends ShardRecordProcessor {

private var shardId: String = _
private var extendedSequenceNumber: ExtendedSequenceNumber = _

var shutdown: Option[ShutdownReason] = None

override def initialize(initializationInput: InitializationInput): Unit = {
shardId = initializationInput.shardId()
extendedSequenceNumber = initializationInput.extendedSequenceNumber()
}

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
processRecordsInput.records().asScala.foreach { record =>
callback(
new CommittableRecord(
shardId,
extendedSequenceNumber,
processRecordsInput.millisBehindLatest(),
record,
recordProcessor = this,
processRecordsInput.checkpointer
)
)
}
}

override def leaseLost(leaseLostInput: LeaseLostInput): Unit = {}

override def shardEnded(shardEndedInput: ShardEndedInput): Unit = {
// We need to checkpoint, but if we do it immediately any records still
// in flight may get lost, so we wait for the grace period
shutdown = Some(ShutdownReason.SHARD_END)
Thread.sleep(terminateStreamGracePeriod.toMillis)
shardEndedInput.checkpointer.checkpoint()
}

override def shutdownRequested(shutdownInput: ShutdownRequestedInput): Unit = {
// We don't checkpoint at this point as we assume the
// standard mechanism will checkpoint when required
shutdown = Some(ShutdownReason.REQUESTED)
Thread.sleep(terminateStreamGracePeriod.toMillis)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import java.util.concurrent.Executor
import akka.NotUsed
import aserralle.akka.stream.kcl.{CommittableRecord, scaladsl, _}
import akka.stream.javadsl.{Flow, Sink, Source}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.model.Record
import software.amazon.kinesis.coordinator.Scheduler
import software.amazon.kinesis.processor.ShardRecordProcessorFactory
import software.amazon.kinesis.retrieval.KinesisClientRecord

import scala.concurrent.ExecutionContext

object KinesisWorkerSource {

abstract class WorkerBuilder {
def build(r: IRecordProcessorFactory): Worker
def build(r: ShardRecordProcessorFactory): Scheduler
}

def create(
workerBuilder: WorkerBuilder,
settings: KinesisWorkerSourceSettings,
workerExecutor: Executor
): Source[CommittableRecord, Worker] =
): Source[CommittableRecord, Scheduler] =
scaladsl.KinesisWorkerSource
.apply(workerBuilder.build, settings)(
ExecutionContext.fromExecutor(workerExecutor))
Expand All @@ -34,17 +34,18 @@ object KinesisWorkerSource {
def create(
workerBuilder: WorkerBuilder,
workerExecutor: Executor
): Source[CommittableRecord, Worker] =
): Source[CommittableRecord, Scheduler] =
create(workerBuilder,
KinesisWorkerSourceSettings.defaultInstance,
workerExecutor)

def checkpointRecordsFlow(
settings: KinesisWorkerCheckpointSettings
): Flow[CommittableRecord, Record, NotUsed] =
): Flow[CommittableRecord, KinesisClientRecord, NotUsed] =
scaladsl.KinesisWorkerSource.checkpointRecordsFlow(settings).asJava

def checkpointRecordsFlow(): Flow[CommittableRecord, Record, NotUsed] =
def checkpointRecordsFlow()
: Flow[CommittableRecord, KinesisClientRecord, NotUsed] =
checkpointRecordsFlow(KinesisWorkerCheckpointSettings.defaultInstance)

def checkpointRecordsSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ import aserralle.akka.stream.kcl.Errors.{
}
import aserralle.akka.stream.kcl.{
CommittableRecord,
IRecordProcessor,
KinesisWorkerCheckpointSettings,
KinesisWorkerSourceSettings
KinesisWorkerSourceSettings,
ShardProcessor
}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.model.Record
import software.amazon.kinesis.coordinator.Scheduler
import software.amazon.kinesis.exceptions.ShutdownException
import software.amazon.kinesis.processor.{
ShardRecordProcessor,
ShardRecordProcessorFactory
}
import software.amazon.kinesis.retrieval.KinesisClientRecord

import scala.collection.immutable
import scala.concurrent.{Await, ExecutionContext, Future}
Expand All @@ -32,11 +36,11 @@ import scala.util.{Failure, Success}
object KinesisWorkerSource {

def apply(
workerBuilder: IRecordProcessorFactory => Worker,
workerBuilder: ShardRecordProcessorFactory => Scheduler,
settings: KinesisWorkerSourceSettings =
KinesisWorkerSourceSettings.defaultInstance
)(implicit workerExecutor: ExecutionContext)
: Source[CommittableRecord, Worker] =
: Source[CommittableRecord, Scheduler] =
Source
.queue[CommittableRecord](settings.bufferSize,
OverflowStrategy.backpressure)
Expand All @@ -45,9 +49,9 @@ object KinesisWorkerSource {
case (queue, watch) =>
val semaphore = new Semaphore(1, true)
val worker = workerBuilder(
new IRecordProcessorFactory {
override def createProcessor(): IRecordProcessor =
new IRecordProcessor(
new ShardRecordProcessorFactory {
override def shardRecordProcessor(): ShardRecordProcessor =
new ShardProcessor(
record => {
semaphore.acquire(1)
(Exception.nonFatalCatch either Await.result(
Expand All @@ -73,7 +77,7 @@ object KinesisWorkerSource {
def checkpointRecordsFlow(
settings: KinesisWorkerCheckpointSettings =
KinesisWorkerCheckpointSettings.defaultInstance
): Flow[CommittableRecord, Record, NotUsed] =
): Flow[CommittableRecord, KinesisClientRecord, NotUsed] =
Flow[CommittableRecord]
.groupBy(MAX_KINESIS_SHARDS, _.shardId)
.groupedWithin(settings.maxBatchSize, settings.maxBatchWait)
Expand All @@ -83,7 +87,7 @@ object KinesisWorkerSource {
val `{` =
b.add(scaladsl.Broadcast[immutable.Seq[CommittableRecord]](2))
val `}` = b.add(Zip[Done, immutable.Seq[CommittableRecord]])
val `=` = b.add(Flow[Record])
val `=` = b.add(Flow[KinesisClientRecord])

`{`.out(0)
.map(_.max)
Expand All @@ -98,7 +102,7 @@ object KinesisWorkerSource {
})
.mergeSubstreams
.withAttributes(ActorAttributes.supervisionStrategy {
case _: com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException =>
case _: ShutdownException =>
Resume
case _ => Stop
})
Expand Down
Loading

0 comments on commit 1a9c22d

Please sign in to comment.