-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: Add ConfigOptions to ScalarFunctionArgs #13527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add ConfigOptions to ScalarFunctionArgs #13527
Conversation
There is a lot of file changes here but most of the important changes are in scalar_function.rs, There is a todo in expr_simplifier.rs that I would like feedback on. |
I plan to review this carefully tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Omega359 -- this is an epic plumbing exercise 🪠
The signature in ScalarFunctionArgs
is 👌 very nice
This PR seems to require config_options
to be cloned many times now. I wonder if it is possible to avoid that 🤔. I took a brief look and it seems to be somewhat challenging as SessionState allows mutable access to the underlying SessionConfig.
Maybe we could change the semantics so that SessionConfig
has a Arc<ConfigOptions>
which was cloned when it was modified (Arc::unwrap_or_clone()
style) 🤔
I also think the const evaluator does need the actual correct ConfigOptions for correctness
let physical_expr = | ||
datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?; | ||
let config_options = Arc::new(ConfigOptions::default()); | ||
let physical_expr = datafusion_physical_expr::create_physical_expr( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems somewhat inevitable that creating a physical expr will require the config options
However, I also think threading through the config options down through to the physical creation will (finally) permit people to pass things from the session down to function implementations (I think @cisaacson also was trying to do this in the past)
@@ -283,10 +284,16 @@ async fn prune_partitions( | |||
|
|||
// TODO: Plumb this down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This todo may have now be complete
datafusion/expr/src/udf.rs
Outdated
@@ -336,6 +337,8 @@ pub struct ScalarFunctionArgs<'a> { | |||
// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`) | |||
// when creating the physical expression from the logical expression | |||
pub return_type: &'a DataType, | |||
// The config options which can be used to lookup configuration properties | |||
pub config_options: Arc<ConfigOptions>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
Ok(e) => e, | ||
Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr), | ||
}; | ||
// todo - should the config options be the actual options here or is this sufficient? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the actual configuration options are needed here. Otherwise what will happen is that any function whose behavior relies on the ConfigOptions may have different behavior on columns and constants (or other expressions that can be constant folded)
Yes, it's a bit annoying. I was tempted to see if I could switch to &'a ConfigOptions everywhere. There is at least one 'real' (vs Arc::clone) clone for every query, possibly more as I haven't checked.
Certainly possible, I can attempt that.
I was afraid of that. I was avoiding it because of the signature changes it would required just about everywhere which would cause even more headaches for those systems trying to upgrade. |
Yeah, it is a tricky one for sure |
Marking as draft as I think this PR is no longer waiting on feedback. Please mark it as ready for review when it is ready for another look |
@alamb I did a quick attempt at implementing that however it breaks a commonly used method - SessionConfig.options_mut(). Not having that available breaks a bunch of stuff and while switching to SessionConfig.set(..) is quite possible it's not as clean. Trying with &ConfigOptions in ScalarFunctionExpr leads to lifetime hell in areas I have no idea how to overcome right now. As much as I want this feature I'm going to put it aside for now |
I think this may be ready for review again. For this round I refactored the code to use &ConfigOptions everywhere except for ScalarFunctionExpr so the cost for cloning ConfigOptions is only incurred when creating a scalar UDF. |
Examples check failure is transient I believe. |
I restarted the checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Omega359 -- I think this PR is a major step forward and we could merge it as is.
However, I feel strongly there are two things that should be improved soon (if not this PR then follow on ones). I left specific comments about each
- Don't copy
ConfigOptions
in every call toScalarFunction::create_physical_expr
- Add convenience methods to get default
&ConfigOptions
andArc<ConfigOptions>
which I think will help people upgrading to the next version of DataFusion quickly migrate their code
let result_exec_plan: Arc<dyn ExecutionPlan> = proto | ||
.try_into_physical_plan(&ctx, runtime.deref(), &composed_codec) | ||
.try_into_physical_plan(&ctx, config_options, runtime.deref(), &composed_codec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an API change, but I think it is required to thread the config options through as each argument is specific
We could potentially improve the try_into_physical_plan
API (as a follow on PR) to make it easier to update the API in the future using a trait or something like
So this would look something like
pub trait ProtobufContext {
/// return a function registry
fn function_registry(&self) -> &dyn FunctionRegistry;
/// return the runtime env
fn runtime_env(&self) -> &RuntimeEnv;
/// return the config options
fn config_options(&self) -> &ConfigOptions;
/// return extension codec
fn extension_codec(&self) -> &dyn PhysicalExtensionCodec;
}
impl AsExecutionPlan for protobuf::PhysicalPlanNode {
...
fn try_into_physical_plan(
&self,
registry: &dyn FunctionRegistry,
config_options: &ConfigOptions,
runtime: &RuntimeEnv,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -187,7 +188,9 @@ impl PruningStatistics for MyCatalog { | |||
fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate { | |||
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap(); | |||
let props = ExecutionProps::new(); | |||
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); | |||
let config_options = ConfigOptions::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something we could potentially do to make this API slightly easier to use might be to create a static default ConfigOptions
Something like
impl ConfigOptions {
/// returns a reference to default ConfigOptions
pub fn default_singleton() -> &'static ConfigOptions
}
This would then make it easier to return &ConfigOptions
in various places when only the default was needed
For example, then in LocalCsvTableFunc
you could avoid having to thread the ConfigOptions
through as in that example having the actual config options isn't important
@@ -243,6 +292,7 @@ pub fn create_physical_expr( | |||
Arc::new(fun.clone()), | |||
input_phy_exprs.to_vec(), | |||
return_type, | |||
Arc::new(config_options.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line worries me -- it means each distinct scalar function in the plan will get an entirely new copy of the ConfigOptions.
I think the Arc
should be passed in as the argument like this (I realize this will be a significant code change) so that the config options are copied at most once per plan
/// Create a physical expression for the UDF.
pub fn create_physical_expr(
fun: &ScalarUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
args: &[Expr],
input_dfschema: &DFSchema,
config_options: &Arc<ConfigOptions>, // <--- I think this should be an `Arc`
) -> Result<Arc<dyn PhysicalExpr>> {
I think that would also make it clearer that physical planning makes a read only copy of the configuration (Arc<ConfigOptions>
) that is then unchanged during execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my main concern as well. I am unsure that this change should merged in as is to be honest as any fix will be just as disruptive as this PR is api wise.
Pushing the clone higher up the stack is possible but I did run into major issues trying to push it all the way up including all kinds of disruptive changes like changing OptimizerConfig from &ConfigOptions to the Arc version. Doing the opposite - pushing &ConfigOptions all the way down ran into issues with ScalarFunctionExpr and DynEq/DynHash.
Speaking for myself I prefer the latter as I feel it's less disruptive overall ... I just couldn't get it to work before. Changing the signature of DynHash and DynEq traits may allow it to work but I haven't tried that yet. Maybe with some more thought in January I can get it to work but I expect my January time to be pretty limited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushing the clone higher up the stack is possible but I did run into major issues trying to push it all the way up including all kinds of disruptive changes like changing OptimizerConfig from &ConfigOptions to the Arc version. Doing the opposite - pushing &ConfigOptions all the way down ran into issues with ScalarFunctionExpr and DynEq/DynHash.
If you have an Arc<ConfigOptions>
you can always get a &ConfigOptions
by calling options.as_ref()
So in other words, I don't know if you need to push the Arc through everywhere (at least at first) -- we could just change the create physical expr. I would love to help with this -- perhaps I can find time later in the week (though I have a few other things going on too)
Another option to avoid cloning ConfigOptions
for each instance of ScalarFunction
might be to add some sort of API / way for the function to communicate "I need the config options"
Converting to draft as this PR is accumulating conflicts and it seems like it is not ready for merge yet |
4d32ee7
to
86ca11a
Compare
datafusion/core/Cargo.toml
Outdated
@@ -144,7 +144,7 @@ zstd = { version = "0.13", optional = true, default-features = false } | |||
|
|||
[dev-dependencies] | |||
async-trait = { workspace = true } | |||
criterion = { workspace = true, features = ["async_tokio"] } | |||
criterion = { workspace = true, features = ["async_tokio", "async_futures"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added so the spm benchmark would run. It's obvious that benchmarks are not being run regularly at all which should be looked into.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hit the same problem (and made the same fix) in
@Omega359 -- I started reviewing this PR today. I am still getting my head around what the API change is / what this would mean for downstream users. I'll try and spend more time over the next week giving it a more in depth look |
# Conflicts: # datafusion-examples/examples/planner_api.rs # datafusion/optimizer/src/push_down_filter.rs
…_args_session_config # Conflicts: # datafusion/core/src/physical_planner.rs
…_args_session_config # Conflicts: # datafusion/core/src/datasource/memory_test.rs # datafusion/ffi/src/udf/mod.rs
…_args_session_config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you again for this PR @Omega359
TLDR is I think the feature (config options to functions) is super valuable
My biggest concern is the impact it would have on downstream projects -- I don't have a good handle on how much churn it would create
What I suggest as a next step is:
- Send a note to the dev list (and maybe drop a link in discord and slack) asking people with such projects to offer opinions
- Maybe try to upgrade one of the downstream projects (e.g. comet or delta.rs to see what the impact is)
I'll also kick off some benchmarks on PR too
CONFIG_OPTIONS_SINGLETON.as_ref() | ||
} | ||
|
||
/// this is a static singleton to be used for testing only where the default values are sufficient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
…_args_session_config # Conflicts: # datafusion/core/src/datasource/listing/table.rs # datafusion/core/src/datasource/memory_test.rs # datafusion/core/src/physical_planner.rs # datafusion/ffi/src/udf/mod.rs # datafusion/optimizer/src/utils.rs
…_config' into feature/scalar_func_args_session_config # Conflicts: # datafusion/catalog/src/memory/table.rs
I keep thinking about this PR in the back of my head. The idea of being able to customize functions based on configuration options makes total sense to me, but having to plumb it down all the way through to execution is so disruptive on the APIs and adds non trivial overhead to the whole system for just a few functions One alternate idea I had was to store whatever setting from the ConfigOptions in the function itself. Then prior to execution we could update all the functions that had option specific state. Something like: // update config options...
// Update all registered functions:
for name in ctx.udf_names() {
let old_func = ctx.get_udf(&name);
if let Some(new_func) = new_func.with_config_options(options) {
ctx.register_udf(new_func)
}
} This would make updating config options potentially slower (as now functions would also need to be updated) but I think the change would be much more localized Another would be to use the existing |
Not sure I'd call a clone or two overhead but I agree with you on the plumbing. It's a lot. In JVM land I would likely have used a thread local variable to hold a session id and have a static factory to be able to lookup the config based on that session id. While that could technically work here I think @tustvold has advised in the past the thread locals with tokio are not the way to go. I had thought of the simplify option but if you look at how that is implemented you would still have to thread the options through the Optimizer and would also require a signature change for OptimizerRule, the same as is required for this PR. It would be less invasive overall though. Your first option though, hmm. It would work well for config options but long term it wouldn't allow session variables. That could be a future problem to solve I guess. I'll look into what it would take to impl this option. |
Another idea: Since
What if you added the necessary state (like "SQL mode") and then copied that through (possibly all the way to ScalarFunctionArgs) 🤔 https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarFunctionArgs.html It would still be some plumbing but it might be much less invasive as ExecutionProps is already sent many places -- ExecutionProps is also already very small 🤔 |
Hmm. 🤔 It would mean that we would have to copy things at execution time from ConfigOptions to an instance of ExecutionProps (timezone, sql mode, whatever we deem useful) and perhaps others (Extensions comes to mind) though I don't think that would be horribly difficult. Perhaps we could use Vec<ConfigEntry> for this - or map<key, ConfigEntry> so consumers don't have to iterate the options to find the one they are interested in. I agree that this approach would be very much less invasive. I actually like this approach a bit more than your other idea of adding yet another function to the UDF's to set the config_options on them. It would also allow for per-query changes since session variables would then be accessible to UDF's. Unless someone can poke a pretty big hole in this approach I'll start another branch and work on this. |
I've spent some time looking at using ExecutionProps for this and while I think it'll work it's still a lot of churn. That churn is largely because of two reasons:
Essentially, I think it'll be better than the ConfigOptions approach but not by a huge amount. |
Which issue does this PR close?
Closes #13519
Rationale for this change
Allow udf's to access df config to allow for their behaviour to change based on configuration. For example, allows date and timestamp udf's to use a different timezone than UTC or to allow date/timestamp parsing to have ANSI behaviour when parsing fails.
What changes are included in this PR?
Code. Most of the changes in this PR are plumbing related threading ConfigOptions down through the api to the physical planner. The idea for this is to clone the ConfigOptions the minimal # of times possible (essentially once at the beginning of optimization and planning) for each execution of a query.
Are these changes tested?
Existing tests.
Are there any user-facing changes?
Not specifically, this is covered with the udf signature change in #13290Yes. A number of functions now include an &Arc argument in the parameter list. As well,
SessionState
no longer has anOptimizerConfig
implementation directly, rather that was extracted out to aSessionStateOptimerConfig
struct. I'll work on a full exact list to add here