From 45c312f550a533925d119a7a02479e1f190fa9c5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Jun 2025 08:38:08 -0600 Subject: [PATCH 1/5] Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR hack --- .../scala/org/apache/comet/CometConf.scala | 7 - dev/diffs/3.4.3.diff | 107 ++++++++++++++-- dev/diffs/3.5.4.diff | 95 ++++++++++++-- dev/diffs/3.5.5.diff | 121 ++++++++++++++++-- dev/diffs/4.0.0-preview1.diff | 109 ++++++++++++++-- .../apache/comet/rules/CometExecRule.scala | 13 +- .../rules/EliminateRedundantTransitions.scala | 7 +- .../sql/comet/CometColumnarToRowExec.scala | 2 + .../spark/sql/CometTPCDSQuerySuite.scala | 1 + .../org/apache/spark/sql/CometTestBase.scala | 1 - .../sql/comet/CometPlanStabilitySuite.scala | 1 - 11 files changed, 393 insertions(+), 71 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9807ebe04b..317303eb7b 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -289,13 +289,6 @@ object CometConf extends ShimCometConf { .checkValues(Set("native", "jvm", "auto")) .createWithDefault("auto") - val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] = - conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar") - .doc("Whether to try falling back to columnar shuffle when native shuffle is not supported") - .internal() - .booleanConf - .createWithDefault(false) - val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled") .doc( diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 18e91d9da6..aeb1472c67 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -363,6 +363,44 @@ index a9f69ab28a1..5d9d4f2cb83 100644 withTable("tbl") { sql( """ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index 433b4741979..07148eee480 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -23,8 +23,9 @@ import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} + import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral} + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1186,10 +1187,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1212,7 +1215,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index daef11ae4d6..9f3cc9181f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -386,7 +424,7 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..cc5224af735 100644 +index f33432ddb6f..1925aac8d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -417,17 +455,37 @@ index f33432ddb6f..cc5224af735 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("avoid reordering broadcast join keys to match input hash partitioning") { ++ test("avoid reordering broadcast join keys to match input hash partitioning", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("large", "dimTwo", "dimThree") { +@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1729,6 +1735,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-34637: DPP side broadcast query stage is created firstly") { ++ test("SPARK-34637: DPP side broadcast query stage is created firstly", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH v as ( +@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -611,7 +669,7 @@ index 1792b4c32eb..1616e6f39bd 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 7f062bfb899..b347ef905d2 100644 +index 7f062bfb899..0ed85486e80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier @@ -707,7 +765,7 @@ index 7f062bfb899..b347ef905d2 100644 // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) } -@@ -1282,18 +1292,25 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1282,18 +1292,26 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } // Test shuffled hash join @@ -722,6 +780,7 @@ index 7f062bfb899..b347ef905d2 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -735,7 +794,7 @@ index 7f062bfb899..b347ef905d2 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1341,7 +1358,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1341,7 +1359,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -745,7 +804,7 @@ index 7f062bfb899..b347ef905d2 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1370,9 +1388,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1370,9 +1389,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -760,7 +819,7 @@ index 7f062bfb899..b347ef905d2 100644 } // Test output ordering is not preserved -@@ -1381,9 +1402,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1381,9 +1403,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -775,7 +834,7 @@ index 7f062bfb899..b347ef905d2 100644 } // Test singe partition -@@ -1393,7 +1417,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1393,7 +1418,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -785,7 +844,7 @@ index 7f062bfb899..b347ef905d2 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1438,6 +1463,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1438,6 +1464,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -795,7 +854,7 @@ index 7f062bfb899..b347ef905d2 100644 }.size == 1) } } -@@ -1482,14 +1510,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1482,14 +1511,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -818,7 +877,7 @@ index 7f062bfb899..b347ef905d2 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } -@@ -1605,7 +1639,8 @@ class ThreadLeakInSortMergeJoinSuite +@@ -1605,7 +1640,8 @@ class ThreadLeakInSortMergeJoinSuite sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) } @@ -1280,6 +1339,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..ab7baf434a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff index 32d4d617e8..68e0b8e362 100644 --- a/dev/diffs/3.5.4.diff +++ b/dev/diffs/3.5.4.diff @@ -342,6 +342,44 @@ index 7ee18df3756..64f01a68048 100644 withTable("tbl") { sql( """ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index 47a311c71d5..342e71cfdd4 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f32b32ffc5a..447d7c6416e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -365,7 +403,7 @@ index f32b32ffc5a..447d7c6416e 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..19ce507e82b 100644 +index f33432ddb6f..0fa49fb3f0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -422,7 +460,7 @@ index f33432ddb6f..19ce507e82b 100644 - test("avoid reordering broadcast join keys to match input hash partitioning") { + test("avoid reordering broadcast join keys to match input hash partitioning", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { @@ -442,7 +480,7 @@ index f33432ddb6f..19ce507e82b 100644 test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( @@ -482,7 +520,7 @@ index f33432ddb6f..19ce507e82b 100644 - test("SPARK-34637: DPP side broadcast query stage is created firstly") { + test("SPARK-34637: DPP side broadcast query stage is created firstly", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( @@ -746,7 +784,7 @@ index 7af826583bd..3c3def1eb67 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 4d256154c85..43f0bebb00c 100644 +index 4d256154c85..66a5473852d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -872,7 +910,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size === 1) // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) -@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -880,6 +918,7 @@ index 4d256154c85..43f0bebb00c 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -893,7 +932,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -903,7 +942,7 @@ index 4d256154c85..43f0bebb00c 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -918,7 +957,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test output ordering is not preserved -@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -933,7 +972,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test singe partition -@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -943,7 +982,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -953,7 +992,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size == 1) } } -@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -976,6 +1015,16 @@ index 4d256154c85..43f0bebb00c 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } +@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite + sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) + } + +- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") { ++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)", ++ IgnoreComet("Comet does not support spilling")) { + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index c26757c9cff..d55775f09d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1451,6 +1500,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..ab7baf434a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff index 9ca5310876..1fa55686d9 100644 --- a/dev/diffs/3.5.5.diff +++ b/dev/diffs/3.5.5.diff @@ -342,6 +342,44 @@ index 7ee18df3756..64f01a68048 100644 withTable("tbl") { sql( """ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index 47a311c71d5..342e71cfdd4 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f32b32ffc5a..447d7c6416e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -365,7 +403,7 @@ index f32b32ffc5a..447d7c6416e 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..fe9f74ff8f1 100644 +index f33432ddb6f..0e1499a24ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -386,7 +424,37 @@ index f33432ddb6f..fe9f74ff8f1 100644 case _ => Nil } } -@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("avoid reordering broadcast join keys to match input hash partitioning") { ++ test("avoid reordering broadcast join keys to match input hash partitioning", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("large", "dimTwo", "dimThree") { +@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + + test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + +- "canonicalization and exchange reuse") { ++ "canonicalization and exchange reuse", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = sql( +@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-34637: DPP side broadcast query stage is created firstly") { ++ test("SPARK-34637: DPP side broadcast query stage is created firstly", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH v as ( +@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -563,7 +631,7 @@ index 7af826583bd..3c3def1eb67 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 4d256154c85..43f0bebb00c 100644 +index 4d256154c85..66a5473852d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -689,7 +757,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size === 1) // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) -@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -697,6 +765,7 @@ index 4d256154c85..43f0bebb00c 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -710,7 +779,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -720,7 +789,7 @@ index 4d256154c85..43f0bebb00c 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -735,7 +804,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test output ordering is not preserved -@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -750,7 +819,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test singe partition -@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -760,7 +829,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -770,7 +839,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size == 1) } } -@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -793,6 +862,16 @@ index 4d256154c85..43f0bebb00c 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } +@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite + sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) + } + +- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") { ++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)", ++ IgnoreComet("Comet does not support spilling")) { + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index c26757c9cff..d55775f09d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1256,6 +1335,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..ab7baf434a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 2fec4297d8..3fb9b2c025 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -374,6 +374,44 @@ index 760ee802608..b77133ffd37 100644 } assert(exchanges.size == 2) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index e3aff9b36ae..06196517935 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1142,10 +1143,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1168,7 +1171,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 16a493b5290..3f0b70e2d59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -397,7 +435,7 @@ index 16a493b5290..3f0b70e2d59 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..3e6a8632fa6 100644 +index 2c24cc7d570..21d36ebc6f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -428,17 +466,37 @@ index 2c24cc7d570..3e6a8632fa6 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("avoid reordering broadcast join keys to match input hash partitioning") { ++ test("avoid reordering broadcast join keys to match input hash partitioning", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("large", "dimTwo", "dimThree") { +@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1455,7 +1461,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-34637: DPP side broadcast query stage is created firstly") { ++ test("SPARK-34637: DPP side broadcast query stage is created firstly", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH v as ( +@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -448,7 +506,7 @@ index 2c24cc7d570..3e6a8632fa6 100644 val df = sql( """ |SELECT s.store_id, f.product_id -@@ -1730,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -649,7 +707,7 @@ index 53e47f428c3..a55d8f0c161 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index fcb937d82ba..fafe8e8d08b 100644 +index fcb937d82ba..fc208087a69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -775,7 +833,7 @@ index fcb937d82ba..fafe8e8d08b 100644 }.size === 1) // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) -@@ -1435,13 +1449,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1435,13 +1449,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -783,6 +841,7 @@ index fcb937d82ba..fafe8e8d08b 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -796,7 +855,7 @@ index fcb937d82ba..fafe8e8d08b 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1489,7 +1509,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1489,7 +1510,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -806,7 +865,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1518,9 +1539,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1518,9 +1540,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -821,7 +880,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } // Test output ordering is not preserved -@@ -1529,9 +1553,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1529,9 +1554,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -836,7 +895,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } // Test singe partition -@@ -1541,7 +1568,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1541,7 +1569,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -846,7 +905,7 @@ index fcb937d82ba..fafe8e8d08b 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1586,6 +1614,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1586,6 +1615,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -856,7 +915,7 @@ index fcb937d82ba..fafe8e8d08b 100644 }.size == 1) } } -@@ -1630,14 +1661,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1630,14 +1662,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -879,7 +938,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } -@@ -1773,7 +1810,8 @@ class ThreadLeakInSortMergeJoinSuite +@@ -1773,7 +1811,8 @@ class ThreadLeakInSortMergeJoinSuite sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) } @@ -1451,6 +1510,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 966f4e74712..8017e22d7f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index c9dd4f17b7..54e6e63649 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.{CometConf, ExtendedExplainInfo} -import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR} +import org.apache.comet.CometConf.COMET_ANSI_MODE_ENABLED import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -491,16 +491,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { None } - // this is a temporary workaround because some Spark SQL tests fail - // when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs - // that we had not previously seen - val tryColumnarNext = - !nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty && - COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf)) - val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) { nativeShuffle - } else if (tryColumnarNext) { + } else { // Columnar shuffle for regular Spark operators (not Comet) and Comet operators // (if configured). // If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not @@ -526,8 +519,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } else { None } - } else { - None } if (nativeOrColumnarShuffle.isDefined) { diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index ecc0823d60..a1a96d321b 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.QueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.comet.CometConf @@ -56,7 +57,8 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa override def apply(plan: SparkPlan): SparkPlan = { val newPlan = _apply(plan) if (showTransformations) { - logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan") + // scalastyle:off println + System.err.println(s"EliminateRedundantTransitions:\nINPUT: $plan\nOUTPUT: $newPlan") } newPlan } @@ -64,7 +66,7 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa private def _apply(plan: SparkPlan): SparkPlan = { val eliminatedPlan = plan transformUp { case ColumnarToRowExec(shuffleExchangeExec: CometShuffleExchangeExec) - if (plan.conf.adaptiveExecutionEnabled) => + if plan.conf.adaptiveExecutionEnabled => shuffleExchangeExec case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) => if (sparkToColumnar.child.supportsColumnar) { @@ -112,6 +114,7 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa private def hasCometNativeChild(op: SparkPlan): Boolean = { op match { case c: QueryStageExec => hasCometNativeChild(c.plan) + case c: ReusedExchangeExec => hasCometNativeChild(c.child) case _ => op.exists(_.isInstanceOf[CometPlan]) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index 0391a1c3b3..95e03ca69c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.comet.util.{Utils => CometUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{CodegenSupport, ColumnarToRowTransition, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, WritableColumnVector} import org.apache.spark.sql.types._ @@ -172,6 +173,7 @@ case class CometColumnarToRowExec(child: SparkPlan) op match { case b: CometBroadcastExchangeExec => Some(b) case b: BroadcastQueryStageExec => findCometBroadcastExchange(b.plan) + case b: ReusedExchangeExec => findCometBroadcastExchange(b.child) case _ => op.children.collectFirst(Function.unlift(findCometBroadcastExchange)) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 8d084fd75d..0e582ddaba 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -188,6 +188,7 @@ class CometTPCDSQuerySuite conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g") + conf.set(CometConf.COMET_EXPLAIN_TRANSFORMATIONS.key, "true") conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") conf.set(MEMORY_OFFHEAP_SIZE.key, "15g") diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 0b15def98b..2a7983f0b5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -78,7 +78,6 @@ abstract class CometTestBase conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") - conf.set(CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 9f611612f6..cf887a1013 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -272,7 +272,6 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false", SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString, CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key -> "true", CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { From 4026cb687589bdaeee571f97c06c5e51e53fd653 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Jun 2025 08:57:05 -0600 Subject: [PATCH 2/5] revert ignore a test that now passes due to DF 48 upgrade --- dev/diffs/3.4.3.diff | 12 +----------- dev/diffs/3.5.4.diff | 12 +----------- dev/diffs/3.5.5.diff | 12 +----------- dev/diffs/4.0.0-preview1.diff | 12 +----------- 4 files changed, 4 insertions(+), 44 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index aeb1472c67..dda075f02f 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -247,7 +247,7 @@ index cf40e944c09..bdd5be4f462 100644 test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -index 1cc09c3d7fc..b85b53a9688 100644 +index 1cc09c3d7fc..f031fa45c33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkException @@ -268,16 +268,6 @@ index 1cc09c3d7fc..b85b53a9688 100644 } assert(exchangePlans.length == 1) } -@@ -1100,7 +1100,8 @@ class DataFrameAggregateSuite extends QueryTest - } - } - -- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") { -+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1824")) { - withTempView("view") { - val nan1 = java.lang.Float.intBitsToFloat(0x7f800001) - val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 56e9520fdab..917932336df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff index 68e0b8e362..3695871752 100644 --- a/dev/diffs/3.5.4.diff +++ b/dev/diffs/3.5.4.diff @@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644 test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -index 5a8681aed97..db69fde723a 100644 +index 5a8681aed97..da9d25e2eb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand @@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644 } assert(exchangePlans.length == 1) } -@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest - } - } - -- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") { -+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1824")) { - withTempView("view") { - val nan1 = java.lang.Float.intBitsToFloat(0x7f800001) - val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 56e9520fdab..917932336df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff index 1fa55686d9..75928a0773 100644 --- a/dev/diffs/3.5.5.diff +++ b/dev/diffs/3.5.5.diff @@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644 test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -index 5a8681aed97..db69fde723a 100644 +index 5a8681aed97..da9d25e2eb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand @@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644 } assert(exchangePlans.length == 1) } -@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest - } - } - -- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") { -+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1824")) { - withTempView("view") { - val nan1 = java.lang.Float.intBitsToFloat(0x7f800001) - val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 56e9520fdab..917932336df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 3fb9b2c025..a59c85c212 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -268,7 +268,7 @@ index d023fb82185..0f4f03bda6c 100644 withTempView("t0", "t1", "t2") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -index 620ee430cab..f5df9218fc1 100644 +index 620ee430cab..9d383a4bff9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.util.AUTO_GENERATED_ALIAS @@ -289,16 +289,6 @@ index 620ee430cab..f5df9218fc1 100644 } assert(exchangePlans.length == 1) } -@@ -1275,7 +1275,8 @@ class DataFrameAggregateSuite extends QueryTest - } - } - -- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") { -+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1824")) { - withTempView("view") { - val nan1 = java.lang.Float.intBitsToFloat(0x7f800001) - val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index f6fd6b501d7..11870c85d82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala From fadb1f03e0318221616439e3a6af3f284675f606 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Jun 2025 13:40:09 -0600 Subject: [PATCH 3/5] update 3.5.6 diff --- dev/diffs/3.5.6.diff | 184 ++++++++++++++++++++++++++----------------- 1 file changed, 112 insertions(+), 72 deletions(-) diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff index 42d5dad00a..0c41772778 100644 --- a/dev/diffs/3.5.6.diff +++ b/dev/diffs/3.5.6.diff @@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644 test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -index 5a8681aed97..db69fde723a 100644 +index 5a8681aed97..da9d25e2eb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand @@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644 } assert(exchangePlans.length == 1) } -@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest - } - } - -- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") { -+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1824")) { - withTempView("view") { - val nan1 = java.lang.Float.intBitsToFloat(0x7f800001) - val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 56e9520fdab..917932336df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -342,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644 withTable("tbl") { sql( """ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index 47a311c71d5..8cbc4912d60 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f32b32ffc5a..447d7c6416e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -365,7 +393,7 @@ index f32b32ffc5a..447d7c6416e 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..fe9f74ff8f1 100644 +index f33432ddb6f..0e1499a24ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -386,7 +414,37 @@ index f33432ddb6f..fe9f74ff8f1 100644 case _ => Nil } } -@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("avoid reordering broadcast join keys to match input hash partitioning") { ++ test("avoid reordering broadcast join keys to match input hash partitioning", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("large", "dimTwo", "dimThree") { +@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + + test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + +- "canonicalization and exchange reuse") { ++ "canonicalization and exchange reuse", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = sql( +@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-34637: DPP side broadcast query stage is created firstly") { ++ test("SPARK-34637: DPP side broadcast query stage is created firstly", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH v as ( +@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -483,57 +541,6 @@ index 93275487f29..01e5c601763 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -new file mode 100644 -index 00000000000..5691536c114 ---- /dev/null -+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -@@ -0,0 +1,45 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.spark.sql -+ -+import org.scalactic.source.Position -+import org.scalatest.Tag -+ -+import org.apache.spark.sql.test.SQLTestUtils -+ -+/** -+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). -+ */ -+case class IgnoreComet(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") -+ -+/** -+ * Helper trait that disables Comet for all tests regardless of default config values. -+ */ -+trait IgnoreCometSuite extends SQLTestUtils { -+ override protected def test(testName: String, testTags: Tag*)(testFun: => Any) -+ (implicit pos: Position): Unit = { -+ if (isCometEnabled) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } -+} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 7af826583bd..3c3def1eb67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -563,7 +570,7 @@ index 7af826583bd..3c3def1eb67 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 4d256154c85..43f0bebb00c 100644 +index 4d256154c85..66a5473852d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -689,7 +696,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size === 1) // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) -@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -697,6 +704,7 @@ index 4d256154c85..43f0bebb00c 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -710,7 +718,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -720,7 +728,7 @@ index 4d256154c85..43f0bebb00c 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -735,7 +743,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test output ordering is not preserved -@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -750,7 +758,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test singe partition -@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -760,7 +768,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -770,7 +778,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size == 1) } } -@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -793,6 +801,16 @@ index 4d256154c85..43f0bebb00c 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } +@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite + sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) + } + +- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") { ++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)", ++ IgnoreComet("Comet does not support spilling")) { + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index c26757c9cff..d55775f09d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1256,6 +1274,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..ab7baf434a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala From e6031fd386b3c3b2dbf1a73a349ce2abc2ea086a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Jun 2025 13:40:26 -0600 Subject: [PATCH 4/5] update 3.5.6 diff --- dev/diffs/3.5.6.diff | 51 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff index 0c41772778..6a20e1d7cb 100644 --- a/dev/diffs/3.5.6.diff +++ b/dev/diffs/3.5.6.diff @@ -541,6 +541,57 @@ index 93275487f29..01e5c601763 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +new file mode 100644 +index 00000000000..5691536c114 +--- /dev/null ++++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +@@ -0,0 +1,45 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.spark.sql ++ ++import org.scalactic.source.Position ++import org.scalatest.Tag ++ ++import org.apache.spark.sql.test.SQLTestUtils ++ ++/** ++ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). ++ */ ++case class IgnoreComet(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") ++ ++/** ++ * Helper trait that disables Comet for all tests regardless of default config values. ++ */ ++trait IgnoreCometSuite extends SQLTestUtils { ++ override protected def test(testName: String, testTags: Tag*)(testFun: => Any) ++ (implicit pos: Position): Unit = { ++ if (isCometEnabled) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ } else { ++ super.test(testName, testTags: _*)(testFun) ++ } ++ } ++} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 7af826583bd..3c3def1eb67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala From 97008a453335177b5958b09b0938c8ae111f7df7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Jun 2025 14:37:26 -0600 Subject: [PATCH 5/5] fix 3.5.6 diff --- dev/diffs/3.5.6.diff | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff index 6a20e1d7cb..e3ba0a35c3 100644 --- a/dev/diffs/3.5.6.diff +++ b/dev/diffs/3.5.6.diff @@ -333,7 +333,7 @@ index 7ee18df3756..64f01a68048 100644 sql( """ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala -index 47a311c71d5..8cbc4912d60 100644 +index 47a311c71d5..342e71cfdd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression @@ -356,7 +356,7 @@ index 47a311c71d5..8cbc4912d60 100644 desiredClusterColumns: Seq[String]): Boolean = plan match { case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => partitionExpressionsColumns(op.expressions) === desiredClusterColumns -+ case CometShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns case _ => false }