Skip to content

Commit 621d3c9

Browse files
authored
Merge pull request #147 from SwissBorg/import-upstream-changes
Import upstream changes
2 parents a4565cc + cd48ce7 commit 621d3c9

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

Diff for: core/src/main/scala/akka/persistence/postgres/query/scaladsl/PostgresReadJournal.scala

+10-5
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
5050
val readJournalConfig = new ReadJournalConfig(config)
5151

5252
private val writePluginId = config.getString("write-plugin")
53-
private val eventAdapters = Persistence(system).adaptersFor(writePluginId)
53+
// If 'config' is empty, or if the plugin reference is not found, then the write plugin will be resolved from the
54+
// ActorSystem configuration. Otherwise, it will be resolved from the provided 'config'.
55+
private val eventAdapters = Persistence(system).adaptersFor(writePluginId, config)
5456

5557
val readJournalDao: ReadJournalDao = {
5658
val slickDb = SlickExtension(system).database(config)
@@ -294,10 +296,13 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
294296
.mapConcat(identity)
295297
}
296298

297-
def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
298-
Source.future(readJournalDao.maxJournalSequence()).flatMapConcat { maxOrderingInDb =>
299-
eventsByTag(tag, offset, terminateAfterOffset = Some(maxOrderingInDb))
300-
}
299+
def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = {
300+
Source
301+
.futureSource(readJournalDao.maxJournalSequence().map { maxOrderingInDb =>
302+
eventsByTag(tag, offset, terminateAfterOffset = Some(maxOrderingInDb))
303+
})
304+
.mapMaterializedValue(_ => NotUsed)
305+
}
301306

302307
/**
303308
* Query events that have a specific tag.

Diff for: project/ProjectAutoPlugin.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ object ProjectAutoPlugin extends AutoPlugin {
2727
override val projectSettings: Seq[Setting[_]] = Seq(
2828
crossVersion := CrossVersion.binary,
2929
crossScalaVersions := Dependencies.ScalaVersions,
30-
scalaVersion := Dependencies.Scala212,
30+
scalaVersion := Dependencies.Scala213,
3131
Test / fork := true,
3232
Test / parallelExecution := false,
3333
Test / logBuffered := true,

0 commit comments

Comments
 (0)