Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,12 @@ object CometConf extends ShimCometConf {
.stringConf
.createOptional

val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
conf("spark.comet.maxTempDirectorySize")
.doc("The maximum amount of data (in bytes) stored inside the temporary directories.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB

/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
exec: String,
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Comet provides the following configuration settings.
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
| spark.comet.maxTempDirectorySize | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
Expand Down
14 changes: 11 additions & 3 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
debug_native: jboolean,
explain_native: jboolean,
tracing_enabled: jboolean,
max_temp_directory_size: jlong,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
Expand Down Expand Up @@ -231,8 +232,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
// We need to keep the session context alive. Some session state like temporary
// dictionaries are stored in session context. If it is dropped, the temporary
// dictionaries will be dropped as well.
let session =
prepare_datafusion_session_context(batch_size as usize, memory_pool, local_dirs)?;
let session = prepare_datafusion_session_context(
batch_size as usize,
memory_pool,
local_dirs,
max_temp_directory_size as u64,
)?;

let plan_creation_time = start.elapsed();

Expand Down Expand Up @@ -272,9 +277,12 @@ fn prepare_datafusion_session_context(
batch_size: usize,
memory_pool: Arc<dyn MemoryPool>,
local_dirs: Vec<String>,
max_temp_directory_size: u64,
) -> CometResult<SessionContext> {
let paths = local_dirs.into_iter().map(PathBuf::from).collect();
let disk_manager = DiskManagerBuilder::default().with_mode(DiskManagerMode::Directories(paths));
let disk_manager = DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(paths))
.with_max_temp_directory_size(max_temp_directory_size);
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager);
rt_config = rt_config.with_memory_pool(memory_pool);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class CometExecIterator(
taskAttemptId,
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
tracingEnabled)
tracingEnabled,
maxTempDirectorySize = CometConf.COMET_MAX_TEMP_DIRECTORY_SIZE.get())
}

private var nextBatch: Option[ColumnarBatch] = None
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class Native extends NativeBase {
taskAttemptId: Long,
debug: Boolean,
explain: Boolean,
tracingEnabled: Boolean): Long
tracingEnabled: Boolean,
maxTempDirectorySize: Long): Long
// scalastyle:on

/**
Expand Down
Loading