diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index 74badcda5f..90c7eb8de9 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -27,6 +27,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" @@ -35,6 +36,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 8e4dc5124b..544dd30f89 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -27,6 +27,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" @@ -35,6 +36,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index f94071dbc7..c62e3c7a75 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -27,6 +27,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" @@ -35,6 +36,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 2fe5fefe1a..b6ef22b4b9 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -27,6 +27,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" @@ -35,6 +36,7 @@ on: - "doc/**" - "docs/**" - "**.md" + - "benchmarks/pyspark/**" - "native/core/benches/**" - "native/spark-expr/benches/**" - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" diff --git a/benchmarks/pyspark/benchmarks/__init__.py b/benchmarks/pyspark/benchmarks/__init__.py index 7d913a7d6d..e7a912c5a6 100644 --- a/benchmarks/pyspark/benchmarks/__init__.py +++ b/benchmarks/pyspark/benchmarks/__init__.py @@ -25,13 +25,24 @@ from typing import Dict, Type, List from .base import Benchmark -from .shuffle import ShuffleHashBenchmark, ShuffleRoundRobinBenchmark +from .shuffle import ( + ShuffleHashBenchmark, + ShuffleRoundRobinBenchmark, + ShuffleHashNativeWriteBenchmark, + ShuffleHashSparkWriteBenchmark, + ShuffleRoundRobinNativeWriteBenchmark, + ShuffleRoundRobinSparkWriteBenchmark, +) # Registry of all available benchmarks _BENCHMARK_REGISTRY: Dict[str, Type[Benchmark]] = { ShuffleHashBenchmark.name(): ShuffleHashBenchmark, ShuffleRoundRobinBenchmark.name(): ShuffleRoundRobinBenchmark, + ShuffleHashNativeWriteBenchmark.name(): ShuffleHashNativeWriteBenchmark, + ShuffleHashSparkWriteBenchmark.name(): ShuffleHashSparkWriteBenchmark, + ShuffleRoundRobinNativeWriteBenchmark.name(): ShuffleRoundRobinNativeWriteBenchmark, + ShuffleRoundRobinSparkWriteBenchmark.name(): ShuffleRoundRobinSparkWriteBenchmark, } @@ -76,4 +87,8 @@ def list_benchmarks() -> List[tuple[str, str]]: 'list_benchmarks', 'ShuffleHashBenchmark', 'ShuffleRoundRobinBenchmark', + 'ShuffleHashNativeWriteBenchmark', + 'ShuffleHashSparkWriteBenchmark', + 'ShuffleRoundRobinNativeWriteBenchmark', + 'ShuffleRoundRobinSparkWriteBenchmark', ] diff --git a/benchmarks/pyspark/benchmarks/base.py b/benchmarks/pyspark/benchmarks/base.py index 7e8e8db5a9..57a494158a 100644 --- a/benchmarks/pyspark/benchmarks/base.py +++ b/benchmarks/pyspark/benchmarks/base.py @@ -55,6 +55,31 @@ def description(cls) -> str: """Return a short description of the benchmark.""" pass + @classmethod + def get_spark_configs(cls, mode: str) -> Dict[str, str]: + """ + Return Spark configurations required by this benchmark. + + These configs are applied when creating the SparkSession. + Subclasses can override to specify benchmark-specific configs. + Subclasses should call super().get_spark_configs(mode) and update + the returned dict to preserve common configs. + + Args: + mode: Execution mode (spark, jvm, native) + + Returns: + Dictionary of config key -> value + """ + if mode == "spark": + return {} + # Common configs for all Comet benchmarks (jvm and native modes) + return { + "spark.comet.enabled": "true", + "spark.comet.logFallbackReasons.enabled": "true", + "spark.comet.explainFallback.enabled": "true", + } + @abstractmethod def run(self) -> Dict[str, Any]: """ @@ -109,6 +134,7 @@ def _print_spark_config(self): print(f"Comet enabled: {conf.get('spark.comet.enabled', 'false')}") print(f"Comet shuffle enabled: {conf.get('spark.comet.exec.shuffle.enabled', 'false')}") print(f"Comet shuffle mode: {conf.get('spark.comet.shuffle.mode', 'not set')}") + print(f"Comet native write: {conf.get('spark.comet.parquet.write.enabled', 'false')}") print(f"Spark UI: {self.spark.sparkContext.uiWebUrl}") def _time_operation(self, operation_fn): diff --git a/benchmarks/pyspark/benchmarks/shuffle.py b/benchmarks/pyspark/benchmarks/shuffle.py index 0facd2340d..f4853a9806 100644 --- a/benchmarks/pyspark/benchmarks/shuffle.py +++ b/benchmarks/pyspark/benchmarks/shuffle.py @@ -128,3 +128,86 @@ def description(cls) -> str: def _repartition(self, df: DataFrame) -> DataFrame: """Repartition using round-robin (no partition columns specified).""" return df.repartition(self.num_partitions) + + +# Variants with explicit native write configuration + + +class ShuffleHashNativeWriteBenchmark(ShuffleHashBenchmark): + """Shuffle benchmark using hash partitioning with Comet native parquet writes.""" + + @classmethod + def name(cls) -> str: + return "shuffle-hash-native-write" + + @classmethod + def description(cls) -> str: + return "Hash shuffle with Comet native parquet writes enabled" + + @classmethod + def get_spark_configs(cls, mode: str) -> dict: + configs = super().get_spark_configs(mode) + if mode != "spark": + configs.update({ + "spark.comet.parquet.write.enabled": "true", + "spark.comet.operator.DataWritingCommandExec.allowIncompatible": "true", + }) + return configs + + +class ShuffleHashSparkWriteBenchmark(ShuffleHashBenchmark): + """Shuffle benchmark using hash partitioning with Spark parquet writes.""" + + @classmethod + def name(cls) -> str: + return "shuffle-hash-spark-write" + + @classmethod + def description(cls) -> str: + return "Hash shuffle with Comet native parquet writes disabled" + + @classmethod + def get_spark_configs(cls, mode: str) -> dict: + configs = super().get_spark_configs(mode) + configs["spark.comet.parquet.write.enabled"] = "false" + return configs + + +class ShuffleRoundRobinNativeWriteBenchmark(ShuffleRoundRobinBenchmark): + """Shuffle benchmark using round-robin partitioning with Comet native parquet writes.""" + + @classmethod + def name(cls) -> str: + return "shuffle-roundrobin-native-write" + + @classmethod + def description(cls) -> str: + return "Round-robin shuffle with Comet native parquet writes enabled" + + @classmethod + def get_spark_configs(cls, mode: str) -> dict: + configs = super().get_spark_configs(mode) + if mode != "spark": + configs.update({ + "spark.comet.parquet.write.enabled": "true", + "spark.comet.operator.DataWritingCommandExec.allowIncompatible": "true", + }) + return configs + + +class ShuffleRoundRobinSparkWriteBenchmark(ShuffleRoundRobinBenchmark): + """Shuffle benchmark using round-robin partitioning with Spark parquet writes.""" + + @classmethod + def name(cls) -> str: + return "shuffle-roundrobin-spark-write" + + @classmethod + def description(cls) -> str: + return "Round-robin shuffle with Comet native parquet writes disabled" + + @classmethod + def get_spark_configs(cls, mode: str) -> dict: + configs = super().get_spark_configs(mode) + configs["spark.comet.parquet.write.enabled"] = "false" + return configs diff --git a/benchmarks/pyspark/run_all_benchmarks.sh b/benchmarks/pyspark/run_all_benchmarks.sh index f2bc8f552f..f8bab8e30d 100755 --- a/benchmarks/pyspark/run_all_benchmarks.sh +++ b/benchmarks/pyspark/run_all_benchmarks.sh @@ -45,7 +45,7 @@ echo "========================================" # Run Spark baseline (no Comet) echo "" -echo ">>> Running SPARK shuffle benchmark..." +echo ">>> Running SPARK shuffle benchmark (baseline)..." $SPARK_HOME/bin/spark-submit \ --master "$SPARK_MASTER" \ --executor-memory "$EXECUTOR_MEMORY" \ @@ -55,62 +55,54 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.comet.exec.shuffle.enabled=false \ "$SCRIPT_DIR/run_benchmark.py" \ --data "$DATA_PATH" \ - --mode spark + --shuffle-mode spark \ + --benchmark shuffle-hash -# Run Comet JVM shuffle -echo "" -echo ">>> Running COMET JVM shuffle benchmark..." -$SPARK_HOME/bin/spark-submit \ - --master "$SPARK_MASTER" \ - --executor-memory "$EXECUTOR_MEMORY" \ - --jars "$COMET_JAR" \ - --driver-class-path "$COMET_JAR" \ - --conf spark.executor.extraClassPath="$COMET_JAR" \ - --conf spark.eventLog.enabled=true \ - --conf spark.eventLog.dir="$EVENT_LOG_DIR" \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.comet.enabled=true \ - --conf spark.comet.operator.DataWritingCommandExec.allowIncompatible=true \ - --conf spark.comet.parquet.write.enabled=true \ - --conf spark.comet.logFallbackReasons.enabled=true \ - --conf spark.comet.explainFallback.enabled=true \ - --conf spark.comet.shuffle.mode=jvm \ - --conf spark.comet.exec.shuffle.mode=jvm \ - --conf spark.comet.exec.replaceSortMergeJoin=true \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ - --conf spark.comet.cast.allowIncompatible=true \ - "$SCRIPT_DIR/run_benchmark.py" \ - --data "$DATA_PATH" \ - --mode jvm +# Helper function to run a Comet benchmark +# Usage: run_comet_benchmark +run_comet_benchmark() { + local shuffle_mode=$1 + local benchmark=$2 -# Run Comet Native shuffle -echo "" -echo ">>> Running COMET NATIVE shuffle benchmark..." -$SPARK_HOME/bin/spark-submit \ - --master "$SPARK_MASTER" \ - --executor-memory "$EXECUTOR_MEMORY" \ - --jars "$COMET_JAR" \ - --driver-class-path "$COMET_JAR" \ - --conf spark.executor.extraClassPath="$COMET_JAR" \ - --conf spark.eventLog.enabled=true \ - --conf spark.eventLog.dir="$EVENT_LOG_DIR" \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.comet.enabled=true \ - --conf spark.comet.operator.DataWritingCommandExec.allowIncompatible=true \ - --conf spark.comet.parquet.write.enabled=true \ - --conf spark.comet.logFallbackReasons.enabled=true \ - --conf spark.comet.explainFallback.enabled=true \ - --conf spark.comet.exec.shuffle.mode=native \ - --conf spark.comet.exec.replaceSortMergeJoin=true \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ - --conf spark.comet.cast.allowIncompatible=true \ - "$SCRIPT_DIR/run_benchmark.py" \ - --data "$DATA_PATH" \ - --mode native + echo "" + echo ">>> Running COMET $shuffle_mode-shuffle benchmark: $benchmark..." + + $SPARK_HOME/bin/spark-submit \ + --master "$SPARK_MASTER" \ + --executor-memory "$EXECUTOR_MEMORY" \ + --jars "$COMET_JAR" \ + --driver-class-path "$COMET_JAR" \ + --conf spark.executor.extraClassPath="$COMET_JAR" \ + --conf spark.eventLog.enabled=true \ + --conf spark.eventLog.dir="$EVENT_LOG_DIR" \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=16g \ + --conf spark.comet.exec.shuffle.mode="$shuffle_mode" \ + --conf spark.comet.exec.replaceSortMergeJoin=true \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ + --conf spark.comet.cast.allowIncompatible=true \ + "$SCRIPT_DIR/run_benchmark.py" \ + --data "$DATA_PATH" \ + --shuffle-mode "$shuffle_mode" \ + --benchmark "$benchmark" +} + +# Run Comet JVM shuffle with native writes +run_comet_benchmark jvm shuffle-hash-native-write +run_comet_benchmark jvm shuffle-roundrobin-native-write + +# Run Comet JVM shuffle with Spark writes +run_comet_benchmark jvm shuffle-hash-spark-write +run_comet_benchmark jvm shuffle-roundrobin-spark-write + +# Run Comet Native shuffle with native writes +run_comet_benchmark native shuffle-hash-native-write +run_comet_benchmark native shuffle-roundrobin-native-write + +# Run Comet Native shuffle with Spark writes +run_comet_benchmark native shuffle-hash-spark-write +run_comet_benchmark native shuffle-roundrobin-spark-write echo "" echo "========================================" diff --git a/benchmarks/pyspark/run_benchmark.py b/benchmarks/pyspark/run_benchmark.py index 6713f0ff21..99b2c99b56 100755 --- a/benchmarks/pyspark/run_benchmark.py +++ b/benchmarks/pyspark/run_benchmark.py @@ -38,10 +38,10 @@ def main(): epilog=""" Examples: # Run hash partitioning shuffle benchmark in Spark mode - python run_benchmark.py --data /path/to/data --mode spark --benchmark shuffle-hash + python run_benchmark.py --data /path/to/data --shuffle-mode spark --benchmark shuffle-hash # Run round-robin shuffle benchmark in Comet native mode - python run_benchmark.py --data /path/to/data --mode native --benchmark shuffle-roundrobin + python run_benchmark.py --data /path/to/data --shuffle-mode native --benchmark shuffle-roundrobin # List all available benchmarks python run_benchmark.py --list-benchmarks @@ -52,7 +52,7 @@ def main(): help="Path to input parquet data" ) parser.add_argument( - "--mode", "-m", + "--shuffle-mode", "-m", choices=["spark", "jvm", "native"], help="Shuffle mode being tested" ) @@ -66,6 +66,11 @@ def main(): action="store_true", help="List all available benchmarks and exit" ) + parser.add_argument( + "--print-configs", + action="store_true", + help="Print benchmark-specific Spark configs and exit (for shell scripts)" + ) args = parser.parse_args() @@ -76,11 +81,25 @@ def main(): print(f" {name:25s} - {description}") return 0 + # Handle --print-configs (requires --benchmark and --shuffle-mode) + if args.print_configs: + if not args.shuffle_mode: + parser.error("--shuffle-mode is required when using --print-configs") + try: + benchmark_cls = get_benchmark(args.benchmark) + except KeyError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + configs = benchmark_cls.get_spark_configs(args.shuffle_mode) + for key, value in configs.items(): + print(f"--conf {key}={value}") + return 0 + # Validate required arguments if not args.data: parser.error("--data is required when running a benchmark") - if not args.mode: - parser.error("--mode is required when running a benchmark") + if not args.shuffle_mode: + parser.error("--shuffle-mode is required when running a benchmark") # Get the benchmark class try: @@ -90,14 +109,18 @@ def main(): print("\nUse --list-benchmarks to see available benchmarks", file=sys.stderr) return 1 - # Create Spark session - spark = SparkSession.builder \ - .appName(f"{benchmark_cls.name()}-{args.mode.upper()}") \ - .getOrCreate() + # Get benchmark-specific configs + benchmark_configs = benchmark_cls.get_spark_configs(args.shuffle_mode) + + # Create Spark session with benchmark-specific configs + builder = SparkSession.builder.appName(f"{benchmark_cls.name()}-{args.shuffle_mode.upper()}") + for key, value in benchmark_configs.items(): + builder = builder.config(key, value) + spark = builder.getOrCreate() try: # Create and run the benchmark - benchmark = benchmark_cls(spark, args.data, args.mode) + benchmark = benchmark_cls(spark, args.data, args.shuffle_mode) results = benchmark.execute_timed() print("\nCheck Spark UI for shuffle sizes and detailed metrics")