Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,18 @@ object PersistenceTestKitPlugin {
* Persistence testkit plugin for snapshots.
*/
@InternalApi
class PersistenceTestKitSnapshotPlugin extends SnapshotStore {
class PersistenceTestKitSnapshotPlugin(
// providing this parameter in first position as unused
// because Persistence extension that instantiates the plugins
// does not support constructors without it
@nowarn("msg=never used") cfg: Config,
cfgPath: String)
extends SnapshotStore {

private final val storage = SnapshotStorageEmulatorExtension(context.system)
private final val storage = {
log.debug("Using snapshot storage emulator extension [{}] for test kit snapshot storage", cfgPath)
SnapshotStorageEmulatorExtension(context.system).storageFor(cfgPath)
}

override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
Future.fromTry(Try(storage.tryRead(persistenceId, criteria)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.testkit.internal

import java.util.concurrent.ConcurrentHashMap

import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
import akka.actor.Extension
import akka.annotation.InternalApi
Expand All @@ -14,17 +16,34 @@ import akka.persistence.testkit.scaladsl.SnapshotTestKit
* INTERNAL API
*/
@InternalApi
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider {
private[testkit] object SnapshotStorageEmulatorExtension
extends ExtensionId[SnapshotStorageEmulatorExtension]
with ExtensionIdProvider {

override def get(system: ActorSystem): SnapshotStorage = super.get(system)
override def get(system: ActorSystem): SnapshotStorageEmulatorExtension = super.get(system)

override def createExtension(system: ExtendedActorSystem): SnapshotStorage =
if (SnapshotTestKit.Settings(system).serialize) {
new SerializedSnapshotStorageImpl(system)
} else {
new SimpleSnapshotStorageImpl
}
override def createExtension(system: ExtendedActorSystem): SnapshotStorageEmulatorExtension =
new SnapshotStorageEmulatorExtension(system)

override def lookup: ExtensionId[_ <: Extension] =
SnapshotStorageEmulatorExtension
}

/**
* INTERNAL API
*/
@InternalApi
final class SnapshotStorageEmulatorExtension(system: ExtendedActorSystem) extends Extension {
private val stores = new ConcurrentHashMap[String, SnapshotStorage]()
private lazy val shouldCreateSerializedSnapshotStorage = SnapshotTestKit.Settings(system).serialize

def storageFor(key: String): SnapshotStorage =
stores.computeIfAbsent(key, _ => {
// we don't really care about the key here, we just want separate instances
if (shouldCreateSerializedSnapshotStorage) {
new SerializedSnapshotStorageImpl(system)
} else {
new SimpleSnapshotStorageImpl
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ class SnapshotTestKit(system: ActorSystem)

import SnapshotTestKit._

override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system)
override protected val storage: SnapshotStorage =
SnapshotStorageEmulatorExtension(system).storageFor(PersistenceTestKitSnapshotPlugin.PluginId)

override def getItem(persistenceId: String, nextInd: Int): Option[Any] = {
storage.firstInExpectNextQueue(persistenceId).map(reprToAny)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (C) 2020-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.testkit.scaladsl

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.adapter._
import akka.persistence.JournalProtocol.RecoverySuccess
import akka.persistence.JournalProtocol.ReplayMessages
import akka.persistence.JournalProtocol.ReplayedMessage
import akka.persistence.Persistence
import akka.persistence.SelectedSnapshot
import akka.persistence.SnapshotProtocol.LoadSnapshot
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.RetentionCriteria
import com.typesafe.config.ConfigFactory
import org.scalatest.Inside
import org.scalatest.wordspec.AnyWordSpecLike

object RuntimeJournalsSpec {

private object Actor {
sealed trait Command
case class Save(text: String, replyTo: ActorRef[Done]) extends Command
case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
case object Stop extends Command

def apply(persistenceId: String, journal: String): Behavior[Command] =
EventSourcedBehavior[Command, String, String](
PersistenceId.ofUniqueId(persistenceId),
"",
(state, cmd) =>
cmd match {
case Save(text, replyTo) =>
Effect.persist(text).thenRun(_ => replyTo ! Done)
case ShowMeWhatYouGot(replyTo) =>
replyTo ! state
Effect.none
case Stop =>
Effect.stop()
},
(state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|"))
.withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue))
.withJournalPluginId(s"$journal.journal")
.withJournalPluginConfig(Some(config(journal)))
.withSnapshotPluginId(s"$journal.snapshot")
.withSnapshotPluginConfig(Some(config(journal)))

}

private def config(journal: String) = {
ConfigFactory.parseString(s"""
$journal {
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
snapshot.class = "${classOf[PersistenceTestKitSnapshotPlugin].getName}"
}
""")
}
}

class RuntimeJournalsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing with Inside {

import RuntimeJournalsSpec._

"The testkit journal and snapshot store plugins" must {

"be possible to configure at runtime and use in multiple isolated instances" in {
val probe = createTestProbe[Any]()

{
// one actor in each journal with same id
val j1 = spawn(Actor("id1", "journal1"))
val j2 = spawn(Actor("id1", "journal2"))
j1 ! Actor.Save("j1m1", probe.ref)
probe.receiveMessage()
j2 ! Actor.Save("j2m1", probe.ref)
probe.receiveMessage()
}

{
def assertJournal(journal: String, expectedEvent: String) = {
val ref = Persistence(system).journalFor(s"$journal.journal", config(journal))
ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", probe.ref.toClassic), probe.ref.toClassic)
inside(probe.receiveMessage()) {
case ReplayedMessage(persistentRepr) =>
persistentRepr.persistenceId shouldBe "id1"
persistentRepr.payload shouldBe expectedEvent
}
probe.expectMessage(RecoverySuccess(1))
}

assertJournal("journal1", "j1m1")
assertJournal("journal2", "j2m1")
}

{
def assertSnapshot(journal: String, expectedShapshot: String) = {
val ref = Persistence(system).snapshotStoreFor(s"$journal.snapshot", config(journal))
ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, Long.MaxValue), probe.ref.toClassic)
inside(probe.receiveMessage()) {
case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) =>
snapshot shouldBe expectedShapshot
}
}

assertSnapshot("journal1", "j1m1")
assertSnapshot("journal2", "j2m1")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withJournalPluginConfig")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withSnapshotPluginConfig")
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import akka.persistence.typed.scaladsl.ReplicationInterceptor
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentation
import akka.persistence.typed.scaladsl.SnapshotWhenPredicate
import akka.util.Helpers.ConfigOps
import akka.util.OptionVal
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.FiniteDuration

/**
* INTERNAL API
Expand Down Expand Up @@ -70,8 +74,11 @@ private[akka] final class BehaviorSetup[C, E, S](

val persistence: Persistence = Persistence(context.system.toClassic)

val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
val journal: ClassicActorRef =
persistence.journalFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR, I agree that it would be a nice feature. Before merging I would like to see that we can support it all the way in the r2dbc plugin, which currently isn't using that config. I recall we had problems and gave up in the Cassandra plugin akka/akka-persistence-cassandra#694

It's not only about the journal and snapshot. It should work with the queries too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR for Pekko I was planning to port to Akka too - apache/pekko-projection#225. The r2dbc plugin would just piggyback off that, IIUC. Is this fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, I think I finally understood what you had in mind with potential issues - my comment about projections was not relevant for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thinking is that journal plugin should simply not use the query plugin - the code doing the replay could be shared between journal and query plugins without the need for the former to depend on latter. @patriknw Does that make sense?

val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(
settings.snapshotPluginId,
settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty))

val (isSnapshotOptional: Boolean, isOnlyOneSnapshot: Boolean) = {
val snapshotStoreConfig = Persistence(context.system.classicSystem).configFor(snapshotStore)
Expand Down Expand Up @@ -125,16 +132,19 @@ private[akka] final class BehaviorSetup[C, E, S](

private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None

val recoveryEventTimeout: FiniteDuration = persistence
.journalConfigFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
.getMillisDuration("recovery-event-timeout")

def startRecoveryTimer(snapshot: Boolean): Unit = {
cancelRecoveryTimer()
implicit val ec: ExecutionContext = context.executionContext
val timer =
if (snapshot)
context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
context.scheduleOnce(recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
else
context.system.scheduler.scheduleWithFixedDelay(settings.recoveryEventTimeout, settings.recoveryEventTimeout) {
() =>
context.self ! RecoveryTickEvent(snapshot = false)
context.system.scheduler.scheduleWithFixedDelay(recoveryEventTimeout, recoveryEventTimeout) { () =>
context.self ! RecoveryTickEvent(snapshot = false)
}
recoveryTimer = OptionVal.Some(timer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.typed.internal
import java.util.Optional
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import org.slf4j.LoggerFactory
import akka.Done
import akka.actor.typed
Expand Down Expand Up @@ -44,6 +45,7 @@ import akka.persistence.typed.scaladsl._
import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery }
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentationProvider
import com.typesafe.config.Config

@InternalApi
private[akka] object EventSourcedBehaviorImpl {
Expand Down Expand Up @@ -93,6 +95,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
loggerClass: Class[_],
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
journalPluginConfig: Option[Config] = None,
snapshotPluginConfig: Option[Config] = None,
tagger: (State, Event) => Set[String] = (_: State, _: Event) => Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
Expand Down Expand Up @@ -139,7 +143,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
ctx.system,
journalPluginId.getOrElse(""),
snapshotPluginId.getOrElse(""),
customStashCapacity)
customStashCapacity,
journalPluginConfig,
snapshotPluginConfig)

// stashState outside supervise because StashState should survive restarts due to persist failures
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
Expand Down Expand Up @@ -271,6 +277,14 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(snapshotPluginId = if (id != "") Some(id) else None)
}

override def withJournalPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
copy(journalPluginConfig = config)
}

override def withSnapshotPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
copy(snapshotPluginConfig = config)
}

override def withSnapshotSelectionCriteria(
selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection.toClassic))
Expand Down
Loading