Skip to content

Commit 3b47ead

Browse files
authored
refactor(binder): move the rewrite of ASOF JOIN to the logical plan and remove scalar_expr from DerivedColumn (#18938)
* asof * refine * is_asof_join * refine * fix
1 parent 33102cc commit 3b47ead

File tree

43 files changed

+529
-835
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+529
-835
lines changed

โ€Žsrc/query/service/src/physical_plans/mod.rsโ€Ž

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ pub use physical_window_partition::*;
113113

114114
pub mod explain;
115115
mod format;
116-
mod physical_asof_join;
117116
mod physical_cte_consumer;
118117
mod physical_materialized_cte;
119118
mod physical_plan;

โ€Žsrc/query/service/src/physical_plans/physical_asof_join.rsโ€Ž

Lines changed: 0 additions & 425 deletions
This file was deleted.

โ€Žsrc/query/service/src/physical_plans/physical_hash_join.rsโ€Ž

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,14 @@ impl IPhysicalPlan for HashJoin {
240240
probe_keys: self.probe_keys.clone(),
241241
is_null_equal: self.is_null_equal.clone(),
242242
non_equi_conditions: self.non_equi_conditions.clone(),
243-
join_type: self.join_type.clone(),
243+
join_type: self.join_type,
244244
marker_index: self.marker_index,
245245
from_correlated_subquery: self.from_correlated_subquery,
246246
probe_to_build: self.probe_to_build.clone(),
247247
output_schema: self.output_schema.clone(),
248248
need_hold_hash_table: self.need_hold_hash_table,
249249
stat_info: self.stat_info.clone(),
250-
single_to_inner: self.single_to_inner.clone(),
250+
single_to_inner: self.single_to_inner,
251251
build_side_cache_info: self.build_side_cache_info.clone(),
252252
runtime_filter: self.runtime_filter.clone(),
253253
broadcast_id: self.broadcast_id,
@@ -445,7 +445,7 @@ impl HashJoin {
445445
build_input.clone(),
446446
probe_input.clone(),
447447
joined_output.clone(),
448-
factory.create_hash_join(self.join_type.clone(), 0)?,
448+
factory.create_hash_join(self.join_type, 0)?,
449449
stage_sync_barrier.clone(),
450450
self.projections.clone(),
451451
rf_desc.clone(),
@@ -505,8 +505,8 @@ impl PhysicalPlanBuilder {
505505
left_required: ColumnSet,
506506
right_required: ColumnSet,
507507
) -> Result<(PhysicalPlan, PhysicalPlan)> {
508-
let probe_side = self.build(s_expr.child(0)?, left_required).await?;
509-
let build_side = self.build(s_expr.child(1)?, right_required).await?;
508+
let probe_side = self.build(s_expr.left_child(), left_required).await?;
509+
let build_side = self.build(s_expr.right_child(), right_required).await?;
510510

511511
Ok((probe_side, build_side))
512512
}
@@ -537,7 +537,7 @@ impl PhysicalPlanBuilder {
537537
/// * `Result<DataSchemaRef>` - The prepared schema for the build side
538538
pub fn prepare_build_schema(
539539
&self,
540-
join_type: &JoinType,
540+
join_type: JoinType,
541541
build_side: &PhysicalPlan,
542542
) -> Result<DataSchemaRef> {
543543
match join_type {
@@ -577,7 +577,7 @@ impl PhysicalPlanBuilder {
577577
/// * `Result<DataSchemaRef>` - The prepared schema for the probe side
578578
pub fn prepare_probe_schema(
579579
&self,
580-
join_type: &JoinType,
580+
join_type: JoinType,
581581
probe_side: &PhysicalPlan,
582582
) -> Result<DataSchemaRef> {
583583
match join_type {
@@ -1210,7 +1210,7 @@ impl PhysicalPlanBuilder {
12101210
probe_projections,
12111211
build: build_side,
12121212
probe: probe_side,
1213-
join_type: join.join_type.clone(),
1213+
join_type: join.join_type,
12141214
build_keys: right_join_conditions,
12151215
probe_keys: left_join_conditions,
12161216
is_null_equal,
@@ -1222,7 +1222,7 @@ impl PhysicalPlanBuilder {
12221222
output_schema,
12231223
need_hold_hash_table: join.need_hold_hash_table,
12241224
stat_info: Some(stat_info),
1225-
single_to_inner: join.single_to_inner.clone(),
1225+
single_to_inner: join.single_to_inner,
12261226
build_side_cache_info,
12271227
runtime_filter,
12281228
broadcast_id,
@@ -1252,8 +1252,8 @@ impl PhysicalPlanBuilder {
12521252
self.unify_keys(&mut probe_side, &mut build_side)?;
12531253

12541254
// Step 4: Prepare schemas for both sides
1255-
let build_schema = self.prepare_build_schema(&join.join_type, &build_side)?;
1256-
let probe_schema = self.prepare_probe_schema(&join.join_type, &probe_side)?;
1255+
let build_schema = self.prepare_build_schema(join.join_type, &build_side)?;
1256+
let probe_schema = self.prepare_probe_schema(join.join_type, &probe_side)?;
12571257

12581258
// Step 5: Process join conditions
12591259
let (

โ€Žsrc/query/service/src/physical_plans/physical_join.rsโ€Ž

Lines changed: 70 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
use databend_common_exception::ErrorCode;
1616
use databend_common_exception::Result;
17-
use databend_common_sql::binder::JoinPredicate;
17+
use databend_common_sql::binder::is_range_join_condition;
1818
use databend_common_sql::optimizer::ir::RelExpr;
19-
use databend_common_sql::optimizer::ir::RelationalProperty;
2019
use databend_common_sql::optimizer::ir::SExpr;
20+
use databend_common_sql::plans::FunctionCall;
2121
use databend_common_sql::plans::Join;
2222
use databend_common_sql::plans::JoinType;
2323
use databend_common_sql::ColumnSet;
@@ -27,109 +27,60 @@ use crate::physical_plans::explain::PlanStatsInfo;
2727
use crate::physical_plans::physical_plan::PhysicalPlan;
2828
use crate::physical_plans::PhysicalPlanBuilder;
2929

30-
pub enum PhysicalJoinType {
30+
enum PhysicalJoinType {
3131
Hash,
3232
// The first arg is range conditions, the second arg is other conditions
3333
RangeJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
34-
AsofJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
3534
}
3635

3736
// Choose physical join type by join conditions
38-
pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
39-
let check_asof = matches!(
40-
join.join_type,
41-
JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof
42-
);
43-
37+
fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
4438
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
4539
return Err(ErrorCode::SemanticError(
4640
"ANY JOIN only supports equality-based hash joins",
4741
));
4842
}
49-
if !join.equi_conditions.is_empty() && !check_asof {
43+
44+
if !join.equi_conditions.is_empty() {
5045
// Contain equi condition, use hash join
5146
return Ok(PhysicalJoinType::Hash);
5247
}
5348

54-
if join.build_side_cache_info.is_some() && !check_asof {
49+
if join.build_side_cache_info.is_some() {
5550
// There is a build side cache, use hash join.
5651
return Ok(PhysicalJoinType::Hash);
5752
}
5853

5954
let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?);
6055
let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?);
6156
let right_stat_info = right_rel_expr.derive_cardinality()?;
62-
if !check_asof
63-
&& (matches!(right_stat_info.statistics.precise_cardinality, Some(1))
64-
|| right_stat_info.cardinality == 1.0)
57+
if matches!(right_stat_info.statistics.precise_cardinality, Some(1))
58+
|| right_stat_info.cardinality == 1.0
6559
{
6660
// If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of RANGE JOIN.
6761
return Ok(PhysicalJoinType::Hash);
6862
}
6963

7064
let left_prop = left_rel_expr.derive_relational_prop()?;
7165
let right_prop = right_rel_expr.derive_relational_prop()?;
72-
let mut range_conditions = vec![];
73-
let mut other_conditions = vec![];
74-
for condition in join.non_equi_conditions.iter() {
75-
check_condition(
76-
condition,
77-
&left_prop,
78-
&right_prop,
79-
&mut range_conditions,
80-
&mut other_conditions,
81-
)
82-
}
66+
let (range_conditions, other_conditions) = join
67+
.non_equi_conditions
68+
.iter()
69+
.cloned()
70+
.partition::<Vec<_>, _>(|condition| {
71+
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
72+
});
73+
8374
if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) {
8475
return Ok(PhysicalJoinType::RangeJoin(
8576
range_conditions,
8677
other_conditions,
8778
));
8879
}
89-
if check_asof {
90-
return Ok(PhysicalJoinType::AsofJoin(
91-
range_conditions,
92-
other_conditions,
93-
));
94-
}
9580
// Leverage hash join to execute nested loop join
9681
Ok(PhysicalJoinType::Hash)
9782
}
9883

99-
fn check_condition(
100-
expr: &ScalarExpr,
101-
left_prop: &RelationalProperty,
102-
right_prop: &RelationalProperty,
103-
range_conditions: &mut Vec<ScalarExpr>,
104-
other_conditions: &mut Vec<ScalarExpr>,
105-
) {
106-
if let ScalarExpr::FunctionCall(func) = expr {
107-
if func.arguments.len() != 2
108-
|| !matches!(func.func_name.as_str(), "gt" | "lt" | "gte" | "lte")
109-
{
110-
other_conditions.push(expr.clone());
111-
return;
112-
}
113-
let mut left = false;
114-
let mut right = false;
115-
for arg in func.arguments.iter() {
116-
let join_predicate = JoinPredicate::new(arg, left_prop, right_prop);
117-
match join_predicate {
118-
JoinPredicate::Left(_) => left = true,
119-
JoinPredicate::Right(_) => right = true,
120-
JoinPredicate::Both { .. } | JoinPredicate::Other(_) | JoinPredicate::ALL(_) => {
121-
return;
122-
}
123-
}
124-
}
125-
if left && right {
126-
range_conditions.push(expr.clone());
127-
return;
128-
}
129-
}
130-
other_conditions.push(expr.clone());
131-
}
132-
13384
impl PhysicalPlanBuilder {
13485
pub async fn build_join(
13586
&mut self,
@@ -175,27 +126,61 @@ impl PhysicalPlanBuilder {
175126

176127
// 2. Build physical plan.
177128
// Choose physical join type by join conditions
178-
let physical_join = physical_join(join, s_expr)?;
179-
match physical_join {
180-
PhysicalJoinType::Hash => {
181-
self.build_hash_join(
182-
join,
183-
s_expr,
184-
required,
185-
others_required,
186-
left_required,
187-
right_required,
188-
stat_info,
189-
)
190-
.await
191-
}
192-
PhysicalJoinType::AsofJoin(range, other) => {
193-
self.build_asof_join(join, s_expr, (left_required, right_required), range, other)
129+
if join.join_type.is_asof_join() {
130+
let left_prop = s_expr.left_child().derive_relational_prop()?;
131+
let right_prop = s_expr.right_child().derive_relational_prop()?;
132+
133+
let (range_conditions, other_conditions) = join
134+
.non_equi_conditions
135+
.iter()
136+
.cloned()
137+
.chain(join.equi_conditions.iter().cloned().map(|condition| {
138+
FunctionCall {
139+
span: condition.left.span(),
140+
func_name: "eq".to_string(),
141+
params: vec![],
142+
arguments: vec![condition.left, condition.right],
143+
}
144+
.into()
145+
}))
146+
.partition(|condition| {
147+
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
148+
});
149+
150+
self.build_range_join(
151+
join.join_type,
152+
s_expr,
153+
left_required,
154+
right_required,
155+
range_conditions,
156+
other_conditions,
157+
)
158+
.await
159+
} else {
160+
match physical_join(join, s_expr)? {
161+
PhysicalJoinType::Hash => {
162+
self.build_hash_join(
163+
join,
164+
s_expr,
165+
required,
166+
others_required,
167+
left_required,
168+
right_required,
169+
stat_info,
170+
)
194171
.await
195-
}
196-
PhysicalJoinType::RangeJoin(range, other) => {
197-
self.build_range_join(join, s_expr, left_required, right_required, range, other)
172+
}
173+
PhysicalJoinType::RangeJoin(range, other) => {
174+
self.build_range_join(
175+
join.join_type,
176+
s_expr,
177+
left_required,
178+
right_required,
179+
range,
180+
other,
181+
)
198182
.await
183+
}
199184
}
200185
}
201186
}

โ€Žsrc/query/service/src/physical_plans/physical_range_join.rsโ€Ž

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use databend_common_sql::binder::JoinPredicate;
3030
use databend_common_sql::optimizer::ir::RelExpr;
3131
use databend_common_sql::optimizer::ir::RelationalProperty;
3232
use databend_common_sql::optimizer::ir::SExpr;
33-
use databend_common_sql::plans::Join;
3433
use databend_common_sql::plans::JoinType;
3534
use databend_common_sql::ColumnSet;
3635
use databend_common_sql::ScalarExpr;
@@ -134,7 +133,7 @@ impl IPhysicalPlan for RangeJoin {
134133
right: right_child,
135134
conditions: self.conditions.clone(),
136135
other_conditions: self.other_conditions.clone(),
137-
join_type: self.join_type.clone(),
136+
join_type: self.join_type,
138137
range_join_type: self.range_join_type.clone(),
139138
output_schema: self.output_schema.clone(),
140139
stat_info: self.stat_info.clone(),
@@ -203,15 +202,15 @@ pub struct RangeJoinCondition {
203202
impl PhysicalPlanBuilder {
204203
pub async fn build_range_join(
205204
&mut self,
206-
join: &Join,
205+
join_type: JoinType,
207206
s_expr: &SExpr,
208207
left_required: ColumnSet,
209208
right_required: ColumnSet,
210209
mut range_conditions: Vec<ScalarExpr>,
211210
mut other_conditions: Vec<ScalarExpr>,
212211
) -> Result<PhysicalPlan> {
213-
let left_prop = RelExpr::with_s_expr(s_expr.child(1)?).derive_relational_prop()?;
214-
let right_prop = RelExpr::with_s_expr(s_expr.child(0)?).derive_relational_prop()?;
212+
let left_prop = RelExpr::with_s_expr(s_expr.right_child()).derive_relational_prop()?;
213+
let right_prop = RelExpr::with_s_expr(s_expr.left_child()).derive_relational_prop()?;
215214

216215
debug_assert!(!range_conditions.is_empty());
217216

@@ -230,8 +229,8 @@ impl PhysicalPlanBuilder {
230229
.build_join_sides(s_expr, left_required, right_required)
231230
.await?;
232231

233-
let left_schema = self.prepare_probe_schema(&join.join_type, &left_side)?;
234-
let right_schema = self.prepare_build_schema(&join.join_type, &right_side)?;
232+
let left_schema = self.prepare_probe_schema(join_type, &left_side)?;
233+
let right_schema = self.prepare_build_schema(join_type, &right_side)?;
235234

236235
let mut output_schema = Vec::clone(left_schema.fields());
237236
output_schema.extend_from_slice(right_schema.fields());
@@ -265,7 +264,7 @@ impl PhysicalPlanBuilder {
265264
.iter()
266265
.map(|scalar| resolve_scalar(scalar, &merged_schema))
267266
.collect::<Result<_>>()?,
268-
join_type: join.join_type.clone(),
267+
join_type,
269268
range_join_type,
270269
output_schema: Arc::new(DataSchema::new(output_schema)),
271270
stat_info: Some(self.build_plan_stat_info(s_expr)?),

โ€Žsrc/query/service/src/pipelines/processors/transforms/hash_join/desc.rsโ€Ž

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl HashJoinDesc {
118118
.collect();
119119

120120
Ok(HashJoinDesc {
121-
join_type: join.join_type.clone(),
121+
join_type: join.join_type,
122122
build_keys,
123123
probe_keys,
124124
is_null_equal: join.is_null_equal.clone(),
@@ -128,7 +128,7 @@ impl HashJoinDesc {
128128
// marker_index: join.marker_index,
129129
},
130130
from_correlated_subquery: join.from_correlated_subquery,
131-
single_to_inner: join.single_to_inner.clone(),
131+
single_to_inner: join.single_to_inner,
132132
runtime_filter: (&join.runtime_filter).into(),
133133
probe_to_build: join.probe_to_build.clone(),
134134
build_projection: join.build_projections.clone(),

โ€Žsrc/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rsโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ impl HashJoinBuildState {
870870
}
871871

872872
pub(crate) fn join_type(&self) -> JoinType {
873-
self.hash_join_state.hash_join_desc.join_type.clone()
873+
self.hash_join_state.hash_join_desc.join_type
874874
}
875875

876876
pub fn runtime_filter_desc(&self) -> &[RuntimeFilterDesc] {

0 commit comments

Comments
ย (0)