-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Detect and down suspended process, #30323 #30335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com> | ||
| */ | ||
|
|
||
| package akka.dispatch | ||
|
|
||
| import scala.concurrent.duration._ | ||
|
|
||
| import akka.actor.ActorSystem | ||
| import akka.actor.ClassicActorSystemProvider | ||
| import akka.actor.ExtendedActorSystem | ||
| import akka.actor.Extension | ||
| import akka.actor.ExtensionId | ||
| import akka.actor.ExtensionIdProvider | ||
| import akka.event.Logging | ||
|
|
||
| object SuspendDetector extends ExtensionId[SuspendDetector] with ExtensionIdProvider { | ||
| override def get(system: ActorSystem): SuspendDetector = super.get(system) | ||
|
|
||
| override def get(system: ClassicActorSystemProvider): SuspendDetector = super.get(system) | ||
|
|
||
| override def lookup = SuspendDetector | ||
|
|
||
| override def createExtension(system: ExtendedActorSystem): SuspendDetector = new SuspendDetector(system) | ||
|
|
||
| /** | ||
| * Published to the ActorSystem's eventStream when the process suspension has been detected. | ||
| * Note that this message could be stale when it is received and additional check with | ||
| * [[SuspendDetected#wasSuspended]] is recommended to be sure that the suspension occurred recently. | ||
| */ | ||
| final class SuspendDetected(suspendDetectedNanoTime: Long) { | ||
| def wasSuspended(since: FiniteDuration): Boolean = | ||
| (System.nanoTime() - suspendDetectedNanoTime <= since.toNanos) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| class SuspendDetector(val system: ExtendedActorSystem) extends Extension { | ||
| import SuspendDetector.SuspendDetected | ||
|
|
||
| // FIXME config | ||
| private val tickInterval = 100.millis | ||
| private val tickDeadlineNanos = 5.seconds.toNanos // FIXME default should be > 30 seconds | ||
|
|
||
| private val log = Logging(system, classOf[SuspendDetector]) | ||
|
|
||
| @volatile private var aliveTime = System.nanoTime() | ||
| @volatile private var suspendDetectedTime = aliveTime - 1.day.toNanos | ||
|
|
||
| system.scheduler.scheduleWithFixedDelay(tickInterval, tickInterval) { () => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't this approach (scheduling It seems like the intent is to capture scenarios where nothing in the JVM process gets scheduled (suspended process/VM, but also sufficiently-high levels of host load or priority inversions etc.), so maybe having the check run in a thread that is nearly always sleeping would be more effective and could have a faster reaction time? My understanding is that the kernel schedulers tend to be eager to give CPU to processes that haven't been runnable in a long time (e.g. because they're sleeping). |
||
| checkTime() | ||
| }(system.dispatcher) | ||
|
|
||
| private def checkTime(): Boolean = synchronized { | ||
| val now = System.nanoTime() | ||
| val suspendDetected = | ||
| if (now - aliveTime >= tickDeadlineNanos) { | ||
| suspendDetectedTime = now | ||
| true | ||
| } else { | ||
| false | ||
| } | ||
|
|
||
| if (suspendDetected) { | ||
| log.warning("Process was suspended for [{} seconds]", (now - aliveTime).nanos.toSeconds) | ||
| system.eventStream.publish(new SuspendDetected(now)) | ||
| } | ||
|
|
||
| aliveTime = now | ||
|
|
||
| suspendDetected | ||
| } | ||
|
|
||
| def wasSuspended(since: FiniteDuration): Boolean = { | ||
| checkTime() || (System.nanoTime() - suspendDetectedTime <= since.toNanos) | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import akka.cluster.Member | |
| import akka.cluster.Reachability | ||
| import akka.cluster.UniqueAddress | ||
| import akka.cluster.sbr.DowningStrategy.Decision | ||
| import akka.dispatch.SuspendDetector | ||
| import akka.event.DiagnosticMarkerBusLoggingAdapter | ||
| import akka.event.Logging | ||
| import akka.pattern.pipe | ||
|
|
@@ -146,6 +147,10 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent | |
|
|
||
| val log: DiagnosticMarkerBusLoggingAdapter = Logging.withMarker(this) | ||
|
|
||
| private val suspendTimeout = 1.minute | ||
| private val suspendDetector = SuspendDetector(context.system) // make sure it's started | ||
| context.system.eventStream.subscribe(self, classOf[SuspendDetector.SuspendDetected]) | ||
|
|
||
| @InternalStableApi | ||
| def strategy: DowningStrategy = _strategy | ||
|
|
||
|
|
@@ -287,6 +292,7 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent | |
| case Tick => tick() | ||
| case ThisActorSystemQuarantinedEvent(_, remote) => thisActorSystemWasQuarantined(remote) | ||
| case _: ClusterDomainEvent => // not interested in other events | ||
| case s: SuspendDetector.SuspendDetected => if (s.wasSuspended(suspendTimeout)) suspendDetected() | ||
| } | ||
|
|
||
| private def leaderChanged(leaderOption: Option[Address]): Unit = { | ||
|
|
@@ -296,53 +302,58 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent | |
| } | ||
|
|
||
| private def tick(): Unit = { | ||
| // note the DownAll due to instability is running on all nodes to make that decision as quickly and | ||
| // aggressively as possible if time is out | ||
| if (reachabilityChangedStats.changeCount > 0) { | ||
| val now = System.nanoTime() | ||
| val durationSinceLatestChange = (now - reachabilityChangedStats.latestChangeTimestamp).nanos | ||
| val durationSinceFirstChange = (now - reachabilityChangedStats.firstChangeTimestamp).nanos | ||
|
|
||
| val downAllWhenUnstableEnabled = downAllWhenUnstable > Duration.Zero | ||
| if (downAllWhenUnstableEnabled && durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) { | ||
| log.warning( | ||
| ClusterLogMarker.sbrInstability, | ||
| "SBR detected instability and will down all nodes: {}", | ||
| reachabilityChangedStats) | ||
| actOnDecision(DownAll) | ||
| } else if (!downAllWhenUnstableEnabled && durationSinceLatestChange > (stableAfter * 2)) { | ||
| // downAllWhenUnstable is disabled but reset for meaningful logging | ||
| log.debug("SBR no reachability changes within {} ms, resetting stats", (stableAfter * 2).toMillis) | ||
| resetReachabilityChangedStats() | ||
| if (suspendDetector.wasSuspended(suspendTimeout)) { | ||
| // note that suspend detection is running on all nodes | ||
| suspendDetected() | ||
| } else { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changes here is mostly indentation, added above if suspended case |
||
| // note the DownAll due to instability is running on all nodes to make that decision as quickly and | ||
| // aggressively as possible if time is out | ||
| if (reachabilityChangedStats.changeCount > 0) { | ||
| val now = System.nanoTime() | ||
| val durationSinceLatestChange = (now - reachabilityChangedStats.latestChangeTimestamp).nanos | ||
| val durationSinceFirstChange = (now - reachabilityChangedStats.firstChangeTimestamp).nanos | ||
|
|
||
| val downAllWhenUnstableEnabled = downAllWhenUnstable > Duration.Zero | ||
| if (downAllWhenUnstableEnabled && durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) { | ||
| log.warning( | ||
| ClusterLogMarker.sbrInstability, | ||
| "SBR detected instability and will down all nodes: {}", | ||
| reachabilityChangedStats) | ||
| actOnDecision(DownAll) | ||
| } else if (!downAllWhenUnstableEnabled && durationSinceLatestChange > (stableAfter * 2)) { | ||
| // downAllWhenUnstable is disabled but reset for meaningful logging | ||
| log.debug("SBR no reachability changes within {} ms, resetting stats", (stableAfter * 2).toMillis) | ||
| resetReachabilityChangedStats() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (isResponsible && strategy.unreachable.nonEmpty && stableDeadline.isOverdue()) { | ||
| strategy.decide() match { | ||
| case decision: AcquireLeaseDecision => | ||
| strategy.lease match { | ||
| case Some(lease) => | ||
| if (lease.checkLease()) { | ||
| log.info( | ||
| ClusterLogMarker.sbrLeaseAcquired(decision), | ||
| "SBR has acquired lease for decision [{}]", | ||
| decision) | ||
| actOnDecision(decision) | ||
| } else { | ||
| if (decision.acquireDelay == Duration.Zero) | ||
| acquireLease() // reply message is AcquireLeaseResult | ||
| else { | ||
| log.debug("SBR delayed attempt to acquire lease for [{} ms]", decision.acquireDelay.toMillis) | ||
| timers.startSingleTimer(AcquireLease, AcquireLease, decision.acquireDelay) | ||
| if (isResponsible && strategy.unreachable.nonEmpty && stableDeadline.isOverdue()) { | ||
| strategy.decide() match { | ||
| case decision: AcquireLeaseDecision => | ||
| strategy.lease match { | ||
| case Some(lease) => | ||
| if (lease.checkLease()) { | ||
| log.info( | ||
| ClusterLogMarker.sbrLeaseAcquired(decision), | ||
| "SBR has acquired lease for decision [{}]", | ||
| decision) | ||
| actOnDecision(decision) | ||
| } else { | ||
| if (decision.acquireDelay == Duration.Zero) | ||
| acquireLease() // reply message is AcquireLeaseResult | ||
| else { | ||
| log.debug("SBR delayed attempt to acquire lease for [{} ms]", decision.acquireDelay.toMillis) | ||
| timers.startSingleTimer(AcquireLease, AcquireLease, decision.acquireDelay) | ||
| } | ||
| context.become(waitingForLease(decision)) | ||
| } | ||
| context.become(waitingForLease(decision)) | ||
| } | ||
| case None => | ||
| throw new IllegalStateException("Unexpected lease decision although lease is not configured") | ||
| } | ||
| case None => | ||
| throw new IllegalStateException("Unexpected lease decision although lease is not configured") | ||
| } | ||
|
|
||
| case decision => | ||
| actOnDecision(decision) | ||
| case decision => | ||
| actOnDecision(decision) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -364,6 +375,15 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent | |
| } | ||
| } | ||
|
|
||
| private def suspendDetected(): Unit = { | ||
| if (strategy.allMembersInDC.size > 1) { | ||
| log.warning( | ||
| ClusterLogMarker.sbrInstability, | ||
| "SBR detected that the process was suspended too long and will down itself.") | ||
| actOnDecision(DowningStrategy.DownSelfSuspended) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is how the logs from a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing to note is that sharding already acts quickly on the downing, see |
||
| } | ||
| } | ||
|
|
||
| private def acquireLease(): Unit = { | ||
| log.debug("SBR trying to acquire lease") | ||
| implicit val ec: ExecutionContext = internalDispatcher | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not have to be this frequently, because I think the scheduled task will trigger immediately when the process is woken up.