Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use journal plugin-dispatcher for eventsByPersistenceId recovery #889

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ akka.persistence.cassandra {
}

# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "akka.persistence.cassandra.default-dispatcher"
session-dispatcher = "akka.persistence.cassandra.the-session-dispatcher"

# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
Expand Down Expand Up @@ -81,7 +81,7 @@ akka.persistence.cassandra {
class = "akka.persistence.cassandra.journal.CassandraJournal"

# Dispatcher for the plugin actor
plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher"
plugin-dispatcher = "akka.persistence.cassandra.journal-dispatcher"

# Parameter indicating whether the journal keyspace should be auto created.
# Not all Cassandra settings are configurable when using autocreate and for
Expand Down Expand Up @@ -205,7 +205,7 @@ akka.persistence.cassandra {
class = "akka.persistence.cassandra.query.CassandraReadJournalProvider"

# Dispatcher for the plugin actors.
plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher"
plugin-dispatcher = "akka.persistence.cassandra.query-dispatcher"

read-profile = "akka-persistence-cassandra-profile"

Expand Down Expand Up @@ -458,7 +458,7 @@ akka.persistence.cassandra {
class = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore"

# Dispatcher for the plugin actor
plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher"
plugin-dispatcher = "akka.persistence.cassandra.snapshot-dispatcher"

write-profile = "akka-persistence-cassandra-snapshot-profile"
read-profile = "akka-persistence-cassandra-snapshot-profile"
Expand Down Expand Up @@ -576,6 +576,46 @@ akka.persistence.cassandra {
parallelism-max = 6
}
}

journal-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}

query-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}

the-session-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}

snapshot-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}
}

//#profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ClassicActorSystemProvider
import akka.stream.ActorAttributes

object EventsByTagMigration {
def apply(systemProvider: ClassicActorSystemProvider): EventsByTagMigration =
Expand Down Expand Up @@ -79,10 +80,9 @@ class EventsByTagMigration(
private lazy val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](pluginConfigPath + ".query")
private implicit val sys: ActorSystem = system

implicit val ec =
system.dispatchers.lookup(system.settings.config.getString(s"$pluginConfigPath.journal.plugin-dispatcher"))
private val settings: PluginSettings =
new PluginSettings(system, system.settings.config.getConfig(pluginConfigPath))
implicit val ec = system.dispatchers.lookup(settings.journalSettings.pluginDispatcher)
private val journalStatements = new CassandraJournalStatements(settings)
private val taggedPreparedStatements = new TaggedPreparedStatements(journalStatements, session.prepare)
private val tagWriterSession =
Expand Down Expand Up @@ -220,10 +220,12 @@ class EventsByTagMigration(
settings.querySettings.readProfile,
s"migrateToTag-$pid",
extractor =
EventsByTagMigration.rawPayloadOldTagSchemaExtractor(eventsByTagSettings.bucketSize, system))
EventsByTagMigration.rawPayloadOldTagSchemaExtractor(eventsByTagSettings.bucketSize, system),
ec)
.map(tagRecovery.sendMissingTagWriteRaw(tp, actorRunning = false))
.grouped(flushBatchSize)
.mapAsync(1)(_ => tagRecovery.flush(timeout))
.withAttributes(ActorAttributes.dispatcher(settings.journalSettings.pluginDispatcher))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config
import com.datastax.oss.driver.api.core.uuid.Uuids
import com.datastax.oss.protocol.internal.util.Bytes

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.immutable
Expand All @@ -40,8 +39,12 @@ import scala.concurrent._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import scala.compat.java8.FutureConverters._

import akka.annotation.DoNotInherit
import akka.annotation.InternalStableApi
import akka.dispatch.ExecutionContexts
import akka.stream.ActorAttributes
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source

/**
Expand Down Expand Up @@ -703,7 +706,9 @@ import akka.stream.scaladsl.Source
partitionNr * journalSettings.targetPartitionSize + 1

private def execute[T <: Statement[T]](stmt: Statement[T]): Future[Unit] = {
session.executeWrite(stmt.setExecutionProfileName(journalSettings.writeProfile)).map(_ => ())
session
.executeWrite(stmt.setExecutionProfileName(journalSettings.writeProfile))
.map(_ => ())(ExecutionContexts.parasitic)
}

// TODO this serialises and re-serialises the messages for fixing tag_views
Expand Down Expand Up @@ -733,6 +738,7 @@ import akka.stream.scaladsl.Source
fromSequenceNr,
toSequenceNr)

println(s"# asyncReplayMessages-1 Thread: ${Thread.currentThread().getName}") // FIXME
queries
.eventsByPersistenceId(
persistenceId,
Expand All @@ -742,12 +748,27 @@ import akka.stream.scaladsl.Source
None,
settings.journalSettings.readProfile,
"asyncReplayMessages",
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization))
.mapAsync(1)(tr.sendMissingTagWrite(tp))
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization),
ec)
.mapAsync(1) { x =>
println(s"# asyncReplayMessages-2 Thread: ${Thread.currentThread().getName}") // FIXME
tr.sendMissingTagWrite(tp)(x)
}
}))
.map(te => queries.mapEvent(te.pr))
.runForeach(replayCallback)
.map(_ => ())
.map { te =>
println(s"# asyncReplayMessages-3 Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}

queries.mapEvent(te.pr)
}
.map(replayCallback)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(settings.journalSettings.pluginDispatcher))
.run()
.map(_ => ())(ExecutionContexts.parasitic)

case None =>
queries
Expand All @@ -759,10 +780,22 @@ import akka.stream.scaladsl.Source
None,
settings.journalSettings.readProfile,
"asyncReplayMessages",
extractor = Extractors.persistentRepr(eventDeserializer, serialization))
extractor = Extractors.persistentRepr(eventDeserializer, serialization),
ec)
.map(queries.mapEvent)
.runForeach(replayCallback)
.map(_ => ())
.map { x =>
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}

x
}
.map(replayCallback)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(settings.journalSettings.pluginDispatcher))
.run()
.map(_ => ())(ExecutionContexts.parasitic)
}
}

Expand All @@ -789,15 +822,38 @@ import akka.stream.scaladsl.Source
None,
settings.journalSettings.readProfile,
"asyncReplayMessagesPreSnapshot",
Extractors.optionalTaggedPersistentRepr(eventDeserializer, serialization))
Extractors.optionalTaggedPersistentRepr(eventDeserializer, serialization),
ec)
.map { x =>
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}

x
}
.mapAsync(1) { t =>
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}
t.tagged match {
case OptionVal.Some(tpr) =>
tr.sendMissingTagWrite(tp)(tpr)
case OptionVal.None => FutureDone // no tags, skip
}
}
.runWith(Sink.ignore)
.map { x =>
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}

x
}
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(settings.journalSettings.pluginDispatcher))
.run()
} else {
log.debug(
"[{}] Recovery is starting before the latest tag writes tag progress. Min progress [{}]. From sequence nr of recovery: [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ import com.typesafe.config.Config

val coordinatedShutdownOnError: Boolean = config.getBoolean("coordinated-shutdown-on-error")

val pluginDispatcher: String = journalConfig.getString("plugin-dispatcher")

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ import akka.persistence.cassandra.PluginSettings
}

def selectSingleRow(persistenceId: String, pnr: Long)(implicit ec: ExecutionContext): Future[Option[Row]] = {
println(s"# EventsByPersistenceIdStage Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}

val boundStatement = selectSingleRowQuery.bind(persistenceId, pnr: JLong).setExecutionProfileName(profile)
session.executeAsync(boundStatement).toScala.map(rs => Option(rs.one()))
}
Expand All @@ -79,8 +85,15 @@ import akka.persistence.cassandra.PluginSettings
executeStatement(selectDeletedToQuery.bind(persistenceId).setExecutionProfileName(profile)).map(r =>
Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))

private def executeStatement(statement: Statement[_]): Future[AsyncResultSet] =
private def executeStatement(statement: Statement[_]): Future[AsyncResultSet] = {
println(s"# EventsByPersistenceIdStage Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}

session.executeAsync(statement).toScala
}

}

Expand Down Expand Up @@ -110,6 +123,7 @@ import akka.persistence.cassandra.PluginSettings
refreshInterval: Option[FiniteDuration],
session: EventsByPersistenceIdStage.EventsByPersistenceIdSession,
settings: PluginSettings,
executionContext: ExecutionContext,
fastForwardEnabled: Boolean = false)
extends GraphStageWithMaterializedValue[SourceShape[Row], EventsByPersistenceIdStage.Control] {

Expand All @@ -123,11 +137,11 @@ import akka.persistence.cassandra.PluginSettings
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = {
val logic = new TimerGraphStageLogic(shape) with OutHandler with StageLogging with Control {

implicit def ec: ExecutionContext = executionContext

override protected def logSource: Class[_] =
classOf[EventsByPersistenceIdStage]

implicit def ec = materializer.executionContext

val donePromise = Promise[Done]()

var expectedNextSeqNr = 0L // initialized in preStart
Expand Down Expand Up @@ -248,6 +262,12 @@ import akka.persistence.cassandra.PluginSettings
queryState = QueryInProgress(switchPartition = false, fetchMore = false, System.nanoTime())
session.highestDeletedSequenceNumber(persistenceId).onComplete(highestDeletedSequenceNrCb.invoke)

println(s"# EventsByPersistenceIdStage Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
}

refreshInterval match {
case Some(interval) =>
val initial =
Expand Down
Loading