Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3d828a8
support streamstats simply
ishaoxy Sep 12, 2025
5f4124c
add some tests
ishaoxy Sep 15, 2025
9f726b8
add UT
ishaoxy Sep 17, 2025
089480b
fix some error
ishaoxy Sep 17, 2025
1228778
fix conflicts
ishaoxy Sep 17, 2025
06e2c6a
add global
ishaoxy Sep 19, 2025
5fd9fe6
implement global
ishaoxy Sep 23, 2025
223b15e
implement reset
ishaoxy Sep 24, 2025
a7d8d48
implement all the arguments
ishaoxy Sep 25, 2025
cd56840
fix test
ishaoxy Sep 26, 2025
4f4376c
add all IT, UT and rst doc
ishaoxy Sep 26, 2025
5eeef46
Merge branch 'main' into streamstats-command
ishaoxy Sep 29, 2025
d80b37d
fix anonymizer test
ishaoxy Sep 29, 2025
da99024
fix doctest
ishaoxy Oct 9, 2025
37a7944
fix conflict
ishaoxy Oct 9, 2025
63b0157
modify doc and IT
ishaoxy Oct 20, 2025
4b4e912
add explainIT
ishaoxy Oct 22, 2025
8208a95
Merge branch 'main' into streamstats-command
ishaoxy Oct 22, 2025
02182cb
fix import
ishaoxy Oct 22, 2025
d12932e
fix typo
ishaoxy Oct 22, 2025
b052817
fix doctest
ishaoxy Oct 22, 2025
ffd4ecf
fix explainIT yaml format
ishaoxy Oct 22, 2025
3cf6c06
fix dc nopushdown explainIT
ishaoxy Oct 23, 2025
2601cac
add explainIT for path2 and path3
ishaoxy Oct 27, 2025
7be0782
typo error
ishaoxy Oct 27, 2025
7324bcc
Merge branch 'main' into streamstats-command
yuancu Oct 30, 2025
09c77a8
handle resort case
ishaoxy Oct 30, 2025
e911fec
fix IT
ishaoxy Oct 31, 2025
82713d2
change row_num
ishaoxy Oct 31, 2025
50df38e
Merge branch 'main' into streamstats-check
ishaoxy Oct 31, 2025
197879a
Rule out aggregator from PPLAggregateMergeRule
ishaoxy Oct 31, 2025
bcc28dc
Rule out aggregator from PPLAggregateMergeRule
ishaoxy Oct 31, 2025
d33854b
fix conflict streamstats row_num name in PlanUtils
ishaoxy Nov 3, 2025
6c5adfa
fix explainIT
ishaoxy Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_NAME_MAIN;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_NAME_STREAMSTATS;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_NAME_SUBSEARCH;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_NAME_TOP_RARE;
import static org.opensearch.sql.calcite.utils.PlanUtils.getRelation;
Expand Down Expand Up @@ -1588,7 +1589,6 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
boolean hasReset = node.getResetBefore() != null || node.getResetAfter() != null;

// Local helper column names
final String SEQ_COL = "__stream_seq__"; // global row number
final String RESET_BEFORE_FLAG_COL = "__reset_before_flag__"; // flag for reset_before
final String RESET_AFTER_FLAG_COL = "__reset_after_flag__"; // flag for reset_after
final String SEGMENT_ID_COL = "__seg_id__"; // segment id
Expand All @@ -1604,53 +1604,64 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
leftWithSeg,
node,
groupList,
SEQ_COL,
ROW_NUMBER_COLUMN_NAME_STREAMSTATS,
SEGMENT_ID_COL,
new String[] {SEQ_COL, RESET_BEFORE_FLAG_COL, RESET_AFTER_FLAG_COL, SEGMENT_ID_COL});
new String[] {
ROW_NUMBER_COLUMN_NAME_STREAMSTATS,
RESET_BEFORE_FLAG_COL,
RESET_AFTER_FLAG_COL,
SEGMENT_ID_COL
});
}

// CASE: global=true + window>0 + has group
if (node.isGlobal() && hasWindow && hasGroup) {
// 1. Add global sequence column for sliding window
RexNode streamSeq =
PlanUtils.makeOver(
context,
BuiltinFunctionName.ROW_NUMBER,
null,
List.of(),
List.of(),
List.of(),
WindowFrame.toCurrentRow());
context.relBuilder.projectPlus(context.relBuilder.alias(streamSeq, SEQ_COL));
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_NAME_STREAMSTATS);
context.relBuilder.projectPlus(streamSeq);
RelNode left = context.relBuilder.build();

// 2. Run correlate + aggregate
return buildStreamWindowJoinPlan(
context, left, node, groupList, SEQ_COL, null, new String[] {SEQ_COL});
context,
left,
node,
groupList,
ROW_NUMBER_COLUMN_NAME_STREAMSTATS,
null,
new String[] {ROW_NUMBER_COLUMN_NAME_STREAMSTATS});
}

// Default
RexNode streamSeq =
PlanUtils.makeOver(
context,
BuiltinFunctionName.ROW_NUMBER,
null,
List.of(),
List.of(),
List.of(),
WindowFrame.toCurrentRow());
context.relBuilder.projectPlus(context.relBuilder.alias(streamSeq, SEQ_COL));
if (hasGroup) {
// only build sequence when there is by condition
RexNode streamSeq =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_NAME_STREAMSTATS);
context.relBuilder.projectPlus(streamSeq);
}

List<RexNode> overExpressions =
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
context.relBuilder.projectPlus(overExpressions);

// resort when there is by condition
if (hasGroup) {
context.relBuilder.sort(context.relBuilder.field(SEQ_COL));
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_STREAMSTATS));
context.relBuilder.projectExcept(
context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_STREAMSTATS));
Comment on lines +1635 to +1636
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context.relBuilder.projectExcept was out of the hasGroup branch, now you move it into the branch. Is it on purpose?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change is just a small optimization. It only creates the row_num column and sorts and deletes it when hasGroup is true in the default path. Previously, row_num would be created in all cases but only sorted when hasGroup is true. I think this meets the expectations and does not introduce any logical changes.

}

context.relBuilder.projectExcept(context.relBuilder.field(SEQ_COL));
return context.relBuilder.peek();
}

Expand Down Expand Up @@ -1719,15 +1730,13 @@ private RelNode buildStreamWindowJoinPlan(
private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow node) {
// 1. global sequence to define order
RexNode rowNum =
PlanUtils.makeOver(
context,
BuiltinFunctionName.ROW_NUMBER,
null,
List.of(),
List.of(),
List.of(),
WindowFrame.toCurrentRow());
context.relBuilder.projectPlus(context.relBuilder.alias(rowNum, "__stream_seq__"));
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_NAME_STREAMSTATS);
context.relBuilder.projectPlus(rowNum);

// 2. before/after flags
RexNode beforePred =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ public static boolean isDependentField(RexNode node, Collection<RexNode> baseFie
// to transform a field into such a literal
if (node.getKind() == SqlKind.LITERAL) return true;
if (node.getKind() == SqlKind.INPUT_REF && baseFields.contains(node)) return true;
if (node instanceof RexCall && ((RexCall) node).getOperator().isDeterministic()) {
// Use !isAggregator to rule out window functions like row_number()
if (node instanceof RexCall
&& ((RexCall) node).getOperator().isDeterministic()
&& !((RexCall) node).getOperator().isAggregator()) {
return ((RexCall) node)
.getOperands().stream().allMatch(op -> isDependentField(op, baseFields));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public interface PlanUtils {
String ROW_NUMBER_COLUMN_NAME_TOP_RARE = "_row_number_top_rare_";
String ROW_NUMBER_COLUMN_NAME_MAIN = "_row_number_main_";
String ROW_NUMBER_COLUMN_NAME_SUBSEARCH = "_row_number_subsearch_";
String ROW_NUMBER_COLUMN_NAME_STREAMSTATS = "__stream_seq__";

static SpanUnit intervalUnitToSpanUnit(IntervalUnit unit) {
return switch (unit) {
Expand Down
14 changes: 7 additions & 7 deletions docs/user/ppl/cmd/streamstats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ PPL query::
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+-------------+---------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | running_avg | running_sum | running_count |
|----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+-------------+---------------|
| 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28.0 | 28 | 1 |
| 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | [email protected] | Duke | 32.0 | 32 | 1 |
| 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | [email protected] | Bond | 34.0 | 68 | 2 |
| 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28.0 | 28 | 1 |
| 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | [email protected] | Adams | 33.666666666666664 | 101 | 3 |
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+-------------+---------------+

Expand Down Expand Up @@ -180,13 +180,13 @@ PPL query::
+-------+---------+------------+-------+------+-----+-------------+
| name | country | state | month | year | age | running_avg |
|-------+---------+------------+-------+------+-----+-------------|
| Jake | USA | California | 4 | 2023 | 70 | 70.0 |
| Hello | USA | New York | 4 | 2023 | 30 | 50.0 |
| John | Canada | Ontario | 4 | 2023 | 25 | 25.0 |
| Jane | Canada | Quebec | 4 | 2023 | 20 | 22.5 |
| Jim | Canada | B.C | 4 | 2023 | 27 | 23.5 |
| Peter | Canada | B.C | 4 | 2023 | 57 | 42.0 |
| Rick | Canada | B.C | 4 | 2023 | 70 | 63.5 |
| Jake | USA | California | 4 | 2023 | 70 | 70.0 |
| Hello | USA | New York | 4 | 2023 | 30 | 50.0 |
| David | USA | Washington | 4 | 2023 | 40 | 40.0 |
+-------+---------+------------+-------+------+-----+-------------+

Expand All @@ -195,13 +195,13 @@ PPL query::
+-------+---------+------------+-------+------+-----+-------------+
| name | country | state | month | year | age | running_avg |
|-------+---------+------------+-------+------+-----+-------------|
| Jake | USA | California | 4 | 2023 | 70 | 70.0 |
| Hello | USA | New York | 4 | 2023 | 30 | 50.0 |
| John | Canada | Ontario | 4 | 2023 | 25 | 25.0 |
| Jane | Canada | Quebec | 4 | 2023 | 20 | 22.5 |
| Jim | Canada | B.C | 4 | 2023 | 27 | 23.5 |
| Peter | Canada | B.C | 4 | 2023 | 57 | 42.0 |
| Rick | Canada | B.C | 4 | 2023 | 70 | 63.5 |
| Jake | USA | California | 4 | 2023 | 70 | 70.0 |
| Hello | USA | New York | 4 | 2023 | 30 | 50.0 |
| David | USA | Washington | 4 | 2023 | 40 | 35.0 |
+-------+---------+------------+-------+------+-----+-------------+

Expand All @@ -218,12 +218,12 @@ PPL query::
+-------+---------+------------+-------+------+-----+---------+
| name | country | state | month | year | age | avg_age |
|-------+---------+------------+-------+------+-----+---------|
| Jake | USA | California | 4 | 2023 | 70 | null |
| Hello | USA | New York | 4 | 2023 | 30 | 70.0 |
| John | Canada | Ontario | 4 | 2023 | 25 | null |
| Jane | Canada | Quebec | 4 | 2023 | 20 | 25.0 |
| Jim | Canada | B.C | 4 | 2023 | 27 | null |
| Peter | Canada | B.C | 4 | 2023 | 57 | null |
| Rick | Canada | B.C | 4 | 2023 | 70 | null |
| Jake | USA | California | 4 | 2023 | 70 | null |
| Hello | USA | New York | 4 | 2023 | 30 | 70.0 |
| David | USA | Washington | 4 | 2023 | 40 | null |
+-------+---------+------------+-------+------+-----+---------+
Loading
Loading