Skip to content

Commit 3dfd44b

Browse files
committed
peng - isolate the fix logic to its own visitor class
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent 35c56c5 commit 3dfd44b

3 files changed

Lines changed: 91 additions & 196 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;
99

1010
import java.sql.Connection;
11-
import java.util.ArrayList;
1211
import java.util.HashMap;
1312
import java.util.List;
1413
import java.util.Map;
@@ -62,12 +61,6 @@ public class CalcitePlanContext {
6261

6362
@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;
6463

65-
/** Accumulated filter conditions to prevent deep Filter node chains */
66-
private final List<RexNode> pendingFilterConditions = new ArrayList<>();
67-
68-
/** Flag to indicate if filter accumulation mode is active */
69-
@Getter @Setter private boolean filterAccumulationEnabled = false;
70-
7164
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
7265
this.config = config;
7366
this.sysLimit = sysLimit;
@@ -141,52 +134,4 @@ public static boolean isLegacyPreferred() {
141134
public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
142135
this.rexLambdaRefMap.putAll(candidateMap);
143136
}
144-
145-
/**
146-
* Adds a filter condition to the accumulation list instead of creating immediate Filter RelNode.
147-
* This prevents deep Filter node chains that cause memory explosion.
148-
*/
149-
public void addFilterCondition(RexNode condition) {
150-
pendingFilterConditions.add(condition);
151-
}
152-
153-
/**
154-
* Applies all accumulated filter conditions as a single Filter RelNode with AND operations. This
155-
* creates a single Filter node instead of a deep chain of Filter nodes.
156-
*/
157-
public void flushFilterConditions() {
158-
if (pendingFilterConditions.isEmpty()) {
159-
return;
160-
}
161-
162-
if (pendingFilterConditions.size() == 1) {
163-
relBuilder.filter(pendingFilterConditions.get(0));
164-
} else {
165-
// Combine all filter conditions with AND
166-
RexNode combinedCondition = relBuilder.and(pendingFilterConditions);
167-
relBuilder.filter(combinedCondition);
168-
}
169-
pendingFilterConditions.clear();
170-
}
171-
172-
/**
173-
* Enables filter accumulation mode to prevent deep Filter node chains. Should be called before
174-
* processing multiple filter operations.
175-
*/
176-
public void enableFilterAccumulation() {
177-
filterAccumulationEnabled = true;
178-
}
179-
180-
/**
181-
* Disables filter accumulation mode. Should be called after processing multiple filter
182-
* operations.
183-
*/
184-
public void disableFilterAccumulation() {
185-
filterAccumulationEnabled = false;
186-
}
187-
188-
/** Returns true if there are pending filter conditions that need to be flushed. */
189-
public boolean hasPendingFilterConditions() {
190-
return !pendingFilterConditions.isEmpty();
191-
}
192137
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 8 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -176,31 +176,13 @@ public CalciteRelNodeVisitor(DataSourceService dataSourceService) {
176176
}
177177

178178
public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
179-
// Enable filter accumulation if this plan contains multiple filtering operations
180-
// that could create deep Filter RelNode chains
181-
if (countFilteringOperations(unresolved) >= 2) {
182-
context.enableFilterAccumulation();
183-
try {
184-
unresolved.accept(this, context);
185-
context.flushFilterConditions();
186-
return context.relBuilder.peek();
187-
} finally {
188-
context.disableFilterAccumulation();
189-
}
190-
} else {
191-
return unresolved.accept(this, context);
192-
}
193-
}
179+
// Build the RelNode tree (may contain deep Filter chains)
180+
RelNode relNode = unresolved.accept(this, context);
194181

195-
/**
196-
* Flushes accumulated filter conditions before schema-changing operations. This prevents
197-
* RexInputRef index mismatches that occur when filters reference field indices from the old
198-
* schema.
199-
*/
200-
private void flushFiltersBeforeSchemaChange(CalcitePlanContext context) {
201-
if (context.isFilterAccumulationEnabled() && context.hasPendingFilterConditions()) {
202-
context.flushFilterConditions();
203-
}
182+
// Apply filter merge optimization as post-processing
183+
// This merges consecutive LogicalFilter nodes to prevent OOM with deep chains
184+
FilterMergeVisitor filterMergeVisitor = new FilterMergeVisitor();
185+
return relNode.accept(filterMergeVisitor);
204186
}
205187

206188
@Override
@@ -268,12 +250,7 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
268250
context.relBuilder.filter(ImmutableList.of(v.get().id), condition);
269251
context.popCorrelVar();
270252
} else {
271-
// Use filter accumulation to prevent deep Filter node chains
272-
if (context.isFilterAccumulationEnabled()) {
273-
context.addFilterCondition(condition);
274-
} else {
275-
context.relBuilder.filter(condition);
276-
}
253+
context.relBuilder.filter(condition);
277254
}
278255
return context.relBuilder.peek();
279256
}
@@ -322,20 +299,13 @@ public RelNode visitRegex(Regex node, CalcitePlanContext context) {
322299
regexCondition = context.rexBuilder.makeCall(SqlStdOperatorTable.NOT, regexCondition);
323300
}
324301

325-
// Use filter accumulation to prevent deep Filter node chains
326-
if (context.isFilterAccumulationEnabled()) {
327-
context.addFilterCondition(regexCondition);
328-
} else {
329-
context.relBuilder.filter(regexCondition);
330-
}
302+
context.relBuilder.filter(regexCondition);
331303
return context.relBuilder.peek();
332304
}
333305

334306
public RelNode visitRex(Rex node, CalcitePlanContext context) {
335307
visitChildren(node, context);
336308

337-
flushFiltersBeforeSchemaChange(context);
338-
339309
RexNode fieldRex = rexVisitor.analyze(node.getField(), context);
340310
String patternStr = (String) node.getPattern().getValue();
341311

@@ -420,8 +390,6 @@ private boolean containsSubqueryExpression(Node expr) {
420390
public RelNode visitProject(Project node, CalcitePlanContext context) {
421391
visitChildren(node, context);
422392

423-
flushFiltersBeforeSchemaChange(context);
424-
425393
if (isSingleAllFieldsProject(node)) {
426394
return handleAllFieldsProject(node, context);
427395
}
@@ -736,8 +704,6 @@ public RelNode visitReverse(
736704
public RelNode visitBin(Bin node, CalcitePlanContext context) {
737705
visitChildren(node, context);
738706

739-
flushFiltersBeforeSchemaChange(context);
740-
741707
RexNode fieldExpr = rexVisitor.analyze(node.getField(), context);
742708
String fieldName = BinUtils.extractFieldName(node);
743709

@@ -752,7 +718,6 @@ public RelNode visitBin(Bin node, CalcitePlanContext context) {
752718
@Override
753719
public RelNode visitParse(Parse node, CalcitePlanContext context) {
754720
visitChildren(node, context);
755-
flushFiltersBeforeSchemaChange(context);
756721
buildParseRelNode(node, context);
757722
return context.relBuilder.peek();
758723
}
@@ -900,8 +865,6 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) {
900865
public RelNode visitEval(Eval node, CalcitePlanContext context) {
901866
visitChildren(node, context);
902867

903-
flushFiltersBeforeSchemaChange(context);
904-
905868
node.getExpressionList()
906869
.forEach(
907870
expr -> {
@@ -1171,9 +1134,6 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
11711134
/** Visits an aggregation for stats command */
11721135
@Override
11731136
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
1174-
// Flush accumulated filter conditions before schema-changing aggregation operations
1175-
flushFiltersBeforeSchemaChange(context);
1176-
11771137
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
11781138
Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue();
11791139
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
@@ -2292,26 +2252,11 @@ private RelNode mergeTableAndResolveColumnConflict(
22922252
@Override
22932253
public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) {
22942254
List<RelNode> subsearchNodes = new ArrayList<>();
2295-
// Save the current filter accumulation state - we'll process each subsearch independently
2296-
boolean wasFilterAccumulationEnabled = context.isFilterAccumulationEnabled();
22972255

22982256
for (UnresolvedPlan subsearch : node.getSubsearches()) {
22992257
UnresolvedPlan prunedSubSearch = subsearch.accept(new EmptySourcePropagateVisitor(), null);
2300-
2301-
// Temporarily disable filter accumulation so each subsearch gets its own independent
2302-
// lifecycle via analyze(). This prevents filter state from bleeding across branches.
2303-
if (wasFilterAccumulationEnabled) {
2304-
context.disableFilterAccumulation();
2305-
}
2306-
2307-
// Use analyze() to let each subsearch determine its own filter accumulation needs
23082258
analyze(prunedSubSearch, context);
23092259
subsearchNodes.add(context.relBuilder.build());
2310-
2311-
// Restore filter accumulation state for the next iteration
2312-
if (wasFilterAccumulationEnabled) {
2313-
context.enableFilterAccumulation();
2314-
}
23152260
}
23162261

23172262
// Use shared schema merging logic that handles type conflicts via field renaming
@@ -3302,82 +3247,4 @@ private RexNode createOptimizedTransliteration(
33023247
throw new RuntimeException("Failed to optimize sed expression: " + sedExpression, e);
33033248
}
33043249
}
3305-
3306-
/**
3307-
* Counts the number of filtering operations in an UnresolvedPlan tree that would create Filter
3308-
* RelNodes. This is used to detect queries with multiple regex/filter operations that could cause
3309-
* deep Filter RelNode chains and memory exhaustion.
3310-
*
3311-
* <p>Stops counting at schema-changing operations (like Aggregation, Project with computed
3312-
* expressions) to avoid enabling filter accumulation across schema boundaries, which would cause
3313-
* RexInputRef index mismatches.
3314-
*
3315-
* @param plan the UnresolvedPlan to analyze
3316-
* @return the count of filtering operations found before the first schema-changing operation
3317-
*/
3318-
private int countFilteringOperations(UnresolvedPlan plan) {
3319-
if (plan == null) {
3320-
return 0;
3321-
}
3322-
3323-
int count = 0;
3324-
3325-
// Count this node if it's a filtering operation
3326-
// BUT: Don't count Filter nodes that contain function calls, as they can cause
3327-
// type mismatches when accumulated and flushed later
3328-
if (plan instanceof Regex) {
3329-
count = 1;
3330-
} else if (plan instanceof Filter) {
3331-
Filter filterNode = (Filter) plan;
3332-
if (!containsFunctionCall(filterNode.getCondition())) {
3333-
count = 1;
3334-
}
3335-
}
3336-
3337-
// Stop counting at schema-changing operations to prevent accumulation across schema boundaries
3338-
// Schema-changing operations include: Aggregation, Eval, Project (with computed expressions),
3339-
// Window, StreamWindow, etc.
3340-
if (plan instanceof Aggregation
3341-
|| plan instanceof Eval
3342-
|| plan instanceof Window
3343-
|| plan instanceof StreamWindow) {
3344-
return count; // Don't recurse into children beyond schema changes
3345-
}
3346-
3347-
// Recursively count filtering operations in children
3348-
if (plan.getChild() != null) {
3349-
for (Node child : plan.getChild()) {
3350-
if (child instanceof UnresolvedPlan) {
3351-
count += countFilteringOperations((UnresolvedPlan) child);
3352-
}
3353-
}
3354-
}
3355-
3356-
return count;
3357-
}
3358-
3359-
/**
3360-
* Checks if an expression contains any function calls. Filter expressions with function calls can
3361-
* cause type mismatches when accumulated and flushed later, so we exclude them from filter
3362-
* accumulation.
3363-
*/
3364-
private boolean containsFunctionCall(UnresolvedExpression expr) {
3365-
if (expr == null) {
3366-
return false;
3367-
}
3368-
3369-
if (expr instanceof org.opensearch.sql.ast.expression.Function) {
3370-
return true;
3371-
}
3372-
3373-
// Check children recursively
3374-
for (Node child : expr.getChild()) {
3375-
if (child instanceof UnresolvedExpression
3376-
&& containsFunctionCall((UnresolvedExpression) child)) {
3377-
return true;
3378-
}
3379-
}
3380-
3381-
return false;
3382-
}
33833250
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import org.apache.calcite.plan.RelOptCluster;
11+
import org.apache.calcite.rel.RelNode;
12+
import org.apache.calcite.rel.RelShuttleImpl;
13+
import org.apache.calcite.rel.logical.LogicalFilter;
14+
import org.apache.calcite.rex.RexBuilder;
15+
import org.apache.calcite.rex.RexNode;
16+
17+
/**
18+
* A RelNode visitor that merges consecutive LogicalFilter nodes into a single filter with combined
19+
* AND conditions. This prevents deep Filter RelNode chains that cause memory exhaustion (OOM) with
20+
* multiple filter operations.
21+
*
22+
* <p>Example transformation:
23+
*
24+
* <pre>
25+
* BEFORE:
26+
* LogicalFilter(age > 30)
27+
* LogicalFilter(age < 40)
28+
* LogicalFilter(balance > 10000)
29+
* TableScan
30+
*
31+
* AFTER:
32+
* LogicalFilter(AND(age > 30, age < 40, balance > 10000))
33+
* TableScan
34+
* </pre>
35+
*
36+
* This is a post-processing optimization that runs after the RelNode tree is constructed by
37+
* CalciteRelNodeVisitor.
38+
*/
39+
public class FilterMergeVisitor extends RelShuttleImpl {
40+
41+
/**
42+
* Visits a LogicalFilter node and merges it with consecutive child LogicalFilter nodes.
43+
*
44+
* @param filter the LogicalFilter node to visit
45+
* @return the merged filter or the original filter if no merging is needed
46+
*/
47+
@Override
48+
public RelNode visit(LogicalFilter filter) {
49+
RelNode newInput = filter.getInput().accept(this);
50+
51+
List<RexNode> conditions = new ArrayList<>();
52+
conditions.add(filter.getCondition());
53+
54+
RelNode current = newInput;
55+
while (current instanceof LogicalFilter) {
56+
LogicalFilter childFilter = (LogicalFilter) current;
57+
conditions.add(childFilter.getCondition());
58+
current = childFilter.getInput();
59+
}
60+
61+
// If we collected multiple conditions, merge them
62+
if (conditions.size() > 1) {
63+
RelOptCluster cluster = filter.getCluster();
64+
RexBuilder rexBuilder = cluster.getRexBuilder();
65+
66+
// Combine all conditions with AND
67+
RexNode combinedCondition =
68+
rexBuilder.makeCall(org.apache.calcite.sql.fun.SqlStdOperatorTable.AND, conditions);
69+
70+
// Simplify the combined condition (e.g., remove redundant TRUE, optimize)
71+
combinedCondition = org.apache.calcite.rex.RexUtil.simplify(rexBuilder, combinedCondition);
72+
73+
// Create a new filter with the combined condition
74+
return LogicalFilter.create(current, combinedCondition);
75+
}
76+
77+
if (newInput != filter.getInput()) {
78+
return filter.copy(filter.getTraitSet(), newInput, filter.getCondition());
79+
}
80+
81+
return filter;
82+
}
83+
}

0 commit comments

Comments
 (0)