Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("auto")

val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
.doc("Whether to try falling back to columnar shuffle when native shuffle is not supported")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
119 changes: 95 additions & 24 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ index cf40e944c09..bdd5be4f462 100644

test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 1cc09c3d7fc..b85b53a9688 100644
index 1cc09c3d7fc..f031fa45c33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.SparkException
Expand All @@ -268,16 +268,6 @@ index 1cc09c3d7fc..b85b53a9688 100644
}
assert(exchangePlans.length == 1)
}
@@ -1100,7 +1100,8 @@ class DataFrameAggregateSuite extends QueryTest
}
}

- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1824")) {
withTempView("view") {
val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 56e9520fdab..917932336df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
Expand Down Expand Up @@ -363,6 +353,44 @@ index a9f69ab28a1..5d9d4f2cb83 100644
withTable("tbl") {
sql(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 433b4741979..07148eee480 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -23,8 +23,9 @@ import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral}
import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
@@ -1186,10 +1187,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}

def isShuffleExecByRequirement(
- plan: ShuffleExchangeExec,
+ plan: ShuffleExchangeLike,
desiredClusterColumns: Seq[String]): Boolean = plan match {
case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) =>
partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
case _ => false
}

@@ -1212,7 +1215,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
case w: WindowExec =>
w.child.exists {
- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2"))
+ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2"))
case _ => false
}
case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index daef11ae4d6..9f3cc9181f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Expand All @@ -386,7 +414,7 @@ index daef11ae4d6..9f3cc9181f2 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..cc5224af735 100644
index f33432ddb6f..1925aac8d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -417,17 +445,37 @@ index f33432ddb6f..cc5224af735 100644
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("avoid reordering broadcast join keys to match input hash partitioning") {
+ test("avoid reordering broadcast join keys to match input hash partitioning",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1729,6 +1735,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -611,7 +659,7 @@ index 1792b4c32eb..1616e6f39bd 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7f062bfb899..b347ef905d2 100644
index 7f062bfb899..0ed85486e80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -707,7 +755,7 @@ index 7f062bfb899..b347ef905d2 100644
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
@@ -1282,18 +1292,25 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1282,18 +1292,26 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}

// Test shuffled hash join
Expand All @@ -722,6 +770,7 @@ index 7f062bfb899..b347ef905d2 100644
+ true
+ case WholeStageCodegenExec(ColumnarToRowExec(
+ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true
+ case _: CometHashJoinExec => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)

Expand All @@ -735,7 +784,7 @@ index 7f062bfb899..b347ef905d2 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
@@ -1341,7 +1358,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1341,7 +1359,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
// Have shuffle before aggregation
Expand All @@ -745,7 +794,7 @@ index 7f062bfb899..b347ef905d2 100644
}

def getJoinQuery(selectExpr: String, joinType: String): String = {
@@ -1370,9 +1388,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1370,9 +1389,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -760,7 +809,7 @@ index 7f062bfb899..b347ef905d2 100644
}

// Test output ordering is not preserved
@@ -1381,9 +1402,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1381,9 +1403,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -775,7 +824,7 @@ index 7f062bfb899..b347ef905d2 100644
}

// Test singe partition
@@ -1393,7 +1417,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1393,7 +1418,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
Expand All @@ -785,7 +834,7 @@ index 7f062bfb899..b347ef905d2 100644
checkAnswer(fullJoinDF, Row(100))
}
}
@@ -1438,6 +1463,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1438,6 +1464,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true
Expand All @@ -795,7 +844,7 @@ index 7f062bfb899..b347ef905d2 100644
}.size == 1)
}
}
@@ -1482,14 +1510,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1482,14 +1511,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -818,7 +867,7 @@ index 7f062bfb899..b347ef905d2 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1605,7 +1639,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1605,7 +1640,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand Down Expand Up @@ -1280,6 +1329,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index eec396b2e39..bf3f1c769d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.TestUtils.assertSpilled
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
import org.apache.spark.sql.test.SharedSparkSession

@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession {
Row(1, 3, null) :: Row(2, null, 4) :: Nil)
}

- test("test with low buffer spill threshold") {
+ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) {
val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
nums.createOrReplaceTempView("nums")

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..ab7baf434a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
Expand Down
Loading
Loading