Fuse array higher-order functions in Project#15061
Conversation
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
Greptile SummaryThis PR introduces
Confidence Score: 5/5Safe to merge; the fusion is transparent to correctness (determinism and side-effect guards are conservative), resource lifecycle follows established ARM patterns, and GPU/CPU equality is verified by integration tests. The fusion logic is well-guarded: sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala — the new GpuArrayHofFusion object is the entire change surface; give particular attention to the unionIntermediate column-index remapping in makeTransformLambdaBatch and the shared-arg lifetime in evaluateFusedGroup. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[GpuProjectExec.project] --> B{GpuArrayHofFusion.project}
B --> C{findFusedGroups}
C --> D[Scan boundExprs extrachof + canFuse]
D --> E{canShareExplode?}
E -->|yes| F[Add to existing group]
E -->|no| G[Start new group]
D --> H{canReorderExpression?}
H -->|no| I[flushGroups - barrier]
C --> J{Any group >= 2 HOFs?}
J -->|no| K[Return None - fallback]
J -->|yes| L[projectWithFusedGroups]
L --> M[evaluateFusedGroup per group]
M --> N[eval arg once]
N --> O[makeExplodedElementBatch with unionIntermediate]
O --> P[shared exploded batch]
P --> Q[foreach transform - makeTransformLambdaBatch]
Q --> R[transform.function.columnarEval]
R --> S[consumeElementResults]
S --> T[fill outputColumns at HOF index]
L --> U[non-HOF exprs: eval normally]
T --> V[ColumnarBatch output]
U --> V
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
A[GpuProjectExec.project] --> B{GpuArrayHofFusion.project}
B --> C{findFusedGroups}
C --> D[Scan boundExprs extrachof + canFuse]
D --> E{canShareExplode?}
E -->|yes| F[Add to existing group]
E -->|no| G[Start new group]
D --> H{canReorderExpression?}
H -->|no| I[flushGroups - barrier]
C --> J{Any group >= 2 HOFs?}
J -->|no| K[Return None - fallback]
J -->|yes| L[projectWithFusedGroups]
L --> M[evaluateFusedGroup per group]
M --> N[eval arg once]
N --> O[makeExplodedElementBatch with unionIntermediate]
O --> P[shared exploded batch]
P --> Q[foreach transform - makeTransformLambdaBatch]
Q --> R[transform.function.columnarEval]
R --> S[consumeElementResults]
S --> T[fill outputColumns at HOF index]
L --> U[non-HOF exprs: eval normally]
T --> V[ColumnarBatch output]
U --> V
Reviews (6): Last reviewed commit: "address greptile comments" | Re-trigger Greptile |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a fusion path in GPU Project evaluation to share the explode + intermediate projection work across multiple compatible array higher-order functions (HOFs) that read the same array input, improving performance while preserving ordering and correctness via conservative guards.
Changes:
- Introduces
GpuArrayTransformFusionto detect fuseable top-level Project outputs (transform/filter/exists and supported array_aggregate) and evaluate them using a shared exploded element batch. - Refactors element-wise and aggregate HOF implementations to expose reusable “consume element results” helpers for the fused execution path.
- Adds integration tests covering mixed Projects that include multiple array HOFs (including aggregate).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala | Adds fusion planner/executor logic for array HOFs in Projects and refactors HOF result consumption to enable sharing the explode step. |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala | Hooks the fusion attempt into GpuProjectExec.project, falling back to the existing per-expression evaluation when no fusion applies. |
| integration_tests/src/main/python/higher_order_functions_test.py | Adds a mixed Project integration test combining transform/filter/exists with aggregate. |
| integration_tests/src/main/python/array_test.py | Adds a heterogeneous mixed Project test combining multiple element-wise array HOF outputs. |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Fixes #14711.
Description
This PR fuses compatible array higher-order functions that appear as top-level Project outputs, so multiple expressions over the same array can share the explode and intermediate projection work.
The fused path currently covers
transform,filter,exists, and supportedaggregateexpressions. It groups HOFs by the same array argument and lambda arity, evaluates the shared exploded element batch once, then evaluates each lambda against the shared batch and reconstructs each output independently. The implementation keeps conservative guards for deterministic expressions, side-effect checks, and Project output ordering.Performance
Checklists
Documentation
Testing
(Please provide the names of the existing tests in the PR description.)
Performance