Description
Describe the bug
In cases where we support a HashAggregate's aggregate functions, we will convert the partial stage HashAggregate, execute it in DF, then use native shuffle to forward the results, however, the next stage will not materialize due to AQE waiting for the shuffle and its stats.
Then, if the agg() function contained unsupported expressions wrapping the aggregates themselves, we cannot convert the Final mode.
This causes the Spark HashAggregate to crash as it attempts to access fields that don't exist, and even if they do it expects its own aggregate buffer representation, which we don't have(The error being thrown is the "Not supported on CometListVector"), I'm not sure why the InputAdapter doesn't always work here, I think this can only be supported in cases where no aggregate buffer exists,
This comment implies the possible issue:
I believe I've seen a few tests that are ignored because of this.
I don't think this is a valid situation, We should not crash based on previously Comet-ran operators if they were successful.
Steps to reproduce
Have a group_by/aggregate that either needs a shuffle or aggregate buffer
Like
.agg(
collect_set(col("my_column"))
)
but wrap that collect_set with an unsupported expression or cast or something
(I am not sure, but I believe I saw something about a simliar behaviour that can be created using Decimal avg, as the Partial aggregate is supported but generates a Sum and Count, but generating results from the intermediate data does is not supported natively, that may no longer be relevant though)
For example:
.agg(
concat(flatten(collect_set(col("my_column"))))
)
can create this behaviour with AQE on.
not sure if this is datatype related.
Expected behavior
Comet should either not convert a Partial HashAggregate whose Final stage cannot be converted.
Or if it already did, should elegantly execute the Final aggregations and let Spark finish the work without breaking the plan.
Additional context
I believe I have a non-invasive solution where if the result expressions are not supported, we convert them into a separate ProjectExec, which will be the parent of the CometHashAggregateExec, which will not have result expressions (Like the Partial stage doesn't), and will have the grouping+aggregate attributes as its output.
We then have a conversion to rows and run a ProjectExec with the unsupported expression, ensuring that even if the rest of the stage cannot be run using Comet, we don't break an already running workflow.
Will open a PR shortly