From 62718938cdfe02964b96f67ea0a5f4d4dddec641 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Mon, 29 Sep 2025 12:25:38 +0800 Subject: [PATCH] feat: Make DiskManager `max_temp_directory_size` configurable --- .../main/scala/org/apache/comet/CometConf.scala | 6 ++++++ docs/source/user-guide/latest/configs.md | 1 + native/core/src/execution/jni_api.rs | 14 +++++++++++--- .../scala/org/apache/comet/CometExecIterator.scala | 3 ++- spark/src/main/scala/org/apache/comet/Native.scala | 3 ++- 5 files changed, 22 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 249b0ea6bf..5931eb25bf 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -651,6 +651,12 @@ object CometConf extends ShimCometConf { .stringConf .createOptional + val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] = + conf("spark.comet.maxTempDirectorySize") + .doc("The maximum amount of data (in bytes) stored inside the temporary directories.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB + /** Create a config to enable a specific operator */ private def createExecEnabledConfig( exec: String, diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 4c8fe810e3..554d34e353 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -67,6 +67,7 @@ Comet provides the following configuration settings. | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false | +| spark.comet.maxTempDirectorySize | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b | | spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 52b8eb6a30..83dbd68e76 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -170,6 +170,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( debug_native: jboolean, explain_native: jboolean, tracing_enabled: jboolean, + max_temp_directory_size: jlong, ) -> jlong { try_unwrap_or_throw(&e, |mut env| { with_trace("createPlan", tracing_enabled != JNI_FALSE, || { @@ -231,8 +232,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( // We need to keep the session context alive. Some session state like temporary // dictionaries are stored in session context. If it is dropped, the temporary // dictionaries will be dropped as well. - let session = - prepare_datafusion_session_context(batch_size as usize, memory_pool, local_dirs)?; + let session = prepare_datafusion_session_context( + batch_size as usize, + memory_pool, + local_dirs, + max_temp_directory_size as u64, + )?; let plan_creation_time = start.elapsed(); @@ -272,9 +277,12 @@ fn prepare_datafusion_session_context( batch_size: usize, memory_pool: Arc, local_dirs: Vec, + max_temp_directory_size: u64, ) -> CometResult { let paths = local_dirs.into_iter().map(PathBuf::from).collect(); - let disk_manager = DiskManagerBuilder::default().with_mode(DiskManagerMode::Directories(paths)); + let disk_manager = DiskManagerBuilder::default() + .with_mode(DiskManagerMode::Directories(paths)) + .with_max_temp_directory_size(max_temp_directory_size); let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager); rt_config = rt_config.with_memory_pool(memory_pool); diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index f62dd13e81..a4e9494b69 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -120,7 +120,8 @@ class CometExecIterator( taskAttemptId, debug = COMET_DEBUG_ENABLED.get(), explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), - tracingEnabled) + tracingEnabled, + maxTempDirectorySize = CometConf.COMET_MAX_TEMP_DIRECTORY_SIZE.get()) } private var nextBatch: Option[ColumnarBatch] = None diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 7430a4322c..13edf2997c 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -68,7 +68,8 @@ class Native extends NativeBase { taskAttemptId: Long, debug: Boolean, explain: Boolean, - tracingEnabled: Boolean): Long + tracingEnabled: Boolean, + maxTempDirectorySize: Long): Long // scalastyle:on /**