Skip to content

Commit 8c8f997

Browse files
authored
fix: only initial backtracking up to start offset when far behind (#109)
1 parent 8827aca commit 8c8f997

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@ import org.slf4j.Logger
269269

270270
def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
271271
val aheadOfInitial =
272-
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)
272+
initialOffset == TimestampOffset.Zero ||
273+
state.latestBacktracking.timestamp.compareTo(initialOffset.timestamp) >= 0
273274

274275
val previousTimestamp =
275276
if (state.previous == TimestampOffset.Zero) state.latest.timestamp

core/src/test/scala/akka/persistence/dynamodb/query/EventsBySliceBacktrackingSpec.scala

+9-4
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ class EventsBySliceBacktrackingSpec
258258
result1.cancel()
259259
}
260260

261-
"still make initial backtracking until ahead of start offset" in {
261+
"still make initial backtracking until caught up to start offset, then skip backtracking" in {
262262
val entityType = nextEntityType()
263263
val pid1 = nextPersistenceId(entityType)
264264
val slice = query.sliceForPersistenceId(pid1.id)
@@ -275,7 +275,11 @@ class EventsBySliceBacktrackingSpec
275275
writeEvent(slice, pid, seqNr, startTime.plusMillis(n), s"e${mod}-${seqNr}")
276276
}
277277

278-
(3 to 10).foreach { n =>
278+
// will start query at next event
279+
val startOffset = TimestampOffset(startTime.plusSeconds(23).plusMillis(1), Map.empty)
280+
281+
// go past switch-to-backtracking trigger of 3 * buffer size (of 10)
282+
(3 to 30).foreach { n =>
279283
writeEvent(slice, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n")
280284
writeEvent(slice, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n")
281285
}
@@ -298,15 +302,16 @@ class EventsBySliceBacktrackingSpec
298302
env.offset
299303
}
300304

301-
val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty))
305+
val result1 = startQuery(startOffset)
302306
// from backtracking
303307
expect(result1.expectNext(), pid1, 1, None)
304308
expect(result1.expectNext(), pid2, 1, None)
305309
expect(result1.expectNext(), pid1, 2, None)
306310
expect(result1.expectNext(), pid2, 2, None)
311+
expect(result1.expectNext(), pid1, 3, None) // start offset
307312

308313
// from normal
309-
(3 to 10).foreach { n =>
314+
(3 to 30).foreach { n =>
310315
expect(result1.expectNext(), pid1, n, Some(s"e1-$n"))
311316
expect(result1.expectNext(), pid2, n, Some(s"e2-$n"))
312317
}

0 commit comments

Comments
 (0)