Skip to content

Commit 2bfafae

Browse files
authored
Merge pull request #206 from SwissBorg/155-journal-persistence-ids-table
[#155] New table: journal_persistence_ids
2 parents be8c983 + 513e3ca commit 2bfafae

File tree

43 files changed

+1051
-146
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1051
-146
lines changed

README.md

+4-56
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ You can read more about DAOs and schema variants in [the official documentation]
2121
To use `akka-persistence-postgres` in your SBT project, add the following to your `build.sbt`:
2222

2323
```scala
24-
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres" % "0.5.0"
24+
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres" % "0.6.0-RC1"
2525
```
2626

2727
For a maven project add:
2828
```xml
2929
<dependency>
3030
<groupId>com.swissborg</groupId>
3131
<artifactId>akka-persistence-postgres_2.13</artifactId>
32-
<version>0.5.0</version>
32+
<version>0.6.0-RC1</version>
3333
</dependency>
3434
```
3535
to your `pom.xml`.
@@ -113,62 +113,10 @@ Example partition names: `j_myActor_0`, `j_myActor_1`, `j_worker_0` etc.
113113
Keep in mind that the default maximum length for a table name in Postgres is 63 bytes, so you should avoid any non-ascii characters in your `persistenceId`s and keep the `prefix` reasonably short.
114114

115115
> :warning: Once any of the partitioning setting under `postgres-journal.tables.journal.partitions` branch is settled, you should never change it. Otherwise you might end up with PostgresExceptions caused by table name or range conflicts.
116-
## Migration
117-
118-
### Migration from akka-persistence-jdbc 4.0.0
119-
It is possible to migrate existing journals from Akka Persistence JDBC 4.0.0.
120-
Since we decided to extract metadata from the serialized payload and store it in a separate column it is not possible to migrate exiting journal and snapshot store using plain SQL scripts.
121-
122-
#### How migration works
123-
Each journal event and snapshot has to be read, deserialized, metadata and tags must be extracted and then everything stored in the new table.
124-
125-
We provide you with an optional artifact, `akka-persistence-postgres-migration` that brings to your project the necessary classes to automate the above process.
126-
127-
**Important**: Our util classes neither drop nor update any old data. Original tables will be still there but renamed with an `old_` prefix. It's up to you when to drop them.
128116
129-
#### How to use plugin provided migrations
130-
##### Add akka-persistence-migration to your project
131-
Add the following to your `build.sbt`
132-
```
133-
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.5.0"
134-
```
135-
For a maven project add:
136-
```xml
137-
<dependency>
138-
<groupId>com.swisborg</groupId>
139-
<artifactId>akka-persistence-postgres-migration_2.13</artifactId>
140-
<version>0.5.0</version>
141-
</dependency>
142-
```
143-
to your `pom.xml`.
144-
145-
##### Create and run migrations:
146-
```scala
147-
import akka.persistence.postgres.migration.journal.Jdbc4JournalMigration
148-
import akka.persistence.postgres.migration.snapshot.Jdbc4SnapshotStoreMigration
149-
150-
for {
151-
_ <- new Jdbc4JournalMigration(config).run()
152-
_ <- new Jdbc4SnapshotStoreMigration(config).run()
153-
} yield ()
154-
```
155-
**Very important note**: The migration has to be finished before your application starts any persistent actors!
156-
157-
It's your choice whether you want to trigger migration manually or (recommended) leverage a database version control system of your choice (e.g. Flyway).
158-
159-
#### Examples
160-
An example Flyway-based migration can be found in the demo app: https://github.com/mkubala/demo-akka-persistence-postgres/blob/master/src/main/scala/com/github/mkubala/FlywayMigrationExample.scala
161-
162-
### Migration from akka-persistence-postgres 0.4.0 to 0.5.0
163-
New indices need to be created on each partition, to avoid locking production databases for too long, it should be done in 2 steps:
164-
1. manually create indices CONCURRENTLY,
165-
2. deploy new release with migration scripts.
166-
167-
#### Manually create indices CONCURRENTLY
168-
Execute DDL statements produced by the [sample migration script](scripts/migration-0.5.0/partitioned/1-add-indices-manually.sql), adapt top level variables to match your journal configuration before executing.
117+
## Migration
169118

170-
#### Deploy new release with migration scripts
171-
See [sample flyway migration script](scripts/migration-0.5.0/partitioned/2-add-indices-flyway.sql) and adapt top level variables to match your journal configuration.
119+
Please see the documentation regarding migrations [here](https://swissborg.github.io/akka-persistence-postgres/migration).
172120

173121
## Contributing
174122
We are also always looking for contributions and new ideas, so if you’d like to join the project, check out the [open issues](https://github.com/SwissBorg/akka-persistence-postgres/issues), or post your own suggestions!

core/src/main/resources/reference.conf

+42-3
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,17 @@ postgres-journal {
126126
metadata = "metadata"
127127
}
128128
}
129-
129+
# Used to hold journal information that can be used to speed up queries
130+
journalMetadata {
131+
tableName = "journal_metadata"
132+
schemaName = ""
133+
columnNames = {
134+
persistenceId = "persistence_id"
135+
maxSequenceNumber = "max_sequence_number"
136+
maxOrdering = "max_ordering"
137+
minOrdering = "min_ordering"
138+
}
139+
}
130140
tags {
131141
tableName = "tags"
132142
schameName = ""
@@ -176,6 +186,14 @@ postgres-journal {
176186
# to the same value for these other journals.
177187
use-shared-db = null
178188

189+
# This setting can be used to enable the usage of the data being stored
190+
# at the journal_metadata table, in order to speed up some queries that would
191+
# solely use the journal table.
192+
# In case the metadata table does not hold the required information (not available yet),
193+
# the logic fallback to the journal-only queries.
194+
# This setting is disabled by default.
195+
use-journal-metadata = false
196+
179197
slick {
180198

181199
db {
@@ -358,7 +376,18 @@ postgres-read-journal {
358376
# to the same value for these other journals.
359377
use-shared-db = null
360378

361-
dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao"
379+
# This setting can be used to enable the usage of the data being stored
380+
# at the journal_metadata table, in order to speed up some queries that would
381+
# solely use the journal table.
382+
# In case the metadata table does not hold the required information (not available yet),
383+
# the logic fallback to the journal-only queries.
384+
# This setting is disabled by default.
385+
use-journal-metadata = false
386+
387+
388+
# Replace with "akka.persistence.postgres.query.dao.PartitionedReadJournalDao" in order to leverage dedicated queries to
389+
# partitioned journal.
390+
dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao"
362391

363392
# Confguration for akka.persistence.postgres.tag.TagIdResolver
364393
tags {
@@ -402,7 +431,17 @@ postgres-read-journal {
402431
message = "message"
403432
}
404433
}
405-
434+
# Used to hold journal information that can be used to speed up queries
435+
journalMetadata {
436+
tableName = "journal_metadata"
437+
schemaName = ""
438+
columnNames = {
439+
persistenceId = "persistence_id"
440+
maxSequenceNumber = "max_sequence_number"
441+
maxOrdering = "max_ordering"
442+
minOrdering = "min_ordering"
443+
}
444+
}
406445
tags {
407446
tableName = "tags"
408447
schameName = ""

core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala

+31-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import scala.concurrent.duration._
1212

1313
object ConfigKeys {
1414
val useSharedDb = "use-shared-db"
15+
val useJournalMetadata = "use-journal-metadata"
1516
}
1617

1718
class SlickConfiguration(config: Config) {
@@ -49,6 +50,26 @@ class JournalTableConfiguration(config: Config) {
4950
override def toString: String = s"JournalTableConfiguration($tableName,$schemaName,$columnNames)"
5051
}
5152

53+
class JournalMetadataTableColumnNames(config: Config) {
54+
private val cfg = config.asConfig("tables.journalMetadata.columnNames")
55+
val id: String = cfg.as[String]("id", "id")
56+
val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id")
57+
val maxSequenceNumber: String = cfg.as[String]("maxSequenceNumber", "max_sequence_number")
58+
val maxOrdering: String = cfg.as[String]("maxOrdering", "max_ordering")
59+
val minOrdering: String = cfg.as[String]("minOrdering", "min_ordering")
60+
61+
override def toString: String =
62+
s"JournalMetadataTableColumnNames($id,$persistenceId,$maxSequenceNumber,$maxOrdering,$minOrdering)"
63+
}
64+
65+
class JournalMetadataTableConfiguration(config: Config) {
66+
private val cfg = config.asConfig("tables.journalMetadata")
67+
val tableName: String = cfg.as[String]("tableName", "journal_metadata")
68+
val schemaName: Option[String] = cfg.as[String]("schemaName").trim
69+
val columnNames: JournalMetadataTableColumnNames = new JournalMetadataTableColumnNames(config)
70+
override def toString: String = s"JournalMetadataTableConfiguration($tableName,$schemaName,$columnNames)"
71+
}
72+
5273
class SnapshotTableColumnNames(config: Config) {
5374
private val cfg = config.asConfig("tables.snapshot.columnNames")
5475
val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id")
@@ -86,7 +107,7 @@ class TagsTableConfiguration(config: Config) {
86107
}
87108

88109
class JournalPluginConfig(config: Config) {
89-
val dao: String = config.asString("dao", "akka.persistence.postgres.dao.bytea.journal.FlatJournalDao")
110+
val dao: String = config.asString("dao", "akka.persistence.postgres.journal.dao.FlatJournalDao")
90111
override def toString: String = s"JournalPluginConfig($dao)"
91112
}
92113

@@ -101,12 +122,12 @@ class BaseByteArrayJournalDaoConfig(config: Config) {
101122
}
102123

103124
class ReadJournalPluginConfig(config: Config) {
104-
val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.readjournal.ByteArrayReadJournalDao")
125+
val dao: String = config.as[String]("dao", "akka.persistence.postgres.query.dao.FlatReadJournalDao")
105126
override def toString: String = s"ReadJournalPluginConfig($dao)"
106127
}
107128

108129
class SnapshotPluginConfig(config: Config) {
109-
val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.snapshot.ByteArraySnapshotDao")
130+
val dao: String = config.as[String]("dao", "akka.persistence.postgres.snapshot.dao.ByteArraySnapshotDao")
110131
override def toString: String = s"SnapshotPluginConfig($dao)"
111132
}
112133

@@ -122,13 +143,16 @@ class TagsConfig(config: Config) {
122143
class JournalConfig(config: Config) {
123144
val partitionsConfig = new JournalPartitionsConfiguration(config)
124145
val journalTableConfiguration = new JournalTableConfiguration(config)
146+
val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config)
125147
val pluginConfig = new JournalPluginConfig(config)
126148
val daoConfig = new BaseByteArrayJournalDaoConfig(config)
127149
val tagsConfig = new TagsConfig(config)
128150
val tagsTableConfiguration = new TagsTableConfiguration(config)
129151
val useSharedDb: Option[String] = config.asOptionalNonEmptyString(ConfigKeys.useSharedDb)
152+
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)
153+
130154
override def toString: String =
131-
s"JournalConfig($journalTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb)"
155+
s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb,$useJournalMetadata)"
132156
}
133157

134158
class SnapshotConfig(config: Config) {
@@ -156,6 +180,7 @@ case class JournalSequenceRetrievalConfig(
156180

157181
class ReadJournalConfig(config: Config) {
158182
val journalTableConfiguration = new JournalTableConfiguration(config)
183+
val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config)
159184
val journalSequenceRetrievalConfiguration = JournalSequenceRetrievalConfig(config)
160185
val pluginConfig = new ReadJournalPluginConfig(config)
161186
val tagsConfig = new TagsConfig(config)
@@ -164,7 +189,8 @@ class ReadJournalConfig(config: Config) {
164189
val maxBufferSize: Int = config.as[String]("max-buffer-size", "500").toInt
165190
val addShutdownHook: Boolean = config.asBoolean("add-shutdown-hook", true)
166191
val includeDeleted: Boolean = config.as[Boolean]("includeLogicallyDeleted", true)
192+
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)
167193

168194
override def toString: String =
169-
s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)"
195+
s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted,$useJournalMetadata)"
170196
}

core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala

+20-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import akka.stream.scaladsl.{ Keep, Sink, Source }
1515
import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult }
1616
import akka.{ Done, NotUsed }
1717
import org.slf4j.{ Logger, LoggerFactory }
18+
import slick.dbio.DBIOAction
1819
import slick.jdbc.JdbcBackend._
1920

2021
import scala.collection.immutable._
@@ -39,6 +40,9 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW
3940

4041
val logger: Logger = LoggerFactory.getLogger(this.getClass)
4142

43+
lazy val metadataQueries: JournalMetadataQueries = new JournalMetadataQueries(
44+
JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))
45+
4246
// This logging may block since we don't control how the user will configure logback
4347
// We can't use a Akka logging neither because we don't have an ActorSystem in scope and
4448
// we should not introduce another dependency here.
@@ -137,10 +141,22 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW
137141
private def highestMarkedSequenceNr(persistenceId: String) =
138142
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result
139143

140-
override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
141-
for {
142-
maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
143-
} yield maybeHighestSeqNo.getOrElse(0L)
144+
override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
145+
val query = if (journalConfig.useJournalMetadata) {
146+
metadataQueries.highestSequenceNrForPersistenceId(persistenceId).result.headOption.flatMap {
147+
case Some(maxSequenceNr) =>
148+
// return the stored max sequence nr on journal metadata table
149+
DBIOAction.successful(Some(maxSequenceNr))
150+
case None =>
151+
// journal metadata do not have information for this persistenceId -> fallback to standard behaviour
152+
queries.highestSequenceNrForPersistenceId(persistenceId).result
153+
}
154+
} else
155+
queries.highestSequenceNrForPersistenceId(persistenceId).result
156+
157+
// Default to 0L when nothing is found for this persistenceId
158+
db.run(query).map(_.getOrElse(0L))
159+
}
144160

145161
override def messages(
146162
persistenceId: String,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package akka.persistence.postgres.journal.dao
2+
3+
import slick.lifted.TableQuery
4+
5+
class JournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) {
6+
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
7+
8+
private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = {
9+
journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1)
10+
}
11+
12+
val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)
13+
14+
private def _minAndMaxOrderingForPersistenceId(
15+
persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] =
16+
journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering))
17+
18+
val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _)
19+
}

core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala

+18-13
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
1919
def writeJournalRows(xs: Seq[JournalRow]): FixedSqlAction[Option[Int], NoStream, slick.dbio.Effect.Write] =
2020
compiledJournalTable ++= xs.sortBy(_.sequenceNumber)
2121

22-
private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) =
23-
journalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc)
24-
2522
def delete(persistenceId: String, toSequenceNr: Long): FixedSqlAction[Int, NoStream, slick.dbio.Effect.Write] = {
2623
journalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete
2724
}
@@ -58,16 +55,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
5855

5956
val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)
6057

61-
private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
62-
selectAllJournalForPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)
63-
64-
val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _)
65-
66-
private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] =
67-
journalTable.map(_.persistenceId).distinct
68-
69-
val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)
70-
7158
private def _messagesQuery(
7259
persistenceId: Rep[String],
7360
fromSequenceNr: Rep[Long],
@@ -81,6 +68,24 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
8168
.sortBy(_.sequenceNumber.asc)
8269
.take(max)
8370

71+
private def _messagesOrderingBoundedQuery(
72+
persistenceId: Rep[String],
73+
fromSequenceNr: Rep[Long],
74+
toSequenceNr: Rep[Long],
75+
max: ConstColumn[Long],
76+
minOrdering: Rep[Long],
77+
maxOrdering: Rep[Long]): Query[JournalTable, JournalRow, Seq] =
78+
journalTable
79+
.filter(_.persistenceId === persistenceId)
80+
.filter(_.deleted === false)
81+
.filter(_.sequenceNumber >= fromSequenceNr)
82+
.filter(_.sequenceNumber <= toSequenceNr)
83+
.filter(_.ordering >= minOrdering)
84+
.filter(_.ordering <= maxOrdering)
85+
.sortBy(_.sequenceNumber.asc)
86+
.take(max)
87+
8488
val messagesQuery = Compiled(_messagesQuery _)
8589

90+
val messagesOrderingBoundedQuery = Compiled(_messagesOrderingBoundedQuery _)
8691
}

0 commit comments

Comments
 (0)