diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 24cef144c97..a9b445472a9 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -80,6 +80,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -713,6 +714,11 @@ public LogicalPlan visitExpand(Expand expand, AnalysisContext context) { throw getOnlyForCalciteException("Expand"); } + @Override + public LogicalPlan visitMvExpand(MvExpand node, AnalysisContext context) { + throw getOnlyForCalciteException("MvExpand"); + } + /** Build {@link LogicalTrendline} for Trendline command. */ @Override public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) { diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index a6ef5e7547a..6dd42980879 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -68,6 +68,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -461,4 +462,8 @@ public T visitAddTotals(AddTotals node, C context) { public T visitAddColTotals(AddColTotals node, C context) { return visitChildren(node, context); } + + public T visitMvExpand(MvExpand node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index bf54d2ffd89..c7ea873e43d 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -62,6 +62,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Limit; import org.opensearch.sql.ast.tree.MinSpanBin; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -136,6 +137,10 @@ public Expand expand(UnresolvedPlan input, Field field, String alias) { return new Expand(field, alias).attach(input); } + public static UnresolvedPlan mvexpand(UnresolvedPlan input, Field field, Integer limit) { + return new MvExpand(field, limit).attach(input); + } + public static UnresolvedPlan projectWithArg( UnresolvedPlan input, List argList, UnresolvedExpression... projectList) { return new Project(Arrays.asList(projectList), argList).attach(input); diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java b/core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java new file mode 100644 index 00000000000..29dc89c541b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Field; + +/** AST node representing the {@code mvexpand} PPL command: {@code mvexpand [limit=N]}. */ +@ToString +@EqualsAndHashCode(callSuper = false) +public class MvExpand extends UnresolvedPlan { + + private UnresolvedPlan child; + @Getter private final Field field; + @Getter @Nullable private final Integer limit; + + public MvExpand(Field field, @Nullable Integer limit) { + this.field = field; + this.limit = limit; + } + + @Override + public MvExpand attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitMvExpand(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 19dce3e3609..4857d1bb29a 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -51,6 +51,7 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeFamily; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; @@ -122,6 +123,7 @@ import org.opensearch.sql.ast.tree.Lookup.OutputStrategy; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -846,7 +848,11 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) { .toList(); context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggCall); buildExpandRelNode( - context.relBuilder.field(node.getAlias()), node.getAlias(), node.getAlias(), context); + context.relBuilder.field(node.getAlias()), + node.getAlias(), + node.getAlias(), + null, + context); flattenParsedPattern( node.getAlias(), context.relBuilder.field(node.getAlias()), @@ -3111,11 +3117,82 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) { RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context); String alias = expand.getAlias(); - buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, context); + buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, null, context); return context.relBuilder.peek(); } + /** + * MVExpand command visitor. + * + *

For Calcite remote planning, mvexpand shares the same expansion mechanics as {@link Expand}: + * it unnests the target multivalue field and joins back to the original relation. The additional + * mvexpand semantics (such as an optional per-document limit) are surfaced via the MVExpand AST + * node but reuse the same underlying RelBuilder pipeline as expand at this layer. + * + * @param mvExpand MVExpand command to be visited + * @param context CalcitePlanContext containing the RelBuilder and other context + * @return RelNode representing records with the expanded multi-value field + */ + /** + * MVExpand command visitor. + * + *

For Calcite remote planning, mvexpand reuses the same expansion mechanics as {@link Expand}: + * it unnests the target multivalue field and joins back to the original relation. + * mvexpand-specific semantics (such as an optional per-document limit) are carried by the {@link + * MvExpand} AST node and applied via the limit parameter passed into the shared expansion + * builder. + * + *

Missing-field behavior: if the target field does not exist in the input schema, mvexpand + * produces no rows while keeping the output schema stable. + * + * @param mvExpand MVExpand command to be visited + * @param context CalcitePlanContext containing the RelBuilder and other context + * @return RelNode representing records with the expanded multi-value field + */ + @Override + public RelNode visitMvExpand(MvExpand mvExpand, CalcitePlanContext context) { + visitChildren(mvExpand, context); + + final RelBuilder relBuilder = context.relBuilder; + final Field field = mvExpand.getField(); + final String fieldName = field.getField().toString(); + + // Missing-field: produce no rows (but keep schema stable). + final RelDataType inputType = relBuilder.peek().getRowType(); + final RelDataTypeField inputField = + inputType.getField(fieldName, /*caseSensitive*/ false, /*elideRecord*/ false); + if (inputField == null) { + return buildEmptyResultWithStableSchema(relBuilder, fieldName); + } + + // Resolve field ref using rexVisitor for consistent semantics (same as expand). + final RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(field, context); + + // Enforce ARRAY type before UNNEST so we return SemanticCheckException. + final SqlTypeName actual = arrayFieldRex.getType().getSqlTypeName(); + if (actual != SqlTypeName.ARRAY) { + throw new SemanticCheckException( + String.format( + "Cannot expand field '%s': expected ARRAY type but found %s", fieldName, actual)); + } + + buildExpandRelNode(arrayFieldRex, fieldName, fieldName, mvExpand.getLimit(), context); + return relBuilder.peek(); + } + + private static RelNode buildEmptyResultWithStableSchema(RelBuilder relBuilder, String fieldName) { + final RelDataTypeFactory typeFactory = relBuilder.getTypeFactory(); + final RelDataType arrayAny = + typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.ANY), -1); + + relBuilder.projectPlus( + List.of(relBuilder.alias(relBuilder.getRexBuilder().makeNullLiteral(arrayAny), fieldName))); + + relBuilder.filter(relBuilder.literal(false)); + return relBuilder.peek(); + } + @Override public RelNode visitValues(Values values, CalcitePlanContext context) { if (values.getValues() == null || values.getValues().isEmpty()) { @@ -3360,7 +3437,11 @@ private void flattenParsedPattern( } private void buildExpandRelNode( - RexInputRef arrayFieldRex, String arrayFieldName, String alias, CalcitePlanContext context) { + RexInputRef arrayFieldRex, + String arrayFieldName, + String alias, + @Nullable Integer perDocLimit, + CalcitePlanContext context) { // 3. Capture the outer row in a CorrelationId Holder correlVariable = Holder.empty(); context.relBuilder.variable(correlVariable::set); @@ -3375,14 +3456,17 @@ private void buildExpandRelNode( RelNode leftNode = context.relBuilder.build(); // 5. Build join right node and expand the array field using uncollect - RelNode rightNode = - context - .relBuilder - // fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter - .push(LogicalValues.createOneRow(context.relBuilder.getCluster())) - .project(List.of(correlArrayFieldAccess), List.of(arrayFieldName)) - .uncollect(List.of(), false) - .build(); + context + .relBuilder + // fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter + .push(LogicalValues.createOneRow(context.relBuilder.getCluster())) + .project(List.of(correlArrayFieldAccess), List.of(arrayFieldName)) + .uncollect(List.of(), false); + + if (perDocLimit != null) { + context.relBuilder.limit(0, perDocLimit); + } + RelNode rightNode = context.relBuilder.build(); // 6. Perform a nested-loop join (correlate) between the original table and the expanded // array field. diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index 205f3a0f2e1..95d09033458 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -1108,6 +1108,14 @@ void populate() { OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.INTEGER) .or(OperandTypes.family(SqlTypeFamily.MAP, SqlTypeFamily.ANY)), false)); + // Allow using INTERNAL_ITEM when the element type is unknown/undefined at planning time. + // Some datasets (or Calcite's type inference) may give the element an UNDEFINED type. + // Accept a "ignore" first-argument family so INTERNAL_ITEM(elem, 'key') can still be planned + // and resolved at runtime (fallback semantics handled at execution side). - Used in MVEXPAND + registerOperator( + INTERNAL_ITEM, + SqlStdOperatorTable.ITEM, + PPLTypeChecker.family(SqlTypeFamily.IGNORE, SqlTypeFamily.CHARACTER)); registerOperator( XOR, SqlStdOperatorTable.NOT_EQUALS, diff --git a/docs/category.json b/docs/category.json index 094768d1e6f..886939df8aa 100644 --- a/docs/category.json +++ b/docs/category.json @@ -29,6 +29,7 @@ "user/ppl/cmd/regex.md", "user/ppl/cmd/rename.md", "user/ppl/cmd/multisearch.md", + "user/ppl/cmd/mvexpand.md", "user/ppl/cmd/replace.md", "user/ppl/cmd/rex.md", "user/ppl/cmd/search.md", @@ -79,4 +80,4 @@ "bash_settings": [ "user/ppl/admin/settings.md" ] -} +} \ No newline at end of file diff --git a/docs/user/dql/metadata.rst b/docs/user/dql/metadata.rst index e959a69c8b6..645e65997c2 100644 --- a/docs/user/dql/metadata.rst +++ b/docs/user/dql/metadata.rst @@ -35,7 +35,7 @@ Example 1: Show All Indices Information SQL query:: os> SHOW TABLES LIKE '%' - fetched rows / total rows = 23/23 + fetched rows / total rows = 24/24 +----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | |----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------| @@ -48,6 +48,7 @@ SQL query:: | docTestCluster | null | events_many_hosts | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | events_null | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | json_test | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | mvexpand_logs | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nested | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nyc_taxi | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | occupation | BASE TABLE | null | null | null | null | null | null | @@ -120,3 +121,4 @@ SQL query:: | docTestCluster | null | accounts | firstname | null | text | null | null | null | 10 | 2 | null | null | null | null | null | 1 | | null | null | null | null | NO | | | docTestCluster | null | accounts | lastname | null | text | null | null | null | 10 | 2 | null | null | null | null | null | 10 | | null | null | null | null | NO | | +----------------+-------------+------------+-------------+-----------+-----------+-------------+---------------+----------------+----------------+----------+---------+------------+---------------+------------------+-------------------+------------------+-------------+---------------+--------------+-------------+------------------+------------------+--------------------+ + diff --git a/docs/user/ppl/cmd/mvexpand.md b/docs/user/ppl/cmd/mvexpand.md new file mode 100644 index 00000000000..101b04c5c4c --- /dev/null +++ b/docs/user/ppl/cmd/mvexpand.md @@ -0,0 +1,143 @@ +# mvexpand + +## Description +The `mvexpand` command expands each value in a multivalue (array) field into a separate row. For each document, every element in the specified array field is returned as a new row. + + +## Syntax +``` +mvexpand [limit=] +``` + +- ``: The multivalue (array) field to expand. (Required) +- `limit`: Maximum number of values per document to expand. (Optional) + + +### Output field naming +After `mvexpand`, the expanded value remains under the same field name (for example, `tags` or `ids`). +If the array contains objects, you can reference subfields (for example, `skills.name`). + + +## Examples + +### Example 1: Basic Expansion (single document) +Input document (case "basic") contains three tag values. + +PPL query: +```ppl +source=people +| eval tags = array('error', 'warning', 'info') +| fields tags +| head 1 +| mvexpand tags +| fields tags +``` + +Expected output: +```text +fetched rows / total rows = 3/3 ++---------+ +| tags | +|---------| +| error | +| warning | +| info | ++---------+ +``` + +### Example 2: Expansion with Limit +Input document (case "ids") contains an array of integers; expand and apply limit. + +PPL query: +```ppl +source=people +| eval ids = array(1, 2, 3, 4, 5) +| fields ids +| head 1 +| mvexpand ids limit=3 +| fields ids +``` + +Expected output: +```text +fetched rows / total rows = 3/3 ++-----+ +| ids | +|-----| +| 1 | +| 2 | +| 3 | ++-----+ +``` + +### Example 3: Empty Expansion +This example demonstrates that mvexpand produces no rows when there are no matching input rows. + +PPL query: +```ppl +source=people +| eval tags = array('dummy') +| where false +| fields tags +| head 1 +| mvexpand tags +| fields tags +``` + +Expected output: +```text +fetched rows / total rows = 0/0 ++------+ +| tags | +|------| ++------+ +``` + +### Example 4: Single-value array (case "single") +Single-element array should expand to one row. + +PPL query: +```ppl +source=people +| eval tags = array('error') +| fields tags +| head 1 +| mvexpand tags +| fields tags +``` + +Expected output: +```text +fetched rows / total rows = 1/1 ++-------+ +| tags | +|-------| +| error | ++-------+ +``` + +### Example 5: Missing Field +If the field is missing in the document (case "missing"), no rows are produced. + +PPL query: +```ppl +source=people +| eval some_field = 'x' +| fields some_field +| head 1 +| mvexpand tags +| fields tags +``` + +Expected output: +```text +fetched rows / total rows = 0/0 ++------+ +| tags | +|------| ++------+ +``` + +## Notes about these doctests +- The examples below generate deterministic multivalue fields using `eval` + `array()` so doctests are stable. +- All examples run against a single source index (`people`) and use `head 1` to keep output predictable. \ No newline at end of file diff --git a/docs/user/ppl/index.md b/docs/user/ppl/index.md index 30ad7159182..1e55f0e456d 100644 --- a/docs/user/ppl/index.md +++ b/docs/user/ppl/index.md @@ -46,6 +46,7 @@ source=accounts | [replace command](cmd/replace.md) | 3.4 | experimental (since 3.4) | Replace text in one or more fields in the search result | | [fillnull command](cmd/fillnull.md) | 3.0 | experimental (since 3.0) | Fill null with provided value in one or more fields in the search result. | | [expand command](cmd/expand.md) | 3.1 | experimental (since 3.1) | Transform a single document into multiple documents by expanding a nested array field. | +| [mvexpand command](cmd/mvexpand.md) | 3.4 | experimental (since 3.4) | Expand a multi-valued field into separate documents (one per value). | | [flatten command](cmd/flatten.md) | 3.1 | experimental (since 3.1) | Flatten a struct or an object field into separate fields in a document. | | [table command](cmd/table.md) | 3.3 | experimental (since 3.3) | Keep or remove fields from the search result using enhanced syntax options. | | [stats command](cmd/stats.md) | 1.0 | stable (since 1.0) | Calculate aggregation from search results. | @@ -78,8 +79,9 @@ source=accounts | [describe command](cmd/describe.md) | 2.1 | stable (since 2.1) | Query the metadata of an index. | | [explain command](cmd/explain.md) | 3.1 | stable (since 3.1) | Explain the plan of query. | | [show datasources command](cmd/showdatasources.md) | 2.4 | stable (since 2.4) | Query datasources configured in the PPL engine. | - | [addtotals command](cmd/addtotals.md) | 3.4 | stable (since 3.4) | Adds row and column values and appends a totals column and row. | - | [addcoltotals command](cmd/addcoltotals.md) | 3.4 | stable (since 3.4) | Adds column values and appends a totals row. | +| [addtotals command](cmd/addtotals.md) | 3.4 | stable (since 3.4) | Adds row and column values and appends a totals column and row. | +| [addcoltotals command](cmd/addcoltotals.md) | 3.4 | stable (since 3.4) | Adds column values and appends a totals row. | + - [Syntax](cmd/syntax.md) - PPL query structure and command syntax formatting * **Functions** diff --git a/doctest/test_data/mvexpand_logs.json b/doctest/test_data/mvexpand_logs.json new file mode 100644 index 00000000000..01f08ed009f --- /dev/null +++ b/doctest/test_data/mvexpand_logs.json @@ -0,0 +1,6 @@ +{"case":"basic","tags":[{"value":"error"},{"value":"warning"},{"value":"info"}]} +{"case":"empty","tags":[]} +{"case":"null","tags":null} +{"case":"single","tags":[{"value":"error"}]} +{"case":"ids","ids":[{"value":1},{"value":2},{"value":3},{"value":4},{"value":5}]} +{"case":"missing","other":[1,2]} diff --git a/doctest/test_docs.py b/doctest/test_docs.py index e57c41d6827..11560a44687 100644 --- a/doctest/test_docs.py +++ b/doctest/test_docs.py @@ -57,7 +57,8 @@ 'otellogs': 'otellogs.json', 'time_data': 'time_test_data.json', 'time_data2': 'time_test_data2.json', - 'time_test': 'time_test.json' + 'time_test': 'time_test.json', + 'mvexpand_logs': 'mvexpand_logs.json', } DEBUG_MODE = os.environ.get('DOCTEST_DEBUG', 'false').lower() == 'true' diff --git a/doctest/test_mapping/mvexpand_logs.json b/doctest/test_mapping/mvexpand_logs.json new file mode 100644 index 00000000000..eea16ae9b5f --- /dev/null +++ b/doctest/test_mapping/mvexpand_logs.json @@ -0,0 +1,24 @@ +{ + "mappings": { + "properties": { + "case": { + "type": "keyword" + }, + "tags": { + "type": "nested", + "properties": { + "value": { "type": "keyword" } + } + }, + "ids": { + "type": "nested", + "properties": { + "value": { "type": "integer" } + } + }, + "other": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java index c254fb47c44..72f3d8d4063 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java @@ -34,6 +34,7 @@ CalciteDedupCommandIT.class, CalciteDescribeCommandIT.class, CalciteExpandCommandIT.class, + CalciteMvExpandCommandIT.class, CalciteFieldsCommandIT.class, CalciteFillNullCommandIT.class, CalciteFlattenCommandIT.class, diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index f00fbfec75b..7a403b3a3b7 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -44,6 +44,7 @@ public void init() throws Exception { loadIndex(Index.WORKER); loadIndex(Index.WORK_INFORMATION); loadIndex(Index.WEBLOG); + loadIndex(Index.MVEXPAND_EDGE_CASES); loadIndex(Index.DATA_TYPE_ALIAS); } @@ -314,6 +315,14 @@ public void testExplainMultisearchTimestampInterleaving() throws IOException { assertYamlEqualsIgnoreId(expected, result); } + @Test + public void testMvexpandExplain() throws IOException { + // mvexpand explain plan validation + String expected = loadExpectedPlan("explain_mvexpand.yaml"); + explainQueryYaml( + "source=mvexpand_edge_cases | eval skills_arr = array(1, 2, 3) | mvexpand skills_arr"); + } + // Only for Calcite @Test public void testExplainIsBlank() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvExpandCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvExpandCommandIT.java new file mode 100644 index 00000000000..cd1de285c26 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvExpandCommandIT.java @@ -0,0 +1,347 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteMvExpandCommandIT extends PPLIntegTestCase { + + private static final String INDEX = Index.MVEXPAND_EDGE_CASES.getName(); + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + deleteIndexIfExists(INDEX); + + // Single shared mapping for ALL cases (no extra indices) + // - skills: nested (mvexpand target) + // - skills_not_array: keyword (semantic error test) + // - skills_int: integer (semantic error test) + final String mapping = + "{ \"mappings\": { \"properties\": { " + + "\"username\": { \"type\": \"keyword\" }," + + "\"skills\": { \"type\": \"nested\" }," + + "\"skills_not_array\": { \"type\": \"keyword\" }," + + "\"skills_int\": { \"type\": \"integer\" }" + + "} } }"; + + createIndex(INDEX, mapping); + + bulkInsert( + INDEX, + "{\"username\":\"happy\",\"skills\":[{\"name\":\"python\"},{\"name\":\"java\"},{\"name\":\"sql\"}]}", + "{\"username\":\"single\",\"skills\":[{\"name\":\"go\"}]}", + "{\"username\":\"empty\",\"skills\":[]}", + "{\"username\":\"nullskills\",\"skills\":null}", + "{\"username\":\"noskills\"}", + "{\"username\":\"partial\",\"skills\":[{\"name\":\"kotlin\"},{\"level\":\"intern\"},{\"name\":null}]}", + "{\"username\":\"mixed_shapes\",\"skills\":[{\"name\":\"elixir\",\"meta\":{\"years\":3}},{\"name\":\"haskell\"}]}", + "{\"username\":\"duplicate\",\"skills\":[{\"name\":\"dup\"},{\"name\":\"dup\"}]}", + "{\"username\":\"complex\",\"skills\":[{\"name\":\"ml\",\"level\":\"expert\"},{\"name\":\"ai\"},{\"level\":\"novice\"}]}", + "{\"username\":\"large\",\"skills\":[" + + "{\"name\":\"s1\"},{\"name\":\"s2\"},{\"name\":\"s3\"},{\"name\":\"s4\"},{\"name\":\"s5\"}," + + "{\"name\":\"s6\"},{\"name\":\"s7\"},{\"name\":\"s8\"},{\"name\":\"s9\"},{\"name\":\"s10\"}" + + "]}", + "{\"username\":\"hetero_types\",\"skills\":[{\"level\":\"senior\"},{\"level\":3}]}", + "{\"username\":\"u1\",\"skills_not_array\":\"scala\"}", + "{\"username\":\"u_int\",\"skills_int\":5}", + "{\"username\":\"limituser\",\"skills\":[{\"name\":\"a\"},{\"name\":\"b\"},{\"name\":\"c\"},{\"name\":\"d\"},{\"name\":\"e\"}]}"); + + refreshIndex(INDEX); + } + + @Test + public void testMvexpandSingleElement() throws Exception { + String q1 = + String.format( + "source=%s | mvexpand skills | where username='single' | fields username, skills", + INDEX); + JSONObject r1 = executeQuery(q1); + + assertSingleRowNestedFieldEquals(r1, "skills", "name", "go"); + + String q2 = + String.format( + "source=%s | mvexpand skills | where username='single' | fields username, skills.name", + INDEX); + JSONObject r2 = executeQuery(q2); + verifyDataRows(r2, rows("single", "go")); + } + + /** + * Asserts the result has exactly one row and that the given column is a MAP/object containing + * nestedKey=nestedValue. + */ + private static void assertSingleRowNestedFieldEquals( + JSONObject result, String mapColumn, String nestedKey, String expectedValue) { + var dataRows = result.getJSONArray("datarows"); + Assertions.assertEquals(1, dataRows.length(), "Expected exactly one row"); + + var schema = result.getJSONArray("schema"); + + int mapIdx = -1; + for (int i = 0; i < schema.length(); i++) { + if (mapColumn.equals(schema.getJSONObject(i).getString("name"))) { + mapIdx = i; + break; + } + } + Assertions.assertTrue(mapIdx >= 0, "Column not found in schema: " + mapColumn); + + var row0 = dataRows.getJSONArray(0); + var skillsObj = row0.getJSONObject(mapIdx); // this is the MAP/object + Assertions.assertEquals(expectedValue, skillsObj.optString(nestedKey, null)); + } + + @Test + public void testMvexpandEmptyArray() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='empty' | fields username, skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result); + } + + @Test + public void testMvexpandNullArray() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='nullskills' | fields username," + + " skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result); + } + + @Test + public void testMvexpandNoArrayField() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='noskills' | fields username," + + " skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result); + } + + @Test + public void testMvexpandDuplicate() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='duplicate' | fields username," + + " skills.name | sort skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result, rows("duplicate", "dup"), rows("duplicate", "dup")); + } + + @Test + public void testMvexpandHappyMultipleElements() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='happy' | fields username, skills.name |" + + " sort skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result, rows("happy", "java"), rows("happy", "python"), rows("happy", "sql")); + } + + @Test + public void testMvexpandPartialElementMissingName() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='partial' | fields username, skills.name" + + " | sort skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows( + result, + rows("partial", "kotlin"), + rows("partial", (String) null), + rows("partial", (String) null)); + } + + @Test + public void testMvexpandMixedShapesKeepsAllElements() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='mixed_shapes' | fields username," + + " skills.name | sort skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyDataRows(result, rows("mixed_shapes", "elixir"), rows("mixed_shapes", "haskell")); + } + + @Test + public void testMvexpandFlattenedSchemaPresence() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='complex' | fields username," + + " skills.level, skills.name", + INDEX); + JSONObject result = executeQuery(query); + + verifySchema( + result, + schema("username", "string"), + schema("skills.level", "string"), + schema("skills.name", "string")); + + verifyDataRows( + result, + rows("complex", "expert", "ml"), + rows("complex", (String) null, "ai"), + rows("complex", "novice", (String) null)); + } + + @Test + public void testMvexpandOnNonArrayFieldMapping() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills_not_array | where username='u1' | fields username," + + " skills_not_array", + INDEX); + + ResponseException ex = assertThrows(ResponseException.class, () -> executeQuery(query)); + Assertions.assertTrue( + ex.getMessage() + .contains( + "Cannot expand field 'skills_not_array': expected ARRAY type but found VARCHAR")); + } + + @Test + public void testMvexpandMissingFieldReturnsEmpty() throws Exception { + // single-index version: username='noskills' doc has no "skills" field at all + String query = + String.format( + "source=%s | mvexpand skills | where username='noskills' | fields username, skills", + INDEX); + + JSONObject result = executeQuery(query); + verifyDataRows(result); + } + + @Test + public void testMvexpandLimitParameter() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills limit=3 | where username='limituser' | fields username," + + " skills.name", + INDEX); + JSONObject result = executeQuery(query); + verifyNumOfRows(result, 3); + verifyDataRows(result, rows("limituser", "a"), rows("limituser", "b"), rows("limituser", "c")); + } + + @Test + public void testMvexpandTypeInferenceForHeterogeneousSubfields() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='hetero_types' | fields username," + + " skills.level", + INDEX); + JSONObject result = executeQuery(query); + + verifyDataRows(result, rows("hetero_types", "senior"), rows("hetero_types", "3")); + } + + @Test + public void testMvexpandLargeArrayElements() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills | where username='large' | fields username, skills.name |" + + " sort skills.name", + INDEX); + JSONObject result = executeQuery(query); + + verifyNumOfRows(result, 10); + + verifyDataRows( + result, + rows("large", "s1"), + rows("large", "s2"), + rows("large", "s3"), + rows("large", "s4"), + rows("large", "s5"), + rows("large", "s6"), + rows("large", "s7"), + rows("large", "s8"), + rows("large", "s9"), + rows("large", "s10")); + } + + @Test + public void testMvexpandOnIntegerFieldMappingThrowsSemantic() throws Exception { + String query = + String.format( + "source=%s | mvexpand skills_int | where username='u_int' | fields username," + + " skills_int", + INDEX); + + ResponseException ex = assertThrows(ResponseException.class, () -> executeQuery(query)); + Assertions.assertTrue( + ex.getMessage().contains("Cannot expand field") || ex.getMessage().contains("Semantic"), + "Expected semantic error for non-array field, got: " + ex.getMessage()); + } + + private static void createIndex(String index, String mappingJson) throws IOException { + Request request = new Request("PUT", "/" + index); + request.setJsonEntity(mappingJson); + PPLIntegTestCase.adminClient().performRequest(request); + } + + private static void deleteIndexIfExists(String index) throws IOException { + try { + Request request = new Request("DELETE", "/" + index); + PPLIntegTestCase.adminClient().performRequest(request); + } catch (ResponseException e) { + if (e.getResponse().getStatusLine().getStatusCode() != 404) { + throw e; + } + } + } + + private static void bulkInsert(String index, String... docs) throws IOException { + StringBuilder bulk = new StringBuilder(); + int nextAutoId = 1; + for (String doc : docs) { + String id; + String json; + if (doc.contains("|")) { + String[] parts = doc.split("\\|", 2); + id = parts[0]; + json = parts[1]; + } else { + id = String.valueOf(nextAutoId++); + json = doc; + } + bulk.append("{\"index\":{\"_id\":").append(id).append("}}\n"); + bulk.append(json).append("\n"); + } + Request request = new Request("POST", "/" + index + "/_bulk?refresh=true"); + request.setJsonEntity(bulk.toString()); + PPLIntegTestCase.adminClient().performRequest(request); + } + + private static void refreshIndex(String index) throws IOException { + Request request = new Request("POST", "/" + index + "/_refresh"); + PPLIntegTestCase.adminClient().performRequest(request); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 50ee11b765a..e2c162c7a76 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -699,6 +699,11 @@ public enum Index { "_doc", getNestedSimpleIndexMapping(), "src/test/resources/nested_simple.json"), + MVEXPAND_EDGE_CASES( + "mvexpand_edge_cases", + "mvexpand_edge_cases", + getMappingFile("mvexpand_edge_cases_mapping.json"), + "src/test/resources/mvexpand_edge_cases.json"), DEEP_NESTED( TestsConstants.TEST_INDEX_DEEP_NESTED, "_doc", diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_mvexpand.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvexpand.yaml new file mode 100644 index 00000000000..3aba9e30986 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvexpand.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(username=[$0], skills_arr=[$1]) + LogicalUnnest + LogicalProject(username=[$0], skills_arr=[$1]) + CalciteLogicalIndexScan(table=[[OpenSearch, mvexpand_edge_cases]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], proj#0..1=[{exprs}]) + EnumerableUnnest + EnumerableCalc(expr#0..2=[{inputs}], proj#0..1=[{exprs}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, mvexpand_edge_cases]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvexpand.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvexpand.yaml new file mode 100644 index 00000000000..3aba9e30986 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvexpand.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(username=[$0], skills_arr=[$1]) + LogicalUnnest + LogicalProject(username=[$0], skills_arr=[$1]) + CalciteLogicalIndexScan(table=[[OpenSearch, mvexpand_edge_cases]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], proj#0..1=[{exprs}]) + EnumerableUnnest + EnumerableCalc(expr#0..2=[{inputs}], proj#0..1=[{exprs}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, mvexpand_edge_cases]]) diff --git a/integ-test/src/test/resources/mvexpand_edge_cases.json b/integ-test/src/test/resources/mvexpand_edge_cases.json new file mode 100644 index 00000000000..662769d89b8 --- /dev/null +++ b/integ-test/src/test/resources/mvexpand_edge_cases.json @@ -0,0 +1,18 @@ +{"index":{}} +{"username":"happy","skills":[{"name":"python"},{"name":"java"},{"name":"sql"}]} +{"index":{}} +{"username":"single","skills":[{"name":"go"}]} +{"index":{}} +{"username":"empty","skills":[]} +{"index":{}} +{"username":"nullskills","skills":null} +{"index":{}} +{"username":"noskills"} +{"index":{}} +{"username":"missingattr","skills":[{"name":"c"},{"level":"advanced"}]} +{"index":{}} +{"username":"complex","skills":[{"name":"ml","level":"expert"},{"name":"ai"},{"level":"novice"}]} +{"index":{}} +{"username":"duplicate","skills":[{"name":"dup"},{"name":"dup"}]} +{"index":{}} +{"username":"large","skills":[{"name":"s1"},{"name":"s2"},{"name":"s3"},{"name":"s4"},{"name":"s5"},{"name":"s6"},{"name":"s7"},{"name":"s8"},{"name":"s9"},{"name":"s10"}]} diff --git a/integ-test/src/test/resources/mvexpand_edge_cases_mapping.json b/integ-test/src/test/resources/mvexpand_edge_cases_mapping.json new file mode 100644 index 00000000000..0d33eb4914c --- /dev/null +++ b/integ-test/src/test/resources/mvexpand_edge_cases_mapping.json @@ -0,0 +1,8 @@ +{ + "mappings": { + "properties": { + "username": { "type": "keyword" }, + "skills": { "type": "nested" } + } + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 71162e81bd8..cd58d920938 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -54,6 +54,7 @@ ADDCOLTOTALS: 'ADDCOLTOTALS'; ROW: 'ROW'; COL: 'COL'; EXPAND: 'EXPAND'; +MVEXPAND: 'MVEXPAND'; SIMPLE_PATTERN: 'SIMPLE_PATTERN'; BRAIN: 'BRAIN'; VARIABLE_COUNT_THRESHOLD: 'VARIABLE_COUNT_THRESHOLD'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 7045796a03c..ad6c2343a2b 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -80,6 +80,7 @@ commands | addcoltotalsCommand | appendCommand | expandCommand + | mvexpandCommand | flattenCommand | reverseCommand | regexCommand @@ -118,6 +119,7 @@ commandName | ML | FILLNULL | EXPAND + | MVEXPAND | FLATTEN | TRENDLINE | TIMECHART @@ -531,6 +533,10 @@ expandCommand : EXPAND fieldExpression (AS alias = qualifiedName)? ; +mvexpandCommand + : MVEXPAND fieldExpression (LIMIT EQUAL INTEGER_LITERAL)? + ; + flattenCommand : FLATTEN fieldExpression (AS aliases = identifierSeq)? ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 3f4f3049365..4badd758659 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -91,6 +91,7 @@ import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.MinSpanBin; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -868,6 +869,14 @@ public UnresolvedPlan visitExpandCommand(OpenSearchPPLParser.ExpandCommandContex return new Expand(fieldExpression, alias); } + @Override + public UnresolvedPlan visitMvexpandCommand(OpenSearchPPLParser.MvexpandCommandContext ctx) { + Field field = (Field) expressionBuilder.visit(ctx.fieldExpression()); + Integer limit = + ctx.INTEGER_LITERAL() != null ? Integer.parseInt(ctx.INTEGER_LITERAL().getText()) : null; + return new MvExpand(field, limit); + } + @Override public UnresolvedPlan visitGrokCommand(OpenSearchPPLParser.GrokCommandContext ctx) { UnresolvedExpression sourceField = internalVisitExpression(ctx.source_field); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 7d04ad8e6ad..db301e276be 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -82,6 +82,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.MinSpanBin; import org.opensearch.sql.ast.tree.Multisearch; +import org.opensearch.sql.ast.tree.MvExpand; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -654,6 +655,16 @@ public String visitAppend(Append node, String context) { return StringUtils.format("%s | append [%s ]", child, subsearch); } + @Override + public String visitMvExpand(MvExpand node, String context) { + String child = node.getChild().get(0).accept(this, context); + String field = MASK_COLUMN; // Always anonymize field names + if (node.getLimit() != null) { + return StringUtils.format("%s | mvexpand %s limit=%s", child, field, MASK_LITERAL); + } + return StringUtils.format("%s | mvexpand %s", child, field); + } + @Override public String visitMultisearch(Multisearch node, String context) { List anonymizedSubsearches = new ArrayList<>(); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvExpandTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvExpandTest.java new file mode 100644 index 00000000000..01b1bdf52db --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvExpandTest.java @@ -0,0 +1,185 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.List; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Assert; +import org.junit.Test; + +public class CalcitePPLMvExpandTest extends CalcitePPLAbstractTest { + + public CalcitePPLMvExpandTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + /** + * There is no existing table with arrays. We create one for test purpose. + * + *

This mirrors CalcitePPLExpandTest.TableWithArray. + */ + public static class TableWithArray implements Table { + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("DEPTNO", SqlTypeName.INTEGER) + .add( + "EMPNOS", + factory.createArrayType(factory.createSqlType(SqlTypeName.INTEGER), -1)) + .build(); + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + // Add an empty table with name DEPT for test purpose + schema.add("DEPT", new TableWithArray()); + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testMvExpandBasic() { + String ppl = "source=DEPT | mvexpand EMPNOS"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(DEPTNO=[$0], EMPNOS=[$2])\n" + + " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + " Uncollect\n" + + " LogicalProject(EMPNOS=[$cor0.EMPNOS])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `$cor0`.`DEPTNO`, `t00`.`EMPNOS`\n" + + "FROM `scott`.`DEPT` `$cor0`,\n" + + "LATERAL UNNEST((SELECT `$cor0`.`EMPNOS`\n" + + "FROM (VALUES (0)) `t` (`ZERO`))) `t00` (`EMPNOS`)"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvExpandWithLimitParameter() { + String ppl = "source=DEPT | mvexpand EMPNOS limit=2"; + RelNode root = getRelNode(ppl); + + assertContains(root, "LogicalCorrelate"); + assertContains(root, "Uncollect"); + + assertAnyContains(root, "fetch=", "LIMIT", "RowNumber", "Window"); + } + + @Test + public void testMvExpandProjectNested() { + String ppl = "source=DEPT | mvexpand EMPNOS | fields DEPTNO, EMPNOS"; + RelNode root = getRelNode(ppl); + + assertContains(root, "LogicalCorrelate"); + assertContains(root, "Uncollect"); + assertContains(root, "LogicalProject"); + } + + @Test + public void testMvExpandEmptyOrNullArray() { + RelNode root = getRelNode("source=DEPT | where isnull(EMPNOS) | mvexpand EMPNOS"); + assertContains(root, "LogicalCorrelate"); + assertContains(root, "Uncollect"); + } + + @Test + public void testMvExpandWithDuplicates() { + RelNode root = getRelNode("source=DEPT | where DEPTNO in (10, 10, 20) | mvexpand EMPNOS"); + assertContains(root, "LogicalCorrelate"); + assertContains(root, "Uncollect"); + } + + @Test + public void testMvExpandLargeArray() { + RelNode root = getRelNode("source=DEPT | where DEPTNO = 999 | mvexpand EMPNOS"); + assertContains(root, "LogicalCorrelate"); + assertContains(root, "Uncollect"); + } + + @Test + public void testMvExpandPrimitiveArray() { + RelNode root = getRelNode("source=DEPT | mvexpand EMPNOS"); + assertContains(root, "LogicalCorrelate"); + assertContains(root, "Uncollect"); + } + + private static void assertContains(RelNode root, String token) { + String plan = root.explain(); + Assert.assertTrue( + "Expected plan to contain [" + token + "] but got:\n" + plan, plan.contains(token)); + } + + private static void assertAnyContains(RelNode root, String... tokens) { + String plan = root.explain(); + for (String token : tokens) { + if (plan.contains(token)) { + return; + } + } + Assert.fail( + "Expected plan to contain one of " + Arrays.toString(tokens) + " but got:\n" + plan); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 2fd08988f6b..3ef3f5fd8c6 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -872,6 +872,18 @@ public void testMvzip() { "source=t | eval result=mvzip(array('a', 'b'), array('x', 'y'), '|') | fields result")); } + @Test + public void testMvexpandCommand() { + assertEquals("source=table | mvexpand identifier", anonymize("source=t | mvexpand skills")); + } + + @Test + public void testMvexpandCommandWithLimit() { + assertEquals( + "source=table | mvexpand identifier limit=***", + anonymize("source=t | mvexpand skills limit=5")); + } + @Test public void testSplit() { // Test split with delimiter