-
Notifications
You must be signed in to change notification settings - Fork 181
Feature/mvcombine #5025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feature/mvcombine #5025
Changes from all commits
5ce6849
33c9220
3ee160b
5bc0d98
b49b0e7
a5913af
5ac0f3d
dc4da50
e4150e4
dba85da
6750593
b2a9d9f
6510d2b
a93660b
776fcad
9a83103
c72c45b
26ba22f
469b048
1259270
11fb3e0
22f542c
2771d0b
6dfbf00
612eaa7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import java.util.List; | ||
| import javax.annotation.Nullable; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.expression.Field; | ||
|
|
||
| @Getter | ||
| @ToString(callSuper = true) | ||
| @EqualsAndHashCode(callSuper = false) | ||
| public class MvCombine extends UnresolvedPlan { | ||
|
|
||
| private final Field field; | ||
| private final String delim; | ||
| @Nullable private UnresolvedPlan child; | ||
|
|
||
| public MvCombine(Field field, @Nullable String delim) { | ||
| this.field = field; | ||
| this.delim = (delim == null) ? " " : delim; | ||
| } | ||
|
|
||
| public MvCombine attach(UnresolvedPlan child) { | ||
| this.child = child; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public List<UnresolvedPlan> getChild() { | ||
| return child == null ? ImmutableList.of() : ImmutableList.of(child); | ||
| } | ||
|
|
||
| @Override | ||
| public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
| return nodeVisitor.visitMvCombine(this, context); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,7 @@ | |
| import org.apache.calcite.rel.type.RelDataType; | ||
| import org.apache.calcite.rel.type.RelDataTypeFamily; | ||
| import org.apache.calcite.rel.type.RelDataTypeField; | ||
| import org.apache.calcite.rex.RexBuilder; | ||
| import org.apache.calcite.rex.RexCall; | ||
| import org.apache.calcite.rex.RexCorrelVariable; | ||
| import org.apache.calcite.rex.RexInputRef; | ||
|
|
@@ -121,6 +122,7 @@ | |
| import org.opensearch.sql.ast.tree.Lookup.OutputStrategy; | ||
| import org.opensearch.sql.ast.tree.ML; | ||
| import org.opensearch.sql.ast.tree.Multisearch; | ||
| import org.opensearch.sql.ast.tree.MvCombine; | ||
| import org.opensearch.sql.ast.tree.Paginate; | ||
| import org.opensearch.sql.ast.tree.Parse; | ||
| import org.opensearch.sql.ast.tree.Patterns; | ||
|
|
@@ -3086,6 +3088,191 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) { | |
| return context.relBuilder.peek(); | ||
| } | ||
|
|
||
| /** | ||
| * mvcombine command visitor to collapse rows that are identical on all fields except the target | ||
| * field, and combine the target field values into a multivalue (array) field. | ||
| * | ||
| * <p>Implementation notes:Groups by all input fields except the target field. Aggregates target | ||
| * values using {@code COLLECT} (MULTISET). Casts the aggregation result from MULTISET to ARRAY | ||
| * for a stable multivalue output type. Preserves the original output column order. | ||
| * | ||
| * @param node mvcombine command to be visited | ||
| * @param context CalcitePlanContext containing the RelBuilder, RexBuilder, and resolution context | ||
| * @return RelNode representing collapsed records with the target combined into a multivalue array | ||
| * @throws SemanticCheckException if the mvcombine target is not a direct field reference | ||
| */ | ||
| @Override | ||
| public RelNode visitMvCombine(MvCombine node, CalcitePlanContext context) { | ||
| // 1) Lower the child plan first so the RelBuilder has the input schema on the stack. | ||
| visitChildren(node, context); | ||
|
|
||
| final RelBuilder relBuilder = context.relBuilder; | ||
| final RexBuilder rexBuilder = context.rexBuilder; | ||
|
|
||
| final RelNode input = relBuilder.peek(); | ||
| final List<String> inputFieldNames = input.getRowType().getFieldNames(); | ||
|
|
||
| // 2) Resolve the mvcombine target to an input column index (must be a direct field reference). | ||
| final Field targetField = node.getField(); | ||
| final int targetIndex = resolveTargetIndex(targetField, context); | ||
| final String targetName = inputFieldNames.get(targetIndex); | ||
|
|
||
| // 3) Group by all fields except the target. | ||
| final List<RexNode> groupExprs = | ||
| buildGroupExpressionsExcludingTarget(targetIndex, inputFieldNames, relBuilder); | ||
|
|
||
| // 4) Aggregate target values using COLLECT, filtering out NULLs. | ||
| performCollectAggregation(relBuilder, targetIndex, targetName, groupExprs); | ||
|
|
||
| // 5) Restore original output column order, and cast COLLECT's MULTISET output to ARRAY<T>. | ||
| restoreColumnOrderWithArrayCast( | ||
| relBuilder, rexBuilder, input, inputFieldNames, targetIndex, groupExprs); | ||
|
|
||
| return relBuilder.peek(); | ||
| } | ||
|
|
||
| /** | ||
| * Resolves the mvcombine target expression to an input field index. | ||
| * | ||
| * <p>mvcombine requires the target to be a direct field reference (RexInputRef). This keeps the | ||
| * command semantics predictable and avoids accidental grouping on computed expressions. | ||
| * | ||
| * <p>The target must also be a scalar-ish field. mvcombine outputs ARRAY<T>, so the input | ||
| * target cannot already be an ARRAY or MULTISET. | ||
| * | ||
| * @param targetField Target field expression from the AST | ||
| * @param context Planning context | ||
| * @return 0-based input field index for the target | ||
| * @throws SemanticCheckException if the target is not a direct field reference or has an array | ||
| * type | ||
| */ | ||
| private int resolveTargetIndex(Field targetField, CalcitePlanContext context) { | ||
| final RexNode targetRex = rexVisitor.analyze(targetField, context); | ||
| if (!(targetRex instanceof RexInputRef)) { | ||
| throw new SemanticCheckException( | ||
| "mvcombine target must be a direct field reference, but got: " + targetField); | ||
| } | ||
|
|
||
| final int index = ((RexInputRef) targetRex).getIndex(); | ||
|
|
||
| final RelDataType fieldType = | ||
| context.relBuilder.peek().getRowType().getFieldList().get(index).getType(); | ||
|
|
||
| if (fieldType.getSqlTypeName() == org.apache.calcite.sql.type.SqlTypeName.ARRAY | ||
| || fieldType.getSqlTypeName() == org.apache.calcite.sql.type.SqlTypeName.MULTISET) { | ||
| throw new SemanticCheckException( | ||
| "mvcombine target cannot be an array/multivalue type, but got: " + fieldType); | ||
| } | ||
|
|
||
| return index; | ||
| } | ||
|
|
||
| /** | ||
| * Builds group-by expressions for mvcombine: all input fields except the target field. | ||
| * | ||
| * @param targetIndex Input index of the mvcombine target field | ||
| * @param inputFieldNames Input schema field names (for sizing/ordering) | ||
| * @param relBuilder RelBuilder positioned on the input | ||
| * @return Group-by expressions in input order excluding the target | ||
| */ | ||
| private List<RexNode> buildGroupExpressionsExcludingTarget( | ||
| int targetIndex, List<String> inputFieldNames, RelBuilder relBuilder) { | ||
| final List<RexNode> groupExprs = new ArrayList<>(Math.max(0, inputFieldNames.size() - 1)); | ||
| for (int i = 0; i < inputFieldNames.size(); i++) { | ||
| if (i == targetIndex) { | ||
| continue; | ||
| } | ||
| groupExprs.add(relBuilder.field(i)); | ||
| } | ||
| return groupExprs; | ||
| } | ||
|
|
||
| /** | ||
| * Applies mvcombine aggregation: | ||
| * | ||
| * <p>GROUP BY all non-target fields, and aggregate target values using {@code COLLECT}. {@code | ||
| * COLLECT} produces a MULTISET in Calcite, which we later cast to ARRAY for output. | ||
| * | ||
| * <p>NULL target values are excluded from the collected multivalue list by applying an aggregate | ||
| * filter. This matches typical "combine values" semantics and avoids polluting the result with | ||
| * NULL elements. | ||
| * | ||
| * @param relBuilder RelBuilder positioned on the input | ||
| * @param targetIndex Target field input index | ||
| * @param targetName Target field output name (preserved) | ||
| * @param groupExprs Group-by expressions (all fields except target) | ||
| */ | ||
| private void performCollectAggregation( | ||
| RelBuilder relBuilder, int targetIndex, String targetName, List<RexNode> groupExprs) { | ||
|
|
||
| final RexNode targetRef = relBuilder.field(targetIndex); | ||
| final RexNode notNullTarget = relBuilder.isNotNull(targetRef); | ||
|
|
||
| final RelBuilder.AggCall aggCall = | ||
| relBuilder | ||
| .aggregateCall(SqlStdOperatorTable.COLLECT, targetRef) | ||
| .filter(notNullTarget) | ||
| .as(targetName); | ||
|
|
||
| relBuilder.aggregate(relBuilder.groupKey(groupExprs), aggCall); | ||
|
Comment on lines
+3205
to
+3217
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: Short answer: COLLECT in Apache Calcite produces a MULTISET, which by SQL standard is an unordered collection — insertion order is not guaranteed or preserved. Calcite’s COLLECT implementation does not provide a defined ordering and historically there has been work / discussion to replace Collect with ordered-aggregation alternatives (ARRAY_AGG / MULTISET_AGG and ORDER BY / WITHIN GROUP) when ordering matters. [1][2][3][4] Sources:
🏁 Script executed: # Search for mvcombine documentation and context in the codebase
rg -i "mvcombine" --type java -B 5 -A 5Repository: opensearch-project/sql Length of output: 50378 🏁 Script executed: # Look for ordering-related comments or workarounds in CalciteRelNodeVisitor
rg -i "order|collect" core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java -B 2 -A 2Repository: opensearch-project/sql Length of output: 10985 COLLECT aggregate does not preserve input order; verify if mvcombine specification requires order preservation. The web search confirms that Apache Calcite's 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| /** | ||
| * Restores the original output column order after the aggregate step and converts the collected | ||
| * target from MULTISET to ARRAY<T>. | ||
| * | ||
| * <p>After aggregation, the schema is: | ||
| * | ||
| * <pre> | ||
| * [groupField0, groupField1, ..., groupFieldN, targetAggMultiset] | ||
| * </pre> | ||
| * | ||
| * <p>This method projects fields back to the original input order, replacing the original target | ||
| * slot with {@code CAST(targetAggMultiset AS ARRAY<T>)}. | ||
| * | ||
| * @param relBuilder RelBuilder positioned on the post-aggregate node | ||
| * @param rexBuilder RexBuilder for explicit casts | ||
| * @param input Original input RelNode (used to derive the target element type) | ||
| * @param inputFieldNames Original input field names (also output field names) | ||
| * @param targetIndex Target field index in the original input | ||
| * @param groupExprs Group-by expressions used during aggregation | ||
| */ | ||
| private void restoreColumnOrderWithArrayCast( | ||
| RelBuilder relBuilder, | ||
| RexBuilder rexBuilder, | ||
| RelNode input, | ||
| List<String> inputFieldNames, | ||
| int targetIndex, | ||
| List<RexNode> groupExprs) { | ||
|
|
||
| // Post-aggregate: group fields come first, and the collected target is appended at the end. | ||
| final int collectedTargetPos = groupExprs.size(); | ||
|
|
||
| final RelDataType targetElemType = input.getRowType().getFieldList().get(targetIndex).getType(); | ||
| final RelDataType targetArrayType = | ||
| relBuilder.getTypeFactory().createArrayType(targetElemType, -1); | ||
|
|
||
| final List<RexNode> projections = new ArrayList<>(inputFieldNames.size()); | ||
| final List<String> projectionNames = new ArrayList<>(inputFieldNames.size()); | ||
|
|
||
| int groupPos = 0; | ||
| for (int i = 0; i < inputFieldNames.size(); i++) { | ||
| projectionNames.add(inputFieldNames.get(i)); | ||
|
|
||
| if (i == targetIndex) { | ||
| // COLLECT returns MULTISET; normalize output to ARRAY<T>. | ||
| final RexNode multisetRef = relBuilder.field(collectedTargetPos); | ||
| projections.add(rexBuilder.makeCast(targetArrayType, multisetRef)); | ||
| } else { | ||
| projections.add(relBuilder.field(groupPos)); | ||
| groupPos++; | ||
| } | ||
| } | ||
|
|
||
| // Force projection to avoid Calcite "identity" short-circuit when only names/types change. | ||
| relBuilder.project(projections, projectionNames, /* force= */ true); | ||
| } | ||
|
|
||
| @Override | ||
| public RelNode visitValues(Values values, CalcitePlanContext context) { | ||
| if (values.getValues() == null || values.getValues().isEmpty()) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.