Skip to content
This repository has been archived by the owner on May 21, 2020. It is now read-only.

Record processor doesn't checkpoint when shard has terminated #9

Open
ewestern opened this issue Sep 27, 2018 · 2 comments
Open

Record processor doesn't checkpoint when shard has terminated #9

ewestern opened this issue Sep 27, 2018 · 2 comments

Comments

@ewestern
Copy link

I think the checkpointer should be called here, without arguments:
https://github.com/aserrallerios/kcl-akka-stream/blob/master/src/main/scala/aserralle/akka/stream/kcl/IRecordProcessor.scala#L56

As indicated in the docs, this tells the application that all records on the shard have been processed and it is safe to mark as such in DynamoDB.

For a split or merge operation, the KCL won't start processing the new shards until the processors for the original shards have called checkpoint to signal that all processing on the original shards is complete.

https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-java.html

The problem is that, in my case, a shard split caused the end of each shard to be reached. But when shutdown is called without a final checkpoint, KCL throws the following error and doesn't let the application progress:

Sep 27 20:48:38 ip-172-31-81-83 java[12490]: 20:48:38.826 ERROR [RecordProcessor-0057] c.a.s.k.c.lib.worker.ShutdownTask - Application exception.
Sep 27 20:48:38 ip-172-31-81-83 java[12490]: java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000130

Thanks for the help.

@aserrallerios
Copy link
Owner

Have you tried rising the terminateStreamGracePeriod parameter?

This parameter blocks the consumption of new records and gives the user time to checkpoint everything before consume/checkpoint records of the new shard.

Note that terminateStreamGracePeriod needs to be sensibly higher than maxBatchWait if you're using the provided checkpointer Flow.

@aserrallerios
Copy link
Owner

aserrallerios commented Sep 28, 2018

If we checkpoint here

https://github.com/aserrallerios/kcl-akka-stream/blob/master/src/main/scala/aserralle/akka/stream/kcl/IRecordProcessor.scala#L56

we cannot guarantee that the latest records of the shutting-down shard have been properly consumed by the user's stream.

The KCL worker is synchronous while Akka Streams works asynchronously.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants