Skip to content

POC: Add ConfigOptions to ExecutionProps when execution is started #16661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,7 @@ impl SessionContext {
/// [`ConfigOptions`]: crate::config::ConfigOptions
pub fn state(&self) -> SessionState {
let mut state = self.state.read().clone();
state.execution_props_mut().start_execution();
state.start_execution();
state
}

Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,12 @@ impl SessionState {
self.config.options()
}

/// Mark the start of the execution
pub fn start_execution(&mut self) {
let config = self.config.options_arc();
self.execution_props.start_execution(config);
}

/// Return the table options
pub fn table_options(&self) -> &TableOptions {
&self.table_options
Expand Down
69 changes: 39 additions & 30 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,16 @@ use datafusion_common::{
/// [`SessionContext::new_with_config`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.new_with_config
#[derive(Clone, Debug)]
pub struct SessionConfig {
/// Configuration options
options: ConfigOptions,
/// Configuration options, copy on write
options: Arc<ConfigOptions>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to pass this pointer down into the ExecutionProps

/// Opaque extensions.
extensions: AnyMap,
}

impl Default for SessionConfig {
fn default() -> Self {
Self {
options: ConfigOptions::new(),
options: Arc::new(ConfigOptions::new()),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
0,
Expand Down Expand Up @@ -140,6 +140,11 @@ impl SessionConfig {
&self.options
}

/// Returns the config options as an Arc
pub fn options_arc(&self) -> Arc<ConfigOptions> {
Arc::clone(&self.options)
}

/// Return a mutable handle to the configuration options.
///
/// Can be used to set configuration options.
Expand All @@ -152,7 +157,7 @@ impl SessionConfig {
/// assert_eq!(config.options().execution.batch_size, 1024);
/// ```
pub fn options_mut(&mut self) -> &mut ConfigOptions {
&mut self.options
Arc::make_mut(&mut self.options)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key function call -- it basically does "copy on write" and if there are existing references to the options a new copy is created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the cost of options_mut was small, now it's considerable.
Not sure it matters, but worth calling out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the cost of options_mut was small, now it's considerable.
Not sure it matters, but worth calling out.

Technically I think it is considerable only on the first call -- subsequent calls will use the same copy (as I understand from the make_mut docs https://doc.rust-lang.org/std/sync/struct.Arc.html#method.make_mut)

}

/// Set a configuration option
Expand All @@ -177,23 +182,23 @@ impl SessionConfig {

/// Set a generic `str` configuration option
pub fn set_str(mut self, key: &str, value: &str) -> Self {
self.options.set(key, value).unwrap();
self.options_mut().set(key, value).unwrap();
self
}

/// Customize batch size
pub fn with_batch_size(mut self, n: usize) -> Self {
// batch size must be greater than zero
assert!(n > 0);
self.options.execution.batch_size = n;
self.options_mut().execution.batch_size = n;
self
}

/// Customize [`target_partitions`]
///
/// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
pub fn with_target_partitions(mut self, n: usize) -> Self {
self.options.execution.target_partitions = if n == 0 {
self.options_mut().execution.target_partitions = if n == 0 {
datafusion_common::config::ExecutionOptions::default().target_partitions
} else {
n
Expand Down Expand Up @@ -269,62 +274,64 @@ impl SessionConfig {
catalog: impl Into<String>,
schema: impl Into<String>,
) -> Self {
self.options.catalog.default_catalog = catalog.into();
self.options.catalog.default_schema = schema.into();
self.options_mut().catalog.default_catalog = catalog.into();
self.options_mut().catalog.default_schema = schema.into();
self
}

/// Controls whether the default catalog and schema will be automatically created
pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
self.options.catalog.create_default_catalog_and_schema = create;
self.options_mut().catalog.create_default_catalog_and_schema = create;
self
}

/// Enables or disables the inclusion of `information_schema` virtual tables
pub fn with_information_schema(mut self, enabled: bool) -> Self {
self.options.catalog.information_schema = enabled;
self.options_mut().catalog.information_schema = enabled;
self
}

/// Enables or disables the use of repartitioning for joins to improve parallelism
pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_joins = enabled;
self.options_mut().optimizer.repartition_joins = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_aggregations = enabled;
self.options_mut().optimizer.repartition_aggregations = enabled;
self
}

/// Sets minimum file range size for repartitioning scans
pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
self.options.optimizer.repartition_file_min_size = size;
self.options_mut().optimizer.repartition_file_min_size = size;
self
}

/// Enables or disables the allowing unordered symmetric hash join
pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
self.options.optimizer.allow_symmetric_joins_without_pruning = enabled;
self.options_mut()
.optimizer
.allow_symmetric_joins_without_pruning = enabled;
self
}

/// Enables or disables the use of repartitioning for file scans
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_file_scans = enabled;
self.options_mut().optimizer.repartition_file_scans = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_windows = enabled;
self.options_mut().optimizer.repartition_windows = enabled;
self
}

/// Enables or disables the use of per-partition sorting to improve parallelism
pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_sorts = enabled;
self.options_mut().optimizer.repartition_sorts = enabled;
self
}

Expand All @@ -333,21 +340,21 @@ impl SessionConfig {
///
/// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
self.options.optimizer.prefer_existing_sort = enabled;
self.options_mut().optimizer.prefer_existing_sort = enabled;
self
}

/// Prefer existing union (true). See [prefer_existing_union] for more details
///
/// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
self.options.optimizer.prefer_existing_union = enabled;
self.options_mut().optimizer.prefer_existing_union = enabled;
self
}

/// Enables or disables the use of pruning predicate for parquet readers to skip row groups
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.pruning = enabled;
self.options_mut().execution.parquet.pruning = enabled;
self
}

Expand All @@ -363,7 +370,7 @@ impl SessionConfig {

/// Enables or disables the use of bloom filter for parquet readers to skip row groups
pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.bloom_filter_on_read = enabled;
self.options_mut().execution.parquet.bloom_filter_on_read = enabled;
self
}

Expand All @@ -374,13 +381,13 @@ impl SessionConfig {

/// Enables or disables the use of page index for parquet readers to skip parquet data pages
pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.enable_page_index = enabled;
self.options_mut().execution.parquet.enable_page_index = enabled;
self
}

/// Enables or disables the collection of statistics after listing files
pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
self.options.execution.collect_statistics = enabled;
self.options_mut().execution.collect_statistics = enabled;
self
}

Expand All @@ -391,7 +398,7 @@ impl SessionConfig {

/// Enables or disables the coalescence of small batches into larger batches
pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
self.options.execution.coalesce_batches = enabled;
self.options_mut().execution.coalesce_batches = enabled;
self
}

Expand All @@ -403,7 +410,7 @@ impl SessionConfig {

/// Enables or disables the round robin repartition for increasing parallelism
pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
self.options.optimizer.enable_round_robin_repartition = enabled;
self.options_mut().optimizer.enable_round_robin_repartition = enabled;
self
}

Expand All @@ -421,7 +428,7 @@ impl SessionConfig {
mut self,
sort_spill_reservation_bytes: usize,
) -> Self {
self.options.execution.sort_spill_reservation_bytes =
self.options_mut().execution.sort_spill_reservation_bytes =
sort_spill_reservation_bytes;
self
}
Expand All @@ -430,7 +437,7 @@ impl SessionConfig {
///
/// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression
pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self {
self.options.execution.spill_compression = spill_compression;
self.options_mut().execution.spill_compression = spill_compression;
self
}

Expand All @@ -442,7 +449,7 @@ impl SessionConfig {
mut self,
sort_in_place_threshold_bytes: usize,
) -> Self {
self.options.execution.sort_in_place_threshold_bytes =
self.options_mut().execution.sort_in_place_threshold_bytes =
sort_in_place_threshold_bytes;
self
}
Expand All @@ -452,7 +459,8 @@ impl SessionConfig {
mut self,
enforce_batch_size_in_joins: bool,
) -> Self {
self.options.execution.enforce_batch_size_in_joins = enforce_batch_size_in_joins;
self.options_mut().execution.enforce_batch_size_in_joins =
enforce_batch_size_in_joins;
self
}

Expand Down Expand Up @@ -590,6 +598,7 @@ impl SessionConfig {

impl From<ConfigOptions> for SessionConfig {
fn from(options: ConfigOptions) -> Self {
let options = Arc::new(options);
Self {
options,
..Default::default()
Expand Down
13 changes: 12 additions & 1 deletion datafusion/expr/src/execution_props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::var_provider::{VarProvider, VarType};
use chrono::{DateTime, TimeZone, Utc};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::HashMap;
use std::sync::Arc;

Expand All @@ -35,6 +36,8 @@ pub struct ExecutionProps {
pub query_execution_start_time: DateTime<Utc>,
/// Alias generator used by subquery optimizer rules
pub alias_generator: Arc<AliasGenerator>,
/// Snapshot of config options
pub config_options: Option<Arc<ConfigOptions>>,
/// Providers for scalar variables
pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
}
Expand All @@ -53,6 +56,7 @@ impl ExecutionProps {
// not being updated / propagated correctly
query_execution_start_time: Utc.timestamp_nanos(0),
alias_generator: Arc::new(AliasGenerator::new()),
config_options: None,
var_providers: None,
}
}
Expand All @@ -68,9 +72,10 @@ impl ExecutionProps {

/// Marks the execution of query started timestamp.
/// This also instantiates a new alias generator.
pub fn start_execution(&mut self) -> &Self {
pub fn start_execution(&mut self, config_options: Arc<ConfigOptions>) -> &Self {
self.query_execution_start_time = Utc::now();
self.alias_generator = Arc::new(AliasGenerator::new());
self.config_options = Some(config_options);
&*self
}

Expand Down Expand Up @@ -99,6 +104,12 @@ impl ExecutionProps {
.as_ref()
.and_then(|var_providers| var_providers.get(&var_type).cloned())
}

/// Returns the configuration properties for this execution
/// if the execution has started
pub fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
Comment on lines +108 to +110
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the with_query_execution_start_time makes / used to make ExecutionProps "as started"
these call sites should be updated as well

self.config_options.as_ref()
}
}

#[cfg(test)]
Expand Down
Loading