-
Notifications
You must be signed in to change notification settings - Fork 3.6k
feat: support the flattening syntax for supervising #32465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
25a6342
ed6e9b4
71e27a2
82ee194
ac9b572
2199266
92021e9
31a19df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior | |
| import akka.actor.typed.internal.BehaviorTags | ||
| import akka.actor.typed.internal.CachedProps | ||
| import akka.actor.typed.internal.InterceptorImpl | ||
| import akka.actor.typed.internal.Supervisor | ||
| import akka.annotation.DoNotInherit | ||
| import akka.annotation.InternalApi | ||
| import akka.util.OptionVal | ||
|
|
@@ -113,6 +114,32 @@ abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.Extensible | |
| def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] | ||
| } | ||
|
|
||
| /** | ||
| * INTERNAL API | ||
| * A behavior type that could be supervised, Not for user extension. | ||
| */ | ||
| @InternalApi | ||
| class SuperviseBehavior[T] private[akka] (val wrapped: Behavior[T]) | ||
| extends Behavior[T](BehaviorTags.SuperviseBehavior) { | ||
| private final val ThrowableClassTag = ClassTag(classOf[Throwable]) | ||
|
|
||
| /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */ | ||
| def whenFailure[Thr <: Throwable](strategy: SupervisorStrategy)(implicit tag: ClassTag[Thr]): SuperviseBehavior[T] = { | ||
| val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag | ||
| new SuperviseBehavior[T](Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)) | ||
| } | ||
|
|
||
| /** | ||
| * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. | ||
| * | ||
| * Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior. | ||
| */ | ||
| def whenFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] = | ||
| whenFailure(strategy)(ClassTag(clazz)) | ||
|
|
||
| private[akka] def unwrap: Behavior[T] = wrapped | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunate that the entire class needs to be public, but maybe separating in public trait and impl would be tricky. Make it final and replace "not for user extension" with "Not for user instantiation.
|
||
|
|
||
| object Behavior { | ||
|
|
||
| final implicit class BehaviorDecorators[Inner](val behavior: Behavior[Inner]) extends AnyVal { | ||
|
|
@@ -179,7 +206,8 @@ object Behavior { | |
| val startedInner = start(wrapped.nestedBehavior, ctx.asInstanceOf[TypedActorContext[Any]]) | ||
| if (startedInner eq wrapped.nestedBehavior) wrapped | ||
| else wrapped.replaceNested(startedInner) | ||
| case _ => behavior | ||
| case supervise: SuperviseBehavior[T] => start(supervise.unwrap, ctx) | ||
| case _ => behavior | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -265,6 +293,8 @@ object Behavior { | |
| throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") | ||
| case BehaviorTags.DeferredBehavior => | ||
| throw new IllegalArgumentException(s"deferred [$behavior] should not be passed to interpreter") | ||
| case BehaviorTags.SuperviseBehavior => | ||
| throw new IllegalArgumentException(s"supervise [$behavior] should not be passed to interpreter") | ||
| case BehaviorTags.IgnoreBehavior => | ||
| BehaviorImpl.same[T] | ||
| case BehaviorTags.StoppedBehavior => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,13 +10,7 @@ import java.util.function.{ Supplier, Function => JFunction } | |
| import scala.reflect.ClassTag | ||
|
|
||
| import akka.actor.typed._ | ||
| import akka.actor.typed.internal.{ | ||
| BehaviorImpl, | ||
| StashBufferImpl, | ||
| Supervisor, | ||
| TimerSchedulerImpl, | ||
| WithMdcBehaviorInterceptor | ||
| } | ||
| import akka.actor.typed.internal.{ BehaviorImpl, StashBufferImpl, TimerSchedulerImpl, WithMdcBehaviorInterceptor } | ||
| import akka.japi.function.{ Effect, Function2 => JapiFunction2 } | ||
| import akka.japi.pf.PFBuilder | ||
| import akka.util.ccompat.JavaConverters._ | ||
|
|
@@ -261,7 +255,7 @@ object Behaviors { | |
| * Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior. | ||
| */ | ||
| def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): Behavior[T] = | ||
| Supervisor(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz)) | ||
| new SuperviseBehavior[T](wrapped).whenFailure(clazz, strategy).unwrap | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it not be safe to change the return type here to a subtype of Behavior and not need the separate |
||
|
|
||
| /** | ||
| * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. | ||
|
|
@@ -270,6 +264,14 @@ object Behaviors { | |
| */ | ||
| def onFailure(strategy: SupervisorStrategy): Behavior[T] = | ||
| onFailure(classOf[Exception], strategy) | ||
|
|
||
| /** | ||
| * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws by use flatten ways. | ||
| * | ||
| * Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior. | ||
| */ | ||
| def whenFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] = | ||
| new SuperviseBehavior[T](wrapped).whenFailure(clazz, strategy) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should show this as the main way now that we have it, not sure we need to show the other one at all