Skip to content

Commit 258af4b

Browse files
authored
Add Analyzer phase to DataFusion , add basic validation logic to Subquery Plans and Expressions (#5570)
* Add a rule based Analyzer, add basic check for SubQuery expressions * remove the failed UT, add comments
1 parent 0f6931c commit 258af4b

File tree

5 files changed

+441
-2
lines changed

5 files changed

+441
-2
lines changed

datafusion/core/tests/sql/subqueries.rs

+34
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,37 @@ async fn in_subquery_with_same_table() -> Result<()> {
179179

180180
Ok(())
181181
}
182+
183+
#[tokio::test]
184+
async fn invalid_scalar_subquery() -> Result<()> {
185+
let ctx = create_join_context("t1_id", "t2_id", true)?;
186+
187+
let sql = "SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1";
188+
let msg = format!("Creating logical plan for '{sql}'");
189+
let dataframe = ctx.sql(sql).await.expect(&msg);
190+
let err = dataframe.into_optimized_plan().err().unwrap();
191+
assert_eq!(
192+
"Plan(\"Scalar subquery should only return one column\")",
193+
&format!("{err:?}")
194+
);
195+
196+
Ok(())
197+
}
198+
199+
#[tokio::test]
200+
async fn subquery_not_allowed() -> Result<()> {
201+
let ctx = create_join_context("t1_id", "t2_id", true)?;
202+
203+
// In/Exist Subquery is not allowed in ORDER BY clause.
204+
let sql = "SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int)";
205+
let msg = format!("Creating logical plan for '{sql}'");
206+
let dataframe = ctx.sql(sql).await.expect(&msg);
207+
let err = dataframe.into_optimized_plan().err().unwrap();
208+
209+
assert_eq!(
210+
"Plan(\"In/Exist subquery can not be used in Sort plan nodes\")",
211+
&format!("{err:?}")
212+
);
213+
214+
Ok(())
215+
}

datafusion/optimizer/src/analyzer.rs

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::rewrite::TreeNodeRewritable;
19+
use datafusion_common::config::ConfigOptions;
20+
use datafusion_common::{DataFusionError, Result};
21+
use datafusion_expr::expr_visitor::inspect_expr_pre;
22+
use datafusion_expr::{Expr, LogicalPlan};
23+
use log::{debug, trace};
24+
use std::sync::Arc;
25+
use std::time::Instant;
26+
27+
/// `AnalyzerRule` transforms the unresolved ['LogicalPlan']s and unresolved ['Expr']s into
28+
/// the resolved form.
29+
pub trait AnalyzerRule {
30+
/// Rewrite `plan`
31+
fn analyze(&self, plan: &LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
32+
33+
/// A human readable name for this analyzer rule
34+
fn name(&self) -> &str;
35+
}
36+
/// A rule-based Analyzer.
37+
#[derive(Clone)]
38+
pub struct Analyzer {
39+
/// All rules to apply
40+
pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
41+
}
42+
43+
impl Default for Analyzer {
44+
fn default() -> Self {
45+
Self::new()
46+
}
47+
}
48+
49+
impl Analyzer {
50+
/// Create a new analyzer using the recommended list of rules
51+
pub fn new() -> Self {
52+
let rules = vec![];
53+
Self::with_rules(rules)
54+
}
55+
56+
/// Create a new analyzer with the given rules
57+
pub fn with_rules(rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>) -> Self {
58+
Self { rules }
59+
}
60+
61+
/// Analyze the logical plan by applying analyzer rules, and
62+
/// do necessary check and fail the invalid plans
63+
pub fn execute_and_check(
64+
&self,
65+
plan: &LogicalPlan,
66+
config: &ConfigOptions,
67+
) -> Result<LogicalPlan> {
68+
let start_time = Instant::now();
69+
let mut new_plan = plan.clone();
70+
71+
// TODO add common rule executor for Analyzer and Optimizer
72+
for rule in &self.rules {
73+
new_plan = rule.analyze(&new_plan, config)?;
74+
}
75+
check_plan(&new_plan)?;
76+
log_plan("Final analyzed plan", &new_plan);
77+
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
78+
Ok(new_plan)
79+
}
80+
}
81+
82+
/// Log the plan in debug/tracing mode after some part of the optimizer runs
83+
fn log_plan(description: &str, plan: &LogicalPlan) {
84+
debug!("{description}:\n{}\n", plan.display_indent());
85+
trace!("{description}::\n{}\n", plan.display_indent_schema());
86+
}
87+
88+
/// Do necessary check and fail the invalid plan
89+
fn check_plan(plan: &LogicalPlan) -> Result<()> {
90+
plan.for_each_up(&|plan: &LogicalPlan| {
91+
plan.expressions().into_iter().try_for_each(|expr| {
92+
// recursively look for subqueries
93+
inspect_expr_pre(&expr, |expr| match expr {
94+
Expr::Exists { subquery, .. }
95+
| Expr::InSubquery { subquery, .. }
96+
| Expr::ScalarSubquery(subquery) => {
97+
check_subquery_expr(plan, &subquery.subquery, expr)
98+
}
99+
_ => Ok(()),
100+
})
101+
})
102+
})
103+
}
104+
105+
/// Do necessary check on subquery expressions and fail the invalid plan
106+
/// 1) Check whether the outer plan is in the allowed outer plans list to use subquery expressions,
107+
/// the allowed while list: [Projection, Filter, Window, Aggregate, Sort, Join].
108+
/// 2) Check whether the inner plan is in the allowed inner plans list to use correlated(outer) expressions.
109+
/// 3) Check and validate unsupported cases to use the correlated(outer) expressions inside the subquery(inner) plans/inner expressions.
110+
/// For example, we do not want to support to use correlated expressions as the Join conditions in the subquery plan when the Join
111+
/// is a Full Out Join
112+
fn check_subquery_expr(
113+
outer_plan: &LogicalPlan,
114+
inner_plan: &LogicalPlan,
115+
expr: &Expr,
116+
) -> Result<()> {
117+
check_plan(inner_plan)?;
118+
119+
// Scalar subquery should only return one column
120+
if matches!(expr, Expr::ScalarSubquery(subquery) if subquery.subquery.schema().fields().len() > 1)
121+
{
122+
return Err(DataFusionError::Plan(
123+
"Scalar subquery should only return one column".to_string(),
124+
));
125+
}
126+
127+
match outer_plan {
128+
LogicalPlan::Projection(_)
129+
| LogicalPlan::Filter(_)
130+
| LogicalPlan::Window(_)
131+
| LogicalPlan::Aggregate(_)
132+
| LogicalPlan::Join(_) => Ok(()),
133+
LogicalPlan::Sort(_) => match expr {
134+
Expr::InSubquery { .. } | Expr::Exists { .. } => Err(DataFusionError::Plan(
135+
"In/Exist subquery can not be used in Sort plan nodes".to_string(),
136+
)),
137+
Expr::ScalarSubquery(_) => Ok(()),
138+
_ => Ok(()),
139+
},
140+
_ => Err(DataFusionError::Plan(
141+
"Subquery can only be used in Projection, Filter, \
142+
Window functions, Aggregate, Sort and Join plan nodes"
143+
.to_string(),
144+
)),
145+
}?;
146+
check_correlations_in_subquery(outer_plan, inner_plan, expr, true)
147+
}
148+
149+
// Recursively check the unsupported outer references in the sub query plan.
150+
fn check_correlations_in_subquery(
151+
outer_plan: &LogicalPlan,
152+
inner_plan: &LogicalPlan,
153+
expr: &Expr,
154+
can_contain_outer_ref: bool,
155+
) -> Result<()> {
156+
// We want to support as many operators as possible inside the correlated subquery
157+
if !can_contain_outer_ref && contains_outer_reference(outer_plan, inner_plan, expr) {
158+
return Err(DataFusionError::Plan(
159+
"Accessing outer reference column is not allowed in the plan".to_string(),
160+
));
161+
}
162+
match inner_plan {
163+
LogicalPlan::Projection(_)
164+
| LogicalPlan::Filter(_)
165+
| LogicalPlan::Window(_)
166+
| LogicalPlan::Aggregate(_)
167+
| LogicalPlan::Distinct(_)
168+
| LogicalPlan::Sort(_)
169+
| LogicalPlan::CrossJoin(_)
170+
| LogicalPlan::Union(_)
171+
| LogicalPlan::TableScan(_)
172+
| LogicalPlan::EmptyRelation(_)
173+
| LogicalPlan::Limit(_)
174+
| LogicalPlan::Subquery(_)
175+
| LogicalPlan::SubqueryAlias(_) => inner_plan.apply_children(|plan| {
176+
check_correlations_in_subquery(outer_plan, plan, expr, can_contain_outer_ref)
177+
}),
178+
LogicalPlan::Join(_) => {
179+
// TODO support correlation columns in the subquery join
180+
inner_plan.apply_children(|plan| {
181+
check_correlations_in_subquery(
182+
outer_plan,
183+
plan,
184+
expr,
185+
can_contain_outer_ref,
186+
)
187+
})
188+
}
189+
_ => Err(DataFusionError::Plan(
190+
"Unsupported operator in the subquery plan.".to_string(),
191+
)),
192+
}
193+
}
194+
195+
fn contains_outer_reference(
196+
_outer_plan: &LogicalPlan,
197+
_inner_plan: &LogicalPlan,
198+
_expr: &Expr,
199+
) -> bool {
200+
// TODO check outer references
201+
false
202+
}

datafusion/optimizer/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
pub mod alias;
19+
pub mod analyzer;
1920
pub mod common_subexpr_eliminate;
2021
pub mod decorrelate_where_exists;
2122
pub mod decorrelate_where_in;
@@ -35,6 +36,7 @@ pub mod push_down_filter;
3536
pub mod push_down_limit;
3637
pub mod push_down_projection;
3738
pub mod replace_distinct_aggregate;
39+
pub mod rewrite;
3840
pub mod rewrite_disjunctive_predicate;
3941
pub mod scalar_subquery_to_join;
4042
pub mod simplify_expressions;

datafusion/optimizer/src/optimizer.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Query optimizer traits
1919
20+
use crate::analyzer::Analyzer;
2021
use crate::common_subexpr_eliminate::CommonSubexprEliminate;
2122
use crate::decorrelate_where_exists::DecorrelateWhereExists;
2223
use crate::decorrelate_where_in::DecorrelateWhereIn;
@@ -266,9 +267,10 @@ impl Optimizer {
266267
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
267268
{
268269
let options = config.options();
270+
let analyzed_plan = Analyzer::default().execute_and_check(plan, options)?;
269271
let start_time = Instant::now();
270-
let mut old_plan = Cow::Borrowed(plan);
271-
let mut new_plan = plan.clone();
272+
let mut old_plan = Cow::Borrowed(&analyzed_plan);
273+
let mut new_plan = analyzed_plan.clone();
272274
let mut i = 0;
273275
while i < options.optimizer.max_passes {
274276
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);

0 commit comments

Comments
 (0)