Skip to content

Add DynamoDB item Time to Live #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,80 @@ The table to create for snapshot storage has the schema:

The Dynamodb item of a snapshot [can be 400 kB](http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html#limits-items). Using a binary serialisation format like ProtoBuf or Kryo will use that space most effectively.


DynamoDB Time To Live
---------------------
DynamoDB offers a Time to Live option to remove items after a delay. Once the date specified in the item passed today, the item is **eventually** removed from the table.

In DynamoDB, this feature can be activated in a per-table basis by specifying the name of the field containing the expiring date.

Expiring items is available for journal and snapshot tables. In order to activate it, `dynamodb-item-ttl-config.field-name` and `dynamodb-item-ttl-config.ttl` need to be specified. For example, given a system with a snapshot interval of 10 days. The TTLs could be configured:
~~~
// Journal
my-dynamodb-journal {
// ...
dynamodb-item-ttl-config {
field-name = "expiresAt" // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/time-to-live-ttl-how-to.html
ttl = 30d
}
// ...
}

// Snapshot
my-dynamodb-snapshot-store {
// ...
dynamodb-item-ttl-config {
field-name = "expiresAt" // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/time-to-live-ttl-how-to.html
ttl = 40d
}
// ...
}
~~~

### To be aware if you are using TTL
Before enabling the TTL feature, make sure to have an understanding of the potential impacts detailed below.

#### Don't rely on the TTL for business logic related to the expiration
As per specified in the [AWS DynamoDB configuration](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice description of the design considerations required!

> Items that have expired, but haven’t yet been deleted by TTL, still appear in reads, queries, and scans. If you do not want expired items in the result set, you must filter them out.

For the moment, the implementation of the TTL in this library does not filter items out on read from database.

Similarly,
> TTL typically deletes expired items within 48 hours of expiration.

As a consequence, you should not rely on this feature for business logic related to the expiration. The journal events and snapshots could still be returned after being expired but not yet deleted.

#### Be careful about potential inconsistencies around the time of the expiring time
This sections explains the relationship between snapshot TTL, journal TTL and snapshot interval.

TLDR:
- a different TTL need to be used for journal and snapshot. The snapshot TTL needs to be at least 48hrs lower than the journal TTL.
- the snapshot interval needs to be much less than the snapshot TTL.


In DynamoDB, the TTL is implemented by a per-partition background scan. This means that items are not actually deleted in global TTL order but only per-partition. Which may result in odd recover behavior.

Suppose the log (in insert time order) for a persistent entity is (E - event, S - snapshot):
```
E0, E1, E2, S0, E3, E4, S1.
```
As the Es may be on multiple partitions the TTL expire may result in:
```
E0, E2, S0, E3, E4, S1.
```

Note that `E1` was expired prior to `E0`. Which is entirely possible given the description from Amazon on how TTL is implemented.

As a consequence, configuring the TTLs requires careful consideration. For an entity to be recovered from the persisted snapshot and journal events, the snapshot interval must be less than snapshot TTL.

Further, to ensure the useful property: The entity can be recovered by replaying the events since any persisted snapshot. The journal TTL should be larger than the snapshot TTL. At least by 48hrs.

Some concideration about the snapshot interval is also needed when defining the TTL. The bigger the interval between snapshots, the higher the difference between the TTL of the snapshot and the journal should be. In general, snapshot interval needs to be much less than snapshot TTL. If the snapshot interval is not small enough compared to the jounal TTL, some events could be lost. In other words, there need to be at least one snapshot before any journal item get expired and deleted.

#### Only new items are going to get expired
Once the TTL feature get enabled, **only** new inserted journal events and snapshot will be eventually removed. In case some event related to an entity are persisted before the feature being enabled, only new inserted events would eventually expire. That could lead to the same problem mentioned above could happen in that case as well. The entity recovered could be in an inconsistent state.

Storage Semantics
-----------------

Expand Down
10 changes: 10 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ dynamodb-journal {
parallelism-max = 8
}
}

dynamodb-item-ttl-config {
field-name = ""
ttl = "0m" # duration
}
}
dynamodb-snapshot-store {

Expand Down Expand Up @@ -168,5 +173,10 @@ dynamodb-snapshot-store {
max-batch-write = ${dynamodb-journal.aws-api-limits.max-batch-write}
max-item-size = ${dynamodb-journal.aws-api-limits.max-item-size}
}

dynamodb-item-ttl-config {
field-name = ${dynamodb-journal.dynamodb-item-ttl-config.field-name}
ttl = ${dynamodb-journal.dynamodb-item-ttl-config.ttl}
}
}

9 changes: 4 additions & 5 deletions src/main/scala/akka/persistence/dynamodb/DynamoDBConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
*/
package akka.persistence.dynamodb

import java.net.InetAddress

import akka.persistence.dynamodb.journal.DynamoDBHelper
import akka.serialization.Serialization
import com.amazonaws.{ ClientConfiguration, Protocol }
import com.typesafe.config.Config

import java.net.InetAddress

trait ClientConfig {
val config: ClientConfiguration
}

trait DynamoDBConfig {
val AwsKey: String
val AwsSecret: String
Expand All @@ -25,7 +24,7 @@ trait DynamoDBConfig {
val MaxItemSize: Int
val Table: String
val JournalName: String

val TTLConfig: Option[DynamoDBTTLConfig]
}

class DynamoDBClientConfig(c: Config) extends ClientConfig {
Expand Down
41 changes: 41 additions & 0 deletions src/main/scala/akka/persistence/dynamodb/DynamoDBTTLConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package akka.persistence.dynamodb

import com.typesafe.config.Config

import java.time.OffsetDateTime
import scala.concurrent.duration.{ Duration, DurationLong }
import scala.util.Try

case class DynamoDBTTLConfig(fieldName: String, ttl: DynamoDBTTL) {

override def toString: String =
s"DynamoDBTTLConfig(fieldName = ${fieldName}, ttl = ${ttl.ttl})"
}

case class DynamoDBTTL(ttl: Duration) {

// Important, the value needs to be Unix epoch time format in seconds
// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/time-to-live-ttl-before-you-start.html#time-to-live-ttl-before-you-start-formatting
def getItemExpiryTimeEpochSeconds(now: OffsetDateTime): Long =
now.plusNanos(ttl.toNanos).toEpochSecond
}

object DynamoDBTTL {
def fromJavaDuration(duration: java.time.Duration): DynamoDBTTL =
DynamoDBTTL(duration.toNanos.nanos)
}

object DynamoDBTTLConfigReader {

val configFieldName: String = "dynamodb-item-ttl-config.field-name"
val configTtlName: String = "dynamodb-item-ttl-config.ttl"

def readTTLConfig(c: Config): Option[DynamoDBTTLConfig] = {
for {
fieldName <- Try(c.getString(configFieldName)).toOption.map(_.trim)
if fieldName.nonEmpty
ttl <- Try(c.getDuration(configTtlName)).toOption
} yield DynamoDBTTLConfig(fieldName = fieldName, ttl = DynamoDBTTL.fromJavaDuration(ttl))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@ class DynamoDBJournal(config: Config)
case SetDBHelperReporter(ref) => dynamo.setReporter(ref)
}

def keyLength(persistenceId: String, sequenceNr: Long): Int =
persistenceId.length + JournalName.length + KeyPayloadOverhead

def messageKey(persistenceId: String, sequenceNr: Long): Item = {
val item: Item = new JHMap
item.put(Key, S(messagePartitionKey(persistenceId, sequenceNr)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
*/
package akka.persistence.dynamodb.journal

import akka.persistence.dynamodb.{ DynamoDBClientConfig, DynamoDBConfig, DynamoDBTTLConfig, DynamoDBTTLConfigReader }
import com.typesafe.config.Config

import akka.persistence.dynamodb.{ DynamoDBClientConfig, DynamoDBConfig }

class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
val JournalTable = c.getString("journal-table")
val Table = JournalTable
Expand All @@ -25,6 +24,8 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
val MaxBatchWrite = c.getInt("aws-api-limits.max-batch-write")
val MaxItemSize = c.getInt("aws-api-limits.max-item-size")

override val TTLConfig: Option[DynamoDBTTLConfig] = DynamoDBTTLConfigReader.readTTLConfig(c)

val client = new DynamoDBClientConfig(c)
override def toString: String =
"DynamoDBJournalConfig(" +
Expand All @@ -39,7 +40,8 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
",Tracing:" + Tracing +
",MaxBatchGet:" + MaxBatchGet +
",MaxBatchWrite:" + MaxBatchWrite +
",MaxItemSize:" + MaxItemSize +
",MaxItemSize:" + MaxItemSize ++
",TTLConfig:" + TTLConfig.getOrElse("<undefined>") +
",client.config:" + client +
")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@
*/
package akka.persistence.dynamodb.journal

import java.nio.ByteBuffer
import java.util.Collections

import akka.Done
import akka.actor.ExtendedActorSystem
import akka.pattern.after
import akka.persistence.dynamodb._
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.serialization.{ AsyncSerializer, Serialization, Serializer, Serializers }
import com.amazonaws.services.dynamodbv2.model._

import java.nio.ByteBuffer
import java.time.OffsetDateTime
import java.util.Collections
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import akka.Done
import akka.actor.ExtendedActorSystem
import akka.pattern.after
import akka.persistence.dynamodb._
import akka.serialization.{ AsyncSerializer, Serialization, Serializers }
import scala.util.{ Failure, Success, Try }

trait DynamoDBJournalRequests extends DynamoDBRequests {
this: DynamoDBJournal =>

import settings._

private lazy val itemSizeVerifier = new ItemSizeCalculator(settings)

/**
* Write all messages in a sequence of AtomicWrites. Care must be taken to
* not have concurrent writes happening that touch the highest sequence number.
Expand All @@ -44,6 +47,7 @@ trait DynamoDBJournalRequests extends DynamoDBRequests {
writeMessages(write).flatMap(result => rec(remainder, bubbleUpFailures(result) :: acc))
case Nil => Future.successful(acc.reverse)
}

rec(writes.toList, Nil)
}

Expand Down Expand Up @@ -165,54 +169,66 @@ trait DynamoDBJournalRequests extends DynamoDBRequests {
try {
val reprPayload: AnyRef = repr.payload.asInstanceOf[AnyRef]
val serializer = serialization.serializerFor(reprPayload.getClass)
val fut = serializer match {
case aS: AsyncSerializer =>
Serialization.withTransportInformation(context.system.asInstanceOf[ExtendedActorSystem]) { () =>
aS.toBinaryAsync(reprPayload)
}
case _ =>
Future {
ByteBuffer.wrap(serialization.serialize(reprPayload).get).array()
}
}

fut.map { serialized =>
serializePersistentRepr(reprPayload, serializer).map { serialized =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice simplification!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

val eventData = B(serialized)
val serializerId = N(serializer.identifier)

val fieldLength = repr.persistenceId.getBytes.length + repr.sequenceNr.toString.getBytes.length +
repr.writerUuid.getBytes.length + repr.manifest.getBytes.length

val manifest = Serializers.manifestFor(serializer, reprPayload)
val manifestLength = if (manifest.isEmpty) 0 else manifest.getBytes.length

val itemSize = keyLength(
repr.persistenceId,
repr.sequenceNr) + eventData.getB.remaining + serializerId.getN.getBytes.length + manifestLength + fieldLength
val manifest = Serializers.manifestFor(serializer, reprPayload)

val itemSize = itemSizeVerifier.getItemSize(repr, eventData, serializerId, manifest)
if (itemSize > MaxItemSize) {
throw new DynamoDBJournalRejection(s"MaxItemSize exceeded: $itemSize > $MaxItemSize")
}
val item: Item = messageKey(repr.persistenceId, repr.sequenceNr)

item.put(PersistentId, S(repr.persistenceId))
item.put(SequenceNr, N(repr.sequenceNr))
item.put(Event, eventData)
item.put(WriterUuid, S(repr.writerUuid))
item.put(SerializerId, serializerId)
if (repr.manifest.nonEmpty) {
item.put(Manifest, S(repr.manifest))
}
if (manifest.nonEmpty) {
item.put(SerializerManifest, S(manifest))
}
item

createItem(repr, eventData, serializerId, manifest)
}
} catch {
case NonFatal(e) => Future.failed(e)
}
}

private def createItem(
repr: PersistentRepr,
eventData: AttributeValue,
serializerId: AttributeValue,
manifest: String) = {
val item: Item = messageKey(repr.persistenceId, repr.sequenceNr)

item.put(PersistentId, S(repr.persistenceId))
item.put(SequenceNr, N(repr.sequenceNr))
item.put(Event, eventData)
item.put(WriterUuid, S(repr.writerUuid))
item.put(SerializerId, serializerId)

TTLConfig.foreach {
case DynamoDBTTLConfig(fieldName, ttl) =>
val expiresAt = ttl.getItemExpiryTimeEpochSeconds(OffsetDateTime.now)
item.put(fieldName, N(expiresAt))
}

if (repr.manifest.nonEmpty) {
item.put(Manifest, S(repr.manifest))
}
if (manifest.nonEmpty) {
item.put(SerializerManifest, S(manifest))
}
item
}

private def serializePersistentRepr(reprPayload: AnyRef, serializer: Serializer) = {
serializer match {
case aS: AsyncSerializer =>
Serialization.withTransportInformation(context.system.asInstanceOf[ExtendedActorSystem]) { () =>
aS.toBinaryAsync(reprPayload)
}
case _ =>
Future {
ByteBuffer.wrap(serialization.serialize(reprPayload).get).array()
}
}
}

/**
* Store the highest sequence number for this persistenceId.
*
Expand Down
Loading