diff --git a/README.md b/README.md index 808d497..71d0a19 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ For more information about KCL please visit the [official documentation](http:// aserrallerios kcl-akka-stream_2.11 - 0.3 + 0.4 pom ``` @@ -47,22 +47,23 @@ val workerSourceSettings = KinesisWorkerSourceSettings( bufferSize = 1000, terminateStreamGracePeriod = 1 minute, backpressureTimeout = 1 minute) - val builder: IRecordProcessorFactory => Worker = { recordProcessorFactory => - new Worker.Builder() - .recordProcessorFactory(recordProcessorFactory) - .config( - new KinesisClientLibConfiguration( - "myApp", - "myStreamName", - DefaultAWSCredentialsProviderChain.getInstance(), - s"${ - import scala.sys.process._ - "hostname".!!.trim() - }:${java.util.UUID.randomUUID()}" - ) + +val builder: IRecordProcessorFactory => Worker = { recordProcessorFactory => + new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config( + new KinesisClientLibConfiguration( + "myApp", + "myStreamName", + DefaultAWSCredentialsProviderChain.getInstance(), + s"${ + import scala.sys.process._ + "hostname".!!.trim() + }:${java.util.UUID.randomUUID()}" ) - .build() - } + ) + .build() +} ``` The Source also needs an `ExecutionContext` to run the Worker's thread and to commit/checkpoint records. Then the Source can be created as usual: @@ -70,6 +71,7 @@ The Source also needs an `ExecutionContext` to run the Worker's thread and to co ```scala implicit val _ = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1000)) + KinesisWorkerSource(builder, workerSourceSettings).to(Sink.ignore) ```