Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3d828a8
support streamstats simply
ishaoxy Sep 12, 2025
5f4124c
add some tests
ishaoxy Sep 15, 2025
9f726b8
add UT
ishaoxy Sep 17, 2025
089480b
fix some error
ishaoxy Sep 17, 2025
1228778
fix conflicts
ishaoxy Sep 17, 2025
06e2c6a
add global
ishaoxy Sep 19, 2025
5fd9fe6
implement global
ishaoxy Sep 23, 2025
223b15e
implement reset
ishaoxy Sep 24, 2025
a7d8d48
implement all the arguments
ishaoxy Sep 25, 2025
cd56840
fix test
ishaoxy Sep 26, 2025
4f4376c
add all IT, UT and rst doc
ishaoxy Sep 26, 2025
5eeef46
Merge branch 'main' into streamstats-command
ishaoxy Sep 29, 2025
d80b37d
fix anonymizer test
ishaoxy Sep 29, 2025
da99024
fix doctest
ishaoxy Oct 9, 2025
37a7944
fix conflict
ishaoxy Oct 9, 2025
63b0157
modify doc and IT
ishaoxy Oct 20, 2025
4b4e912
add explainIT
ishaoxy Oct 22, 2025
8208a95
Merge branch 'main' into streamstats-command
ishaoxy Oct 22, 2025
02182cb
fix import
ishaoxy Oct 22, 2025
d12932e
fix typo
ishaoxy Oct 22, 2025
b052817
fix doctest
ishaoxy Oct 22, 2025
ffd4ecf
fix explainIT yaml format
ishaoxy Oct 22, 2025
3cf6c06
fix dc nopushdown explainIT
ishaoxy Oct 23, 2025
2601cac
add explainIT for path2 and path3
ishaoxy Oct 27, 2025
7be0782
typo error
ishaoxy Oct 27, 2025
7324bcc
Merge branch 'main' into streamstats-command
yuancu Oct 30, 2025
09c77a8
handle resort case
ishaoxy Oct 30, 2025
e911fec
fix IT
ishaoxy Oct 31, 2025
82713d2
change row_num
ishaoxy Oct 31, 2025
50df38e
Merge branch 'main' into streamstats-check
ishaoxy Oct 31, 2025
197879a
Rule out aggregator from PPLAggregateMergeRule
ishaoxy Oct 31, 2025
bcc28dc
Rule out aggregator from PPLAggregateMergeRule
ishaoxy Oct 31, 2025
d33854b
fix conflict streamstats row_num name in PlanUtils
ishaoxy Nov 3, 2025
6c5adfa
fix explainIT
ishaoxy Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.sql.ast.tree.Rex;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.StreamWindow;
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Timechart;
Expand Down Expand Up @@ -732,6 +733,11 @@ public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
computationsAndTypes.build());
}

@Override
public LogicalPlan visitStreamWindow(StreamWindow node, AnalysisContext context) {
throw getOnlyForCalciteException("Streamstats");
}

@Override
public LogicalPlan visitFlatten(Flatten node, AnalysisContext context) {
throw getOnlyForCalciteException("Flatten");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.sql.ast.tree.Rex;
import org.opensearch.sql.ast.tree.SPath;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.StreamWindow;
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Timechart;
Expand Down Expand Up @@ -399,6 +400,10 @@ public T visitWindow(Window window, C context) {
return visitChildren(window, context);
}

public T visitStreamWindow(StreamWindow node, C context) {
return visitChildren(node, context);
}

public T visitJoin(Join node, C context) {
return visitChildren(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
public class StreamWindow extends UnresolvedPlan {
Comment thread
ishaoxy marked this conversation as resolved.

private final List<UnresolvedExpression> windowFunctionList;
private final List<Argument> argExprList;
@ToString.Exclude private UnresolvedPlan child;

/** StreamWindow Constructor without optional argument. */
public StreamWindow(List<UnresolvedExpression> windowFunctionList) {
this(windowFunctionList, Collections.emptyList());
}

/** StreamWindow Constructor. */
public StreamWindow(List<UnresolvedExpression> windowFunctionList, List<Argument> argExprList) {
this.windowFunctionList = windowFunctionList;
this.argExprList = argExprList;
}

public boolean hasArgument() {
return !argExprList.isEmpty();
}

@Override
public StreamWindow attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitStreamWindow(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.opensearch.sql.ast.tree.SPath;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.StreamWindow;
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
Expand Down Expand Up @@ -1117,6 +1118,15 @@ public RelNode visitWindow(Window node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

@Override
public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> overExpressions =
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
context.relBuilder.projectPlus(overExpressions);
return context.relBuilder.peek();
}

@Override
public RelNode visitFillNull(FillNull node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down
400 changes: 400 additions & 0 deletions docs/user/ppl/cmd/streamstats.rst

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/user/ppl/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ The query start with search command and then flowing a set of command delimited

- `stats command <cmd/stats.rst>`_

- `streamstats command <cmd/streamstats.rst>`_
Comment thread
ishaoxy marked this conversation as resolved.

- `subquery (aka subsearch) command <cmd/subquery.rst>`_

- `reverse command <cmd/reverse.rst>`_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void init() throws Exception {
}

@Test
public void testEventstat() throws IOException {
public void testEventstats() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -57,7 +57,7 @@ public void testEventstat() throws IOException {
}

@Test
public void testEventstatWithNull() throws IOException {
public void testEventstatsWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -89,7 +89,7 @@ public void testEventstatWithNull() throws IOException {
}

@Test
public void testEventstatBy() throws IOException {
public void testEventstatsBy() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -119,7 +119,7 @@ public void testEventstatBy() throws IOException {
}

@Test
public void testEventstatByWithNull() throws IOException {
public void testEventstatsByWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -166,7 +166,7 @@ public void testEventstatByWithNull() throws IOException {
}

@Test
public void testEventstatBySpan() throws IOException {
public void testEventstatsBySpan() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -183,7 +183,7 @@ public void testEventstatBySpan() throws IOException {
}

@Test
public void testEventstatBySpanWithNull() throws IOException {
public void testEventstatsBySpanWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -202,7 +202,7 @@ public void testEventstatBySpanWithNull() throws IOException {
}

@Test
public void testEventstatByMultiplePartitions1() throws IOException {
public void testEventstatsByMultiplePartitions1() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -219,7 +219,7 @@ public void testEventstatByMultiplePartitions1() throws IOException {
}

@Test
public void testEventstatByMultiplePartitions2() throws IOException {
public void testEventstatsByMultiplePartitions2() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -236,7 +236,7 @@ public void testEventstatByMultiplePartitions2() throws IOException {
}

@Test
public void testEventstatByMultiplePartitionsWithNull1() throws IOException {
public void testEventstatsByMultiplePartitionsWithNull1() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -255,7 +255,7 @@ public void testEventstatByMultiplePartitionsWithNull1() throws IOException {
}

@Test
public void testEventstatByMultiplePartitionsWithNull2() throws IOException {
public void testEventstatsByMultiplePartitionsWithNull2() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -289,7 +289,7 @@ public void testUnsupportedWindowFunctions() {
}

@Test
public void testMultipleEventstat() throws IOException {
public void testMultipleEventstats() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -306,7 +306,7 @@ public void testMultipleEventstat() throws IOException {
}

@Test
public void testMultipleEventstatWithNull() throws IOException {
public void testMultipleEventstatsWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -325,7 +325,7 @@ public void testMultipleEventstatWithNull() throws IOException {
}

@Test
public void testMultipleEventstatWithEval() throws IOException {
public void testMultipleEventstatsWithEval() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -343,7 +343,7 @@ public void testMultipleEventstatWithEval() throws IOException {
}

@Test
public void testEventstatEmptyRows() throws IOException {
public void testEventstatsEmptyRows() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -363,7 +363,7 @@ public void testEventstatEmptyRows() throws IOException {
}

@Test
public void testEventstatVariance() throws IOException {
public void testEventstatsVariance() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -433,7 +433,7 @@ public void testEventstatVariance() throws IOException {
}

@Test
public void testEventstatVarianceWithNull() throws IOException {
public void testEventstatsVarianceWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -496,7 +496,7 @@ public void testEventstatVarianceWithNull() throws IOException {
}

@Test
public void testEventstatVarianceBy() throws IOException {
public void testEventstatsVarianceBy() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -513,7 +513,7 @@ public void testEventstatVarianceBy() throws IOException {
}

@Test
public void testEventstatVarianceBySpan() throws IOException {
public void testEventstatsVarianceBySpan() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -527,7 +527,7 @@ public void testEventstatVarianceBySpan() throws IOException {
}

@Test
public void testEventstatVarianceWithNullBy() throws IOException {
public void testEventstatsVarianceWithNullBy() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -576,7 +576,7 @@ public void testEventstatVarianceWithNullBy() throws IOException {
}

@Test
public void testEventstatDistinctCount() throws IOException {
public void testEventstatsDistinctCount() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -601,7 +601,7 @@ public void testEventstatDistinctCount() throws IOException {
}

@Test
public void testEventstatDistinctCountByCountry() throws IOException {
public void testEventstatsDistinctCountByCountry() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -627,7 +627,7 @@ public void testEventstatDistinctCountByCountry() throws IOException {
}

@Test
public void testEventstatDistinctCountFunction() throws IOException {
public void testEventstatsDistinctCountFunction() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand All @@ -653,7 +653,7 @@ public void testEventstatDistinctCountFunction() throws IOException {
}

@Test
public void testEventstatDistinctCountWithNull() throws IOException {
public void testEventstatsDistinctCountWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down Expand Up @@ -682,7 +682,7 @@ public void testEventstatDistinctCountWithNull() throws IOException {

@Ignore
@Test
public void testEventstatEarliestAndLatest() throws IOException {
public void testEventstatsEarliestAndLatest() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Expand Down
Loading