Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand All @@ -35,6 +36,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand All @@ -35,6 +36,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand All @@ -35,6 +36,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand All @@ -35,6 +36,7 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "benchmarks/pyspark/**"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand Down
17 changes: 16 additions & 1 deletion benchmarks/pyspark/benchmarks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,24 @@
from typing import Dict, Type, List

from .base import Benchmark
from .shuffle import ShuffleHashBenchmark, ShuffleRoundRobinBenchmark
from .shuffle import (
ShuffleHashBenchmark,
ShuffleRoundRobinBenchmark,
ShuffleHashNativeWriteBenchmark,
ShuffleHashSparkWriteBenchmark,
ShuffleRoundRobinNativeWriteBenchmark,
ShuffleRoundRobinSparkWriteBenchmark,
)


# Registry of all available benchmarks
_BENCHMARK_REGISTRY: Dict[str, Type[Benchmark]] = {
ShuffleHashBenchmark.name(): ShuffleHashBenchmark,
ShuffleRoundRobinBenchmark.name(): ShuffleRoundRobinBenchmark,
ShuffleHashNativeWriteBenchmark.name(): ShuffleHashNativeWriteBenchmark,
ShuffleHashSparkWriteBenchmark.name(): ShuffleHashSparkWriteBenchmark,
ShuffleRoundRobinNativeWriteBenchmark.name(): ShuffleRoundRobinNativeWriteBenchmark,
ShuffleRoundRobinSparkWriteBenchmark.name(): ShuffleRoundRobinSparkWriteBenchmark,
}


Expand Down Expand Up @@ -76,4 +87,8 @@ def list_benchmarks() -> List[tuple[str, str]]:
'list_benchmarks',
'ShuffleHashBenchmark',
'ShuffleRoundRobinBenchmark',
'ShuffleHashNativeWriteBenchmark',
'ShuffleHashSparkWriteBenchmark',
'ShuffleRoundRobinNativeWriteBenchmark',
'ShuffleRoundRobinSparkWriteBenchmark',
]
26 changes: 26 additions & 0 deletions benchmarks/pyspark/benchmarks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ def description(cls) -> str:
"""Return a short description of the benchmark."""
pass

@classmethod
def get_spark_configs(cls, mode: str) -> Dict[str, str]:
"""
Return Spark configurations required by this benchmark.

These configs are applied when creating the SparkSession.
Subclasses can override to specify benchmark-specific configs.
Subclasses should call super().get_spark_configs(mode) and update
the returned dict to preserve common configs.

Args:
mode: Execution mode (spark, jvm, native)

Returns:
Dictionary of config key -> value
"""
if mode == "spark":
return {}
# Common configs for all Comet benchmarks (jvm and native modes)
return {
"spark.comet.enabled": "true",
"spark.comet.logFallbackReasons.enabled": "true",
"spark.comet.explainFallback.enabled": "true",
}

@abstractmethod
def run(self) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -109,6 +134,7 @@ def _print_spark_config(self):
print(f"Comet enabled: {conf.get('spark.comet.enabled', 'false')}")
print(f"Comet shuffle enabled: {conf.get('spark.comet.exec.shuffle.enabled', 'false')}")
print(f"Comet shuffle mode: {conf.get('spark.comet.shuffle.mode', 'not set')}")
print(f"Comet native write: {conf.get('spark.comet.parquet.write.enabled', 'false')}")
print(f"Spark UI: {self.spark.sparkContext.uiWebUrl}")

def _time_operation(self, operation_fn):
Expand Down
83 changes: 83 additions & 0 deletions benchmarks/pyspark/benchmarks/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,86 @@ def description(cls) -> str:
def _repartition(self, df: DataFrame) -> DataFrame:
"""Repartition using round-robin (no partition columns specified)."""
return df.repartition(self.num_partitions)


# Variants with explicit native write configuration


class ShuffleHashNativeWriteBenchmark(ShuffleHashBenchmark):
"""Shuffle benchmark using hash partitioning with Comet native parquet writes."""

@classmethod
def name(cls) -> str:
return "shuffle-hash-native-write"

@classmethod
def description(cls) -> str:
return "Hash shuffle with Comet native parquet writes enabled"

@classmethod
def get_spark_configs(cls, mode: str) -> dict:
configs = super().get_spark_configs(mode)
if mode != "spark":
configs.update({
"spark.comet.parquet.write.enabled": "true",
"spark.comet.operator.DataWritingCommandExec.allowIncompatible": "true",
})
return configs


class ShuffleHashSparkWriteBenchmark(ShuffleHashBenchmark):
"""Shuffle benchmark using hash partitioning with Spark parquet writes."""

@classmethod
def name(cls) -> str:
return "shuffle-hash-spark-write"

@classmethod
def description(cls) -> str:
return "Hash shuffle with Comet native parquet writes disabled"

@classmethod
def get_spark_configs(cls, mode: str) -> dict:
configs = super().get_spark_configs(mode)
configs["spark.comet.parquet.write.enabled"] = "false"
return configs


class ShuffleRoundRobinNativeWriteBenchmark(ShuffleRoundRobinBenchmark):
"""Shuffle benchmark using round-robin partitioning with Comet native parquet writes."""

@classmethod
def name(cls) -> str:
return "shuffle-roundrobin-native-write"

@classmethod
def description(cls) -> str:
return "Round-robin shuffle with Comet native parquet writes enabled"

@classmethod
def get_spark_configs(cls, mode: str) -> dict:
configs = super().get_spark_configs(mode)
if mode != "spark":
configs.update({
"spark.comet.parquet.write.enabled": "true",
"spark.comet.operator.DataWritingCommandExec.allowIncompatible": "true",
})
return configs


class ShuffleRoundRobinSparkWriteBenchmark(ShuffleRoundRobinBenchmark):
"""Shuffle benchmark using round-robin partitioning with Spark parquet writes."""

@classmethod
def name(cls) -> str:
return "shuffle-roundrobin-spark-write"

@classmethod
def description(cls) -> str:
return "Round-robin shuffle with Comet native parquet writes disabled"

@classmethod
def get_spark_configs(cls, mode: str) -> dict:
configs = super().get_spark_configs(mode)
configs["spark.comet.parquet.write.enabled"] = "false"
return configs
102 changes: 47 additions & 55 deletions benchmarks/pyspark/run_all_benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ echo "========================================"

# Run Spark baseline (no Comet)
echo ""
echo ">>> Running SPARK shuffle benchmark..."
echo ">>> Running SPARK shuffle benchmark (baseline)..."
$SPARK_HOME/bin/spark-submit \
--master "$SPARK_MASTER" \
--executor-memory "$EXECUTOR_MEMORY" \
Expand All @@ -55,62 +55,54 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.comet.exec.shuffle.enabled=false \
"$SCRIPT_DIR/run_benchmark.py" \
--data "$DATA_PATH" \
--mode spark
--shuffle-mode spark \
--benchmark shuffle-hash

# Run Comet JVM shuffle
echo ""
echo ">>> Running COMET JVM shuffle benchmark..."
$SPARK_HOME/bin/spark-submit \
--master "$SPARK_MASTER" \
--executor-memory "$EXECUTOR_MEMORY" \
--jars "$COMET_JAR" \
--driver-class-path "$COMET_JAR" \
--conf spark.executor.extraClassPath="$COMET_JAR" \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir="$EVENT_LOG_DIR" \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g \
--conf spark.comet.enabled=true \
--conf spark.comet.operator.DataWritingCommandExec.allowIncompatible=true \
--conf spark.comet.parquet.write.enabled=true \
--conf spark.comet.logFallbackReasons.enabled=true \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.comet.shuffle.mode=jvm \
--conf spark.comet.exec.shuffle.mode=jvm \
--conf spark.comet.exec.replaceSortMergeJoin=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.cast.allowIncompatible=true \
"$SCRIPT_DIR/run_benchmark.py" \
--data "$DATA_PATH" \
--mode jvm
# Helper function to run a Comet benchmark
# Usage: run_comet_benchmark <mode> <benchmark_name>
run_comet_benchmark() {
local shuffle_mode=$1
local benchmark=$2

# Run Comet Native shuffle
echo ""
echo ">>> Running COMET NATIVE shuffle benchmark..."
$SPARK_HOME/bin/spark-submit \
--master "$SPARK_MASTER" \
--executor-memory "$EXECUTOR_MEMORY" \
--jars "$COMET_JAR" \
--driver-class-path "$COMET_JAR" \
--conf spark.executor.extraClassPath="$COMET_JAR" \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir="$EVENT_LOG_DIR" \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g \
--conf spark.comet.enabled=true \
--conf spark.comet.operator.DataWritingCommandExec.allowIncompatible=true \
--conf spark.comet.parquet.write.enabled=true \
--conf spark.comet.logFallbackReasons.enabled=true \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.comet.exec.shuffle.mode=native \
--conf spark.comet.exec.replaceSortMergeJoin=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.cast.allowIncompatible=true \
"$SCRIPT_DIR/run_benchmark.py" \
--data "$DATA_PATH" \
--mode native
echo ""
echo ">>> Running COMET $shuffle_mode-shuffle benchmark: $benchmark..."

$SPARK_HOME/bin/spark-submit \
--master "$SPARK_MASTER" \
--executor-memory "$EXECUTOR_MEMORY" \
--jars "$COMET_JAR" \
--driver-class-path "$COMET_JAR" \
--conf spark.executor.extraClassPath="$COMET_JAR" \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir="$EVENT_LOG_DIR" \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g \
--conf spark.comet.exec.shuffle.mode="$shuffle_mode" \
--conf spark.comet.exec.replaceSortMergeJoin=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.cast.allowIncompatible=true \
"$SCRIPT_DIR/run_benchmark.py" \
--data "$DATA_PATH" \
--shuffle-mode "$shuffle_mode" \
--benchmark "$benchmark"
}

# Run Comet JVM shuffle with native writes
run_comet_benchmark jvm shuffle-hash-native-write
run_comet_benchmark jvm shuffle-roundrobin-native-write

# Run Comet JVM shuffle with Spark writes
run_comet_benchmark jvm shuffle-hash-spark-write
run_comet_benchmark jvm shuffle-roundrobin-spark-write

# Run Comet Native shuffle with native writes
run_comet_benchmark native shuffle-hash-native-write
run_comet_benchmark native shuffle-roundrobin-native-write

# Run Comet Native shuffle with Spark writes
run_comet_benchmark native shuffle-hash-spark-write
run_comet_benchmark native shuffle-roundrobin-spark-write

echo ""
echo "========================================"
Expand Down
Loading
Loading