Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
a454360
feat: add optimized PyArrow UDF execution (CometPythonMapInArrowExec)
andygrove May 6, 2026
84aec84
docs: add PyArrow UDF acceleration user guide page
andygrove May 6, 2026
af98fbb
fix(test): correct PyArrow UDF integration test signatures and assert…
andygrove May 6, 2026
f29cb2f
test: convert PyArrow UDF script to pytest and add CI coverage
andygrove May 6, 2026
f751539
docs: run prettier on pyarrow-udfs user guide page
andygrove May 6, 2026
b14fbfb
style: apply spotless formatting
andygrove May 6, 2026
ca0bbbf
ci: broaden pyarrow_udf_test triggers to match pr_build_linux
andygrove May 6, 2026
55c28c3
ci: restrict GITHUB_TOKEN to contents:read in pyarrow_udf_test
andygrove May 6, 2026
05b1e7a
fix: shim CometPythonMapInArrowExec for cross-version Spark builds
andygrove May 6, 2026
66eb246
ci: switch pyarrow_udf_test container to rust:bookworm
andygrove May 6, 2026
ec6fa78
ci: set PYSPARK_PYTHON to venv python for pyarrow_udf_test
andygrove May 6, 2026
1de2c2f
feat: default-disable PyArrow UDF optimization while experimental
andygrove May 6, 2026
3f68cbe
test: expand PyArrow UDF pytest coverage
andygrove May 6, 2026
e2ca2d2
docs: document PyArrow UDF limitations and AQE explain quirk
andygrove May 6, 2026
f4b5c32
bench: add Python end-to-end benchmark for PyArrow UDF acceleration
andygrove May 6, 2026
3822ed7
fix: propagate isBarrier through CometPythonMapInArrowExec
andygrove May 6, 2026
24dc84b
refactor: address PR review feedback for PyArrow UDF acceleration
andygrove May 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ jobs:
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.exec.CometPythonMapInArrowSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ jobs:
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.exec.CometPythonMapInArrowSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
Expand Down
96 changes: 96 additions & 0 deletions .github/workflows/pyarrow_udf_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PyArrow UDF Tests

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

on:
push:
branches:
- main
paths:
- "spark/src/main/scala/org/apache/spark/sql/comet/CometPythonMapInArrowExec.scala"
- "spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala"
- "spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala"
- "spark/src/test/resources/pyspark/test_pyarrow_udf.py"
- ".github/workflows/pyarrow_udf_test.yml"
- "native/**"
pull_request:
paths:
- "spark/src/main/scala/org/apache/spark/sql/comet/CometPythonMapInArrowExec.scala"
- "spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala"
- "spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala"
- "spark/src/test/resources/pyspark/test_pyarrow_udf.py"
- ".github/workflows/pyarrow_udf_test.yml"
- "native/**"
workflow_dispatch:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1
RUSTFLAGS: "-Clink-arg=-fuse-ld=bfd"

jobs:
pyarrow-udf:
name: PyArrow UDF (Spark 3.5, JDK 17, Python 3.11)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Spark 4 now?
I'm assuming that this is enabled for only one version of Spark because it is experimental?

runs-on: ubuntu-latest
container:
image: amd64/rust
env:
JAVA_TOOL_OPTIONS: "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"
steps:
- uses: actions/checkout@v6

- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ env.RUST_VERSION }}
jdk-version: 17

- name: Cache Maven dependencies
uses: actions/cache@v5
with:
path: |
~/.m2/repository
/root/.m2/repository
key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }}-pyarrow-udf
restore-keys: |
${{ runner.os }}-java-maven-

- name: Build Comet (release, Spark 3.5 / Scala 2.12)
run: |
cd native && cargo build --release
cd .. && ./mvnw -B -Prelease install -DskipTests -Pspark-3.5 -Pscala-2.12

- name: Install Python 3.11 and pip
run: |
apt-get update
apt-get install -y --no-install-recommends python3.11 python3.11-venv python3-pip
python3.11 -m venv /tmp/venv
/tmp/venv/bin/pip install --upgrade pip
/tmp/venv/bin/pip install "pyspark==3.5.8" "pyarrow>=14" pandas pytest

- name: Run PyArrow UDF pytest
run: |
jar=$(ls "$PWD"/spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar \
| grep -v sources | grep -v tests | head -n1)
echo "Using $jar"
COMET_JAR="$jar" /tmp/venv/bin/python -m pytest -v \
spark/src/test/resources/pyspark/test_pyarrow_udf.py

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_PYTHON_MAP_IN_ARROW_ENABLED: ConfigEntry[Boolean] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAP is confusing IMO, so many things related to map in Spark

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe COMET_PYARROW_SUPPORT or something like that?

conf("spark.comet.exec.pythonMapInArrow.enabled")
.category(CATEGORY_EXEC)
.doc(
"Whether to enable optimized execution of PyArrow UDFs (mapInArrow/mapInPandas). " +
"When enabled, Comet passes Arrow columnar data directly to Python UDFs without " +
"the intermediate Arrow-to-Row-to-Arrow conversion that Spark normally performs.")
.booleanConf
.createWithDefault(true)

val COMET_TRACING_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.tracing.enabled")
.category(CATEGORY_TUNING)
.doc(s"Enable fine-grained tracing of events and memory usage. $TRACING_GUIDE.")
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ Comet $COMET_VERSION User Guide
Understanding Comet Plans <understanding-comet-plans>
Tuning Guide <tuning>
Metrics Guide <metrics>
PyArrow UDF Acceleration <pyarrow-udfs>
Iceberg Guide <iceberg>
Kubernetes Guide <kubernetes>
134 changes: 134 additions & 0 deletions docs/source/user-guide/latest/pyarrow-udfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# PyArrow UDF Acceleration

Comet can accelerate Python UDFs that use PyArrow-backed batch processing, such as `mapInArrow` and `mapInPandas`.
These APIs are commonly used for ML inference, feature engineering, and data transformation workloads.

## Background

Spark's `mapInArrow` and `mapInPandas` APIs allow users to apply Python functions that operate on Arrow
RecordBatches or Pandas DataFrames. Under the hood, Spark communicates with the Python worker process
using the Arrow IPC format.

Without Comet, the execution path for these UDFs involves unnecessary data conversions:

1. Comet reads data in Arrow columnar format (via CometScan)
2. Spark inserts a ColumnarToRow transition (converts Arrow to UnsafeRow)
3. The Python runner converts those rows back to Arrow to send to Python
4. Python executes the UDF on Arrow batches
5. Results are returned as Arrow and then converted back to rows

Steps 2 and 3 are redundant since the data starts and ends in Arrow format.

## How Comet Optimizes This

When enabled, Comet detects `PythonMapInArrowExec` and `MapInPandasExec` operators in the physical plan
and replaces them with `CometPythonMapInArrowExec`, which:

- Reads Arrow columnar batches directly from the upstream Comet operator
- Feeds them to the Python runner without the expensive UnsafeProjection copy
- Keeps the Python output in columnar format for downstream operators

This eliminates the ColumnarToRow transition and the output row conversion, reducing CPU overhead
and memory allocations.

## Configuration

The optimization is controlled by:

```
spark.comet.exec.pythonMapInArrow.enabled=true (default)
```

It is enabled by default when Comet execution is active.

## Supported APIs

| PySpark API | Spark Plan Node | Supported |
| -------------------------------- | --------------------------- | --------- |
| `df.mapInArrow(func, schema)` | `PythonMapInArrowExec` | Yes |
| `df.mapInPandas(func, schema)` | `MapInPandasExec` | Yes |
| `@pandas_udf` (scalar) | `ArrowEvalPythonExec` | Not yet |
| `df.applyInPandas(func, schema)` | `FlatMapGroupsInPandasExec` | Not yet |

## Example

```python
import pyarrow as pa
from pyspark.sql import SparkSession, types as T

spark = SparkSession.builder \
.config("spark.plugins", "org.apache.spark.CometPlugin") \
.config("spark.comet.enabled", "true") \
.config("spark.comet.exec.enabled", "true") \
.config("spark.comet.exec.pythonMapInArrow.enabled", "true") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "2g") \
.getOrCreate()

df = spark.read.parquet("data.parquet")

def transform(batch: pa.RecordBatch) -> pa.RecordBatch:
# Your transformation logic here
table = batch.to_pandas()
table["new_col"] = table["value"] * 2
return pa.RecordBatch.from_pandas(table)

output_schema = T.StructType([
T.StructField("value", T.DoubleType()),
T.StructField("new_col", T.DoubleType()),
])

result = df.mapInArrow(transform, output_schema)
```

## Verifying the Optimization

Use `explain()` to verify that `CometPythonMapInArrowExec` appears in your plan:

```python
result.explain(mode="extended")
```

You should see:

```
CometPythonMapInArrowExec ...
+- CometNativeExec ...
+- CometScan ...
```

Instead of the unoptimized plan:

```
PythonMapInArrow ...
+- ColumnarToRow
+- CometNativeExec ...
+- CometScan ...
```

## Limitations

- The optimization currently applies only to `mapInArrow` and `mapInPandas`. Scalar pandas UDFs
(`@pandas_udf`) and grouped operations (`applyInPandas`) are not yet supported.
- The internal row-to-Arrow conversion inside the Python runner is still present in this version.
A future optimization will write Arrow batches directly to the Python IPC stream, achieving
near zero-copy data transfer.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package org.apache.comet.rules

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.PythonUDF
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometPythonMapInArrowExec, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.python.{MapInPandasExec, PythonMapInArrowExec}

import org.apache.comet.CometConf

Expand Down Expand Up @@ -98,6 +100,32 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
case CometNativeColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
sparkToColumnar.child
case CometSparkToColumnarExec(child: CometSparkToColumnarExec) => child
// Replace MapInBatchExec (PythonMapInArrowExec / MapInPandasExec) that has a
// ColumnarToRow child with CometPythonMapInArrowExec to avoid the unnecessary
// Arrow->Row->Arrow round-trip.
case p: PythonMapInArrowExec if CometConf.COMET_PYTHON_MAP_IN_ARROW_ENABLED.get() =>
extractColumnarChild(p.child)
.map { columnarChild =>
CometPythonMapInArrowExec(
p.func,
p.output,
columnarChild,
p.isBarrier,
p.func.asInstanceOf[PythonUDF].evalType)
}
.getOrElse(p)
case p: MapInPandasExec if CometConf.COMET_PYTHON_MAP_IN_ARROW_ENABLED.get() =>
extractColumnarChild(p.child)
.map { columnarChild =>
CometPythonMapInArrowExec(
p.func,
p.output,
columnarChild,
p.isBarrier,
p.func.asInstanceOf[PythonUDF].evalType)
}
.getOrElse(p)

// Spark adds `RowToColumnar` under Comet columnar shuffle. But it's redundant as the
// shuffle takes row-based input.
case s @ CometShuffleExchangeExec(
Expand Down Expand Up @@ -130,6 +158,18 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
}
}

/**
* If the given plan is a ColumnarToRow transition wrapping a columnar child, returns that
* columnar child. Used to detect and eliminate unnecessary transitions before Python UDF
* operators.
*/
private def extractColumnarChild(plan: SparkPlan): Option[SparkPlan] = plan match {
case ColumnarToRowExec(child) if child.supportsColumnar => Some(child)
case CometColumnarToRowExec(child) => Some(child)
case CometNativeColumnarToRowExec(child) => Some(child)
case _ => None
}

/**
* Creates an appropriate columnar to row transition operator.
*
Expand Down
Loading
Loading