Skip to content

Commit 49a552f

Browse files
committed
Annotated queries to review as part of #155
1 parent 1a85e4d commit 49a552f

File tree

4 files changed

+15
-2
lines changed

4 files changed

+15
-2
lines changed

Diff for: core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
2020
compiledJournalTable ++= xs.sortBy(_.sequenceNumber)
2121

2222
private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) =
23-
journalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc)
23+
journalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc) // why .desc ?
2424

2525
def delete(persistenceId: String, toSequenceNr: Long): FixedSqlAction[Int, NoStream, slick.dbio.Effect.Write] = {
2626
journalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete
@@ -54,20 +54,28 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
5454
private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
5555
journalTable.filter(_.deleted === true).filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max
5656

57+
// metadata lookup
5758
val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)
5859

60+
// metadata lookup
5961
val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)
6062

63+
// not used?
6164
private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
6265
selectAllJournalForPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)
6366

67+
// not used?
6468
val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _)
6569

70+
// not used, remove or dedup with read journal
6671
private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] =
6772
journalTable.map(_.persistenceId).distinct
6873

74+
// not used, remove or dedup with read journal
6975
val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)
7076

77+
// for partition pruning, how to integrate min/max ordering?
78+
// at least do where ordering >= metadata.min_ordering
7179
private def _messagesQuery(
7280
persistenceId: Rep[String],
7381
fromSequenceNr: Rep[Long],

Diff for: core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
3232
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
3333

3434
override def allPersistenceIdsSource(max: Long): Source[String, NotUsed] =
35+
// this is fully buffered and not streamed, see note at https://scala-slick.org/doc/3.3.2/dbio.html#streaming
36+
// either batch or stream
3537
Source.fromPublisher(db.stream(queries.allPersistenceIdsDistinct(max).result))
3638

3739
override def eventsByTag(

Diff for: core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
1414

1515
private val journalTable: TableQuery[JournalTable] = FlatJournalTable(readJournalConfig.journalTableConfiguration)
1616

17+
// from metadata table to avoid distinct call
1718
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] =
1819
baseTableQuery().map(_.persistenceId).distinct.take(max)
1920

@@ -23,6 +24,7 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
2324

2425
val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _)
2526

27+
// for partition pruning, consider adding a filter on ordering >= metadata.min_ordering
2628
private def _messagesQuery(
2729
persistenceId: Rep[String],
2830
fromSequenceNr: Rep[Long],
@@ -54,6 +56,7 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
5456

5557
val orderingByOrdering = Compiled(_journalSequenceQuery _)
5658

59+
// this could also be done using metadata table, to be evaluated
5760
val maxOrdering = Compiled {
5861
journalTable.map(_.ordering).max.getOrElse(0L)
5962
}

Diff for: core/src/main/scala/akka/persistence/postgres/query/scaladsl/PostgresReadJournal.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
111111
.repeat(0)
112112
.flatMapConcat(_ => delaySource.flatMapConcat(_ => currentPersistenceIds()))
113113
.statefulMapConcat[String] { () =>
114-
var knownIds = Set.empty[String]
114+
var knownIds = Set.empty[String] // get rid of that and instead use natural ordering of persistenceIds in metadata when sorted by id
115115
def next(id: String): Iterable[String] = {
116116
val xs = Set(id).diff(knownIds)
117117
knownIds += id

0 commit comments

Comments
 (0)