Skip to content

Commit

Permalink
chore(optimizer): add schema check for col_prune && predicate push do…
Browse files Browse the repository at this point in the history
…wn && fix LogicalTableFunction::prune_col (risingwavelabs#11200)
  • Loading branch information
st1page authored Jul 28, 2023
1 parent fb9a324 commit b9e7fe2
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 10031,
"plan_node_id": 10035,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/heuristic_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> HeuristicOptimizer<'a> {
for rule in self.rules {
if let Some(applied) = rule.apply(plan.clone()) {
#[cfg(debug_assertions)]
Self::check_equivalent_plan(rule, &plan, &applied);
Self::check_equivalent_plan(rule.description(), &plan, &applied);

plan = applied;
self.stats.count_rule(rule);
Expand Down Expand Up @@ -88,10 +88,10 @@ impl<'a> HeuristicOptimizer<'a> {
}

#[cfg(debug_assertions)]
fn check_equivalent_plan(rule: &BoxedRule, input_plan: &PlanRef, output_plan: &PlanRef) {
pub fn check_equivalent_plan(rule_desc: &str, input_plan: &PlanRef, output_plan: &PlanRef) {
if !input_plan.schema().type_eq(output_plan.schema()) {
panic!("{} fails to generate equivalent plan.\nInput schema: {:?}\nInput plan: \n{}\nOutput schema: {:?}\nOutput plan: \n{}\nSQL: {}",
rule.description(),
rule_desc,
input_plan.schema(),
input_plan.explain_to_string(),
output_plan.schema(),
Expand Down
4 changes: 1 addition & 3 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ mod tests {
Field::with_name(ty.clone(), "v2"),
Field::with_name(ty.clone(), "v3"),
];
let values = LogicalValues::new(
let values: LogicalValues = LogicalValues::new(
vec![],
Schema {
fields: fields.clone(),
Expand All @@ -1501,12 +1501,10 @@ mod tests {
let project = plan.as_logical_project().unwrap();
assert_eq!(project.exprs().len(), 1);
assert_eq_input_ref!(&project.exprs()[0], 1);
assert_eq!(project.id().0, 5);

let agg_new = project.input();
let agg_new = agg_new.as_logical_agg().unwrap();
assert_eq!(agg_new.group_key(), &vec![0].into());
assert_eq!(agg_new.id().0, 4);

assert_eq!(agg_new.agg_calls().len(), 1);
let agg_call_new = agg_new.agg_calls()[0].clone();
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ mod tests {
assert_eq!(filter.schema().fields().len(), 2);
assert_eq!(filter.schema().fields()[0], fields[1]);
assert_eq!(filter.schema().fields()[1], fields[2]);
assert_eq!(filter.id().0, 4);

let expr: ExprImpl = filter.predicate().clone().into();
let call = expr.as_function_call().unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use risingwave_common::types::DataType;

use super::utils::{childless_record, Distill};
use super::{
ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown, ToBatch,
ToStream,
ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, PlanBase, PlanRef,
PredicatePushdown, ToBatch, ToStream,
};
use crate::expr::{Expr, ExprRewriter, TableFunction};
use crate::optimizer::optimizer_context::OptimizerContextRef;
Expand Down Expand Up @@ -69,11 +69,10 @@ impl Distill for LogicalTableFunction {
}
}

// the leaf node don't need colprunable
impl ColPrunable for LogicalTableFunction {
fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
let _ = required_cols;
self.clone().into()
// No pruning.
LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().copied()).into()
}
}

Expand Down
44 changes: 39 additions & 5 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ impl RewriteExprsRecursive for PlanRef {
}
}

impl ColPrunable for PlanRef {
fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
impl PlanRef {
fn prune_col_inner(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
if let Some(logical_share) = self.as_logical_share() {
// Check the share cache first. If cache exists, it means this is the second round of
// column pruning.
Expand Down Expand Up @@ -306,10 +306,8 @@ impl ColPrunable for PlanRef {
dyn_t.prune_col(required_cols, ctx)
}
}
}

impl PredicatePushdown for PlanRef {
fn predicate_pushdown(
fn predicate_pushdown_inner(
&self,
predicate: Condition,
ctx: &mut PredicatePushdownContext,
Expand Down Expand Up @@ -346,6 +344,42 @@ impl PredicatePushdown for PlanRef {
}
}

impl ColPrunable for PlanRef {
#[allow(clippy::let_and_return)]
fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let res = self.prune_col_inner(required_cols, ctx);
#[cfg(debug_assertions)]
super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
"column pruning",
&LogicalProject::with_out_col_idx(self.clone(), required_cols.iter().cloned()).into(),
&res,
);
res
}
}

impl PredicatePushdown for PlanRef {
#[allow(clippy::let_and_return)]
fn predicate_pushdown(
&self,
predicate: Condition,
ctx: &mut PredicatePushdownContext,
) -> PlanRef {
#[cfg(debug_assertions)]
let predicate_clone = predicate.clone();

let res = self.predicate_pushdown_inner(predicate, ctx);

#[cfg(debug_assertions)]
super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
"predicate push down",
&LogicalFilter::new(self.clone(), predicate_clone).into(),
&res,
);
res
}
}

impl PlanTreeNode for PlanRef {
fn inputs(&self) -> SmallVec<[PlanRef; 2]> {
// Dispatch to dyn PlanNode instead of PlanRef.
Expand Down

0 comments on commit b9e7fe2

Please sign in to comment.