Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ public Behavior<MyMsg> receive(TypedActorContext<MyMsg> context, MyMsg message)
Behaviors.supervise(Behaviors.<MyMsg>ignore())
.onFailure(IllegalStateException.class, strategy6))
.onFailure(RuntimeException.class, strategy1);
// or using flattern API:
Behavior<MyMsg> flatternBehv =
Behaviors.supervise(Behaviors.<MyMsg>ignore())
.whenFailure(IllegalStateException.class, strategy6)
.whenFailure(RuntimeException.class, strategy1);
}

// actor context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,9 @@ class StubbedSupervisionSpec extends AnyWordSpec with Matchers with LogCapturing
inbox.receiveMessage() should ===(State(1, Map.empty))
}

"support nesting to handle different exceptions" in {
def testNestedSupervision[T](supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = {
val inbox = TestInbox[Event]("evt")
val behv =
supervise(supervise(targetBehavior(inbox.ref)).onFailure[Exc2](SupervisorStrategy.resume))
.onFailure[Exc3](SupervisorStrategy.restart)
val behv = supervisedBehavior(targetBehavior(inbox.ref))
val testkit = BehaviorTestKit(behv)
testkit.run(IncrementState)
testkit.run(GetState)
Expand All @@ -192,6 +190,14 @@ class StubbedSupervisionSpec extends AnyWordSpec with Matchers with LogCapturing
inbox.receiveMessage() should ===(ReceivedSignal(PostStop))
}

"support nesting to handle different exceptions" in testNestedSupervision { behv =>
supervise(supervise(behv).onFailure[Exc2](SupervisorStrategy.resume)).onFailure[Exc3](SupervisorStrategy.restart)
}

"flatten support nesting to handle different exceptions" in testNestedSupervision { behv =>
supervise(behv).whenFailure[Exc2](SupervisorStrategy.resume).whenFailure[Exc3](SupervisorStrategy.restart)
}

"not catch fatal error" in {
val inbox = TestInbox[Event]()
val behv = Behaviors.supervise(targetBehavior(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart)
Expand Down Expand Up @@ -397,11 +403,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
}
}

"support nesting exceptions with different strategies" in {
def testNestedSupervision[T](supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = {
val probe = TestProbe[Event]("evt")
val behv =
supervise(supervise(targetBehavior(probe.ref)).onFailure[RuntimeException](SupervisorStrategy.stop))
.onFailure[Exception](SupervisorStrategy.restart)
val behv = supervisedBehavior(targetBehavior(probe.ref))

val ref = spawn(behv)

Expand All @@ -416,13 +420,21 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
}
}

"support nesting exceptions with outer restart and inner backoff strategies" in {
"support nesting exceptions with different strategies" in testNestedSupervision { behv =>
supervise(supervise(behv).onFailure[RuntimeException](SupervisorStrategy.stop))
.onFailure[Exception](SupervisorStrategy.restart)
}

"flatten support nesting exceptions with different strategies" in testNestedSupervision { behv =>
supervise(behv)
.whenFailure[RuntimeException](SupervisorStrategy.stop)
.whenFailure[Exception](SupervisorStrategy.restart)
}

def testNestedSupervisionWithRestartThenBackoff[T](
supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = {
val probe = TestProbe[Event]("evt")
val behv =
supervise(
supervise(targetBehavior(probe.ref))
.onFailure[IllegalArgumentException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)))
.onFailure[IOException](SupervisorStrategy.restart)
val behv = supervisedBehavior(targetBehavior(probe.ref))

val ref = spawn(behv)

Expand All @@ -444,11 +456,25 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
probe.expectMessage(Pong(2))
}

"support nesting exceptions with inner restart and outer backoff strategies" in {
"support nesting exceptions with outer restart and inner backoff strategies" in testNestedSupervisionWithRestartThenBackoff {
behv =>
supervise(
supervise(behv).onFailure[IllegalArgumentException](
SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)))
.onFailure[IOException](SupervisorStrategy.restart)
}

"flatten support nesting exceptions with outer restart and inner backoff strategies" in testNestedSupervisionWithRestartThenBackoff {
behv =>
supervise(behv)
.whenFailure[IllegalArgumentException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))
.whenFailure[IOException](SupervisorStrategy.restart)
}

def testNestedSupervisionWithBackoffThenRestart[T](
supervisedBehavior: Behavior[Command] => Behavior[Command]): Unit = {
val probe = TestProbe[Event]("evt")
val behv =
supervise(supervise(targetBehavior(probe.ref)).onFailure[IllegalArgumentException](SupervisorStrategy.restart))
.onFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))
val behv = supervisedBehavior(targetBehavior(probe.ref))

val ref = spawn(behv)

Expand All @@ -470,6 +496,19 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
probe.expectMessage(Pong(2))
}

"support nesting exceptions with inner restart and outer backoff strategies" in testNestedSupervisionWithBackoffThenRestart {
behv =>
supervise(supervise(behv).onFailure[IllegalArgumentException](SupervisorStrategy.restart))
.onFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))
}

"flatten support nesting exceptions with inner restart and outer backoff strategies" in testNestedSupervisionWithBackoffThenRestart {
behv =>
supervise(behv)
.whenFailure[IllegalArgumentException](SupervisorStrategy.restart)
.whenFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))
}

"stop when not supervised" in {
val probe = TestProbe[Event]("evt")
val behv = targetBehavior(probe.ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ object SupervisionCompileOnly {
Behaviors
.supervise(Behaviors.supervise(behavior).onFailure[IllegalStateException](SupervisorStrategy.restart))
.onFailure[IllegalArgumentException](SupervisorStrategy.stop)

// or flatten ways
Behaviors
.supervise(behavior)
.whenFailure[IllegalStateException](SupervisorStrategy.restart)
.whenFailure[IllegalArgumentException](SupervisorStrategy.stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should show this as the main way now that we have it, not sure we need to show the other one at all

//#multiple

//#wrap
Expand Down
32 changes: 31 additions & 1 deletion akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior
import akka.actor.typed.internal.BehaviorTags
import akka.actor.typed.internal.CachedProps
import akka.actor.typed.internal.InterceptorImpl
import akka.actor.typed.internal.Supervisor
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.util.OptionVal
Expand Down Expand Up @@ -113,6 +114,32 @@ abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.Extensible
def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T]
}

/**
* INTERNAL API
* A behavior type that could be supervised, Not for user extension.
*/
@InternalApi
class SuperviseBehavior[T] private[akka] (val wrapped: Behavior[T])
extends Behavior[T](BehaviorTags.SuperviseBehavior) {
private final val ThrowableClassTag = ClassTag(classOf[Throwable])

/** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */
def whenFailure[Thr <: Throwable](strategy: SupervisorStrategy)(implicit tag: ClassTag[Thr]): SuperviseBehavior[T] = {
val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag
new SuperviseBehavior[T](Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag))
}

/**
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws.
*
* Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior.
*/
def whenFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] =
whenFailure(strategy)(ClassTag(clazz))

private[akka] def unwrap: Behavior[T] = wrapped
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunate that the entire class needs to be public, but maybe separating in public trait and impl would be tricky. Make it final and replace "not for user extension" with "Not for user instantiation.

@InternalApi would mean the entire type is internal but we return it so drop that but put it on the unwrap method which becomes public from Java.


object Behavior {

final implicit class BehaviorDecorators[Inner](val behavior: Behavior[Inner]) extends AnyVal {
Expand Down Expand Up @@ -179,7 +206,8 @@ object Behavior {
val startedInner = start(wrapped.nestedBehavior, ctx.asInstanceOf[TypedActorContext[Any]])
if (startedInner eq wrapped.nestedBehavior) wrapped
else wrapped.replaceNested(startedInner)
case _ => behavior
case supervise: SuperviseBehavior[T] => start(supervise.unwrap, ctx)
case _ => behavior
}
}

Expand Down Expand Up @@ -265,6 +293,8 @@ object Behavior {
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
case BehaviorTags.DeferredBehavior =>
throw new IllegalArgumentException(s"deferred [$behavior] should not be passed to interpreter")
case BehaviorTags.SuperviseBehavior =>
throw new IllegalArgumentException(s"supervise [$behavior] should not be passed to interpreter")
case BehaviorTags.IgnoreBehavior =>
BehaviorImpl.same[T]
case BehaviorTags.StoppedBehavior =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ private[akka] object BehaviorTags {
final val SameBehavior = 6
final val FailedBehavior = 7
final val StoppedBehavior = 8
final val SuperviseBehavior = 9

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ import java.util.function.{ Supplier, Function => JFunction }
import scala.reflect.ClassTag

import akka.actor.typed._
import akka.actor.typed.internal.{
BehaviorImpl,
StashBufferImpl,
Supervisor,
TimerSchedulerImpl,
WithMdcBehaviorInterceptor
}
import akka.actor.typed.internal.{ BehaviorImpl, StashBufferImpl, TimerSchedulerImpl, WithMdcBehaviorInterceptor }
import akka.japi.function.{ Effect, Function2 => JapiFunction2 }
import akka.japi.pf.PFBuilder
import akka.util.ccompat.JavaConverters._
Expand Down Expand Up @@ -261,7 +255,7 @@ object Behaviors {
* Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior.
*/
def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): Behavior[T] =
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz))
new SuperviseBehavior[T](wrapped).whenFailure(clazz, strategy).unwrap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not be safe to change the return type here to a subtype of Behavior and not need the separate whenFailure name?


/**
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws.
Expand All @@ -270,6 +264,14 @@ object Behaviors {
*/
def onFailure(strategy: SupervisorStrategy): Behavior[T] =
onFailure(classOf[Exception], strategy)

/**
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws by use flatten ways.
*
* Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior.
*/
def whenFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] =
new SuperviseBehavior[T](wrapped).whenFailure(clazz, strategy)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ object Behaviors {
val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
}

/** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws by use flatten ways. */
def whenFailure[Thr <: Throwable](strategy: SupervisorStrategy)(
implicit tag: ClassTag[Thr] = ThrowableClassTag): SuperviseBehavior[T] = {
new SuperviseBehavior[T](wrapped).whenFailure(strategy)(tag)
}
}

/**
Expand Down