diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 65056aecbad..abdf1132cc6 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -80,6 +80,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvCombine; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -534,6 +535,11 @@ public LogicalPlan visitAddColTotals(AddColTotals node, AnalysisContext context) throw getOnlyForCalciteException("addcoltotals"); } + @Override + public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) { + throw getOnlyForCalciteException("mvcombine"); + } + /** Build {@link ParseExpression} to context and skip to child nodes. */ @Override public LogicalPlan visitParse(Parse node, AnalysisContext context) { diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index a6ef5e7547a..3ba78e73e89 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -68,6 +68,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvCombine; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -461,4 +462,8 @@ public T visitAddTotals(AddTotals node, C context) { public T visitAddColTotals(AddColTotals node, C context) { return visitChildren(node, context); } + + public T visitMvCombine(MvCombine node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index bf54d2ffd89..8b129c6267a 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -62,6 +62,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Limit; import org.opensearch.sql.ast.tree.MinSpanBin; +import org.opensearch.sql.ast.tree.MvCombine; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -468,6 +469,14 @@ public static List defaultDedupArgs() { argument("consecutive", booleanLiteral(false))); } + public static MvCombine mvcombine(Field field) { + return new MvCombine(field, null); + } + + public static MvCombine mvcombine(Field field, String delim) { + return new MvCombine(field, delim); + } + public static List sortOptions() { return exprList(argument("desc", booleanLiteral(false))); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/MvCombine.java b/core/src/main/java/org/opensearch/sql/ast/tree/MvCombine.java new file mode 100644 index 00000000000..ba94aa10978 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/MvCombine.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Field; + +@Getter +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = false) +public class MvCombine extends UnresolvedPlan { + + private final Field field; + private final String delim; + @Nullable private UnresolvedPlan child; + + public MvCombine(Field field, @Nullable String delim) { + this.field = field; + this.delim = (delim == null) ? " " : delim; + } + + public MvCombine attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return child == null ? ImmutableList.of() : ImmutableList.of(child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitMvCombine(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index f1bc5fd6a0d..e521eabc158 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -53,6 +53,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFamily; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexInputRef; @@ -121,6 +122,7 @@ import org.opensearch.sql.ast.tree.Lookup.OutputStrategy; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvCombine; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -3086,6 +3088,191 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) { return context.relBuilder.peek(); } + /** + * mvcombine command visitor to collapse rows that are identical on all fields except the target + * field, and combine the target field values into a multivalue (array) field. + * + *

Implementation notes:Groups by all input fields except the target field. Aggregates target + * values using {@code COLLECT} (MULTISET). Casts the aggregation result from MULTISET to ARRAY + * for a stable multivalue output type. Preserves the original output column order. + * + * @param node mvcombine command to be visited + * @param context CalcitePlanContext containing the RelBuilder, RexBuilder, and resolution context + * @return RelNode representing collapsed records with the target combined into a multivalue array + * @throws SemanticCheckException if the mvcombine target is not a direct field reference + */ + @Override + public RelNode visitMvCombine(MvCombine node, CalcitePlanContext context) { + // 1) Lower the child plan first so the RelBuilder has the input schema on the stack. + visitChildren(node, context); + + final RelBuilder relBuilder = context.relBuilder; + final RexBuilder rexBuilder = context.rexBuilder; + + final RelNode input = relBuilder.peek(); + final List inputFieldNames = input.getRowType().getFieldNames(); + + // 2) Resolve the mvcombine target to an input column index (must be a direct field reference). + final Field targetField = node.getField(); + final int targetIndex = resolveTargetIndex(targetField, context); + final String targetName = inputFieldNames.get(targetIndex); + + // 3) Group by all fields except the target. + final List groupExprs = + buildGroupExpressionsExcludingTarget(targetIndex, inputFieldNames, relBuilder); + + // 4) Aggregate target values using COLLECT, filtering out NULLs. + performCollectAggregation(relBuilder, targetIndex, targetName, groupExprs); + + // 5) Restore original output column order, and cast COLLECT's MULTISET output to ARRAY. + restoreColumnOrderWithArrayCast( + relBuilder, rexBuilder, input, inputFieldNames, targetIndex, groupExprs); + + return relBuilder.peek(); + } + + /** + * Resolves the mvcombine target expression to an input field index. + * + *

mvcombine requires the target to be a direct field reference (RexInputRef). This keeps the + * command semantics predictable and avoids accidental grouping on computed expressions. + * + *

The target must also be a scalar-ish field. mvcombine outputs ARRAY<T>, so the input + * target cannot already be an ARRAY or MULTISET. + * + * @param targetField Target field expression from the AST + * @param context Planning context + * @return 0-based input field index for the target + * @throws SemanticCheckException if the target is not a direct field reference or has an array + * type + */ + private int resolveTargetIndex(Field targetField, CalcitePlanContext context) { + final RexNode targetRex = rexVisitor.analyze(targetField, context); + if (!(targetRex instanceof RexInputRef)) { + throw new SemanticCheckException( + "mvcombine target must be a direct field reference, but got: " + targetField); + } + + final int index = ((RexInputRef) targetRex).getIndex(); + + final RelDataType fieldType = + context.relBuilder.peek().getRowType().getFieldList().get(index).getType(); + + if (fieldType.getSqlTypeName() == org.apache.calcite.sql.type.SqlTypeName.ARRAY + || fieldType.getSqlTypeName() == org.apache.calcite.sql.type.SqlTypeName.MULTISET) { + throw new SemanticCheckException( + "mvcombine target cannot be an array/multivalue type, but got: " + fieldType); + } + + return index; + } + + /** + * Builds group-by expressions for mvcombine: all input fields except the target field. + * + * @param targetIndex Input index of the mvcombine target field + * @param inputFieldNames Input schema field names (for sizing/ordering) + * @param relBuilder RelBuilder positioned on the input + * @return Group-by expressions in input order excluding the target + */ + private List buildGroupExpressionsExcludingTarget( + int targetIndex, List inputFieldNames, RelBuilder relBuilder) { + final List groupExprs = new ArrayList<>(Math.max(0, inputFieldNames.size() - 1)); + for (int i = 0; i < inputFieldNames.size(); i++) { + if (i == targetIndex) { + continue; + } + groupExprs.add(relBuilder.field(i)); + } + return groupExprs; + } + + /** + * Applies mvcombine aggregation: + * + *

GROUP BY all non-target fields, and aggregate target values using {@code COLLECT}. {@code + * COLLECT} produces a MULTISET in Calcite, which we later cast to ARRAY for output. + * + *

NULL target values are excluded from the collected multivalue list by applying an aggregate + * filter. This matches typical "combine values" semantics and avoids polluting the result with + * NULL elements. + * + * @param relBuilder RelBuilder positioned on the input + * @param targetIndex Target field input index + * @param targetName Target field output name (preserved) + * @param groupExprs Group-by expressions (all fields except target) + */ + private void performCollectAggregation( + RelBuilder relBuilder, int targetIndex, String targetName, List groupExprs) { + + final RexNode targetRef = relBuilder.field(targetIndex); + final RexNode notNullTarget = relBuilder.isNotNull(targetRef); + + final RelBuilder.AggCall aggCall = + relBuilder + .aggregateCall(SqlStdOperatorTable.COLLECT, targetRef) + .filter(notNullTarget) + .as(targetName); + + relBuilder.aggregate(relBuilder.groupKey(groupExprs), aggCall); + } + + /** + * Restores the original output column order after the aggregate step and converts the collected + * target from MULTISET to ARRAY<T>. + * + *

After aggregation, the schema is: + * + *

+   *   [groupField0, groupField1, ..., groupFieldN, targetAggMultiset]
+   * 
+ * + *

This method projects fields back to the original input order, replacing the original target + * slot with {@code CAST(targetAggMultiset AS ARRAY<T>)}. + * + * @param relBuilder RelBuilder positioned on the post-aggregate node + * @param rexBuilder RexBuilder for explicit casts + * @param input Original input RelNode (used to derive the target element type) + * @param inputFieldNames Original input field names (also output field names) + * @param targetIndex Target field index in the original input + * @param groupExprs Group-by expressions used during aggregation + */ + private void restoreColumnOrderWithArrayCast( + RelBuilder relBuilder, + RexBuilder rexBuilder, + RelNode input, + List inputFieldNames, + int targetIndex, + List groupExprs) { + + // Post-aggregate: group fields come first, and the collected target is appended at the end. + final int collectedTargetPos = groupExprs.size(); + + final RelDataType targetElemType = input.getRowType().getFieldList().get(targetIndex).getType(); + final RelDataType targetArrayType = + relBuilder.getTypeFactory().createArrayType(targetElemType, -1); + + final List projections = new ArrayList<>(inputFieldNames.size()); + final List projectionNames = new ArrayList<>(inputFieldNames.size()); + + int groupPos = 0; + for (int i = 0; i < inputFieldNames.size(); i++) { + projectionNames.add(inputFieldNames.get(i)); + + if (i == targetIndex) { + // COLLECT returns MULTISET; normalize output to ARRAY. + final RexNode multisetRef = relBuilder.field(collectedTargetPos); + projections.add(rexBuilder.makeCast(targetArrayType, multisetRef)); + } else { + projections.add(relBuilder.field(groupPos)); + groupPos++; + } + } + + // Force projection to avoid Calcite "identity" short-circuit when only names/types change. + relBuilder.project(projections, projectionNames, /* force= */ true); + } + @Override public RelNode visitValues(Values values, CalcitePlanContext context) { if (values.getValues() == null || values.getValues().isEmpty()) { diff --git a/docs/category.json b/docs/category.json index 094768d1e6f..03a9630af13 100644 --- a/docs/category.json +++ b/docs/category.json @@ -36,6 +36,7 @@ "user/ppl/cmd/sort.md", "user/ppl/cmd/spath.md", "user/ppl/cmd/stats.md", + "user/ppl/cmd/mvcombine.md", "user/ppl/cmd/streamstats.md", "user/ppl/cmd/subquery.md", "user/ppl/cmd/syntax.md", diff --git a/docs/user/dql/metadata.rst b/docs/user/dql/metadata.rst index e959a69c8b6..e4f55ef1b3e 100644 --- a/docs/user/dql/metadata.rst +++ b/docs/user/dql/metadata.rst @@ -35,7 +35,7 @@ Example 1: Show All Indices Information SQL query:: os> SHOW TABLES LIKE '%' - fetched rows / total rows = 23/23 + fetched rows / total rows = 24/24 +----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | |----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------| @@ -48,6 +48,7 @@ SQL query:: | docTestCluster | null | events_many_hosts | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | events_null | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | json_test | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | mvcombine_data | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nested | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nyc_taxi | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | occupation | BASE TABLE | null | null | null | null | null | null | diff --git a/docs/user/ppl/cmd/mvcombine.md b/docs/user/ppl/cmd/mvcombine.md new file mode 100644 index 00000000000..e816de88d17 --- /dev/null +++ b/docs/user/ppl/cmd/mvcombine.md @@ -0,0 +1,146 @@ +# mvcombine + +## Description + +The `mvcombine` command groups rows that are identical across all fields except a specified target field, and combines the values of that target field into a multivalue (array) field. All other fields in the input rows are preserved as group keys in the output. + +`mvcombine` is a transforming command: it consumes a set of input results and produces a new result set with reduced cardinality. + +### Key behaviors + +- Rows are grouped by **all fields currently in the pipeline except the target field**. +- One output row is produced per group. +- The target field becomes a **multivalue field** containing the combined values from all rows in the group. +- Rows where the target field is missing or null do **not** contribute a value to the combined multivalue output. +- The default output is a multivalue representation (array). + +Delimiter handling (`delim`) is parsed but has **no effect on the output** unless the `nomv` command is used. Since `nomv` is not yet implemented in OpenSearch, `delim` is currently inert and does not affect execution. + +--- + +## Syntax + +mvcombine [delim=] + +### Arguments + +- **field** (required) + The name of the field whose values are combined into a multivalue field. + +- **delim** (optional) + A string delimiter for rendering a single-value representation of the combined field. + This option has no observable effect unless `nomv` is used. + +--- + +## Semantics + +Given a set of input rows: + +- All rows that have identical values for every field **except** the target field are grouped together. +- The target field must be a single-valued (scalar) field. +- The output schema preserves the original field order. +- The target field is returned as a multivalue (array) field. + +--- + +## Example 1: Basic mvcombine + +Given the following input rows: + +```text +{"ip":"10.0.0.1","bytes":100,"tags":"t1","packets_str":"10"} +{"ip":"10.0.0.1","bytes":100,"tags":"t1","packets_str":"20"} +{"ip":"10.0.0.1","bytes":100,"tags":"t1","packets_str":"30"} +``` +The following query collapses the three rows into a single row, and combines packets_str into a multivalue field: + +```ppl +source=mvcombine_data +| where ip='10.0.0.1' and bytes=100 and tags='t1' +| fields ip, bytes, tags, packets_str +| mvcombine packets_str +``` + +Expected output: +```text +fetched rows / total rows = 1/1 ++----------+-------+------+-------------+ +| ip | bytes | tags | packets_str | +|----------+-------+------+-------------| +| 10.0.0.1 | 100 | t1 | [10,20,30] | ++----------+-------+------+-------------+ +``` + +Example 2: Multiple groups + +Given a dataset mvcombine with the following data: +```text +{"ip":"10.0.0.7","bytes":700,"tags":"t7","packets_str":"1"} +{"ip":"10.0.0.7","bytes":700,"tags":"t7","packets_str":"2"} +{"ip":"10.0.0.8","bytes":700,"tags":"t7","packets_str":"9"} +``` + +The following query produces one output row per group key: +```ppl +source=mvcombine_data +| where bytes=700 and tags='t7' +| fields ip, bytes, tags, packets_str +| sort ip, packets_str +| mvcombine packets_str +| sort ip +``` + +Expected output: +```text +fetched rows / total rows = 2/2 ++----------+-------+------+-------------+ +| ip | bytes | tags | packets_str | +|----------+-------+------+-------------| +| 10.0.0.7 | 700 | t7 | [1,2] | +| 10.0.0.8 | 700 | t7 | [9] | ++----------+-------+------+-------------+ +``` + +Example 3: Missing target field in some rows + +Rows missing the target field do not contribute a value to the combined output. + +Given a dataset mvcombine with the following data: +```text +{"ip":"10.0.0.3","bytes":300,"tags":"t3","packets_str":"5"} +{"ip":"10.0.0.3","bytes":300,"tags":"t3"} +{"ip":"10.0.0.3","bytes":300,"tags":"t3","letters":"a"} +``` + +The following query collapses the group and preserves the non-missing value: +```ppl +source=mvcombine_data +| where ip='10.0.0.3' and bytes=300 and tags='t3' +| fields ip, bytes, tags, packets_str +| mvcombine packets_str +``` + +Expected output: +```text +fetched rows / total rows = 1/1 ++----------+-------+------+-------------+ +| ip | bytes | tags | packets_str | +|----------+-------+------+-------------| +| 10.0.0.3 | 300 | t3 | [5] | ++----------+-------+------+-------------+ +``` + +Example 5: Error when field does not exist + +If the specified field does not exist in the current schema, mvcombine returns an error. +```ppl +source=mvcombine_data +| mvcombine does_not_exist +``` + +Expected output: +```text +{'reason': 'Invalid Query', 'details': 'Field [does_not_exist] not found.', 'type': 'IllegalArgumentException'} +Error: Query returned no data +``` \ No newline at end of file diff --git a/docs/user/ppl/index.md b/docs/user/ppl/index.md index 30ad7159182..2472a921d85 100644 --- a/docs/user/ppl/index.md +++ b/docs/user/ppl/index.md @@ -78,8 +78,10 @@ source=accounts | [describe command](cmd/describe.md) | 2.1 | stable (since 2.1) | Query the metadata of an index. | | [explain command](cmd/explain.md) | 3.1 | stable (since 3.1) | Explain the plan of query. | | [show datasources command](cmd/showdatasources.md) | 2.4 | stable (since 2.4) | Query datasources configured in the PPL engine. | - | [addtotals command](cmd/addtotals.md) | 3.4 | stable (since 3.4) | Adds row and column values and appends a totals column and row. | - | [addcoltotals command](cmd/addcoltotals.md) | 3.4 | stable (since 3.4) | Adds column values and appends a totals row. | +| [addtotals command](cmd/addtotals.md) | 3.4 | stable (since 3.4) | Adds row and column values and appends a totals column and row. | +| [addcoltotals command](cmd/addcoltotals.md) | 3.4 | stable (since 3.4) | Adds column values and appends a totals row. | +| [mvcombine command](cmd/mvcombine.md) | 3.4 | stable (since 3.4) | Combines values of a specified field across rows identical on all other fields. | + - [Syntax](cmd/syntax.md) - PPL query structure and command syntax formatting * **Functions** diff --git a/doctest/test_data/mvcombine.json b/doctest/test_data/mvcombine.json new file mode 100644 index 00000000000..60c08ebd8a9 --- /dev/null +++ b/doctest/test_data/mvcombine.json @@ -0,0 +1,18 @@ +{"ip":"10.0.0.1","bytes":100,"tags":"t1","packets_str":"10"} +{"ip":"10.0.0.1","bytes":100,"tags":"t1","packets_str":"20"} +{"ip":"10.0.0.1","bytes":100,"tags":"t1","packets_str":"30"} +{"ip":"10.0.0.2","bytes":200,"tags":"t2","packets_str":"7"} +{"ip":"10.0.0.3","bytes":300,"tags":"t3","packets_str":"5"} +{"ip":"10.0.0.3","bytes":300,"tags":"t3"} +{"ip":"10.0.0.3","bytes":300,"tags":"t3","letters":"a"} +{"ip":"10.0.0.7","bytes":700,"tags":"t7","packets_str":"1"} +{"ip":"10.0.0.7","bytes":700,"tags":"t7","packets_str":"2"} +{"ip":"10.0.0.8","bytes":700,"tags":"t7","packets_str":"9"} +{"ip":"10.0.0.9","bytes":900,"tags":"t9","packets_str":"1"} +{"ip":"10.0.0.9","bytes":900,"tags":"t9","packets_str":"2"} +{"ip":"10.0.0.9","bytes":900,"tags":"t9","packets_str":"3"} +{"ip":"10.0.0.5","bytes":500,"tags":"t5","packets_str":"dup"} +{"ip":"10.0.0.5","bytes":500,"tags":"t5","packets_str":"dup"} +{"ip":"10.0.0.5","bytes":500,"tags":"t5","packets_str":"x"} +{"ip":"10.0.0.6","bytes":600,"tags":"t6","packets_str":""} +{"ip":"10.0.0.6","bytes":600,"tags":"t6","packets_str":"z"} diff --git a/doctest/test_docs.py b/doctest/test_docs.py index e57c41d6827..6283252065f 100644 --- a/doctest/test_docs.py +++ b/doctest/test_docs.py @@ -57,7 +57,8 @@ 'otellogs': 'otellogs.json', 'time_data': 'time_test_data.json', 'time_data2': 'time_test_data2.json', - 'time_test': 'time_test.json' + 'time_test': 'time_test.json', + 'mvcombine_data': 'mvcombine.json', } DEBUG_MODE = os.environ.get('DOCTEST_DEBUG', 'false').lower() == 'true' diff --git a/doctest/test_mapping/mvcombine.json b/doctest/test_mapping/mvcombine.json new file mode 100644 index 00000000000..06e00747e1f --- /dev/null +++ b/doctest/test_mapping/mvcombine.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "case": { "type": "keyword" }, + "ip": { "type": "ip" }, + "bytes": { "type": "long" }, + "tags": { "type": "keyword" }, + "packets_str": { "type": "keyword" }, + "letters": { "type": "keyword" } + } + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java index 22a6f6b5916..c03746bba65 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java @@ -106,7 +106,8 @@ CalciteTrendlineCommandIT.class, CalciteVisualizationFormatIT.class, CalciteWhereCommandIT.class, - CalcitePPLTpchIT.class + CalcitePPLTpchIT.class, + CalciteMvCombineCommandIT.class }) public class CalciteNoPushdownIT { private static boolean wasPushdownEnabled; diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index c57b33ab6d8..6c21d223d97 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -2444,4 +2444,16 @@ public void testAggFilterOnNestedFields() throws IOException { "source=%s | stats count(eval(author.name < 'K')) as george_and_jk", TEST_INDEX_CASCADED_NESTED))); } + + @Test + public void testExplainMvCombine() throws IOException { + String query = + "source=opensearch-sql_test_index_account " + + "| fields state, city, age " + + "| mvcombine age delim=','"; + + String actual = explainQueryYaml(query); + String expected = loadExpectedPlan("explain_mvcombine.yaml"); + assertYamlEqualsIgnoreId(expected, actual); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvCombineCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvCombineCommandIT.java new file mode 100644 index 00000000000..7f2ab06752f --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvCombineCommandIT.java @@ -0,0 +1,258 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteMvCombineCommandIT extends PPLIntegTestCase { + + private static final String INDEX = Index.MVCOMBINE.getName(); + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + loadIndex(Index.MVCOMBINE); + } + + // --------------------------- + // Sanity (precondition) + // --------------------------- + + @Test + public void testSanity_datasetIsLoaded() throws IOException { + JSONObject result = executeQuery("source=" + INDEX + " | head 5"); + int rows = result.getJSONArray("datarows").length(); + Assertions.assertTrue(rows > 0, "Expected MVCOMBINE dataset to have rows, got 0"); + } + + // --------------------------- + // Happy path (core mvcombine) + // --------------------------- + + @Test + public void testMvCombine_basicGroupCollapsesToOneRow() throws IOException { + String q = + "source=" + + INDEX + + " | where ip='10.0.0.1' and bytes=100 and tags='t1'" + + " | fields ip, bytes, tags, packets_str" + + " | mvcombine packets_str"; + + JSONObject result = executeQuery(q); + verifyNumOfRows(result, 1); + + JSONArray row = result.getJSONArray("datarows").getJSONArray(0); + Assertions.assertEquals("10.0.0.1", row.getString(0)); + Assertions.assertEquals("100", String.valueOf(row.get(1))); + Assertions.assertEquals("t1", row.getString(2)); + + List mv = toStringListDropNulls(row.get(3)); + Assertions.assertTrue(mv.contains("10"), "Expected packets_str to include 10, got " + mv); + Assertions.assertTrue(mv.contains("20"), "Expected packets_str to include 20, got " + mv); + Assertions.assertTrue(mv.contains("30"), "Expected packets_str to include 30, got " + mv); + } + + @Test + public void testMvCombine_singleRowGroupStaysSingleRow() throws IOException { + // NOTE: Keep output minimal + deterministic to safely verify schema + datarows + String q = + "source=" + + INDEX + + " | where ip='10.0.0.2' and bytes=200 and tags='t2'" + + " | fields ip, tags, packets_str" + + " | mvcombine packets_str"; + + JSONObject result = executeQuery(q); + + verifySchema( + result, + schema("ip", null, "string"), + schema("tags", null, "string"), + schema("packets_str", null, "array")); + + verifyDataRows(result, rows("10.0.0.2", "t2", new JSONArray().put("7"))); + } + + @Test + public void testMvCombine_missingTargetWithinGroup_collapses_nonNullPreserved() + throws IOException { + String q = + "source=" + + INDEX + + " | where ip='10.0.0.3' and bytes=300 and tags='t3'" + + " | fields ip, bytes, tags, packets_str" + + " | mvcombine packets_str"; + + JSONObject result = executeQuery(q); + verifyNumOfRows(result, 1); + + JSONArray row = result.getJSONArray("datarows").getJSONArray(0); + List mv = toStringListKeepNulls(row.get(3)); + + Assertions.assertTrue(mv.contains("5"), "Expected packets_str to include 5, got " + mv); + } + + // --------------------------- + // Multi-group behavior + // --------------------------- + + @Test + public void testMvCombine_multipleGroups_producesOneRowPerGroupKey() throws IOException { + String base = + "source=" + + INDEX + + " | where (ip='10.0.0.7' or ip='10.0.0.8') and bytes=700 and tags='t7'" + + " | fields ip, bytes, tags, packets_str"; + + JSONObject before = executeQuery(base); + int beforeRows = before.getJSONArray("datarows").length(); + Assertions.assertTrue(beforeRows >= 1, "Expected dataset rows for multi-group test, got 0"); + + JSONObject result = executeQuery(base + " | mvcombine packets_str | sort ip"); + int outRows = result.getJSONArray("datarows").length(); + Assertions.assertEquals( + 2, outRows, "Expected 2 groups (10.0.0.7 and 10.0.0.8), got " + outRows); + + JSONArray r0 = result.getJSONArray("datarows").getJSONArray(0); + JSONArray r1 = result.getJSONArray("datarows").getJSONArray(1); + + String ip0 = r0.getString(0); + String ip1 = r1.getString(0); + + if ("10.0.0.7".equals(ip0)) { + List mv0 = toStringListDropNulls(r0.get(3)); + Assertions.assertTrue( + mv0.contains("1") && mv0.contains("2"), + "Expected 10.0.0.7 to include 1 and 2, got " + mv0); + + List mv1 = toStringListDropNulls(r1.get(3)); + Assertions.assertTrue(mv1.contains("9"), "Expected 10.0.0.8 to include 9, got " + mv1); + } else { + List mv0 = toStringListDropNulls(r0.get(3)); + Assertions.assertTrue(mv0.contains("9"), "Expected 10.0.0.8 to include 9, got " + mv0); + + List mv1 = toStringListDropNulls(r1.get(3)); + Assertions.assertTrue( + mv1.contains("1") && mv1.contains("2"), + "Expected 10.0.0.7 to include 1 and 2, got " + mv1); + } + } + + // --------------------------- + // delim: Splunk-compatible command input + output shape + // --------------------------- + + @Test + public void testMvCombine_delim_shouldNotChangeMvShape_ifSupported_elseSyntaxRejected() + throws Exception { + String base = + "source=" + + INDEX + + " | where ip='10.0.0.9' and bytes=900 and tags='t9'" + + " | fields ip, bytes, tags, packets_str"; + + // Splunk-style: options before the field + String q = base + " | mvcombine delim='|' packets_str"; + + try { + JSONObject result = executeQuery(q); + verifyNumOfRows(result, 1); + + Object cell = result.getJSONArray("datarows").getJSONArray(0).get(3); + Assertions.assertTrue( + cell instanceof JSONArray, + "Expected multivalue array (delim should not coerce to string), got: " + cell); + + // Optional sanity: values exist (order not guaranteed) + List mv = toStringListDropNulls(cell); + Assertions.assertTrue(mv.contains("1"), "Expected MV to include 1, got: " + mv); + Assertions.assertTrue(mv.contains("2"), "Expected MV to include 2, got: " + mv); + Assertions.assertTrue(mv.contains("3"), "Expected MV to include 3, got: " + mv); + } catch (ResponseException e) { + Assertions.assertTrue( + isSyntaxBadRequest(e), + "Expected syntax rejection if delim unsupported, got: " + e.getMessage()); + } + } + + // --------------------------- + // Edge case / error semantics + // --------------------------- + + @Test + public void testMvCombine_missingField_shouldReturn4xx() throws IOException { + try { + executeQuery("source=" + INDEX + " | mvcombine does_not_exist"); + Assertions.fail("Expected ResponseException was not thrown"); + } catch (ResponseException e) { + int status = e.getResponse().getStatusLine().getStatusCode(); + Assertions.assertTrue( + status >= 400 && status < 500, + "Expected 4xx for missing field, got " + status + " msg=" + e.getMessage()); + } + } + + // --------------------------- + // Helpers + // --------------------------- + + private static boolean isSyntaxBadRequest(ResponseException e) { + int status = e.getResponse().getStatusLine().getStatusCode(); + if (status != 400) return false; + + String msg = e.getMessage(); + if (msg == null) return false; + + return msg.contains("SyntaxCheckException") + || msg.contains("Invalid Query") + || msg.contains("parsing_exception") + || msg.contains("ParseException"); + } + + /** JSONArray -> list (nulls preserved), scalar -> singleton list, null -> empty list. */ + private static List toStringListKeepNulls(Object cell) { + if (cell == null || cell == JSONObject.NULL) { + return Collections.emptyList(); + } + if (cell instanceof JSONArray arr) { + List out = new ArrayList<>(); + for (int i = 0; i < arr.length(); i++) { + Object v = arr.get(i); + out.add(v == JSONObject.NULL ? null : String.valueOf(v)); + } + return out; + } + return List.of(String.valueOf(cell)); + } + + /** Same as above but drops null entries. */ + private static List toStringListDropNulls(Object cell) { + List all = toStringListKeepNulls(cell); + if (all.isEmpty()) return all; + + List out = new ArrayList<>(); + for (String v : all) { + if (v != null) out.add(v); + } + return out; + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index d9b76f757f9..4e143951bfa 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -926,6 +926,11 @@ public enum Index { "time_data", getMappingFile("time_test_data_index_mapping.json"), "src/test/resources/time_test_data.json"), + MVCOMBINE( + "test_index_mvcombine", + "_doc", + getMappingFile("mvcombine_index_mapping.json"), + "src/test/resources/mvcombine.json"), TIME_TEST_DATA_WITH_NULL( TestsConstants.TEST_INDEX_TIME_DATE_NULL, "time_data_with_null", diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java index 15f3c508b14..45948539c27 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java @@ -214,4 +214,18 @@ private void verifyQuery(JSONObject result) throws IOException { assertThat(error.getString("type"), equalTo("UnsupportedOperationException")); } } + + @Test + public void testMvCombineUnsupportedInV2() throws IOException { + JSONObject result; + try { + result = + executeQuery( + String.format( + "source=%s | fields state, city, age | mvcombine age", TEST_INDEX_BANK)); + } catch (ResponseException e) { + result = new JSONObject(TestUtils.getResponseBody(e.getResponse())); + } + verifyQuery(result); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java index 7ee90dc4640..05e18520837 100644 --- a/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java @@ -287,4 +287,26 @@ public void testCrossClusterAppend() throws IOException { disableCalcite(); } + + /** CrossClusterSearchIT Test for mvcombine. */ + @Test + public void testCrossClusterMvcombine() throws IOException { + enableCalcite(); + + JSONObject result = + executeQuery( + String.format( + "search source=%s | where firstname='Hattie' or firstname='Nanette' " + + "| fields firstname, age | mvcombine age", + TEST_INDEX_BANK_REMOTE)); + + verifyColumn(result, columnName("firstname"), columnName("age")); + + verifyDataRows( + result, + rows("Hattie", new org.json.JSONArray().put(36)), + rows("Nanette", new org.json.JSONArray().put(28))); + + disableCalcite(); + } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml new file mode 100644 index 00000000000..768fa9ef384 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(state=[$0], city=[$1], age=[CAST($2):BIGINT ARRAY NOT NULL]) + LogicalAggregate(group=[{0, 1}], age=[COLLECT($2) FILTER $3]) + LogicalProject(state=[$7], city=[$5], age=[$8], $f3=[IS NOT NULL($8)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t2):BIGINT ARRAY NOT NULL], proj#0..1=[{exprs}], age=[$t3]) + EnumerableAggregate(group=[{0, 1}], age=[COLLECT($2) FILTER $3]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[IS NOT NULL($t2)], proj#0..3=[{exprs}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[state, city, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["state","city","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvcombine.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvcombine.yaml new file mode 100644 index 00000000000..29a8aa3deac --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvcombine.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(state=[$0], city=[$1], age=[CAST($2):BIGINT ARRAY NOT NULL]) + LogicalAggregate(group=[{0, 1}], age=[COLLECT($2) FILTER $3]) + LogicalProject(state=[$7], city=[$5], age=[$8], $f3=[IS NOT NULL($8)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t2):BIGINT ARRAY NOT NULL], proj#0..1=[{exprs}], age=[$t3]) + EnumerableAggregate(group=[{0, 1}], age=[COLLECT($2) FILTER $3]) + EnumerableCalc(expr#0..16=[{inputs}], expr#17=[IS NOT NULL($t8)], state=[$t7], city=[$t5], age=[$t8], $f3=[$t17]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/mvcombine.json b/integ-test/src/test/resources/mvcombine.json new file mode 100644 index 00000000000..315aafb532c --- /dev/null +++ b/integ-test/src/test/resources/mvcombine.json @@ -0,0 +1,42 @@ +{ "index": { "_index": "test_index_mvcombine", "_id": "1" } } +{ "ip": "10.0.0.1", "bytes": 100, "tags": "t1", "packets_str": "10" } +{ "index": { "_index": "test_index_mvcombine", "_id": "2" } } +{ "ip": "10.0.0.1", "bytes": 100, "tags": "t1", "packets_str": "20" } +{ "index": { "_index": "test_index_mvcombine", "_id": "3" } } +{ "ip": "10.0.0.1", "bytes": 100, "tags": "t1", "packets_str": "30" } + +{ "index": { "_index": "test_index_mvcombine", "_id": "4" } } +{ "ip": "10.0.0.2", "bytes": 200, "tags": "t2", "packets_str": "7" } + +{ "index": { "_index": "test_index_mvcombine", "_id": "5" } } +{ "ip": "10.0.0.3", "bytes": 300, "tags": "t3", "packets_str": "5" } +{ "index": { "_index": "test_index_mvcombine", "_id": "6" } } +{ "ip": "10.0.0.3", "bytes": 300, "tags": "t3" } +{ "index": { "_index": "test_index_mvcombine", "_id": "7" } } +{ "ip": "10.0.0.3", "bytes": 300, "tags": "t3", "letters": "a" } + +{ "index": { "_index": "test_index_mvcombine", "_id": "16" } } +{ "ip": "10.0.0.7", "bytes": 700, "tags": "t7", "packets_str": "1" } +{ "index": { "_index": "test_index_mvcombine", "_id": "17" } } +{ "ip": "10.0.0.7", "bytes": 700, "tags": "t7", "packets_str": "2" } +{ "index": { "_index": "test_index_mvcombine", "_id": "18" } } +{ "ip": "10.0.0.8", "bytes": 700, "tags": "t7", "packets_str": "9" } + +{ "index": { "_index": "test_index_mvcombine", "_id": "19" } } +{ "ip": "10.0.0.9", "bytes": 900, "tags": "t9", "packets_str": "1" } +{ "index": { "_index": "test_index_mvcombine", "_id": "20" } } +{ "ip": "10.0.0.9", "bytes": 900, "tags": "t9", "packets_str": "2" } +{ "index": { "_index": "test_index_mvcombine", "_id": "21" } } +{ "ip": "10.0.0.9", "bytes": 900, "tags": "t9", "packets_str": "3" } + +{ "index": { "_index": "test_index_mvcombine", "_id": "11" } } +{ "ip": "10.0.0.5", "bytes": 500, "tags": "t5", "packets_str": "dup" } +{ "index": { "_index": "test_index_mvcombine", "_id": "12" } } +{ "ip": "10.0.0.5", "bytes": 500, "tags": "t5", "packets_str": "dup" } +{ "index": { "_index": "test_index_mvcombine", "_id": "13" } } +{ "ip": "10.0.0.5", "bytes": 500, "tags": "t5", "packets_str": "x" } + +{ "index": { "_index": "test_index_mvcombine", "_id": "14" } } +{ "ip": "10.0.0.6", "bytes": 600, "tags": "t6", "packets_str": "" } +{ "index": { "_index": "test_index_mvcombine", "_id": "15" } } +{ "ip": "10.0.0.6", "bytes": 600, "tags": "t6", "packets_str": "z" } diff --git a/integ-test/src/test/resources/mvcombine_index_mapping.json b/integ-test/src/test/resources/mvcombine_index_mapping.json new file mode 100644 index 00000000000..0c008faf2f1 --- /dev/null +++ b/integ-test/src/test/resources/mvcombine_index_mapping.json @@ -0,0 +1,13 @@ +{ + "mappings": { + "properties": { + "case": { "type": "keyword" }, + "ip": { "type": "ip" }, + "bytes": { "type": "long" }, + "tags": { "type": "keyword" }, + + "packets_str": { "type": "keyword" }, + "letters": { "type": "keyword" } + } + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 71162e81bd8..7b431dae9ff 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -70,6 +70,8 @@ LABEL: 'LABEL'; SHOW_NUMBERED_TOKEN: 'SHOW_NUMBERED_TOKEN'; AGGREGATION: 'AGGREGATION'; APPENDPIPE: 'APPENDPIPE'; +MVCOMBINE: 'MVCOMBINE'; + //Native JOIN KEYWORDS JOIN: 'JOIN'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index e4500ee1e6f..58ee45bd663 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -88,6 +88,7 @@ commands | rexCommand | appendPipeCommand | replaceCommand + | mvcombineCommand ; commandName @@ -131,6 +132,7 @@ commandName | REX | APPENDPIPE | REPLACE + | MVCOMBINE ; searchCommand @@ -531,6 +533,11 @@ expandCommand : EXPAND fieldExpression (AS alias = qualifiedName)? ; +mvcombineCommand + : MVCOMBINE fieldExpression (DELIM EQUAL stringLiteral)? + ; + + flattenCommand : FLATTEN fieldExpression (AS aliases = identifierSeq)? ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 7ac29faf4c9..cf2ff0c6453 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -91,6 +91,7 @@ import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.MinSpanBin; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvCombine; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -868,6 +869,18 @@ public UnresolvedPlan visitExpandCommand(OpenSearchPPLParser.ExpandCommandContex return new Expand(fieldExpression, alias); } + /** mvcombine command. */ + @Override + public UnresolvedPlan visitMvcombineCommand(OpenSearchPPLParser.MvcombineCommandContext ctx) { + Field field = (Field) internalVisitExpression(ctx.fieldExpression()); + + String delim = null; + if (ctx.DELIM() != null) { + delim = StringUtils.unquoteText(getTextInQuery(ctx.stringLiteral())); + } + return new MvCombine(field, delim); + } + @Override public UnresolvedPlan visitGrokCommand(OpenSearchPPLParser.GrokCommandContext ctx) { UnresolvedExpression sourceField = internalVisitExpression(ctx.source_field); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index a1e31e896dc..1ffab94b4a1 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -82,6 +82,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.MinSpanBin; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvCombine; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -463,6 +464,14 @@ public String visitExpand(Expand node, String context) { return StringUtils.format("%s | expand %s", child, field); } + @Override + public String visitMvCombine(MvCombine node, String context) { + String child = node.getChild().getFirst().accept(this, context); + String field = visitExpression(node.getField()); + + return StringUtils.format("%s | mvcombine delim=%s %s", child, MASK_LITERAL, field); + } + /** Build {@link LogicalSort}. */ @Override public String visitSort(Sort node, String context) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvCombineTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvCombineTest.java new file mode 100644 index 00000000000..524f5f7972c --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvCombineTest.java @@ -0,0 +1,276 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import static org.junit.Assert.assertThrows; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +public class CalcitePPLMvCombineTest extends CalcitePPLAbstractTest { + + public CalcitePPLMvCombineTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + + ImmutableList rows = + ImmutableList.of( + // existing "basic" + new Object[] {"basic", "A", 10}, + new Object[] {"basic", "A", 20}, + new Object[] {"basic", "B", 60}, + new Object[] {"basic", "A", 30}, + + // NULL target values case (Splunk-style: nulls do NOT contribute to mv) + new Object[] {"nulls", "A", null}, + new Object[] {"nulls", "A", 10}, + new Object[] {"nulls", "B", null}, + + // single-row case + new Object[] {"single", "Z", 5}); + + schema.add("MVCOMBINE_DATA", new MvCombineDataTable(rows)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testMvCombineBasic() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"basic\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[CAST($2):INTEGER ARRAY NOT NULL])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[COLLECT($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'basic')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `case`, `ip`, CAST(COLLECT(`packets`) FILTER (WHERE `packets` IS NOT NULL) AS" + + " ARRAY< INTEGER >) `packets`\n" + + "FROM `scott`.`MVCOMBINE_DATA`\n" + + "WHERE `case` = 'basic'\n" + + "GROUP BY `case`, `ip`\n" + + "ORDER BY `ip`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvCombineWithNullTargetValues() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"nulls\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[CAST($2):INTEGER ARRAY NOT NULL])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[COLLECT($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'nulls')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `case`, `ip`, CAST(COLLECT(`packets`) FILTER (WHERE `packets` IS NOT NULL) AS" + + " ARRAY< INTEGER >) `packets`\n" + + "FROM `scott`.`MVCOMBINE_DATA`\n" + + "WHERE `case` = 'nulls'\n" + + "GROUP BY `case`, `ip`\n" + + "ORDER BY `ip`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvCombineWithDelimOption_SplunkSyntaxOrder() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"basic\" " + + "| fields case, ip, packets " + + "| mvcombine packets delim='|' " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[CAST($2):INTEGER ARRAY NOT NULL])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[COLLECT($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'basic')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `case`, `ip`, CAST(COLLECT(`packets`) FILTER (WHERE `packets` IS NOT NULL) AS" + + " ARRAY< INTEGER >) `packets`\n" + + "FROM `scott`.`MVCOMBINE_DATA`\n" + + "WHERE `case` = 'basic'\n" + + "GROUP BY `case`, `ip`\n" + + "ORDER BY `ip`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvCombineNonExistentField() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"basic\" " + + "| fields case, ip, packets " + + "| mvcombine does_not_exist"; + + Exception ex = assertThrows(Exception.class, () -> getRelNode(ppl)); + + String msg = String.valueOf(ex.getMessage()); + org.junit.Assert.assertTrue( + "Expected error message to mention missing field. Actual: " + msg, + msg.toLowerCase().contains("does_not_exist") || msg.toLowerCase().contains("field")); + } + + @Test + public void testMvCombineSingleRow() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"single\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[CAST($2):INTEGER ARRAY NOT NULL])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[COLLECT($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'single')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testMvCombineEmptyResult() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"no_such_case\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[CAST($2):INTEGER ARRAY NOT NULL])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[COLLECT($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'no_such_case')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + } + + // ======================================================================== + // Custom ScannableTable for deterministic mvcombine planning tests + // ======================================================================== + + @RequiredArgsConstructor + static class MvCombineDataTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("case", SqlTypeName.VARCHAR) + .nullable(true) + .add("ip", SqlTypeName.VARCHAR) + .nullable(true) + .add("packets", SqlTypeName.INTEGER) + .nullable(true) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 2fd08988f6b..23e40ac0487 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -1010,4 +1010,17 @@ public void testMvfind() { "source=t | eval result=mvfind(array('apple', 'banana', 'apricot'), 'ban.*') | fields" + " result")); } + + @Test + public void testMvcombineCommand() { + assertEquals( + "source=table | mvcombine delim=*** identifier", anonymize("source=t | mvcombine age")); + } + + @Test + public void testMvcombineCommandWithDelim() { + assertEquals( + "source=table | mvcombine delim=*** identifier", + anonymize("source=t | mvcombine age delim=','")); + } }