Skip to content

Commit 75a273e

Browse files
authored
feat: detect clock skew on event replay (#111)
1 parent 92e3889 commit 75a273e

File tree

3 files changed

+89
-10
lines changed

3 files changed

+89
-10
lines changed

core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala

+10-4
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
169169
}
170170

171171
def readHighestSequenceNr(persistenceId: String): Future[Long] = {
172+
readHighestSequenceNrAndTimestamp(persistenceId).map(_._1)(ExecutionContext.parasitic)
173+
}
174+
175+
def readHighestSequenceNrAndTimestamp(persistenceId: String): Future[(Long, Instant)] = {
172176
import JournalAttributes._
173177

174178
val attributeValues = Map(":pid" -> AttributeValue.fromS(persistenceId))
@@ -189,20 +193,22 @@ import software.amazon.awssdk.services.dynamodb.model.Update
189193
.consistentRead(true)
190194
.keyConditionExpression(s"$Pid = :pid")
191195
.expressionAttributeValues((attributeValues ++ filterAttributeValues).asJava)
192-
.projectionExpression(s"$SeqNr")
196+
.projectionExpression(s"$SeqNr, $Timestamp")
193197
.scanIndexForward(false) // get last item (highest sequence nr)
194198
.limit(1)
195199

196200
filterExpression.foreach(requestBuilder.filterExpression)
197201

198202
val result = client.query(requestBuilder.build()).asScala.map { response =>
199-
response.items().asScala.headOption.fold(0L) { item =>
200-
item.get(SeqNr).n().toLong
203+
response.items().asScala.headOption.fold((0L, Instant.EPOCH)) { item =>
204+
(item.get(SeqNr).n().toLong, InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong))
201205
}
202206
}
203207

204208
if (log.isDebugEnabled)
205-
result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr))
209+
result.foreach { case (seqNr, timestamp) =>
210+
log.debug("Highest sequence nr for persistenceId [{}]: [{}] (written at [{}])", persistenceId, seqNr, timestamp)
211+
}
206212

207213
result
208214
.recoverWith { case c: CompletionException =>

core/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournal.scala

+18-6
Original file line numberDiff line numberDiff line change
@@ -229,25 +229,25 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
229229
case None => FutureDone
230230
}
231231
pendingWrite.flatMap { _ =>
232-
if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) {
232+
val highestSeqNrAndTimestamp = if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) {
233233
// this is the normal case, highest sequence number from last event
234234
query
235235
.internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, includeDeleted = true)
236-
.runWith(Sink.fold(0L) { (_, item) =>
236+
.runWith(Sink.fold((0L, Instant.EPOCH)) { (_, item) =>
237237
// payload is empty for deleted item
238238
if (item.payload.isDefined) {
239239
val repr = deserializeItem(serialization, item)
240240
recoveryCallback(repr)
241241
}
242-
item.seqNr
242+
(item.seqNr, item.writeTimestamp)
243243
})
244244
} else if (toSequenceNr <= 0) {
245245
// no replay
246-
journalDao.readHighestSequenceNr(persistenceId)
246+
journalDao.readHighestSequenceNrAndTimestamp(persistenceId)
247247
} else {
248248
// replay to custom sequence number
249249

250-
val highestSeqNr = journalDao.readHighestSequenceNr(persistenceId)
250+
val highestSeqNrAndTimestamp = journalDao.readHighestSequenceNrAndTimestamp(persistenceId)
251251

252252
val effectiveToSequenceNr =
253253
if (max == Long.MaxValue) toSequenceNr
@@ -264,7 +264,19 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
264264
val repr = deserializeItem(serialization, item)
265265
recoveryCallback(repr)
266266
})
267-
.flatMap(_ => highestSeqNr)
267+
.flatMap(_ => highestSeqNrAndTimestamp)
268+
}
269+
highestSeqNrAndTimestamp.map { case (highestSeqNr, timestamp) =>
270+
val now = Instant.now()
271+
if (now.isBefore(timestamp)) {
272+
log.warning(
273+
"Detected clock skew when replaying events: persistence id [{}], highest seq nr [{}] written at [{}], current time is [{}]",
274+
persistenceId,
275+
highestSeqNr,
276+
timestamp,
277+
now)
278+
}
279+
highestSeqNr
268280
}
269281
}
270282
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.dynamodb.journal
6+
7+
import java.time.temporal.ChronoUnit
8+
9+
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit }
10+
import akka.actor.typed.ActorSystem
11+
import akka.persistence.JournalProtocol.{ RecoverySuccess, ReplayMessages, ReplayedMessage }
12+
import akka.persistence.dynamodb.internal.InstantFactory
13+
import akka.persistence.dynamodb.query.EventsBySliceSpec
14+
import akka.persistence.dynamodb.{ TestData, TestDbLifecycle }
15+
import akka.testkit.TestProbe
16+
import org.scalatest.wordspec.AnyWordSpecLike
17+
18+
class ClockSkewDetectionSpec
19+
extends ScalaTestWithActorTestKit(EventsBySliceSpec.config)
20+
with AnyWordSpecLike
21+
with TestDbLifecycle
22+
with TestData
23+
with LogCapturing {
24+
25+
override def typedSystem: ActorSystem[_] = system
26+
27+
private val journal = persistenceExt.journalFor(null)
28+
29+
"DynamoDBJournal" should {
30+
31+
"detect clock skew on event replay" in {
32+
val entityType = nextEntityType()
33+
val pid = nextPersistenceId(entityType)
34+
val slice = persistenceExt.sliceForPersistenceId(pid.id)
35+
36+
val now = InstantFactory.now().truncatedTo(ChronoUnit.SECONDS)
37+
38+
// first 5 events in the past
39+
for (n <- 1 to 5) {
40+
writeEvent(slice, pid, n, now.minusSeconds(10).plusSeconds(n), s"e$n")
41+
}
42+
43+
// next 5 events over 1 minute in the future
44+
for (n <- 6 to 10) {
45+
writeEvent(slice, pid, n, now.plusSeconds(60).plusSeconds(n), s"e$n")
46+
}
47+
48+
val replayProbe = TestProbe()(system.classicSystem)
49+
50+
LoggingTestKit
51+
.warn("Detected clock skew when replaying events:" +
52+
s" persistence id [${pid.id}], highest seq nr [10] written at [${now.plusSeconds(70)}]")
53+
.expect {
54+
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid.id, replayProbe.ref)
55+
(1 to 10).foreach { _ => replayProbe.expectMsgType[ReplayedMessage] }
56+
replayProbe.expectMsg(RecoverySuccess(highestSequenceNr = 10L))
57+
}
58+
}
59+
60+
}
61+
}

0 commit comments

Comments
 (0)