1515import static org .opensearch .sql .ast .tree .Sort .SortOrder .ASC ;
1616import static org .opensearch .sql .ast .tree .Sort .SortOrder .DESC ;
1717import static org .opensearch .sql .calcite .utils .PlanUtils .ROW_NUMBER_COLUMN_FOR_DEDUP ;
18+ import static org .opensearch .sql .calcite .utils .PlanUtils .ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP ;
1819import static org .opensearch .sql .calcite .utils .PlanUtils .ROW_NUMBER_COLUMN_FOR_MAIN ;
1920import static org .opensearch .sql .calcite .utils .PlanUtils .ROW_NUMBER_COLUMN_FOR_RARE_TOP ;
2021import static org .opensearch .sql .calcite .utils .PlanUtils .ROW_NUMBER_COLUMN_FOR_STREAMSTATS ;
4849import org .apache .calcite .rel .RelNode ;
4950import org .apache .calcite .rel .core .Aggregate ;
5051import org .apache .calcite .rel .core .JoinRelType ;
51- import org .apache .calcite .rel .hint .HintStrategyTable ;
52- import org .apache .calcite .rel .hint .RelHint ;
53- import org .apache .calcite .rel .logical .LogicalAggregate ;
5452import org .apache .calcite .rel .logical .LogicalValues ;
5553import org .apache .calcite .rel .type .RelDataType ;
5654import org .apache .calcite .rel .type .RelDataTypeFamily ;
@@ -1054,7 +1052,7 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
10541052 List <String > intendedGroupKeyAliases = getGroupKeyNamesAfterAggregation (reResolved .getLeft ());
10551053 context .relBuilder .aggregate (
10561054 context .relBuilder .groupKey (reResolved .getLeft ()), reResolved .getRight ());
1057- if (hintBucketNonNull ) addIgnoreNullBucketHintToAggregate (context );
1055+ if (hintBucketNonNull ) PlanUtils . addIgnoreNullBucketHintToAggregate (context . relBuilder );
10581056 // During aggregation, Calcite projects both input dependencies and output group-by fields.
10591057 // When names conflict, Calcite adds numeric suffixes (e.g., "value0").
10601058 // Apply explicit renaming to restore the intended aliases.
@@ -1316,7 +1314,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13161314 : duplicatedFieldNames .stream ()
13171315 .map (a -> (RexNode ) context .relBuilder .field (a ))
13181316 .toList ();
1319- buildDedupNotNull (context , dedupeFields , allowedDuplication );
1317+ buildDedupNotNull (context , dedupeFields , allowedDuplication , true );
13201318 }
13211319 context .relBuilder .join (
13221320 JoinAndLookupUtils .translateJoinType (node .getJoinType ()), joinCondition );
@@ -1372,7 +1370,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13721370 List <RexNode > dedupeFields =
13731371 getRightColumnsInJoinCriteria (context .relBuilder , joinCondition );
13741372
1375- buildDedupNotNull (context , dedupeFields , allowedDuplication );
1373+ buildDedupNotNull (context , dedupeFields , allowedDuplication , true );
13761374 }
13771375 context .relBuilder .join (
13781376 JoinAndLookupUtils .translateJoinType (node .getJoinType ()), joinCondition );
@@ -1537,24 +1535,20 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
15371535 if (keepEmpty ) {
15381536 buildDedupOrNull (context , dedupeFields , allowedDuplication );
15391537 } else {
1540- buildDedupNotNull (context , dedupeFields , allowedDuplication );
1538+ buildDedupNotNull (context , dedupeFields , allowedDuplication , false );
15411539 }
15421540 return context .relBuilder .peek ();
15431541 }
15441542
15451543 private static void buildDedupOrNull (
15461544 CalcitePlanContext context , List <RexNode > dedupeFields , Integer allowedDuplication ) {
15471545 /*
1548- * | dedup 2 a, b keepempty=false
1549- * DropColumns('_row_number_dedup_ )
1550- * +- Filter ('_row_number_dedup_ <= n OR isnull('a) OR isnull('b) )
1551- * +- Window [row_number () windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], [' a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST]
1546+ * | dedup 2 a, b keepempty=true
1547+ * LogicalProject(... )
1548+ * +- LogicalFilter(condition=[OR(IS NULL(a), IS NULL(b), <=(_row_number_dedup_, 1))] )
1549+ * +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER () OVER (PARTITION BY a, b ORDER BY a, b)])
15521550 * +- ...
15531551 */
1554- // Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST,
1555- // specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], ['a, 'b], ['a
1556- // ASC
1557- // NULLS FIRST, 'b ASC NULLS FIRST]
15581552 RexNode rowNumber =
15591553 context
15601554 .relBuilder
@@ -1577,16 +1571,21 @@ private static void buildDedupOrNull(
15771571 }
15781572
15791573 private static void buildDedupNotNull (
1580- CalcitePlanContext context , List <RexNode > dedupeFields , Integer allowedDuplication ) {
1574+ CalcitePlanContext context ,
1575+ List <RexNode > dedupeFields ,
1576+ Integer allowedDuplication ,
1577+ boolean fromJoinMaxOption ) {
15811578 /*
15821579 * | dedup 2 a, b keepempty=false
1583- * DropColumns('_row_number_dedup_ )
1584- * +- Filter ('_row_number_dedup_ <= n )
1585- * +- Window [row_number () windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], [' a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST]
1586- * +- Filter (isnotnull('a) AND isnotnull('b) )
1587- * +- ...
1580+ * LogicalProject(... )
1581+ * +- LogicalFilter(condition=[<=(_row_number_dedup_, n)]) )
1582+ * +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER () OVER (PARTITION BY a, b ORDER BY a, b)])
1583+ * +- LogicalFilter(condition=[AND(IS NOT NULL(a), IS NOT NULL(b))] )
1584+ * +- ...
15881585 */
15891586 // Filter (isnotnull('a) AND isnotnull('b))
1587+ String rowNumberAlias =
1588+ fromJoinMaxOption ? ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP : ROW_NUMBER_COLUMN_FOR_DEDUP ;
15901589 context .relBuilder .filter (
15911590 context .relBuilder .and (dedupeFields .stream ().map (context .relBuilder ::isNotNull ).toList ()));
15921591 // Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST,
@@ -1600,15 +1599,15 @@ private static void buildDedupNotNull(
16001599 .partitionBy (dedupeFields )
16011600 .orderBy (dedupeFields )
16021601 .rowsTo (RexWindowBounds .CURRENT_ROW )
1603- .as (ROW_NUMBER_COLUMN_FOR_DEDUP );
1602+ .as (rowNumberAlias );
16041603 context .relBuilder .projectPlus (rowNumber );
1605- RexNode _row_number_dedup_ = context .relBuilder .field (ROW_NUMBER_COLUMN_FOR_DEDUP );
1604+ RexNode rowNumberField = context .relBuilder .field (rowNumberAlias );
16061605 // Filter ('_row_number_dedup_ <= n)
16071606 context .relBuilder .filter (
16081607 context .relBuilder .lessThanOrEqual (
1609- _row_number_dedup_ , context .relBuilder .literal (allowedDuplication )));
1608+ rowNumberField , context .relBuilder .literal (allowedDuplication )));
16101609 // DropColumns('_row_number_dedup_)
1611- context .relBuilder .projectExcept (_row_number_dedup_ );
1610+ context .relBuilder .projectExcept (rowNumberField );
16121611 }
16131612
16141613 @ Override
@@ -2395,25 +2394,6 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
23952394 return context .relBuilder .peek ();
23962395 }
23972396
2398- private static void addIgnoreNullBucketHintToAggregate (CalcitePlanContext context ) {
2399- final RelHint statHits =
2400- RelHint .builder ("stats_args" ).hintOption (Argument .BUCKET_NULLABLE , "false" ).build ();
2401- assert context .relBuilder .peek () instanceof LogicalAggregate
2402- : "Stats hits should be added to LogicalAggregate" ;
2403- context .relBuilder .hints (statHits );
2404- context
2405- .relBuilder
2406- .getCluster ()
2407- .setHintStrategies (
2408- HintStrategyTable .builder ()
2409- .hintStrategy (
2410- "stats_args" ,
2411- (hint , rel ) -> {
2412- return rel instanceof LogicalAggregate ;
2413- })
2414- .build ());
2415- }
2416-
24172397 @ Override
24182398 public RelNode visitTableFunction (TableFunction node , CalcitePlanContext context ) {
24192399 throw new CalciteUnsupportedException ("Table function is unsupported in Calcite" );
0 commit comments