diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index 2c8634ca017..a407c49dda5 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -177,6 +177,11 @@ public Behavior receive(TypedActorContext context, MyMsg message) Behaviors.supervise(Behaviors.ignore()) .onFailure(IllegalStateException.class, strategy6)) .onFailure(RuntimeException.class, strategy1); + // or using flattern API: + Behavior flatternBehv = + Behaviors.supervise(Behaviors.ignore()) + .whenFailure(IllegalStateException.class, strategy6) + .whenFailure(RuntimeException.class, strategy1); } // actor context diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 33f93560868..e1d670cbb49 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -164,11 +164,9 @@ class StubbedSupervisionSpec extends AnyWordSpec with Matchers with LogCapturing inbox.receiveMessage() should ===(State(1, Map.empty)) } - "support nesting to handle different exceptions" in { + def testNestedSupervision[T](supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = { val inbox = TestInbox[Event]("evt") - val behv = - supervise(supervise(targetBehavior(inbox.ref)).onFailure[Exc2](SupervisorStrategy.resume)) - .onFailure[Exc3](SupervisorStrategy.restart) + val behv = supervisedBehavior(targetBehavior(inbox.ref)) val testkit = BehaviorTestKit(behv) testkit.run(IncrementState) testkit.run(GetState) @@ -192,6 +190,14 @@ class StubbedSupervisionSpec extends AnyWordSpec with Matchers with LogCapturing inbox.receiveMessage() should ===(ReceivedSignal(PostStop)) } + "support nesting to handle different exceptions" in testNestedSupervision { behv => + supervise(supervise(behv).onFailure[Exc2](SupervisorStrategy.resume)).onFailure[Exc3](SupervisorStrategy.restart) + } + + "flatten support nesting to handle different exceptions" in testNestedSupervision { behv => + supervise(behv).whenFailure[Exc2](SupervisorStrategy.resume).whenFailure[Exc3](SupervisorStrategy.restart) + } + "not catch fatal error" in { val inbox = TestInbox[Event]() val behv = Behaviors.supervise(targetBehavior(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart) @@ -397,11 +403,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" } } - "support nesting exceptions with different strategies" in { + def testNestedSupervision[T](supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = { val probe = TestProbe[Event]("evt") - val behv = - supervise(supervise(targetBehavior(probe.ref)).onFailure[RuntimeException](SupervisorStrategy.stop)) - .onFailure[Exception](SupervisorStrategy.restart) + val behv = supervisedBehavior(targetBehavior(probe.ref)) val ref = spawn(behv) @@ -416,13 +420,21 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" } } - "support nesting exceptions with outer restart and inner backoff strategies" in { + "support nesting exceptions with different strategies" in testNestedSupervision { behv => + supervise(supervise(behv).onFailure[RuntimeException](SupervisorStrategy.stop)) + .onFailure[Exception](SupervisorStrategy.restart) + } + + "flatten support nesting exceptions with different strategies" in testNestedSupervision { behv => + supervise(behv) + .whenFailure[RuntimeException](SupervisorStrategy.stop) + .whenFailure[Exception](SupervisorStrategy.restart) + } + + def testNestedSupervisionWithRestartThenBackoff[T]( + supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = { val probe = TestProbe[Event]("evt") - val behv = - supervise( - supervise(targetBehavior(probe.ref)) - .onFailure[IllegalArgumentException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))) - .onFailure[IOException](SupervisorStrategy.restart) + val behv = supervisedBehavior(targetBehavior(probe.ref)) val ref = spawn(behv) @@ -444,11 +456,25 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" probe.expectMessage(Pong(2)) } - "support nesting exceptions with inner restart and outer backoff strategies" in { + "support nesting exceptions with outer restart and inner backoff strategies" in testNestedSupervisionWithRestartThenBackoff { + behv => + supervise( + supervise(behv).onFailure[IllegalArgumentException]( + SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))) + .onFailure[IOException](SupervisorStrategy.restart) + } + + "flatten support nesting exceptions with outer restart and inner backoff strategies" in testNestedSupervisionWithRestartThenBackoff { + behv => + supervise(behv) + .whenFailure[IllegalArgumentException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)) + .whenFailure[IOException](SupervisorStrategy.restart) + } + + def testNestedSupervisionWithBackoffThenRestart[T]( + supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = { val probe = TestProbe[Event]("evt") - val behv = - supervise(supervise(targetBehavior(probe.ref)).onFailure[IllegalArgumentException](SupervisorStrategy.restart)) - .onFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)) + val behv = supervisedBehavior(targetBehavior(probe.ref)) val ref = spawn(behv) @@ -470,6 +496,19 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" probe.expectMessage(Pong(2)) } + "support nesting exceptions with inner restart and outer backoff strategies" in testNestedSupervisionWithBackoffThenRestart { + behv => + supervise(supervise(behv).onFailure[IllegalArgumentException](SupervisorStrategy.restart)) + .onFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)) + } + + "flatten support nesting exceptions with inner restart and outer backoff strategies" in testNestedSupervisionWithBackoffThenRestart { + behv => + supervise(behv) + .whenFailure[IllegalArgumentException](SupervisorStrategy.restart) + .whenFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)) + } + "stop when not supervised" in { val probe = TestProbe[Event]("evt") val behv = targetBehavior(probe.ref) diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala index 54341c5f2d0..6de3b79e69e 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala @@ -36,6 +36,12 @@ object SupervisionCompileOnly { Behaviors .supervise(Behaviors.supervise(behavior).onFailure[IllegalStateException](SupervisorStrategy.restart)) .onFailure[IllegalArgumentException](SupervisorStrategy.stop) + + // or flatten ways + Behaviors + .supervise(behavior) + .whenFailure[IllegalStateException](SupervisorStrategy.restart) + .whenFailure[IllegalArgumentException](SupervisorStrategy.stop) //#multiple //#wrap diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 6ed9db91a19..cf06569ef65 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -15,6 +15,7 @@ import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior import akka.actor.typed.internal.BehaviorTags import akka.actor.typed.internal.CachedProps import akka.actor.typed.internal.InterceptorImpl +import akka.actor.typed.internal.Supervisor import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.util.OptionVal @@ -113,6 +114,32 @@ abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.Extensible def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] } +/** + * INTERNAL API + * A behavior type that could be supervised, Not for user extension. + */ +@InternalApi +class SuperviseBehavior[T] private[akka] (val wrapped: Behavior[T]) + extends Behavior[T](BehaviorTags.SuperviseBehavior) { + private final val ThrowableClassTag = ClassTag(classOf[Throwable]) + + /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */ + def whenFailure[Thr <: Throwable](strategy: SupervisorStrategy)(implicit tag: ClassTag[Thr]): SuperviseBehavior[T] = { + val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag + new SuperviseBehavior[T](Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)) + } + + /** + * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. + * + * Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior. + */ + def whenFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] = + whenFailure(strategy)(ClassTag(clazz)) + + private[akka] def unwrap: Behavior[T] = wrapped +} + object Behavior { final implicit class BehaviorDecorators[Inner](val behavior: Behavior[Inner]) extends AnyVal { @@ -179,7 +206,8 @@ object Behavior { val startedInner = start(wrapped.nestedBehavior, ctx.asInstanceOf[TypedActorContext[Any]]) if (startedInner eq wrapped.nestedBehavior) wrapped else wrapped.replaceNested(startedInner) - case _ => behavior + case supervise: SuperviseBehavior[T] => start(supervise.unwrap, ctx) + case _ => behavior } } @@ -265,6 +293,8 @@ object Behavior { throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") case BehaviorTags.DeferredBehavior => throw new IllegalArgumentException(s"deferred [$behavior] should not be passed to interpreter") + case BehaviorTags.SuperviseBehavior => + throw new IllegalArgumentException(s"supervise [$behavior] should not be passed to interpreter") case BehaviorTags.IgnoreBehavior => BehaviorImpl.same[T] case BehaviorTags.StoppedBehavior => diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index c848ec344d6..eb207c1d85b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -29,6 +29,7 @@ private[akka] object BehaviorTags { final val SameBehavior = 6 final val FailedBehavior = 7 final val StoppedBehavior = 8 + final val SuperviseBehavior = 9 } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index 91d7f99ed88..a74bbaadce5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -10,13 +10,7 @@ import java.util.function.{ Supplier, Function => JFunction } import scala.reflect.ClassTag import akka.actor.typed._ -import akka.actor.typed.internal.{ - BehaviorImpl, - StashBufferImpl, - Supervisor, - TimerSchedulerImpl, - WithMdcBehaviorInterceptor -} +import akka.actor.typed.internal.{ BehaviorImpl, StashBufferImpl, TimerSchedulerImpl, WithMdcBehaviorInterceptor } import akka.japi.function.{ Effect, Function2 => JapiFunction2 } import akka.japi.pf.PFBuilder import akka.util.ccompat.JavaConverters._ @@ -261,7 +255,7 @@ object Behaviors { * Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior. */ def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): Behavior[T] = - Supervisor(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz)) + new SuperviseBehavior[T](wrapped).whenFailure(clazz, strategy).unwrap /** * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. @@ -270,6 +264,14 @@ object Behaviors { */ def onFailure(strategy: SupervisorStrategy): Behavior[T] = onFailure(classOf[Exception], strategy) + + /** + * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws by use flatten ways. + * + * Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior. + */ + def whenFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] = + new SuperviseBehavior[T](wrapped).whenFailure(clazz, strategy) } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 4145fb02046..e51541d9fe8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -227,6 +227,12 @@ object Behaviors { val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag) } + + /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws by use flatten ways. */ + def whenFailure[Thr <: Throwable](strategy: SupervisorStrategy)( + implicit tag: ClassTag[Thr] = ThrowableClassTag): SuperviseBehavior[T] = { + new SuperviseBehavior[T](wrapped).whenFailure(strategy)(tag) + } } /**