Skip to content

Commit 291149d

Browse files
committed
asof
1 parent 5b874fa commit 291149d

File tree

14 files changed

+392
-516
lines changed

14 files changed

+392
-516
lines changed

src/query/service/src/physical_plans/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ pub use physical_window_partition::*;
113113

114114
pub mod explain;
115115
mod format;
116-
mod physical_asof_join;
117116
mod physical_cte_consumer;
118117
mod physical_materialized_cte;
119118
mod physical_plan;

src/query/service/src/physical_plans/physical_asof_join.rs

Lines changed: 0 additions & 425 deletions
This file was deleted.

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -505,8 +505,8 @@ impl PhysicalPlanBuilder {
505505
left_required: ColumnSet,
506506
right_required: ColumnSet,
507507
) -> Result<(PhysicalPlan, PhysicalPlan)> {
508-
let probe_side = self.build(s_expr.child(0)?, left_required).await?;
509-
let build_side = self.build(s_expr.child(1)?, right_required).await?;
508+
let probe_side = self.build(s_expr.left_child(), left_required).await?;
509+
let build_side = self.build(s_expr.right_child(), right_required).await?;
510510

511511
Ok((probe_side, build_side))
512512
}
@@ -537,7 +537,7 @@ impl PhysicalPlanBuilder {
537537
/// * `Result<DataSchemaRef>` - The prepared schema for the build side
538538
pub fn prepare_build_schema(
539539
&self,
540-
join_type: &JoinType,
540+
join_type: JoinType,
541541
build_side: &PhysicalPlan,
542542
) -> Result<DataSchemaRef> {
543543
match join_type {
@@ -577,7 +577,7 @@ impl PhysicalPlanBuilder {
577577
/// * `Result<DataSchemaRef>` - The prepared schema for the probe side
578578
pub fn prepare_probe_schema(
579579
&self,
580-
join_type: &JoinType,
580+
join_type: JoinType,
581581
probe_side: &PhysicalPlan,
582582
) -> Result<DataSchemaRef> {
583583
match join_type {
@@ -1252,8 +1252,8 @@ impl PhysicalPlanBuilder {
12521252
self.unify_keys(&mut probe_side, &mut build_side)?;
12531253

12541254
// Step 4: Prepare schemas for both sides
1255-
let build_schema = self.prepare_build_schema(&join.join_type, &build_side)?;
1256-
let probe_schema = self.prepare_probe_schema(&join.join_type, &probe_side)?;
1255+
let build_schema = self.prepare_build_schema(join.join_type, &build_side)?;
1256+
let probe_schema = self.prepare_probe_schema(join.join_type, &probe_side)?;
12571257

12581258
// Step 5: Process join conditions
12591259
let (

src/query/service/src/physical_plans/physical_join.rs

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_sql::binder::JoinPredicate;
1818
use databend_common_sql::optimizer::ir::RelExpr;
1919
use databend_common_sql::optimizer::ir::RelationalProperty;
2020
use databend_common_sql::optimizer::ir::SExpr;
21+
use databend_common_sql::plans::FunctionCall;
2122
use databend_common_sql::plans::Join;
2223
use databend_common_sql::plans::JoinType;
2324
use databend_common_sql::ColumnSet;
@@ -27,41 +28,35 @@ use crate::physical_plans::explain::PlanStatsInfo;
2728
use crate::physical_plans::physical_plan::PhysicalPlan;
2829
use crate::physical_plans::PhysicalPlanBuilder;
2930

30-
pub enum PhysicalJoinType {
31+
enum PhysicalJoinType {
3132
Hash,
3233
// The first arg is range conditions, the second arg is other conditions
3334
RangeJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
34-
AsofJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
3535
}
3636

3737
// Choose physical join type by join conditions
38-
pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
39-
let check_asof = matches!(
40-
join.join_type,
41-
JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof
42-
);
43-
38+
fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
4439
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
4540
return Err(ErrorCode::SemanticError(
4641
"ANY JOIN only supports equality-based hash joins",
4742
));
4843
}
49-
if !join.equi_conditions.is_empty() && !check_asof {
44+
45+
if !join.equi_conditions.is_empty() {
5046
// Contain equi condition, use hash join
5147
return Ok(PhysicalJoinType::Hash);
5248
}
5349

54-
if join.build_side_cache_info.is_some() && !check_asof {
50+
if join.build_side_cache_info.is_some() {
5551
// There is a build side cache, use hash join.
5652
return Ok(PhysicalJoinType::Hash);
5753
}
5854

5955
let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?);
6056
let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?);
6157
let right_stat_info = right_rel_expr.derive_cardinality()?;
62-
if !check_asof
63-
&& (matches!(right_stat_info.statistics.precise_cardinality, Some(1))
64-
|| right_stat_info.cardinality == 1.0)
58+
if matches!(right_stat_info.statistics.precise_cardinality, Some(1))
59+
|| right_stat_info.cardinality == 1.0
6560
{
6661
// If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of RANGE JOIN.
6762
return Ok(PhysicalJoinType::Hash);
@@ -86,12 +81,6 @@ pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
8681
other_conditions,
8782
));
8883
}
89-
if check_asof {
90-
return Ok(PhysicalJoinType::AsofJoin(
91-
range_conditions,
92-
other_conditions,
93-
));
94-
}
9584
// Leverage hash join to execute nested loop join
9685
Ok(PhysicalJoinType::Hash)
9786
}
@@ -175,27 +164,72 @@ impl PhysicalPlanBuilder {
175164

176165
// 2. Build physical plan.
177166
// Choose physical join type by join conditions
178-
let physical_join = physical_join(join, s_expr)?;
179-
match physical_join {
180-
PhysicalJoinType::Hash => {
181-
self.build_hash_join(
182-
join,
183-
s_expr,
184-
required,
185-
others_required,
186-
left_required,
187-
right_required,
188-
stat_info,
167+
if matches!(
168+
join.join_type,
169+
JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof
170+
) {
171+
let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child());
172+
let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child());
173+
let left_prop = left_rel_expr.derive_relational_prop()?;
174+
let right_prop = right_rel_expr.derive_relational_prop()?;
175+
let mut range_conditions = vec![];
176+
let mut other_conditions = vec![];
177+
178+
for condition in join.equi_conditions.iter().cloned() {
179+
other_conditions.push(
180+
FunctionCall {
181+
span: condition.left.span(),
182+
func_name: "eq".to_string(),
183+
params: vec![],
184+
arguments: vec![condition.left, condition.right],
185+
}
186+
.into(),
187+
);
188+
}
189+
for condition in join.non_equi_conditions.iter() {
190+
check_condition(
191+
condition,
192+
&left_prop,
193+
&right_prop,
194+
&mut range_conditions,
195+
&mut other_conditions,
189196
)
190-
.await
191197
}
192-
PhysicalJoinType::AsofJoin(range, other) => {
193-
self.build_asof_join(join, s_expr, (left_required, right_required), range, other)
198+
199+
self.build_range_join(
200+
join.join_type,
201+
s_expr,
202+
left_required,
203+
right_required,
204+
range_conditions,
205+
other_conditions,
206+
)
207+
.await
208+
} else {
209+
match physical_join(join, s_expr)? {
210+
PhysicalJoinType::Hash => {
211+
self.build_hash_join(
212+
join,
213+
s_expr,
214+
required,
215+
others_required,
216+
left_required,
217+
right_required,
218+
stat_info,
219+
)
194220
.await
195-
}
196-
PhysicalJoinType::RangeJoin(range, other) => {
197-
self.build_range_join(join, s_expr, left_required, right_required, range, other)
221+
}
222+
PhysicalJoinType::RangeJoin(range, other) => {
223+
self.build_range_join(
224+
join.join_type,
225+
s_expr,
226+
left_required,
227+
right_required,
228+
range,
229+
other,
230+
)
198231
.await
232+
}
199233
}
200234
}
201235
}

src/query/service/src/physical_plans/physical_range_join.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use databend_common_sql::binder::JoinPredicate;
3030
use databend_common_sql::optimizer::ir::RelExpr;
3131
use databend_common_sql::optimizer::ir::RelationalProperty;
3232
use databend_common_sql::optimizer::ir::SExpr;
33-
use databend_common_sql::plans::Join;
3433
use databend_common_sql::plans::JoinType;
3534
use databend_common_sql::ColumnSet;
3635
use databend_common_sql::ScalarExpr;
@@ -203,15 +202,15 @@ pub struct RangeJoinCondition {
203202
impl PhysicalPlanBuilder {
204203
pub async fn build_range_join(
205204
&mut self,
206-
join: &Join,
205+
join_type: JoinType,
207206
s_expr: &SExpr,
208207
left_required: ColumnSet,
209208
right_required: ColumnSet,
210209
mut range_conditions: Vec<ScalarExpr>,
211210
mut other_conditions: Vec<ScalarExpr>,
212211
) -> Result<PhysicalPlan> {
213-
let left_prop = RelExpr::with_s_expr(s_expr.child(1)?).derive_relational_prop()?;
214-
let right_prop = RelExpr::with_s_expr(s_expr.child(0)?).derive_relational_prop()?;
212+
let left_prop = RelExpr::with_s_expr(s_expr.right_child()).derive_relational_prop()?;
213+
let right_prop = RelExpr::with_s_expr(s_expr.left_child()).derive_relational_prop()?;
215214

216215
debug_assert!(!range_conditions.is_empty());
217216

@@ -230,8 +229,8 @@ impl PhysicalPlanBuilder {
230229
.build_join_sides(s_expr, left_required, right_required)
231230
.await?;
232231

233-
let left_schema = self.prepare_probe_schema(&join.join_type, &left_side)?;
234-
let right_schema = self.prepare_build_schema(&join.join_type, &right_side)?;
232+
let left_schema = self.prepare_probe_schema(join_type, &left_side)?;
233+
let right_schema = self.prepare_build_schema(join_type, &right_side)?;
235234

236235
let mut output_schema = Vec::clone(left_schema.fields());
237236
output_schema.extend_from_slice(right_schema.fields());
@@ -265,7 +264,7 @@ impl PhysicalPlanBuilder {
265264
.iter()
266265
.map(|scalar| resolve_scalar(scalar, &merged_schema))
267266
.collect::<Result<_>>()?,
268-
join_type: join.join_type.clone(),
267+
join_type: join_type.clone(),
269268
range_join_type,
270269
output_schema: Arc::new(DataSchema::new(output_schema)),
271270
stat_info: Some(self.build_plan_stat_info(s_expr)?),

0 commit comments

Comments
 (0)