diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala index 4f02e72a2b4..6b640c4af28 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala @@ -104,9 +104,18 @@ object PersistenceTestKitPlugin { * Persistence testkit plugin for snapshots. */ @InternalApi -class PersistenceTestKitSnapshotPlugin extends SnapshotStore { +class PersistenceTestKitSnapshotPlugin( + // providing this parameter in first position as unused + // because Persistence extension that instantiates the plugins + // does not support constructors without it + @nowarn("msg=never used") cfg: Config, + cfgPath: String) + extends SnapshotStore { - private final val storage = SnapshotStorageEmulatorExtension(context.system) + private final val storage = { + log.debug("Using snapshot storage emulator extension [{}] for test kit snapshot storage", cfgPath) + SnapshotStorageEmulatorExtension(context.system).storageFor(cfgPath) + } override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = Future.fromTry(Try(storage.tryRead(persistenceId, criteria))) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala index cdd83fdb3f0..92041824755 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala @@ -4,6 +4,8 @@ package akka.persistence.testkit.internal +import java.util.concurrent.ConcurrentHashMap + import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider } import akka.actor.Extension import akka.annotation.InternalApi @@ -14,17 +16,34 @@ import akka.persistence.testkit.scaladsl.SnapshotTestKit * INTERNAL API */ @InternalApi -private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider { +private[testkit] object SnapshotStorageEmulatorExtension + extends ExtensionId[SnapshotStorageEmulatorExtension] + with ExtensionIdProvider { - override def get(system: ActorSystem): SnapshotStorage = super.get(system) + override def get(system: ActorSystem): SnapshotStorageEmulatorExtension = super.get(system) - override def createExtension(system: ExtendedActorSystem): SnapshotStorage = - if (SnapshotTestKit.Settings(system).serialize) { - new SerializedSnapshotStorageImpl(system) - } else { - new SimpleSnapshotStorageImpl - } + override def createExtension(system: ExtendedActorSystem): SnapshotStorageEmulatorExtension = + new SnapshotStorageEmulatorExtension(system) override def lookup: ExtensionId[_ <: Extension] = SnapshotStorageEmulatorExtension } + +/** + * INTERNAL API + */ +@InternalApi +final class SnapshotStorageEmulatorExtension(system: ExtendedActorSystem) extends Extension { + private val stores = new ConcurrentHashMap[String, SnapshotStorage]() + private lazy val shouldCreateSerializedSnapshotStorage = SnapshotTestKit.Settings(system).serialize + + def storageFor(key: String): SnapshotStorage = + stores.computeIfAbsent(key, _ => { + // we don't really care about the key here, we just want separate instances + if (shouldCreateSerializedSnapshotStorage) { + new SerializedSnapshotStorageImpl(system) + } else { + new SimpleSnapshotStorageImpl + } + }) +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala index cb51fa155d5..576c79b6528 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala @@ -328,7 +328,8 @@ class SnapshotTestKit(system: ActorSystem) import SnapshotTestKit._ - override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system) + override protected val storage: SnapshotStorage = + SnapshotStorageEmulatorExtension(system).storageFor(PersistenceTestKitSnapshotPlugin.PluginId) override def getItem(persistenceId: String, nextInd: Int): Option[Any] = { storage.firstInExpectNextQueue(persistenceId).map(reprToAny) diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/RuntimeJournalsSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/RuntimeJournalsSpec.scala new file mode 100644 index 00000000000..1ed8817a80e --- /dev/null +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/RuntimeJournalsSpec.scala @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2020-2024 Lightbend Inc. + */ + +package akka.persistence.testkit.scaladsl + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.adapter._ +import akka.persistence.JournalProtocol.RecoverySuccess +import akka.persistence.JournalProtocol.ReplayMessages +import akka.persistence.JournalProtocol.ReplayedMessage +import akka.persistence.Persistence +import akka.persistence.SelectedSnapshot +import akka.persistence.SnapshotProtocol.LoadSnapshot +import akka.persistence.SnapshotProtocol.LoadSnapshotResult +import akka.persistence.SnapshotSelectionCriteria +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.RetentionCriteria +import com.typesafe.config.ConfigFactory +import org.scalatest.Inside +import org.scalatest.wordspec.AnyWordSpecLike + +object RuntimeJournalsSpec { + + private object Actor { + sealed trait Command + case class Save(text: String, replyTo: ActorRef[Done]) extends Command + case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command + case object Stop extends Command + + def apply(persistenceId: String, journal: String): Behavior[Command] = + EventSourcedBehavior[Command, String, String]( + PersistenceId.ofUniqueId(persistenceId), + "", + (state, cmd) => + cmd match { + case Save(text, replyTo) => + Effect.persist(text).thenRun(_ => replyTo ! Done) + case ShowMeWhatYouGot(replyTo) => + replyTo ! state + Effect.none + case Stop => + Effect.stop() + }, + (state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|")) + .withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue)) + .withJournalPluginId(s"$journal.journal") + .withJournalPluginConfig(Some(config(journal))) + .withSnapshotPluginId(s"$journal.snapshot") + .withSnapshotPluginConfig(Some(config(journal))) + + } + + private def config(journal: String) = { + ConfigFactory.parseString(s""" + $journal { + journal.class = "${classOf[PersistenceTestKitPlugin].getName}" + snapshot.class = "${classOf[PersistenceTestKitSnapshotPlugin].getName}" + } + """) + } +} + +class RuntimeJournalsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing with Inside { + + import RuntimeJournalsSpec._ + + "The testkit journal and snapshot store plugins" must { + + "be possible to configure at runtime and use in multiple isolated instances" in { + val probe = createTestProbe[Any]() + + { + // one actor in each journal with same id + val j1 = spawn(Actor("id1", "journal1")) + val j2 = spawn(Actor("id1", "journal2")) + j1 ! Actor.Save("j1m1", probe.ref) + probe.receiveMessage() + j2 ! Actor.Save("j2m1", probe.ref) + probe.receiveMessage() + } + + { + def assertJournal(journal: String, expectedEvent: String) = { + val ref = Persistence(system).journalFor(s"$journal.journal", config(journal)) + ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", probe.ref.toClassic), probe.ref.toClassic) + inside(probe.receiveMessage()) { + case ReplayedMessage(persistentRepr) => + persistentRepr.persistenceId shouldBe "id1" + persistentRepr.payload shouldBe expectedEvent + } + probe.expectMessage(RecoverySuccess(1)) + } + + assertJournal("journal1", "j1m1") + assertJournal("journal2", "j2m1") + } + + { + def assertSnapshot(journal: String, expectedShapshot: String) = { + val ref = Persistence(system).snapshotStoreFor(s"$journal.snapshot", config(journal)) + ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, Long.MaxValue), probe.ref.toClassic) + inside(probe.receiveMessage()) { + case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) => + snapshot shouldBe expectedShapshot + } + } + + assertSnapshot("journal1", "j1m1") + assertSnapshot("journal2", "j2m1") + } + } + } +} diff --git a/akka-persistence-typed/src/main/mima-filters/2.10.x.backwards.excludes/eventsourcedbehavior.excludes b/akka-persistence-typed/src/main/mima-filters/2.10.x.backwards.excludes/eventsourcedbehavior.excludes new file mode 100644 index 00000000000..9c7c45ac4da --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.10.x.backwards.excludes/eventsourcedbehavior.excludes @@ -0,0 +1,2 @@ +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withJournalPluginConfig") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withSnapshotPluginConfig") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 5248eadde58..c3cec0a55d8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -23,7 +23,11 @@ import akka.persistence.typed.scaladsl.ReplicationInterceptor import akka.persistence.typed.scaladsl.RetentionCriteria import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentation import akka.persistence.typed.scaladsl.SnapshotWhenPredicate +import akka.util.Helpers.ConfigOps import akka.util.OptionVal +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -70,8 +74,11 @@ private[akka] final class BehaviorSetup[C, E, S]( val persistence: Persistence = Persistence(context.system.toClassic) - val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId) - val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId) + val journal: ClassicActorRef = + persistence.journalFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty)) + val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor( + settings.snapshotPluginId, + settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty)) val (isSnapshotOptional: Boolean, isOnlyOneSnapshot: Boolean) = { val snapshotStoreConfig = Persistence(context.system.classicSystem).configFor(snapshotStore) @@ -125,16 +132,19 @@ private[akka] final class BehaviorSetup[C, E, S]( private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None + val recoveryEventTimeout: FiniteDuration = persistence + .journalConfigFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty)) + .getMillisDuration("recovery-event-timeout") + def startRecoveryTimer(snapshot: Boolean): Unit = { cancelRecoveryTimer() implicit val ec: ExecutionContext = context.executionContext val timer = if (snapshot) - context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true)) + context.scheduleOnce(recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true)) else - context.system.scheduler.scheduleWithFixedDelay(settings.recoveryEventTimeout, settings.recoveryEventTimeout) { - () => - context.self ! RecoveryTickEvent(snapshot = false) + context.system.scheduler.scheduleWithFixedDelay(recoveryEventTimeout, recoveryEventTimeout) { () => + context.self ! RecoveryTickEvent(snapshot = false) } recoveryTimer = OptionVal.Some(timer) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index a92cdb46ba7..0857528f3c1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.internal import java.util.Optional import java.util.UUID import java.util.concurrent.atomic.AtomicInteger + import org.slf4j.LoggerFactory import akka.Done import akka.actor.typed @@ -44,6 +45,7 @@ import akka.persistence.typed.scaladsl._ import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery } import akka.persistence.typed.scaladsl.RetentionCriteria import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentationProvider +import com.typesafe.config.Config @InternalApi private[akka] object EventSourcedBehaviorImpl { @@ -93,6 +95,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( loggerClass: Class[_], journalPluginId: Option[String] = None, snapshotPluginId: Option[String] = None, + journalPluginConfig: Option[Config] = None, + snapshotPluginConfig: Option[Config] = None, tagger: (State, Event) => Set[String] = (_: State, _: Event) => Set.empty[String], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State], @@ -139,7 +143,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""), - customStashCapacity) + customStashCapacity, + journalPluginConfig, + snapshotPluginConfig) // stashState outside supervise because StashState should survive restarts due to persist failures val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings) @@ -271,6 +277,14 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( copy(snapshotPluginId = if (id != "") Some(id) else None) } + override def withJournalPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = { + copy(journalPluginConfig = config) + } + + override def withSnapshotPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = { + copy(snapshotPluginConfig = config) + } + override def withSnapshotSelectionCriteria( selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = { copy(recovery = Recovery(selection.toClassic)) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala index 5c3b500b792..b2d99eb1e38 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala @@ -4,15 +4,10 @@ package akka.persistence.typed.internal -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration._ - import com.typesafe.config.Config import akka.actor.typed.ActorSystem import akka.annotation.InternalApi -import akka.persistence.Persistence /** * INTERNAL API @@ -20,20 +15,37 @@ import akka.persistence.Persistence @InternalApi private[akka] object EventSourcedSettings { def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventSourcedSettings = - apply(system.settings.config, journalPluginId, snapshotPluginId, None) + apply(system.settings.config, journalPluginId, snapshotPluginId, None, None, None) def apply( system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String, customStashCapacity: Option[Int]): EventSourcedSettings = - apply(system.settings.config, journalPluginId, snapshotPluginId, customStashCapacity) + apply(system.settings.config, journalPluginId, snapshotPluginId, customStashCapacity, None, None) + + def apply( + system: ActorSystem[_], + journalPluginId: String, + snapshotPluginId: String, + customStashCapacity: Option[Int], + journalPluginConfig: Option[Config], + snapshotPluginConfig: Option[Config]): EventSourcedSettings = + apply( + system.settings.config, + journalPluginId, + snapshotPluginId, + customStashCapacity, + journalPluginConfig, + snapshotPluginConfig) def apply( config: Config, journalPluginId: String, snapshotPluginId: String, - customStashCapacity: Option[Int]): EventSourcedSettings = { + customStashCapacity: Option[Int], + journalPluginConfig: Option[Config], + snapshotPluginConfig: Option[Config]): EventSourcedSettings = { val typedConfig = config.getConfig("akka.persistence.typed") val stashOverflowStrategy = typedConfig.getString("stash-overflow-strategy").toLowerCase match { @@ -48,36 +60,18 @@ import akka.persistence.Persistence val logOnStashing = typedConfig.getBoolean("log-stashing") - val journalConfig = journalConfigFor(config, journalPluginId) - val recoveryEventTimeout: FiniteDuration = - journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis - val useContextLoggerForInternalLogging = typedConfig.getBoolean("use-context-logger-for-internal-logging") - Persistence.verifyPluginConfigExists(config, snapshotPluginId, "Snapshot store") - EventSourcedSettings( stashCapacity = stashCapacity, stashOverflowStrategy, logOnStashing = logOnStashing, - recoveryEventTimeout, journalPluginId, snapshotPluginId, + journalPluginConfig, + snapshotPluginConfig, useContextLoggerForInternalLogging) } - - private def journalConfigFor(config: Config, journalPluginId: String): Config = { - def defaultJournalPluginId = { - val configPath = config.getString("akka.persistence.journal.plugin") - Persistence.verifyPluginConfigIsDefined(configPath, "Default journal") - configPath - } - - val configPath = if (journalPluginId == "") defaultJournalPluginId else journalPluginId - Persistence.verifyPluginConfigExists(config, configPath, "Journal") - config.getConfig(configPath).withFallback(config.getConfig(Persistence.JournalFallbackConfigPath)) - } - } /** @@ -88,9 +82,10 @@ private[akka] final case class EventSourcedSettings( stashCapacity: Int, stashOverflowStrategy: StashOverflowStrategy, logOnStashing: Boolean, - recoveryEventTimeout: FiniteDuration, journalPluginId: String, snapshotPluginId: String, + journalPluginConfig: Option[Config], + snapshotPluginConfig: Option[Config], useContextLoggerForInternalLogging: Boolean) { require(journalPluginId != null, "journal plugin id must not be null; use empty string for 'default' journal") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 6c18dcb3f63..a6dbecd70c3 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -237,7 +237,7 @@ private[akka] final class ReplayingEvents[C, E, S]( this } else { val msg = - s"Replay timed out, didn't get event within [${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" + s"Replay timed out, didn't get event within [${setup.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" onRecoveryFailure(new RecoveryTimedOut(msg), None, "retrieving-events") } } else { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index b786f631689..96de17d36e9 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -120,8 +120,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup private def onRecoveryTick(snapshot: Boolean): Behavior[InternalProtocol] = if (snapshot) { // we know we're in snapshotting mode; snapshot recovery timeout arrived - val ex = new RecoveryTimedOut( - s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}") + val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.recoveryEventTimeout}") onRecoveryFailure(ex) } else Behaviors.same // ignore, since we received the snapshot already diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index d6e4afc3413..b12643d69dc 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -14,11 +14,14 @@ import akka.annotation.InternalApi import akka.persistence.typed.EventAdapter import akka.persistence.typed._ import akka.persistence.typed.internal._ - import java.util.Collections import java.util.Optional import java.util.concurrent.CompletionStage + +import com.typesafe.config.Config + import scala.annotation.nowarn +import scala.jdk.OptionConverters._ /** * For projects using Java 17 and newer, also see [[EventSourcedOnCommandBehavior]] @@ -125,6 +128,16 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( */ def snapshotPluginId: String = "" + /** + * Override and define the journal plugin config that this actor should use instead of the default. + */ + def journalPluginConfig: Optional[Config] = Optional.empty() + + /** + * Override and define the snapshot store plugin config that this actor should use instead of the default. + */ + def snapshotPluginConfig: Optional[Config] = Optional.empty() + /** * Override and define the snapshot selection criteria used by this actor instead of the default. * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events @@ -241,6 +254,8 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( .withJournalPluginId(journalPluginId) .withSnapshotPluginId(snapshotPluginId) .withRecovery(recovery.asScala) + .withJournalPluginConfig(journalPluginConfig.toScala) + .withSnapshotPluginConfig(snapshotPluginConfig.toScala) val handler = signalHandler() if (!handler.isEmpty) behavior = behavior.receiveSignal(handler.handler) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 4635e42390e..6754511d622 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -23,6 +23,7 @@ import akka.persistence.typed.ReplicaId import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.typed.internal._ +import com.typesafe.config.Config import scala.concurrent.Future @@ -156,6 +157,16 @@ object EventSourcedBehavior { */ def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] + /** + * Change the journal plugin config that this actor should use. + */ + def withJournalPluginConfig(id: Option[Config]): EventSourcedBehavior[Command, Event, State] + + /** + * Change the snapshot store plugin config that this actor should use. + */ + def withSnapshotPluginConfig(id: Option[Config]): EventSourcedBehavior[Command, Event, State] + /** * Changes the snapshot selection criteria used by this behavior. * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala index 5fadf563c96..7b171dfa74c 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala @@ -64,9 +64,10 @@ class StashStateSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with stashCapacity = capacity, stashOverflowStrategy = StashOverflowStrategy.Fail, logOnStashing = false, - recoveryEventTimeout = 3.seconds, journalPluginId = "", snapshotPluginId = "", + journalPluginConfig = None, + snapshotPluginConfig = None, useContextLoggerForInternalLogging = false) }