Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/query/service/src/physical_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub use physical_window_partition::*;

pub mod explain;
mod format;
mod physical_asof_join;
mod physical_cte_consumer;
mod physical_materialized_cte;
mod physical_plan;
Expand Down
425 changes: 0 additions & 425 deletions src/query/service/src/physical_plans/physical_asof_join.rs

This file was deleted.

22 changes: 11 additions & 11 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ impl IPhysicalPlan for HashJoin {
probe_keys: self.probe_keys.clone(),
is_null_equal: self.is_null_equal.clone(),
non_equi_conditions: self.non_equi_conditions.clone(),
join_type: self.join_type.clone(),
join_type: self.join_type,
marker_index: self.marker_index,
from_correlated_subquery: self.from_correlated_subquery,
probe_to_build: self.probe_to_build.clone(),
output_schema: self.output_schema.clone(),
need_hold_hash_table: self.need_hold_hash_table,
stat_info: self.stat_info.clone(),
single_to_inner: self.single_to_inner.clone(),
single_to_inner: self.single_to_inner,
build_side_cache_info: self.build_side_cache_info.clone(),
runtime_filter: self.runtime_filter.clone(),
broadcast_id: self.broadcast_id,
Expand Down Expand Up @@ -445,7 +445,7 @@ impl HashJoin {
build_input.clone(),
probe_input.clone(),
joined_output.clone(),
factory.create_hash_join(self.join_type.clone(), 0)?,
factory.create_hash_join(self.join_type, 0)?,
stage_sync_barrier.clone(),
self.projections.clone(),
rf_desc.clone(),
Expand Down Expand Up @@ -505,8 +505,8 @@ impl PhysicalPlanBuilder {
left_required: ColumnSet,
right_required: ColumnSet,
) -> Result<(PhysicalPlan, PhysicalPlan)> {
let probe_side = self.build(s_expr.child(0)?, left_required).await?;
let build_side = self.build(s_expr.child(1)?, right_required).await?;
let probe_side = self.build(s_expr.left_child(), left_required).await?;
let build_side = self.build(s_expr.right_child(), right_required).await?;

Ok((probe_side, build_side))
}
Expand Down Expand Up @@ -537,7 +537,7 @@ impl PhysicalPlanBuilder {
/// * `Result<DataSchemaRef>` - The prepared schema for the build side
pub fn prepare_build_schema(
&self,
join_type: &JoinType,
join_type: JoinType,
build_side: &PhysicalPlan,
) -> Result<DataSchemaRef> {
match join_type {
Expand Down Expand Up @@ -577,7 +577,7 @@ impl PhysicalPlanBuilder {
/// * `Result<DataSchemaRef>` - The prepared schema for the probe side
pub fn prepare_probe_schema(
&self,
join_type: &JoinType,
join_type: JoinType,
probe_side: &PhysicalPlan,
) -> Result<DataSchemaRef> {
match join_type {
Expand Down Expand Up @@ -1210,7 +1210,7 @@ impl PhysicalPlanBuilder {
probe_projections,
build: build_side,
probe: probe_side,
join_type: join.join_type.clone(),
join_type: join.join_type,
build_keys: right_join_conditions,
probe_keys: left_join_conditions,
is_null_equal,
Expand All @@ -1222,7 +1222,7 @@ impl PhysicalPlanBuilder {
output_schema,
need_hold_hash_table: join.need_hold_hash_table,
stat_info: Some(stat_info),
single_to_inner: join.single_to_inner.clone(),
single_to_inner: join.single_to_inner,
build_side_cache_info,
runtime_filter,
broadcast_id,
Expand Down Expand Up @@ -1252,8 +1252,8 @@ impl PhysicalPlanBuilder {
self.unify_keys(&mut probe_side, &mut build_side)?;

// Step 4: Prepare schemas for both sides
let build_schema = self.prepare_build_schema(&join.join_type, &build_side)?;
let probe_schema = self.prepare_probe_schema(&join.join_type, &probe_side)?;
let build_schema = self.prepare_build_schema(join.join_type, &build_side)?;
let probe_schema = self.prepare_probe_schema(join.join_type, &probe_side)?;

// Step 5: Process join conditions
let (
Expand Down
155 changes: 70 additions & 85 deletions src/query/service/src/physical_plans/physical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_sql::binder::JoinPredicate;
use databend_common_sql::binder::is_range_join_condition;
use databend_common_sql::optimizer::ir::RelExpr;
use databend_common_sql::optimizer::ir::RelationalProperty;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::FunctionCall;
use databend_common_sql::plans::Join;
use databend_common_sql::plans::JoinType;
use databend_common_sql::ColumnSet;
Expand All @@ -27,109 +27,60 @@ use crate::physical_plans::explain::PlanStatsInfo;
use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::PhysicalPlanBuilder;

pub enum PhysicalJoinType {
enum PhysicalJoinType {
Hash,
// The first arg is range conditions, the second arg is other conditions
RangeJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
AsofJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
}

// Choose physical join type by join conditions
pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
let check_asof = matches!(
join.join_type,
JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof
);

fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
return Err(ErrorCode::SemanticError(
"ANY JOIN only supports equality-based hash joins",
));
}
if !join.equi_conditions.is_empty() && !check_asof {

if !join.equi_conditions.is_empty() {
// Contain equi condition, use hash join
return Ok(PhysicalJoinType::Hash);
}

if join.build_side_cache_info.is_some() && !check_asof {
if join.build_side_cache_info.is_some() {
// There is a build side cache, use hash join.
return Ok(PhysicalJoinType::Hash);
}

let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?);
let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?);
let right_stat_info = right_rel_expr.derive_cardinality()?;
if !check_asof
&& (matches!(right_stat_info.statistics.precise_cardinality, Some(1))
|| right_stat_info.cardinality == 1.0)
if matches!(right_stat_info.statistics.precise_cardinality, Some(1))
|| right_stat_info.cardinality == 1.0
{
// If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of RANGE JOIN.
return Ok(PhysicalJoinType::Hash);
}

let left_prop = left_rel_expr.derive_relational_prop()?;
let right_prop = right_rel_expr.derive_relational_prop()?;
let mut range_conditions = vec![];
let mut other_conditions = vec![];
for condition in join.non_equi_conditions.iter() {
check_condition(
condition,
&left_prop,
&right_prop,
&mut range_conditions,
&mut other_conditions,
)
}
let (range_conditions, other_conditions) = join
.non_equi_conditions
.iter()
.cloned()
.partition::<Vec<_>, _>(|condition| {
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
});

if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) {
return Ok(PhysicalJoinType::RangeJoin(
range_conditions,
other_conditions,
));
}
if check_asof {
return Ok(PhysicalJoinType::AsofJoin(
range_conditions,
other_conditions,
));
}
// Leverage hash join to execute nested loop join
Ok(PhysicalJoinType::Hash)
}

fn check_condition(
expr: &ScalarExpr,
left_prop: &RelationalProperty,
right_prop: &RelationalProperty,
range_conditions: &mut Vec<ScalarExpr>,
other_conditions: &mut Vec<ScalarExpr>,
) {
if let ScalarExpr::FunctionCall(func) = expr {
if func.arguments.len() != 2
|| !matches!(func.func_name.as_str(), "gt" | "lt" | "gte" | "lte")
{
other_conditions.push(expr.clone());
return;
}
let mut left = false;
let mut right = false;
for arg in func.arguments.iter() {
let join_predicate = JoinPredicate::new(arg, left_prop, right_prop);
match join_predicate {
JoinPredicate::Left(_) => left = true,
JoinPredicate::Right(_) => right = true,
JoinPredicate::Both { .. } | JoinPredicate::Other(_) | JoinPredicate::ALL(_) => {
return;
}
}
}
if left && right {
range_conditions.push(expr.clone());
return;
}
}
other_conditions.push(expr.clone());
}

impl PhysicalPlanBuilder {
pub async fn build_join(
&mut self,
Expand Down Expand Up @@ -175,27 +126,61 @@ impl PhysicalPlanBuilder {

// 2. Build physical plan.
// Choose physical join type by join conditions
let physical_join = physical_join(join, s_expr)?;
match physical_join {
PhysicalJoinType::Hash => {
self.build_hash_join(
join,
s_expr,
required,
others_required,
left_required,
right_required,
stat_info,
)
.await
}
PhysicalJoinType::AsofJoin(range, other) => {
self.build_asof_join(join, s_expr, (left_required, right_required), range, other)
if join.join_type.is_asof_join() {
let left_prop = s_expr.left_child().derive_relational_prop()?;
let right_prop = s_expr.right_child().derive_relational_prop()?;

let (range_conditions, other_conditions) = join
.non_equi_conditions
.iter()
.cloned()
.chain(join.equi_conditions.iter().cloned().map(|condition| {
FunctionCall {
span: condition.left.span(),
func_name: "eq".to_string(),
params: vec![],
arguments: vec![condition.left, condition.right],
}
.into()
}))
.partition(|condition| {
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
});

self.build_range_join(
join.join_type,
s_expr,
left_required,
right_required,
range_conditions,
other_conditions,
)
.await
} else {
match physical_join(join, s_expr)? {
PhysicalJoinType::Hash => {
self.build_hash_join(
join,
s_expr,
required,
others_required,
left_required,
right_required,
stat_info,
)
.await
}
PhysicalJoinType::RangeJoin(range, other) => {
self.build_range_join(join, s_expr, left_required, right_required, range, other)
}
PhysicalJoinType::RangeJoin(range, other) => {
self.build_range_join(
join.join_type,
s_expr,
left_required,
right_required,
range,
other,
)
.await
}
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions src/query/service/src/physical_plans/physical_range_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use databend_common_sql::binder::JoinPredicate;
use databend_common_sql::optimizer::ir::RelExpr;
use databend_common_sql::optimizer::ir::RelationalProperty;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::Join;
use databend_common_sql::plans::JoinType;
use databend_common_sql::ColumnSet;
use databend_common_sql::ScalarExpr;
Expand Down Expand Up @@ -134,7 +133,7 @@ impl IPhysicalPlan for RangeJoin {
right: right_child,
conditions: self.conditions.clone(),
other_conditions: self.other_conditions.clone(),
join_type: self.join_type.clone(),
join_type: self.join_type,
range_join_type: self.range_join_type.clone(),
output_schema: self.output_schema.clone(),
stat_info: self.stat_info.clone(),
Expand Down Expand Up @@ -203,15 +202,15 @@ pub struct RangeJoinCondition {
impl PhysicalPlanBuilder {
pub async fn build_range_join(
&mut self,
join: &Join,
join_type: JoinType,
s_expr: &SExpr,
left_required: ColumnSet,
right_required: ColumnSet,
mut range_conditions: Vec<ScalarExpr>,
mut other_conditions: Vec<ScalarExpr>,
) -> Result<PhysicalPlan> {
let left_prop = RelExpr::with_s_expr(s_expr.child(1)?).derive_relational_prop()?;
let right_prop = RelExpr::with_s_expr(s_expr.child(0)?).derive_relational_prop()?;
let left_prop = RelExpr::with_s_expr(s_expr.right_child()).derive_relational_prop()?;
let right_prop = RelExpr::with_s_expr(s_expr.left_child()).derive_relational_prop()?;

debug_assert!(!range_conditions.is_empty());

Expand All @@ -230,8 +229,8 @@ impl PhysicalPlanBuilder {
.build_join_sides(s_expr, left_required, right_required)
.await?;

let left_schema = self.prepare_probe_schema(&join.join_type, &left_side)?;
let right_schema = self.prepare_build_schema(&join.join_type, &right_side)?;
let left_schema = self.prepare_probe_schema(join_type, &left_side)?;
let right_schema = self.prepare_build_schema(join_type, &right_side)?;

let mut output_schema = Vec::clone(left_schema.fields());
output_schema.extend_from_slice(right_schema.fields());
Expand Down Expand Up @@ -265,7 +264,7 @@ impl PhysicalPlanBuilder {
.iter()
.map(|scalar| resolve_scalar(scalar, &merged_schema))
.collect::<Result<_>>()?,
join_type: join.join_type.clone(),
join_type,
range_join_type,
output_schema: Arc::new(DataSchema::new(output_schema)),
stat_info: Some(self.build_plan_stat_info(s_expr)?),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl HashJoinDesc {
.collect();

Ok(HashJoinDesc {
join_type: join.join_type.clone(),
join_type: join.join_type,
build_keys,
probe_keys,
is_null_equal: join.is_null_equal.clone(),
Expand All @@ -128,7 +128,7 @@ impl HashJoinDesc {
// marker_index: join.marker_index,
},
from_correlated_subquery: join.from_correlated_subquery,
single_to_inner: join.single_to_inner.clone(),
single_to_inner: join.single_to_inner,
runtime_filter: (&join.runtime_filter).into(),
probe_to_build: join.probe_to_build.clone(),
build_projection: join.build_projections.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ impl HashJoinBuildState {
}

pub(crate) fn join_type(&self) -> JoinType {
self.hash_join_state.hash_join_desc.join_type.clone()
self.hash_join_state.hash_join_desc.join_type
}

pub fn runtime_filter_desc(&self) -> &[RuntimeFilterDesc] {
Expand Down
Loading