diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index a51aa1c1de7..090876a59c9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -279,6 +279,19 @@ object GpuSemaphore { def memToPermitsWithMax(memory: Long): Long = math.min(computeMaxPermits(), memToPermits(memory)) + /** + * Number of permits a task needs for a given memory estimate, after applying the adaptive + * back-pressure factor (1.0, i.e. no-op, when the feature is disabled). A factor > 1.0 inflates + * the estimate so each task consumes more permits, reducing the number of tasks allowed on the + * GPU concurrently. The result is still clamped to the max permits by `memToPermitsWithMax`, so + * at peak back-pressure a single task can take all permits (concurrency of 1). + */ + def permitsForEstimate(memory: Long): Long = { + val factor = PressureMonitor.factor() + val adjusted = if (factor > 1.0) (memory.toDouble * factor).toLong else memory + memToPermitsWithMax(adjusted) + } + private def computeMaxPermits(): Long = memToPermits(GpuDeviceManager.getMemorySize) private def isDynamicEnabled(conf: SQLConf): Boolean = { @@ -423,8 +436,8 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long, if (!done && shouldBlockOnSemaphore) { // We cannot be in a synchronized block and wait on the semaphore // so we have to release it and grab it again afterwards. - val used = semaphore.acquire(() => - GpuSemaphore.memToPermitsWithMax(memoryEstimator.estimate()), + val used = semaphore.acquire(() => + GpuSemaphore.permitsForEstimate(memoryEstimator.estimate()), () => lastAcquired > 0, TaskPriority.getTaskPriority(taskAttemptId), taskAttemptId) synchronized { @@ -469,7 +482,7 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long, } else { if (blockedThreads.size() == 0) { // No other threads for this task are waiting, so we might be able to grab this directly - val numPermits = GpuSemaphore.memToPermitsWithMax(memoryEstimator.estimate()) + val numPermits = GpuSemaphore.permitsForEstimate(memoryEstimator.estimate()) val ret = semaphore.tryAcquire(numPermits, TaskPriority.getTaskPriority(taskAttemptId), () => lastAcquired > 0, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 7aad76138b9..c7fa3ba36c7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -700,6 +700,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { s"${extraExecutorPlugins.map(_.getClass.getName).mkString(",")}") extraExecutorPlugins.foreach(_.init(pluginContext, extraConf)) GpuSemaphore.initialize(conf.maxConcurrentGpuTasks) + PressureMonitor.initialize(conf) FileCache.init(pluginContext) TrafficController.initialize(conf) } catch { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PressureMonitor.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PressureMonitor.scala new file mode 100644 index 00000000000..50dad48600a --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PressureMonitor.scala @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2025-2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.lang.management.ManagementFactory + +import ai.rapids.cudf.Rmm + +import org.apache.spark.internal.Logging + +/** + * Executor-level adaptive back-pressure monitor (Phase 0/1 of the adaptive + * back-pressure design). + * + * A background daemon thread periodically samples two signals: + * - the JVM GC time fraction (GC ms per wall ms across all collectors), and + * - GPU device-memory utilization (`Rmm.getTotalBytesAllocated` / pool size). + * + * From these it maintains a single multiplicative back-pressure `factor` (>= 1.0). + * [[GpuSemaphore]] multiplies its per-task GPU memory estimate by this factor, so a + * larger factor makes each task take more permits and therefore reduces the number of + * tasks allowed on the GPU concurrently. The factor uses AIMD control: it is increased + * multiplicatively the moment either signal crosses its guard threshold, and decays + * additively back to 1.0 while healthy. + * + * This is deliberately a WAIT-at-acquisition brake: a back-pressured task simply blocks + * at the semaphore acquire *before* it holds any scarce GPU/host resource, so it cannot + * deadlock and does no recompute. It throws no exception. The monitor is fully inert + * (`factor() == 1.0`, no thread started) unless + * `spark.rapids.adaptive.backpressure.enabled` is set. + * + * The GC guard targets the failure mode seen in production GPU runs where a CPU-era + * `executor.cores` over-subscribes host memory, GC time climbs, and the executor is + * eventually lost to a GC-overhead heartbeat timeout. Reducing GPU concurrency when GC + * climbs keeps concurrent host/pinned memory in check adaptively, instead of relying on a + * statically hand-tuned `concurrentGpuTasks` / `executor.cores`. + */ +object PressureMonitor extends Logging { + @volatile private var enabled: Boolean = false + @volatile private var started: Boolean = false + @volatile private var currentFactor: Double = 1.0 + + // Tunables, populated from RapidsConf at initialize(). + private var gcGuard: Double = 0.12 + private var deviceGuard: Double = 0.9 + private var sampleMs: Long = 200L + private var maxFactor: Double = 8.0 + // Multiplicative increase on a guard trip; additive decay otherwise (AIMD). + private[rapids] val IncreaseMult: Double = 1.5 + private[rapids] val DecayStep: Double = 0.25 + + /** + * Pure AIMD decision: given the previous back-pressure factor and the latest GC/device + * signals, return the next factor. The factor increases multiplicatively (capped at + * `maxFactor`) when either signal is at or above its guard, and decays additively toward a + * floor of 1.0 otherwise. Extracted as a side-effect-free function so it can be unit tested + * without a GPU, a sampler thread, or live JVM/GC state. + */ + private[rapids] def computeNextFactor( + prev: Double, + gcFrac: Double, + devRatio: Double, + gcGuard: Double, + deviceGuard: Double, + increaseMult: Double, + decayStep: Double, + maxFactor: Double): Double = { + val tripped = gcFrac >= gcGuard || devRatio >= deviceGuard + if (tripped) { + math.min(maxFactor, prev * increaseMult) + } else { + math.max(1.0, prev - decayStep) + } + } + + private val gcBeans = ManagementFactory.getGarbageCollectorMXBeans + + @volatile private var sampler: Thread = _ + + /** + * Initialize the monitor from the executor's RapidsConf. Idempotent; only the first + * call has any effect. Starts the sampling thread only when the feature is enabled. + */ + def initialize(conf: RapidsConf): Unit = synchronized { + if (started) { + return + } + enabled = conf.adaptiveBackpressureEnabled + if (!enabled) { + started = true + return + } + gcGuard = conf.adaptiveBackpressureGcGuard + deviceGuard = conf.adaptiveBackpressureDeviceGuard + sampleMs = conf.adaptiveBackpressureSampleMs + maxFactor = conf.adaptiveBackpressureMaxFactor + val t = new Thread(new SamplerLoop(), "rapids-pressure-monitor") + t.setDaemon(true) + sampler = t + t.start() + started = true + logWarning(s"Adaptive back-pressure enabled (gcGuard=$gcGuard, deviceGuard=$deviceGuard, " + + s"sampleMs=$sampleMs, maxFactor=$maxFactor)") + } + + /** + * The current multiplicative back-pressure factor, always >= 1.0. A value of 1.0 means + * no back-pressure. Returns 1.0 whenever the feature is disabled. + */ + def factor(): Double = if (enabled) currentFactor else 1.0 + + /** + * Scale a host-side in-flight byte budget down by the current back-pressure factor. When + * back-pressure is active (factor > 1) the effective budget shrinks, so host-side work + * (shuffle serialization, async IO) blocks at its existing in-flight gate sooner. This + * throttles the off-GPU host/pinned memory that the GPU semaphore cannot see -- the actual + * driver of GC-overhead executor loss under an over-subscribed `executor.cores`. Returns the + * base unchanged when the feature is disabled or the factor is 1.0. The caller's gate is + * expected to always admit at least one in-flight item, so this can never deadlock even if + * the scaled budget falls below a single item's size. + */ + def scaleHostLimit(base: Long): Long = scaleLimit(base, factor()) + + /** + * Pure host-budget scaling: divide `base` by the back-pressure factor when `f > 1`, with a + * floor of 1 so the budget never reaches zero. Side-effect free for unit testing. + */ + private[rapids] def scaleLimit(base: Long, f: Double): Long = { + if (f > 1.0) math.max(1L, (base / f).toLong) else base + } + + /** For tests: stop the sampler and reset state. */ + def shutdown(): Unit = synchronized { + if (sampler != null) { + sampler.interrupt() + sampler = null + } + enabled = false + started = false + currentFactor = 1.0 + } + + private def totalGcMillis(): Long = { + var sum = 0L + val it = gcBeans.iterator() + while (it.hasNext) { + val t = it.next().getCollectionTime + if (t > 0) { + sum += t + } + } + sum + } + + private def deviceMemRatio(): Double = { + try { + val total = GpuDeviceManager.getMemorySize + if (total > 0) { + Rmm.getTotalBytesAllocated.toDouble / total.toDouble + } else { + 0.0 + } + } catch { + // Rmm may not be initialized in every context; treat as no device pressure. + case _: Throwable => 0.0 + } + } + + private class SamplerLoop extends Runnable { + override def run(): Unit = { + var lastGc = totalGcMillis() + var lastWall = System.nanoTime() + var keepGoing = true + while (keepGoing) { + try { + Thread.sleep(sampleMs) + } catch { + case _: InterruptedException => keepGoing = false + } + if (keepGoing) { + val nowGc = totalGcMillis() + val nowWall = System.nanoTime() + val wallMs = (nowWall - lastWall) / 1.0e6 + val gcFrac = if (wallMs > 0) (nowGc - lastGc) / wallMs else 0.0 + lastGc = nowGc + lastWall = nowWall + val devRatio = deviceMemRatio() + val prev = currentFactor + val next = computeNextFactor(prev, gcFrac, devRatio, gcGuard, deviceGuard, + IncreaseMult, DecayStep, maxFactor) + currentFactor = next + if (next != prev) { + if (next > prev) { + logWarning(f"Adaptive back-pressure: gcFrac=$gcFrac%.3f devRatio=$devRatio%.3f " + + f"factor $prev%.2f -> $next%.2f") + } else { + logDebug(f"Adaptive back-pressure relaxing: factor $prev%.2f -> $next%.2f") + } + } + } + } + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 39a459a10a1..aef8f0c3f8c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -657,6 +657,45 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .integerConf .createWithDefault(0) + val ADAPTIVE_BACKPRESSURE_ENABLED = conf("spark.rapids.adaptive.backpressure.enabled") + .doc("Enable the executor-level adaptive back-pressure monitor that dynamically " + + "reduces GPU task concurrency when JVM GC time or GPU device-memory utilization " + + "crosses a guard threshold. This adapts the effective concurrentGpuTasks at runtime " + + "to avoid GC-driven executor loss, instead of relying on a statically tuned value. " + + "Disabled by default.") + .internal() + .booleanConf + .createWithDefault(false) + + val ADAPTIVE_BACKPRESSURE_GC_GUARD = conf("spark.rapids.adaptive.backpressure.gcGuard") + .doc("When adaptive back-pressure is enabled, the JVM GC time fraction (GC ms per " + + "wall ms, sampled periodically) at or above which back-pressure is increased.") + .internal() + .doubleConf + .createWithDefault(0.12) + + val ADAPTIVE_BACKPRESSURE_DEVICE_GUARD = conf("spark.rapids.adaptive.backpressure.deviceGuard") + .doc("When adaptive back-pressure is enabled, the GPU device-memory utilization ratio " + + "(RMM allocated / pool size) at or above which back-pressure is increased.") + .internal() + .doubleConf + .createWithDefault(0.9) + + val ADAPTIVE_BACKPRESSURE_SAMPLE_MS = conf("spark.rapids.adaptive.backpressure.sampleMs") + .doc("When adaptive back-pressure is enabled, the sampling interval in milliseconds " + + "for the GC and device-memory monitor.") + .internal() + .longConf + .createWithDefault(200L) + + val ADAPTIVE_BACKPRESSURE_MAX_FACTOR = conf("spark.rapids.adaptive.backpressure.maxFactor") + .doc("When adaptive back-pressure is enabled, the maximum multiplicative factor applied " + + "to the per-task GPU memory estimate (higher means fewer concurrent GPU tasks at peak " + + "back-pressure).") + .internal() + .doubleConf + .createWithDefault(8.0) + val GPU_BATCH_SIZE_BYTES = conf("spark.rapids.sql.batchSizeBytes") .doc("Set the target number of bytes for a GPU batch. Splits sizes for input data " + "is covered by separate configs.") @@ -3433,6 +3472,16 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val maxConcurrentGpuTasks: Integer = get(MAX_CONCURRENT_GPU_TASKS) + lazy val adaptiveBackpressureEnabled: Boolean = get(ADAPTIVE_BACKPRESSURE_ENABLED) + + lazy val adaptiveBackpressureGcGuard: Double = get(ADAPTIVE_BACKPRESSURE_GC_GUARD) + + lazy val adaptiveBackpressureDeviceGuard: Double = get(ADAPTIVE_BACKPRESSURE_DEVICE_GUARD) + + lazy val adaptiveBackpressureSampleMs: Long = get(ADAPTIVE_BACKPRESSURE_SAMPLE_MS) + + lazy val adaptiveBackpressureMaxFactor: Double = get(ADAPTIVE_BACKPRESSURE_MAX_FACTOR) + lazy val isTestEnabled: Boolean = get(TEST_CONF) lazy val isRetryContextCheckEnabled: Boolean = get(TEST_RETRY_CONTEXT_CHECK_ENABLED) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/TrafficController.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/TrafficController.scala index 7bb150f9433..a5e49107171 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/TrafficController.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/TrafficController.scala @@ -20,7 +20,7 @@ import java.util.concurrent.Callable import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.GuardedBy -import com.nvidia.spark.rapids.{RapidsConf, TaskRegistryTracker} +import com.nvidia.spark.rapids.{PressureMonitor, RapidsConf, TaskRegistryTracker} /** * Simple wrapper around a [[Callable]] that also keeps track of the host memory bytes used by @@ -66,7 +66,11 @@ class HostMemoryThrottle(val maxInFlightHostMemoryBytes: Long) extends Throttle private var totalHostMemoryBytes: Long = 0 override def canAccept[T](task: Task[T]): Boolean = { - totalHostMemoryBytes + task.hostMemoryBytes <= maxInFlightHostMemoryBytes + // The effective budget is scaled down by the adaptive back-pressure factor (a no-op when + // the feature is off). TrafficController always admits a task when none are in flight, so a + // shrunk budget throttles async host IO without deadlocking. + totalHostMemoryBytes + task.hostMemoryBytes <= PressureMonitor.scaleHostLimit( + maxInFlightHostMemoryBytes) } override def taskScheduled[T](task: Task[T]): Unit = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index 69eff4ca387..6fd316af1f7 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -1079,7 +1079,10 @@ class BytesInFlightLimiter(maxBytesInFlight: Long) { true } else { synchronized { - if (inFlight == 0 || sz + inFlight < maxBytesInFlight) { + // The effective budget is scaled down by the adaptive back-pressure factor (a no-op + // when the feature is off). The `inFlight == 0` clause always admits one item, so a + // shrunk budget throttles host serialization without deadlocking. + if (inFlight == 0 || sz + inFlight < PressureMonitor.scaleHostLimit(maxBytesInFlight)) { inFlight += sz true } else { diff --git a/sql-plugin/src/test/scala/com/nvidia/spark/rapids/PressureMonitorSuite.scala b/sql-plugin/src/test/scala/com/nvidia/spark/rapids/PressureMonitorSuite.scala new file mode 100644 index 00000000000..345678818ff --- /dev/null +++ b/sql-plugin/src/test/scala/com/nvidia/spark/rapids/PressureMonitorSuite.scala @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2025-2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.scalatest.funsuite.AnyFunSuite + +class PressureMonitorSuite extends AnyFunSuite { + + private val GcGuard = 0.12 + private val DeviceGuard = 0.9 + private val MaxFactor = 8.0 + private val Inc = PressureMonitor.IncreaseMult + private val Dec = PressureMonitor.DecayStep + + private def nextFactor(prev: Double, gcFrac: Double, devRatio: Double): Double = + PressureMonitor.computeNextFactor(prev, gcFrac, devRatio, GcGuard, DeviceGuard, + Inc, Dec, MaxFactor) + + private def assertClose(actual: Double, expected: Double): Unit = + assert(math.abs(actual - expected) < 1e-9, s"expected $expected but got $actual") + + test("factor is 1.0 when the feature is disabled (default)") { + // The monitor is inert unless explicitly enabled via initialize(); factor() must be a no-op. + assertClose(PressureMonitor.factor(), 1.0) + } + + test("GC over guard increases the factor multiplicatively") { + assertClose(nextFactor(1.0, gcFrac = 0.2, devRatio = 0.0), 1.0 * Inc) + assertClose(nextFactor(2.0, gcFrac = 0.2, devRatio = 0.0), 2.0 * Inc) + } + + test("device-memory over guard increases the factor (independent of GC)") { + assertClose(nextFactor(1.0, gcFrac = 0.0, devRatio = 0.95), 1.0 * Inc) + } + + test("a signal exactly at its guard trips back-pressure (>= semantics)") { + assertClose(nextFactor(1.0, gcFrac = GcGuard, devRatio = 0.0), 1.0 * Inc) + assertClose(nextFactor(1.0, gcFrac = 0.0, devRatio = DeviceGuard), 1.0 * Inc) + } + + test("factor is capped at maxFactor under sustained pressure") { + // 6.0 * 1.5 = 9.0 would exceed the cap, so it clamps to 8.0. + assertClose(nextFactor(6.0, gcFrac = 0.5, devRatio = 0.0), MaxFactor) + // already at the cap stays at the cap. + assertClose(nextFactor(MaxFactor, gcFrac = 0.5, devRatio = 0.0), MaxFactor) + } + + test("healthy signals decay the factor additively toward 1.0") { + assertClose(nextFactor(2.0, gcFrac = 0.05, devRatio = 0.5), 2.0 - Dec) + } + + test("decay is floored at 1.0 (never below no-back-pressure)") { + // 1.1 - 0.25 = 0.85 would drop below 1.0, so it clamps to 1.0. + assertClose(nextFactor(1.1, gcFrac = 0.0, devRatio = 0.0), 1.0) + assertClose(nextFactor(1.0, gcFrac = 0.0, devRatio = 0.0), 1.0) + } + + test("a full AIMD trajectory: ramp up under pressure, decay when healthy") { + // Sustained pressure ramps multiplicatively up to the cap. + var f = 1.0 + for (_ <- 0 until 10) { + f = nextFactor(f, gcFrac = 0.3, devRatio = 0.0) + } + assertClose(f, MaxFactor) + // Then health decays it additively back to the 1.0 floor. + for (_ <- 0 until 100) { + f = nextFactor(f, gcFrac = 0.0, devRatio = 0.0) + } + assertClose(f, 1.0) + } + + test("scaleLimit divides the host budget by the factor, with a floor of 1") { + // factor == 1.0 (or below) is a pass-through. + assert(PressureMonitor.scaleLimit(128L * 1024 * 1024, 1.0) == 128L * 1024 * 1024) + // factor > 1 shrinks the budget proportionally. + assert(PressureMonitor.scaleLimit(128L * 1024 * 1024, 2.0) == 64L * 1024 * 1024) + assert(PressureMonitor.scaleLimit(128L * 1024 * 1024, 8.0) == 16L * 1024 * 1024) + // never reaches zero. + assert(PressureMonitor.scaleLimit(4L, 8.0) == 1L) + } + + test("scaleHostLimit is a pass-through when the feature is disabled") { + assert(PressureMonitor.scaleHostLimit(128L * 1024 * 1024) == 128L * 1024 * 1024) + } + + test("RapidsConf wires the adaptive back-pressure defaults") { + val conf = new RapidsConf(Map.empty[String, String]) + assert(!conf.adaptiveBackpressureEnabled) + assertClose(conf.adaptiveBackpressureGcGuard, 0.12) + assertClose(conf.adaptiveBackpressureDeviceGuard, 0.9) + assert(conf.adaptiveBackpressureSampleMs == 200L) + assertClose(conf.adaptiveBackpressureMaxFactor, 8.0) + } + + test("RapidsConf honors adaptive back-pressure overrides") { + val conf = new RapidsConf(Map( + RapidsConf.ADAPTIVE_BACKPRESSURE_ENABLED.key -> "true", + RapidsConf.ADAPTIVE_BACKPRESSURE_GC_GUARD.key -> "0.2", + RapidsConf.ADAPTIVE_BACKPRESSURE_DEVICE_GUARD.key -> "0.85", + RapidsConf.ADAPTIVE_BACKPRESSURE_SAMPLE_MS.key -> "500", + RapidsConf.ADAPTIVE_BACKPRESSURE_MAX_FACTOR.key -> "4.0")) + assert(conf.adaptiveBackpressureEnabled) + assertClose(conf.adaptiveBackpressureGcGuard, 0.2) + assertClose(conf.adaptiveBackpressureDeviceGuard, 0.85) + assert(conf.adaptiveBackpressureSampleMs == 500L) + assertClose(conf.adaptiveBackpressureMaxFactor, 4.0) + } +}