Skip to content

Commit ea0046f

Browse files
committed
Use journal_metadata table on messagesQuery and highestSequenceNrForPersistenceId
1 parent f9644c2 commit ea0046f

File tree

8 files changed

+182
-16
lines changed

8 files changed

+182
-16
lines changed

core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,17 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW
137137
private def highestMarkedSequenceNr(persistenceId: String) =
138138
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result
139139

140-
override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
141-
for {
142-
maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
143-
} yield maybeHighestSeqNo.getOrElse(0L)
140+
override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
141+
db.run(queries.highestStoredSequenceNrForPersistenceId(persistenceId).result.headOption).flatMap {
142+
case Some(maxSequenceNr) =>
143+
// journal_metadata has the max sequence nr stored
144+
Future.successful(maxSequenceNr)
145+
case None =>
146+
// journal_metadata has yet to store the max sequence number to this persistenceId
147+
db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
148+
.map(_.getOrElse(0L)) // Default to 0L when nothing is found for this persistenceId
149+
}
150+
}
144151

145152
override def messages(
146153
persistenceId: String,

core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala

+31-7
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ import io.circe.Json
1010
import slick.lifted.TableQuery
1111
import slick.sql.FixedSqlAction
1212

13-
class JournalQueries(
14-
journalTable: TableQuery[JournalTable],
15-
journalMetadataTable: TableQuery[JournalMetadataTable]) {
13+
class JournalQueries(journalTable: TableQuery[JournalTable], journalMetadataTable: TableQuery[JournalMetadataTable]) {
1614

1715
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
1816

@@ -52,16 +50,18 @@ class JournalQueries(
5250

5351
private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
5452
journalTable.filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max
55-
// journalMetadataTable
56-
// .filter(_.persistenceId === persistenceId)
57-
// .map(_.maxSequenceNumber)
58-
// .max // TODO replace with more appropriate combinator?
53+
54+
private def _highestStoredSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = {
55+
journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1)
56+
}
5957

6058
private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
6159
journalTable.filter(_.deleted === true).filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max
6260

6361
val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)
6462

63+
val highestStoredSequenceNrForPersistenceId = Compiled(_highestStoredSequenceNrForPersistenceId _)
64+
6565
val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)
6666

6767
private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
@@ -74,6 +74,12 @@ class JournalQueries(
7474

7575
val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)
7676

77+
private def _minAndMaxOrderingStoredForPersistenceId(
78+
persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] =
79+
journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering))
80+
81+
val minAndMaxOrderingStoredForPersistenceId = Compiled(_minAndMaxOrderingStoredForPersistenceId _)
82+
7783
private def _messagesQuery(
7884
persistenceId: Rep[String],
7985
fromSequenceNr: Rep[Long],
@@ -87,6 +93,24 @@ class JournalQueries(
8793
.sortBy(_.sequenceNumber.asc)
8894
.take(max)
8995

96+
private def _messagesOrderingBoundedQuery(
97+
persistenceId: Rep[String],
98+
fromSequenceNr: Rep[Long],
99+
toSequenceNr: Rep[Long],
100+
max: ConstColumn[Long],
101+
minOrdering: Rep[Long],
102+
maxOrdering: Rep[Long]): Query[JournalTable, JournalRow, Seq] =
103+
journalTable
104+
.filter(_.persistenceId === persistenceId)
105+
.filter(_.deleted === false)
106+
.filter(_.sequenceNumber >= fromSequenceNr)
107+
.filter(_.sequenceNumber <= toSequenceNr)
108+
.filter(_.ordering >= minOrdering)
109+
.filter(_.ordering <= maxOrdering)
110+
.sortBy(_.sequenceNumber.asc)
111+
.take(max)
112+
90113
val messagesQuery = Compiled(_messagesQuery _)
91114

115+
val messagesOrderingBoundedQuery = Compiled(_messagesOrderingBoundedQuery _)
92116
}

core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala

+23
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package akka.persistence.postgres.journal.dao
22

3+
import akka.NotUsed
4+
import akka.persistence.PersistentRepr
35
import akka.persistence.postgres.JournalRow
46
import akka.persistence.postgres.config.JournalConfig
57
import akka.persistence.postgres.db.DbErrors.{ withHandledIndexErrors, withHandledPartitionErrors }
68
import akka.serialization.Serialization
79
import akka.stream.Materializer
10+
import akka.stream.scaladsl.Source
811
import slick.jdbc.JdbcBackend.Database
912

1013
import java.util.concurrent.atomic.AtomicReference
1114
import scala.collection.immutable.{ Nil, Seq }
1215
import scala.concurrent.{ ExecutionContext, Future }
16+
import scala.util.Try
1317

1418
class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serialization: Serialization)(
1519
implicit ec: ExecutionContext,
@@ -86,4 +90,23 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ
8690
DBIO.successful(())
8791
}
8892
}
93+
94+
override def messages(
95+
persistenceId: String,
96+
fromSequenceNr: Long,
97+
toSequenceNr: Long,
98+
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
99+
// Query the metadata table to get the known min and max ordering a persistence_id has,
100+
// so that the postgres query planner might immediately discard scanning unnecessary partitions
101+
val messagesQuery = queries.minAndMaxOrderingStoredForPersistenceId(persistenceId).result.headOption.flatMap {
102+
case Some((minOrdering, maxOrdering)) =>
103+
queries
104+
.messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering)
105+
.result
106+
case None =>
107+
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result
108+
}
109+
110+
Source.fromPublisher(db.stream(messagesQuery)).via(serializer.deserializeFlow)
111+
}
89112
}

core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala

+28-2
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,10 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
5151
persistenceId: String,
5252
fromSequenceNr: Long,
5353
toSequenceNr: Long,
54-
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
54+
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] =
5555
Source
5656
.fromPublisher(db.stream(queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result))
5757
.via(serializer.deserializeFlow)
58-
}
5958

6059
override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
6160
Source.fromPublisher(db.stream(queries.orderingByOrdering(offset, limit).result))
@@ -78,3 +77,30 @@ class ByteArrayReadJournalDao(
7877
new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration),
7978
readJournalConfig.tagsConfig))
8079
}
80+
81+
class PartitionedReadJournalDao(
82+
db: Database,
83+
readJournalConfig: ReadJournalConfig,
84+
serialization: Serialization,
85+
tagIdResolver: TagIdResolver)(implicit ec: ExecutionContext, mat: Materializer)
86+
extends ByteArrayReadJournalDao(db, readJournalConfig, serialization, tagIdResolver) {
87+
88+
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
89+
90+
override def messages(
91+
persistenceId: String,
92+
fromSequenceNr: Long,
93+
toSequenceNr: Long,
94+
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
95+
val messagesQuery = queries.minAndMaxOrderingStoredForPersistenceId(persistenceId).result.headOption.flatMap {
96+
case Some((minOrdering, maxOrdering)) =>
97+
queries
98+
.messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering)
99+
.result
100+
case None =>
101+
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result
102+
}
103+
104+
Source.fromPublisher(db.stream(messagesQuery)).via(serializer.deserializeFlow)
105+
}
106+
}

core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala

+28-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
1313
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
1414

1515
private val journalTable: TableQuery[JournalTable] = FlatJournalTable(readJournalConfig.journalTableConfiguration)
16+
private val journalMetadataTable: TableQuery[JournalMetadataTable] =
17+
JournalMetadataTable.apply(readJournalConfig.journalMetadataTableConfiguration)
1618

1719
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] =
1820
baseTableQuery().map(_.persistenceId).distinct.take(max)
@@ -23,20 +25,45 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
2325

2426
val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _)
2527

28+
private def _minAndMaxOrderingStoredForPersistenceId(
29+
persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] =
30+
journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering))
31+
32+
val minAndMaxOrderingStoredForPersistenceId = Compiled(_minAndMaxOrderingStoredForPersistenceId _)
33+
2634
private def _messagesQuery(
2735
persistenceId: Rep[String],
2836
fromSequenceNr: Rep[Long],
2937
toSequenceNr: Rep[Long],
30-
max: ConstColumn[Long]) =
38+
max: ConstColumn[Long]): Query[JournalTable, JournalRow, Seq] =
3139
baseTableQuery()
3240
.filter(_.persistenceId === persistenceId)
3341
.filter(_.sequenceNumber >= fromSequenceNr)
3442
.filter(_.sequenceNumber <= toSequenceNr)
3543
.sortBy(_.sequenceNumber.asc)
3644
.take(max)
3745

46+
private def _messagesOrderingBoundedQuery(
47+
persistenceId: Rep[String],
48+
fromSequenceNr: Rep[Long],
49+
toSequenceNr: Rep[Long],
50+
max: ConstColumn[Long],
51+
minOrdering: Rep[Long],
52+
maxOrdering: Rep[Long]): Query[JournalTable, JournalRow, Seq] =
53+
baseTableQuery()
54+
.filter(_.persistenceId === persistenceId)
55+
.filter(_.deleted === false)
56+
.filter(_.sequenceNumber >= fromSequenceNr)
57+
.filter(_.sequenceNumber <= toSequenceNr)
58+
.filter(_.ordering >= minOrdering)
59+
.filter(_.ordering <= maxOrdering)
60+
.sortBy(_.sequenceNumber.asc)
61+
.take(max)
62+
3863
val messagesQuery = Compiled(_messagesQuery _)
3964

65+
val messagesOrderingBoundedQuery = Compiled(_messagesOrderingBoundedQuery _)
66+
4067
protected def _eventsByTag(
4168
tag: Rep[List[Int]],
4269
offset: ConstColumn[Long],

core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala

+20-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ class JournalQueriesTest extends BaseQueryTest {
1818
it should "create SQL query for highestSequenceNrForPersistenceId" in withJournalQueries { queries =>
1919
queries.highestSequenceNrForPersistenceId(
2020
"aaa") shouldBeSQL """select max("sequence_number") from "journal" where "persistence_id" = ?"""
21-
// queries.highestSequenceNrForPersistenceId("aaa") shouldBeSQL """select "max_sequence_number" from "journal_metadata" where "persistence_id" = ? limit 1"""
21+
}
22+
23+
it should "create SQL query for highestStoredSequenceNrForPersistenceId" in withJournalQueries { queries =>
24+
queries.highestStoredSequenceNrForPersistenceId(
25+
"aaa") shouldBeSQL """select "max_sequence_number" from "journal_metadata" where "persistence_id" = ? limit 1"""
2226
}
2327

2428
it should "create SQL query for selectByPersistenceIdAndMaxSequenceNumber" in withJournalQueries { queries =>
@@ -27,6 +31,11 @@ class JournalQueriesTest extends BaseQueryTest {
2731
11L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ("persistence_id" = ?) and ("sequence_number" <= ?) order by "sequence_number" desc"""
2832
}
2933

34+
it should "create SQL query for minAndMaxOrderingStoredForPersistenceId" in withJournalQueries { queries =>
35+
queries.minAndMaxOrderingStoredForPersistenceId(
36+
"aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1"""
37+
}
38+
3039
it should "create SQL query for messagesQuery" in withJournalQueries { queries =>
3140
queries.messagesQuery(
3241
"aaa",
@@ -35,6 +44,16 @@ class JournalQueriesTest extends BaseQueryTest {
3544
11L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ((("persistence_id" = ?) and ("deleted" = false)) and ("sequence_number" >= ?)) and ("sequence_number" <= ?) order by "sequence_number" limit ?"""
3645
}
3746

47+
it should "create SQL query for messagesOrderingBoundedQuery" in withJournalQueries { queries =>
48+
queries.messagesOrderingBoundedQuery(
49+
"aaa",
50+
11L,
51+
11L,
52+
11L,
53+
11L,
54+
11L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ((((("persistence_id" = ?) and ("deleted" = false)) and ("sequence_number" >= ?)) and ("sequence_number" <= ?)) and ("ordering" >= ?)) and ("ordering" <= ?) order by "sequence_number" limit ?"""
55+
}
56+
3857
it should "create SQL query for markJournalMessagesAsDeleted" in withJournalQueries { queries =>
3958
queries.markJournalMessagesAsDeleted(
4059
"aaa",

core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala

+26-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ import akka.pattern.ask
1111
import akka.persistence.postgres.config.JournalSequenceRetrievalConfig
1212
import akka.persistence.postgres.db.ExtendedPostgresProfile
1313
import akka.persistence.postgres.query.JournalSequenceActor.{ GetMaxOrderingId, MaxOrderingId }
14-
import akka.persistence.postgres.query.dao.{ ByteArrayReadJournalDao, TestProbeReadJournalDao }
14+
import akka.persistence.postgres.query.dao.{
15+
ByteArrayReadJournalDao,
16+
PartitionedReadJournalDao,
17+
TestProbeReadJournalDao
18+
}
1519
import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao }
1620
import akka.persistence.postgres.util.Schema.{ NestedPartitions, Partitioned, Plain, SchemaType }
1721
import akka.persistence.postgres.{ JournalRow, SharedActorSystemTestSpec }
@@ -22,6 +26,7 @@ import akka.testkit.TestProbe
2226
import io.circe.{ Json, JsonObject }
2327
import org.scalatest.time.Span
2428
import org.slf4j.LoggerFactory
29+
import slick.jdbc
2530
import slick.jdbc.{ JdbcBackend, JdbcCapabilities }
2631

2732
import scala.concurrent.Future
@@ -316,6 +321,26 @@ class PartitionedJournalSequenceActorTest extends JournalSequenceActorTest(Parti
316321
}
317322
}
318323
}
324+
325+
override def withJournalSequenceActor(db: jdbc.JdbcBackend.Database, maxTries: Int)(f: ActorRef => Unit)(
326+
implicit system: ActorSystem): Unit = {
327+
import system.dispatcher
328+
implicit val mat: Materializer = SystemMaterializer(system).materializer
329+
val readJournalDao =
330+
new PartitionedReadJournalDao(
331+
db,
332+
readJournalConfig,
333+
SerializationExtension(system),
334+
new CachedTagIdResolver(
335+
new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration),
336+
readJournalConfig.tagsConfig))
337+
val actor =
338+
system.actorOf(
339+
JournalSequenceActor
340+
.props(readJournalDao, readJournalConfig.journalSequenceRetrievalConfiguration.copy(maxTries = maxTries)))
341+
try f(actor)
342+
finally system.stop(actor)
343+
}
319344
}
320345

321346
class PlainJournalSequenceActorTest extends JournalSequenceActorTest(Plain)

core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala

+15
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ class ReadJournalQueriesTest extends BaseQueryTest {
88
queries.allPersistenceIdsDistinct(23L) shouldBeSQL """select distinct "persistence_id" from "journal" limit ?"""
99
}
1010

11+
it should "create SQL query for minAndMaxOrderingStoredForPersistenceId" in withReadJournalQueries { queries =>
12+
queries.minAndMaxOrderingStoredForPersistenceId(
13+
"aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1"""
14+
}
15+
1116
it should "create SQL query for messagesQuery" in withReadJournalQueries { queries =>
1217
queries.messagesQuery(
1318
"p1",
@@ -16,6 +21,16 @@ class ReadJournalQueriesTest extends BaseQueryTest {
1621
5L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where (("persistence_id" = ?) and ("sequence_number" >= ?)) and ("sequence_number" <= ?) order by "sequence_number" limit ?"""
1722
}
1823

24+
it should "create SQL query for messagesOrderingBoundedQuery" in withReadJournalQueries { queries =>
25+
queries.messagesOrderingBoundedQuery(
26+
"aaa",
27+
1L,
28+
4L,
29+
5L,
30+
1L,
31+
10L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ((((("persistence_id" = ?) and ("deleted" = false)) and ("sequence_number" >= ?)) and ("sequence_number" <= ?)) and ("ordering" >= ?)) and ("ordering" <= ?) order by "sequence_number" limit ?"""
32+
}
33+
1934
it should "create SQL query for eventsByTag" in withReadJournalQueries { queries =>
2035
queries.eventsByTag(
2136
List(11),

0 commit comments

Comments
 (0)