diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index 54b9d4ffba..966d587a97 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -36,7 +36,6 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.time.Instant; -import java.util.List; import java.util.Properties; import java.util.function.Consumer; import org.apache.calcite.adapter.enumerable.EnumerableConvention; @@ -59,7 +58,6 @@ import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelOptTable.ViewExpander; @@ -551,12 +549,26 @@ public RelNode visit(TableScan scan) { } } - /** Try to optimize the plan by using HepPlanner */ - private static final List hepRuleList = - List.of(FilterMergeRule.Config.DEFAULT.toRule(), PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE); - + /** + * Try to optimize the plan by using HepPlanner. + * + *

Rule order matters: {@link PPLSimplifyDedupRule#DEDUP_SIMPLIFY_RULE} must run to fixpoint + * before {@link FilterMergeRule}. The simplify rule's bottom operand only matches a pure {@code + * IS NOT NULL} (or AND-of-{@code IS NOT NULL}) bucket-non-null filter; if {@code FilterMergeRule} + * runs first when a user {@code WHERE} sits below the synthetic {@code IS NOT NULL} filter that + * PPL emits as part of {@code dedup}, the two adjacent filters are merged into a single filter + * whose condition includes the user predicate, the simplify rule's predicate fails, no {@link + * org.opensearch.sql.calcite.plan.rel.LogicalDedup} is produced, and dedup pushdown to the + * OpenSearch storage engine is silently disabled. Using separate {@code addRuleInstance} calls + * (rather than {@code addRuleCollection}) enforces deterministic ordering: dedup simplification + * fires first against the original adjacent-filter shape, then any remaining adjacent filters are + * merged. + */ private static final HepProgram HEP_PROGRAM = - new HepProgramBuilder().addRuleCollection(hepRuleList).build(); + new HepProgramBuilder() + .addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE) + .addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()) + .build(); public static RelNode optimize(RelNode plan, CalcitePlanContext context) { Util.discard(context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index ab07cd9b5c..3d746975d7 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -45,6 +45,7 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.calcite.SysLimit; +import org.opensearch.sql.calcite.utils.CalciteToolsHelper; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.exception.ExpressionEvaluationException; @@ -110,6 +111,20 @@ public RelNode getRelNode(String ppl) { return root; } + /** + * Get the root RelNode of the given PPL query after running the production HEP program from + * {@code CalciteToolsHelper}. Use this in regression tests that exercise rules registered in the + * production HEP program (e.g. {@code PPLSimplifyDedupRule}) — those rules need to see the raw + * planner output, not the post-FilterMerge form returned by {@link #getRelNode(String)}. + */ + public RelNode getRelNodeAfterCalciteHep(String ppl) { + CalcitePlanContext context = createBuilderContext(); + Query query = (Query) plan(pplParser, ppl); + planTransformer.analyze(query.getPlan(), context); + RelNode root = context.relBuilder.build(); + return CalciteToolsHelper.optimize(root, context); + } + private RelNode mergeAdjacentFilters(RelNode relNode) { HepProgram program = new HepProgramBuilder().addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()).build(); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java index ca1a789b0f..503e508036 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java @@ -5,9 +5,17 @@ package org.opensearch.sql.ppl.calcite; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgram; +import org.apache.calcite.plan.hep.HepProgramBuilder; import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; import org.junit.Test; +import org.opensearch.sql.calcite.plan.rel.LogicalDedup; +import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule; public class CalcitePPLDedupTest extends CalcitePPLAbstractTest { @@ -353,4 +361,72 @@ public void testSortFieldProjectedAwayBeforeDedup() { + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); } + + /** + * Regression test for issue #7: when a user {@code where} sits below {@code dedup}, the HEP + * program in {@code CalciteToolsHelper} must still produce a {@link LogicalDedup}. Before the + * fix, both rules were registered via {@code addRuleCollection}, so {@code FilterMergeRule} could + * fire ahead of {@code PPLSimplifyDedupRule} and merge the user predicate into the + * bucket-non-null filter; the simplify rule's bottom operand then rejected the merged condition + * (it only accepts pure {@code IS NOT NULL}/AND-of-{@code IS NOT NULL}), no {@code LogicalDedup} + * was produced, and dedup pushdown to the OpenSearch storage engine was silently disabled. The + * fix is to register the two rules with separate {@code addRuleInstance} calls in the order + * simplify-dedup first (to fixpoint), then filter-merge. + */ + @Test + public void testWhereThenDedupProducesLogicalDedup() { + // Use a where predicate on a DIFFERENT column from the dedup column. With the same column, + // Calcite's RexSimplify can fold AND(IS_NOT_NULL(x), >(x, c)) down to >(x, c), masking the + // bug. The issue's reproducer (where on @timestamp, dedup on namespace) hits this exact + // shape. + String ppl = "source=EMP | where SAL > 1000 | dedup 1 DEPTNO | fields DEPTNO"; + RelNode optimized = getRelNodeAfterCalciteHep(ppl); + String optimizedPlan = optimized.explain(); + assertTrue( + "where + dedup must produce a LogicalDedup so OpenSearch DedupPushdownRule can match;" + + " actual plan was:\n" + + optimizedPlan, + optimizedPlan.contains("LogicalDedup")); + // The window-form leftover would indicate the simplify rule did not fire — assert it is gone. + assertFalse( + "ROW_NUMBER window must be consumed by PPLSimplifyDedupRule when where + dedup are" + + " combined; actual plan was:\n" + + optimizedPlan, + optimizedPlan.contains("ROW_NUMBER")); + } + + /** + * Adversarial regression test: simulates the pathological order described in issue #7 by forcing + * FilterMergeRule to run to fixpoint BEFORE PPLSimplifyDedupRule. This documents the failure mode + * the fix in {@code CalciteToolsHelper} prevents — once the bucket-non-null filter has been + * merged with the user {@code WHERE}, {@code mayBeFilterFromBucketNonNull} can never accept the + * combined condition, so {@code PPLSimplifyDedupRule} is permanently unable to produce a {@code + * LogicalDedup}. The production fix enforces order at the program level (sequential {@code + * addRuleInstance} calls), making this hazard unreachable. + */ + @Test + public void testFilterMergeBeforeSimplifyDedupBreaksPattern() { + String ppl = "source=EMP | where SAL > 1000 | dedup 1 DEPTNO | fields DEPTNO"; + // getRelNode already runs FilterMergeRule on the raw plan, simulating the pathological + // schedule where FilterMergeRule fires before PPLSimplifyDedupRule. + RelNode mergedFirst = getRelNode(ppl); + HepProgram simplifyOnly = + new HepProgramBuilder().addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE).build(); + HepPlanner planner = new HepPlanner(simplifyOnly); + planner.setRoot(mergedFirst); + RelNode result = planner.findBestExp(); + String plan = result.explain(); + assertFalse( + "If FilterMergeRule runs before PPLSimplifyDedupRule, the simplify rule must NOT recover" + + " — the merged AND(IS_NOT_NULL, user_predicate) filter fails the bucket-non-null" + + " predicate. This documents why the production HEP program enforces ordering via" + + " separate addRuleInstance calls (PPLSimplifyDedupRule first, then FilterMergeRule)." + + " Actual plan was:\n" + + plan, + plan.contains("LogicalDedup")); + assertTrue( + "Plan should still contain ROW_NUMBER window form when simplify fails. Actual plan was:\n" + + plan, + plan.contains("ROW_NUMBER")); + } }