Skip to content

Commit 8827aca

Browse files
authored
chore: More information in query log prefix (#112)
* and some more debug logging
1 parent 75a273e commit 8827aca

File tree

2 files changed

+29
-25
lines changed

2 files changed

+29
-25
lines changed

core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala

+9-18
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,9 @@ import org.slf4j.Logger
150150

151151
if (state.queryCount != 0 && log.isDebugEnabled())
152152
log.debug(
153-
"{} next query [{}] from slice [{}], between time [{} - {}]. Found [{}] items in previous query.",
153+
"{} next query [{}], between time [{} - {}]. Found [{}] items in previous query.",
154154
logPrefix,
155155
state.queryCount,
156-
slice,
157156
state.latest.timestamp,
158157
toTimestamp,
159158
state.itemCount)
@@ -168,10 +167,9 @@ import org.slf4j.Logger
168167
} else {
169168
if (log.isDebugEnabled)
170169
log.debug(
171-
"{} query [{}] from slice [{}] completed. Found [{}] items in previous query.",
170+
"{} query [{}] completed. Found [{}] items in previous query.",
172171
logPrefix,
173172
state.queryCount,
174-
slice,
175173
state.itemCount)
176174

177175
state -> None
@@ -180,12 +178,7 @@ import org.slf4j.Logger
180178

181179
val currentTimestamp = InstantFactory.now()
182180
if (log.isDebugEnabled())
183-
log.debug(
184-
"{} query slice [{}], from time [{}] until now [{}].",
185-
logPrefix,
186-
slice,
187-
initialOffset.timestamp,
188-
currentTimestamp)
181+
log.debug("{} query from time [{}] until now [{}].", logPrefix, initialOffset.timestamp, currentTimestamp)
189182

190183
ContinuousQuery[QueryState, Envelope](
191184
initialState = QueryState.empty.copy(latest = initialOffset),
@@ -205,7 +198,7 @@ import org.slf4j.Logger
205198
val initialOffset = toTimestampOffset(offset)
206199

207200
if (log.isDebugEnabled())
208-
log.debug("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)
201+
log.debug("{} starting query from time [{}].", logPrefix, initialOffset.timestamp)
209202

210203
def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
211204
if (EnvelopeOrigin.isHeartbeatEvent(envelope)) state
@@ -256,12 +249,7 @@ import org.slf4j.Logger
256249

257250
if (log.isDebugEnabled)
258251
delay.foreach { d =>
259-
log.debug(
260-
"{} query [{}] from slice [{}] delay next [{}] ms.",
261-
logPrefix,
262-
state.queryCount,
263-
slice,
264-
d.toMillis)
252+
log.debug("{} query [{}] delay next [{}] ms.", logPrefix, state.queryCount, d.toMillis)
265253
}
266254

267255
delay
@@ -413,7 +401,10 @@ import org.slf4j.Logger
413401
val timestamp = state.startTimestamp.plus(
414402
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
415403

416-
createHeartbeat(timestamp)
404+
val h = createHeartbeat(timestamp)
405+
if (h.isDefined)
406+
log.debug("{} heartbeat timestamp [{}]", logPrefix, timestamp)
407+
h
417408
} else None
418409
}
419410

core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala

+20-7
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
219219
toSequenceNr: Long,
220220
includeDeleted: Boolean): Source[SerializedJournalItem, NotUsed] = {
221221

222+
log.debug("[{}] eventsByPersistenceId from seqNr [{}] to [{}]", persistenceId, fromSequenceNr, toSequenceNr)
223+
222224
queryDao.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, includeDeleted)
223225
}
224226

@@ -244,7 +246,11 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
244246
offset: Offset): Source[EventEnvelope[Event], NotUsed] = {
245247
val bySliceQueries = (minSlice to maxSlice).map { slice =>
246248
bySlice[Event](entityType, slice)
247-
.currentBySlice("currentEventsBySlices", entityType, slice, sliceStartOffset(slice, offset))
249+
.currentBySlice(
250+
s"[$entityType] currentEventsBySlice [$slice]: ",
251+
entityType,
252+
slice,
253+
sliceStartOffset(slice, offset))
248254
}
249255
require(bySliceQueries.nonEmpty, s"maxSlice [$maxSlice] must be >= minSlice [$minSlice]")
250256

@@ -286,7 +292,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
286292

287293
val bySliceQueries = (minSlice to maxSlice).map { slice =>
288294
bySlice[Event](entityType, slice).liveBySlice(
289-
"eventsBySlices",
295+
s"[$entityType] eventsBySlice [$slice]: ",
290296
entityType,
291297
slice,
292298
sliceStartOffset(slice, offset))
@@ -333,7 +339,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
333339
val timestampOffset = TimestampOffset.toTimestampOffset(sliceStartOffset(slice, offset))
334340

335341
val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, slice, transformSnapshot)
336-
.currentBySlice("currentSnapshotsBySlice", entityType, slice, timestampOffset)
342+
.currentBySlice(s"[$entityType] currentSnapshotsBySlice [$slice]: ", entityType, slice, timestampOffset)
337343

338344
Source.fromGraph(
339345
new StartingFromSnapshotStage[Event](
@@ -357,7 +363,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
357363
snapshotOffsets.size)
358364

359365
bySlice[Event](entityType, slice).currentBySlice(
360-
"currentEventsBySlice",
366+
s"[$entityType] currentEventsBySlice [$slice]: ",
361367
entityType,
362368
slice,
363369
initOffset,
@@ -395,7 +401,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
395401
val timestampOffset = TimestampOffset.toTimestampOffset(sliceStartOffset(slice, offset))
396402

397403
val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, slice, transformSnapshot)
398-
.currentBySlice("snapshotsBySlice", entityType, slice, timestampOffset)
404+
.currentBySlice(s"[$entityType] snapshotsBySlice [$slice]: ", entityType, slice, timestampOffset)
399405

400406
Source.fromGraph(
401407
new StartingFromSnapshotStage[Event](
@@ -419,7 +425,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
419425
snapshotOffsets.size)
420426

421427
bySlice[Event](entityType, slice).liveBySlice(
422-
"eventsBySlice",
428+
s"[$entityType] eventsBySlice [$slice]: ",
423429
entityType,
424430
slice,
425431
initOffset,
@@ -624,11 +630,18 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
624630

625631
// EventTimestampQuery
626632
override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = {
627-
queryDao.timestampOfEvent(persistenceId, sequenceNr)
633+
val result = queryDao.timestampOfEvent(persistenceId, sequenceNr)
634+
if (log.isDebugEnabled) {
635+
result.foreach { t =>
636+
log.debug("[{}] timestampOf seqNr [{}] is [{}]", persistenceId, sequenceNr, t)
637+
}
638+
}
639+
result
628640
}
629641

630642
//LoadEventQuery
631643
override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]] = {
644+
log.debug("[{}] loadEnvelope seqNr [{}]", persistenceId, sequenceNr)
632645
queryDao
633646
.loadEvent(persistenceId, sequenceNr, includePayload = true)
634647
.map {

0 commit comments

Comments
 (0)