Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -279,6 +279,19 @@ object GpuSemaphore {

def memToPermitsWithMax(memory: Long): Long = math.min(computeMaxPermits(), memToPermits(memory))

/**
* Number of permits a task needs for a given memory estimate, after applying the adaptive
* back-pressure factor (1.0, i.e. no-op, when the feature is disabled). A factor > 1.0 inflates
* the estimate so each task consumes more permits, reducing the number of tasks allowed on the
* GPU concurrently. The result is still clamped to the max permits by `memToPermitsWithMax`, so
* at peak back-pressure a single task can take all permits (concurrency of 1).
*/
def permitsForEstimate(memory: Long): Long = {
val factor = PressureMonitor.factor()
val adjusted = if (factor > 1.0) (memory.toDouble * factor).toLong else memory
memToPermitsWithMax(adjusted)
}

private def computeMaxPermits(): Long = memToPermits(GpuDeviceManager.getMemorySize)

private def isDynamicEnabled(conf: SQLConf): Boolean = {
Expand Down Expand Up @@ -423,8 +436,8 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long,
if (!done && shouldBlockOnSemaphore) {
// We cannot be in a synchronized block and wait on the semaphore
// so we have to release it and grab it again afterwards.
val used = semaphore.acquire(() =>
GpuSemaphore.memToPermitsWithMax(memoryEstimator.estimate()),
val used = semaphore.acquire(() =>
GpuSemaphore.permitsForEstimate(memoryEstimator.estimate()),
() => lastAcquired > 0,
TaskPriority.getTaskPriority(taskAttemptId), taskAttemptId)
synchronized {
Expand Down Expand Up @@ -469,7 +482,7 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long,
} else {
if (blockedThreads.size() == 0) {
// No other threads for this task are waiting, so we might be able to grab this directly
val numPermits = GpuSemaphore.memToPermitsWithMax(memoryEstimator.estimate())
val numPermits = GpuSemaphore.permitsForEstimate(memoryEstimator.estimate())
val ret = semaphore.tryAcquire(numPermits,
TaskPriority.getTaskPriority(taskAttemptId),
() => lastAcquired > 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
s"${extraExecutorPlugins.map(_.getClass.getName).mkString(",")}")
extraExecutorPlugins.foreach(_.init(pluginContext, extraConf))
GpuSemaphore.initialize(conf.maxConcurrentGpuTasks)
PressureMonitor.initialize(conf)
FileCache.init(pluginContext)
TrafficController.initialize(conf)
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.lang.management.ManagementFactory

import ai.rapids.cudf.Rmm

import org.apache.spark.internal.Logging

/**
* Executor-level adaptive back-pressure monitor (Phase 0/1 of the adaptive
* back-pressure design).
*
* A background daemon thread periodically samples two signals:
* - the JVM GC time fraction (GC ms per wall ms across all collectors), and
* - GPU device-memory utilization (`Rmm.getTotalBytesAllocated` / pool size).
*
* From these it maintains a single multiplicative back-pressure `factor` (>= 1.0).
* [[GpuSemaphore]] multiplies its per-task GPU memory estimate by this factor, so a
* larger factor makes each task take more permits and therefore reduces the number of
* tasks allowed on the GPU concurrently. The factor uses AIMD control: it is increased
* multiplicatively the moment either signal crosses its guard threshold, and decays
* additively back to 1.0 while healthy.
*
* This is deliberately a WAIT-at-acquisition brake: a back-pressured task simply blocks
* at the semaphore acquire *before* it holds any scarce GPU/host resource, so it cannot
* deadlock and does no recompute. It throws no exception. The monitor is fully inert
* (`factor() == 1.0`, no thread started) unless
* `spark.rapids.adaptive.backpressure.enabled` is set.
*
* The GC guard targets the failure mode seen in production GPU runs where a CPU-era
* `executor.cores` over-subscribes host memory, GC time climbs, and the executor is
* eventually lost to a GC-overhead heartbeat timeout. Reducing GPU concurrency when GC
* climbs keeps concurrent host/pinned memory in check adaptively, instead of relying on a
* statically hand-tuned `concurrentGpuTasks` / `executor.cores`.
*/
object PressureMonitor extends Logging {
@volatile private var enabled: Boolean = false
@volatile private var started: Boolean = false
@volatile private var currentFactor: Double = 1.0

// Tunables, populated from RapidsConf at initialize().
private var gcGuard: Double = 0.12
private var deviceGuard: Double = 0.9
private var sampleMs: Long = 200L
private var maxFactor: Double = 8.0
// Multiplicative increase on a guard trip; additive decay otherwise (AIMD).
private[rapids] val IncreaseMult: Double = 1.5
private[rapids] val DecayStep: Double = 0.25

/**
* Pure AIMD decision: given the previous back-pressure factor and the latest GC/device
* signals, return the next factor. The factor increases multiplicatively (capped at
* `maxFactor`) when either signal is at or above its guard, and decays additively toward a
* floor of 1.0 otherwise. Extracted as a side-effect-free function so it can be unit tested
* without a GPU, a sampler thread, or live JVM/GC state.
*/
private[rapids] def computeNextFactor(
prev: Double,
gcFrac: Double,
devRatio: Double,
gcGuard: Double,
deviceGuard: Double,
increaseMult: Double,
decayStep: Double,
maxFactor: Double): Double = {
val tripped = gcFrac >= gcGuard || devRatio >= deviceGuard
if (tripped) {
math.min(maxFactor, prev * increaseMult)
} else {
math.max(1.0, prev - decayStep)
}
}

private val gcBeans = ManagementFactory.getGarbageCollectorMXBeans

@volatile private var sampler: Thread = _

/**
* Initialize the monitor from the executor's RapidsConf. Idempotent; only the first
* call has any effect. Starts the sampling thread only when the feature is enabled.
*/
def initialize(conf: RapidsConf): Unit = synchronized {
if (started) {
return
}
enabled = conf.adaptiveBackpressureEnabled
if (!enabled) {
started = true
return
}
gcGuard = conf.adaptiveBackpressureGcGuard
deviceGuard = conf.adaptiveBackpressureDeviceGuard
sampleMs = conf.adaptiveBackpressureSampleMs
maxFactor = conf.adaptiveBackpressureMaxFactor
val t = new Thread(new SamplerLoop(), "rapids-pressure-monitor")
t.setDaemon(true)
sampler = t
t.start()
started = true
logWarning(s"Adaptive back-pressure enabled (gcGuard=$gcGuard, deviceGuard=$deviceGuard, " +
s"sampleMs=$sampleMs, maxFactor=$maxFactor)")
}

/**
* The current multiplicative back-pressure factor, always >= 1.0. A value of 1.0 means
* no back-pressure. Returns 1.0 whenever the feature is disabled.
*/
def factor(): Double = if (enabled) currentFactor else 1.0

/**
* Scale a host-side in-flight byte budget down by the current back-pressure factor. When
* back-pressure is active (factor > 1) the effective budget shrinks, so host-side work
* (shuffle serialization, async IO) blocks at its existing in-flight gate sooner. This
* throttles the off-GPU host/pinned memory that the GPU semaphore cannot see -- the actual
* driver of GC-overhead executor loss under an over-subscribed `executor.cores`. Returns the
* base unchanged when the feature is disabled or the factor is 1.0. The caller's gate is
* expected to always admit at least one in-flight item, so this can never deadlock even if
* the scaled budget falls below a single item's size.
*/
def scaleHostLimit(base: Long): Long = scaleLimit(base, factor())

/**
* Pure host-budget scaling: divide `base` by the back-pressure factor when `f > 1`, with a
* floor of 1 so the budget never reaches zero. Side-effect free for unit testing.
*/
private[rapids] def scaleLimit(base: Long, f: Double): Long = {
if (f > 1.0) math.max(1L, (base / f).toLong) else base
}

/** For tests: stop the sampler and reset state. */
def shutdown(): Unit = synchronized {
if (sampler != null) {
sampler.interrupt()
sampler = null
}
enabled = false
started = false
currentFactor = 1.0
}

private def totalGcMillis(): Long = {
var sum = 0L
val it = gcBeans.iterator()
while (it.hasNext) {
val t = it.next().getCollectionTime
if (t > 0) {
sum += t
}
}
sum
}

private def deviceMemRatio(): Double = {
try {
val total = GpuDeviceManager.getMemorySize
if (total > 0) {
Rmm.getTotalBytesAllocated.toDouble / total.toDouble
} else {
0.0
}
} catch {
// Rmm may not be initialized in every context; treat as no device pressure.
case _: Throwable => 0.0
}
}

private class SamplerLoop extends Runnable {
override def run(): Unit = {
var lastGc = totalGcMillis()
var lastWall = System.nanoTime()
var keepGoing = true
while (keepGoing) {
try {
Thread.sleep(sampleMs)
} catch {
case _: InterruptedException => keepGoing = false
}
if (keepGoing) {
val nowGc = totalGcMillis()
val nowWall = System.nanoTime()
val wallMs = (nowWall - lastWall) / 1.0e6
val gcFrac = if (wallMs > 0) (nowGc - lastGc) / wallMs else 0.0
lastGc = nowGc
lastWall = nowWall
val devRatio = deviceMemRatio()
val prev = currentFactor
val next = computeNextFactor(prev, gcFrac, devRatio, gcGuard, deviceGuard,
IncreaseMult, DecayStep, maxFactor)
currentFactor = next
if (next != prev) {
if (next > prev) {
logWarning(f"Adaptive back-pressure: gcFrac=$gcFrac%.3f devRatio=$devRatio%.3f " +
f"factor $prev%.2f -> $next%.2f")
} else {
logDebug(f"Adaptive back-pressure relaxing: factor $prev%.2f -> $next%.2f")
}
}
}
}
}
}
}
49 changes: 49 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,45 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.integerConf
.createWithDefault(0)

val ADAPTIVE_BACKPRESSURE_ENABLED = conf("spark.rapids.adaptive.backpressure.enabled")
.doc("Enable the executor-level adaptive back-pressure monitor that dynamically " +
"reduces GPU task concurrency when JVM GC time or GPU device-memory utilization " +
"crosses a guard threshold. This adapts the effective concurrentGpuTasks at runtime " +
"to avoid GC-driven executor loss, instead of relying on a statically tuned value. " +
"Disabled by default.")
.internal()
.booleanConf
.createWithDefault(false)

val ADAPTIVE_BACKPRESSURE_GC_GUARD = conf("spark.rapids.adaptive.backpressure.gcGuard")
.doc("When adaptive back-pressure is enabled, the JVM GC time fraction (GC ms per " +
"wall ms, sampled periodically) at or above which back-pressure is increased.")
.internal()
.doubleConf
.createWithDefault(0.12)

val ADAPTIVE_BACKPRESSURE_DEVICE_GUARD = conf("spark.rapids.adaptive.backpressure.deviceGuard")
.doc("When adaptive back-pressure is enabled, the GPU device-memory utilization ratio " +
"(RMM allocated / pool size) at or above which back-pressure is increased.")
.internal()
.doubleConf
.createWithDefault(0.9)

val ADAPTIVE_BACKPRESSURE_SAMPLE_MS = conf("spark.rapids.adaptive.backpressure.sampleMs")
.doc("When adaptive back-pressure is enabled, the sampling interval in milliseconds " +
"for the GC and device-memory monitor.")
.internal()
.longConf
.createWithDefault(200L)

val ADAPTIVE_BACKPRESSURE_MAX_FACTOR = conf("spark.rapids.adaptive.backpressure.maxFactor")
.doc("When adaptive back-pressure is enabled, the maximum multiplicative factor applied " +
"to the per-task GPU memory estimate (higher means fewer concurrent GPU tasks at peak " +
"back-pressure).")
.internal()
.doubleConf
.createWithDefault(8.0)

val GPU_BATCH_SIZE_BYTES = conf("spark.rapids.sql.batchSizeBytes")
.doc("Set the target number of bytes for a GPU batch. Splits sizes for input data " +
"is covered by separate configs.")
Expand Down Expand Up @@ -3433,6 +3472,16 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val maxConcurrentGpuTasks: Integer = get(MAX_CONCURRENT_GPU_TASKS)

lazy val adaptiveBackpressureEnabled: Boolean = get(ADAPTIVE_BACKPRESSURE_ENABLED)

lazy val adaptiveBackpressureGcGuard: Double = get(ADAPTIVE_BACKPRESSURE_GC_GUARD)

lazy val adaptiveBackpressureDeviceGuard: Double = get(ADAPTIVE_BACKPRESSURE_DEVICE_GUARD)

lazy val adaptiveBackpressureSampleMs: Long = get(ADAPTIVE_BACKPRESSURE_SAMPLE_MS)

lazy val adaptiveBackpressureMaxFactor: Double = get(ADAPTIVE_BACKPRESSURE_MAX_FACTOR)

lazy val isTestEnabled: Boolean = get(TEST_CONF)

lazy val isRetryContextCheckEnabled: Boolean = get(TEST_RETRY_CONTEXT_CHECK_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.Callable
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy

import com.nvidia.spark.rapids.{RapidsConf, TaskRegistryTracker}
import com.nvidia.spark.rapids.{PressureMonitor, RapidsConf, TaskRegistryTracker}

/**
* Simple wrapper around a [[Callable]] that also keeps track of the host memory bytes used by
Expand Down Expand Up @@ -66,7 +66,11 @@ class HostMemoryThrottle(val maxInFlightHostMemoryBytes: Long) extends Throttle
private var totalHostMemoryBytes: Long = 0

override def canAccept[T](task: Task[T]): Boolean = {
totalHostMemoryBytes + task.hostMemoryBytes <= maxInFlightHostMemoryBytes
// The effective budget is scaled down by the adaptive back-pressure factor (a no-op when
// the feature is off). TrafficController always admits a task when none are in flight, so a
// shrunk budget throttles async host IO without deadlocking.
totalHostMemoryBytes + task.hostMemoryBytes <= PressureMonitor.scaleHostLimit(
maxInFlightHostMemoryBytes)
}

override def taskScheduled[T](task: Task[T]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,10 @@ class BytesInFlightLimiter(maxBytesInFlight: Long) {
true
} else {
synchronized {
if (inFlight == 0 || sz + inFlight < maxBytesInFlight) {
// The effective budget is scaled down by the adaptive back-pressure factor (a no-op
// when the feature is off). The `inFlight == 0` clause always admits one item, so a
// shrunk budget throttles host serialization without deadlocking.
if (inFlight == 0 || sz + inFlight < PressureMonitor.scaleHostLimit(maxBytesInFlight)) {
inFlight += sz
true
} else {
Expand Down
Loading
Loading