Skip to content

[BUG] eventstats does not push down to OpenSearch (RexOver excluded from aggregation pushdown) #9

@RyanL1997

Description

@RyanL1997

Query Information

PPL Command/Query:

source=logs-* 
| where `@timestamp` > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
| where severityText = 'ERROR'
| eventstats count() as total
| head 10
| fields severityText, total

Expected Result:
The global eventstats count() should push down to OpenSearch as track_total_hits (the same way stats count() does), so the shard returns one number and the coordinator scales it across documents — instead of streaming every matching document to the coordinator just to count them.

Actual Result:
Pushdown does not fire. The scan runs with requestedTotalSize=2147483647 and _source includes — i.e., every matching document streams to the coordinator, where a COUNT() OVER () window function counts them in memory. On a 47B-document index this exceeds query timeouts (the reported "FAIL on PI") and on smaller indices it returns 0 rows when the streaming exceeds limits ("0 rows on CAPE/WI"). The query returns correct results on tiny test data but the plan is unviable at production scale.

Dataset Information

Dataset/Schema Type

  • OpenTelemetry (OTEL)

Index Mapping

{
  "index_patterns": ["logs-*"],
  "template": {
    "mappings": {
      "properties": {
        "@timestamp":   { "type": "date" },
        "body":         { "type": "text", "norms": false },
        "severityText": { "type": "keyword" }
      }
    }
  }
}

Sample Data

{ "@timestamp": "2026-05-28T17:44:20.000Z", "severityText": "ERROR", "body": "..." }
{ "@timestamp": "2026-05-28T17:44:21.000Z", "severityText": "INFO",  "body": "..." }
{ "@timestamp": "2026-05-28T17:44:22.000Z", "severityText": "ERROR", "body": "..." }
{ "@timestamp": "2026-05-28T17:44:23.000Z", "severityText": "ERROR", "body": "..." }

Bug Description

Issue Summary:

eventstats (no BY, no partition) — equivalent to a global stats count() for purposes of the count value — produces a COUNT() OVER () window function instead of a flat aggregation, and the existing pushdown rules deliberately skip it.

Explain output for both shapes (Calcite engine, pushdown enabled):

stats count() as total (works, pushed down):

CalciteEnumerableIndexScan(
  PushDownContext=[[FILTER->..., AGGREGATION->...COUNT(), LIMIT->10000]],
  sourceBuilder={"size":0, ..., "track_total_hits":2147483647})

DSL has "size":0 and track_total_hits — the shard returns one number.

eventstats count() as total (fails to push down):

EnumerableLimit(fetch=[10000])
  EnumerableLimit(fetch=[10])
    EnumerableWindow(window#0=[window(aggs [COUNT()])])
      CalciteEnumerableIndexScan(
        PushDownContext=[[PROJECT->[@timestamp, severityText],
                          FILTER->AND(>(@ts,...), =(severityText,'ERROR')),
                          PROJECT->[severityText]]],
        sourceBuilder={"_source":{"includes":["severityText"]},
                       "query":{"bool":{"must":[{"range":...},{"term":{"severityText":"ERROR"}}]}}},
        requestedTotalSize=2147483647)

DSL has _source includes and requestedTotalSize=MAX_INT — every matching document streams up to the coordinator. EnumerableWindow then counts them.

Root cause:
PPL eventstats is parsed in AstBuilder.visitEventstatsCommand (ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java:498) as a Window node, which Calcite lowers to a RexOver. The aggregation pushdown path in AggregateIndexScanRule (opensearch/.../planner/rules/AggregateIndexScanRule.java:122,171) explicitly excludes RexOver from pushdown — windows are treated as opaque coordinator-side computations.

This treatment is correct for windows that need partitioning or ordering. But eventstats <agg> without a BY clause produces an unpartitioned, unordered window — OVER () — which is semantically equivalent to: "compute a single global aggregate, broadcast it to every input row." The first half of that (the global aggregate) is exactly what OpenSearch can do natively in the shard.

The result is pathological at scale: instead of one round-trip returning a single number, the coordinator pulls every matching document.

Steps to Reproduce:

  1. Apply the index template above; index a few sample documents.
  2. Set plugins.calcite.enabled: true, plugins.calcite.pushdown.enabled: true.
  3. Run EXPLAIN on the failing query — observe EnumerableWindow and the absence of an aggregations block in the DSL.
  4. Replace eventstats with stats and re-run EXPLAIN — observe the correct aggregations + track_total_hits shape.

Environment Information

OpenSearch Version: 3.7.0-SNAPSHOT (reproduced on main, commit ec433f4ff).

Additional Details:

  • Calcite engine: plugins.calcite.enabled = true
  • Pushdown: plugins.calcite.pushdown.enabled = true
  • Code locations:
    • ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java:498eventstats becomes a Window node
    • opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java:122,171 — pushdown rule excludes RexOver

Possible fix:
Add a rule that recognizes RexOver with empty partition/order/frame as a global aggregate and rewrites it to:

Project(input.*, broadcast(globalAgg))
  Aggregate(group=[], aggs=[<agg>])
    <input>

The aggregate is then pushable through the existing AggregateIndexScanRule. The broadcast step on the coordinator is one extra column join that costs nothing — far cheaper than streaming the entire matched set.

Same fix applies to eventstats count() by <field>: OVER (PARTITION BY <field>)Aggregate(group=[<field>], aggs=[<agg>]) joined back on <field>. Both shapes are common in observability queries.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions