Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,15 +600,6 @@ object CometConf extends ShimCometConf {
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan"))

val COMET_ANSI_MODE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.ansi.enabled")
.internal()
.doc(
"Comet does not respect ANSI mode in most cases and by default will not accelerate " +
"queries when ansi mode is enabled. Enable this setting to test Comet's experimental " +
"support for ANSI mode. This should not be used in production.")
.booleanConf
.createWithDefault(COMET_ANSI_MODE_ENABLED_DEFAULT)

val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.caseConversion.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ package org.apache.comet.shims

trait ShimCometConf {
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = false
protected val COMET_ANSI_MODE_ENABLED_DEFAULT = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ package org.apache.comet.shims

trait ShimCometConf {
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
protected val COMET_ANSI_MODE_ENABLED_DEFAULT = true
}
17 changes: 15 additions & 2 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index d3544881af1..5cc127f064d 100644
index d3544881af1..9c174496a4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -881,7 +881,7 @@ index b5b34922694..a72403780c4 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 525d97e4998..8a3e7457618 100644
index 525d97e4998..5e04319dd97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -894,6 +894,19 @@ index 525d97e4998..8a3e7457618 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4467,7 +4468,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val msg = intercept[SparkException] {
sql(query).collect()
}.getMessage
- assert(msg.contains(query))
+ if (!isCometEnabled) {
+ // Comet's error message does not include the original SQL query
+ // https://github.com/apache/datafusion-comet/issues/2215
+ assert(msg.contains(query))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ANSI was not enabled, was this passing?
If so, is there a way to check whether ANSI is enabled and skip only when both comet and ansi are enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were previously falling back to Spark for this test. This Spark test is specifically for testing ANSI errors from cast.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall now why we were falling back to Spark so I will take another look

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Spark 3.x we previously fell back to Spark for this test because we do not run the Spark SQL tests for 3.x with ENABLE_COMET_ANSI_MODE=true and therefore did not enable spark.comet.ansi.enabled.

+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 48ad10992c5..51d1ee65422 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
Expand Down
17 changes: 15 additions & 2 deletions dev/diffs/3.5.6.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index 68e2c422a24..fb9c2e88fac 100644
index 68e2c422a24..d971894ffe6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -866,7 +866,7 @@ index c26757c9cff..d55775f09d7 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 793a0da6a86..6ccb9d62582 100644
index 793a0da6a86..e48e74091cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -879,6 +879,19 @@ index 793a0da6a86..6ccb9d62582 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4497,7 +4498,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val msg = intercept[SparkException] {
sql(query).collect()
}.getMessage
- assert(msg.contains(query))
+ if (!isCometEnabled) {
+ // Comet's error message does not include the original SQL query
+ // https://github.com/apache/datafusion-comet/issues/2215
+ assert(msg.contains(query))
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index fa1a64460fc..1d2e215d6a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
Expand Down
19 changes: 16 additions & 3 deletions dev/diffs/4.0.0.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ index ad424b3a7cc..4ece0117a34 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b3fce19979e..345acb4811a 100644
index b3fce19979e..67edf5eb91c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1524,7 +1524,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand Down Expand Up @@ -1086,11 +1086,24 @@ index b3fce19979e..345acb4811a 100644
test("SPARK-39175: Query context of Cast should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2218")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4497,7 +4500,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -4490,14 +4493,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
assert(ex.isInstanceOf[SparkNumberFormatException] ||
ex.isInstanceOf[SparkDateTimeException] ||
ex.isInstanceOf[SparkRuntimeException])
- assert(ex.getMessage.contains(query))
+
+ if (!isCometEnabled) {
+ // Comet's error message does not include the original SQL query
+ // https://github.com/apache/datafusion-comet/issues/2215
+ assert(ex.getMessage.contains(query))
+ }
}
}
}
}

test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object CometCast {
Compatible(Some("Only supports years between 262143 BC and 262142 AD"))
case DataTypes.TimestampType if timeZoneId.exists(tz => tz != "UTC") =>
Incompatible(Some(s"Cast will use UTC instead of $timeZoneId"))
case DataTypes.TimestampType if evalMode == "ANSI" =>
case DataTypes.TimestampType if evalMode == CometEvalMode.ANSI =>
Incompatible(Some("ANSI mode not supported"))
case DataTypes.TimestampType =>
// https://github.com/apache/datafusion-comet/issues/328
Expand Down
15 changes: 1 addition & 14 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}

import org.apache.comet.{CometConf, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_EXEC_SHUFFLE_ENABLED}
import org.apache.comet.CometConf.COMET_EXEC_SHUFFLE_ENABLED
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
Expand Down Expand Up @@ -605,19 +605,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}

private def _apply(plan: SparkPlan): SparkPlan = {
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
if (COMET_ANSI_MODE_ENABLED.get()) {
if (!isSpark40Plus) {
logWarning("Using Comet's experimental support for ANSI mode.")
}
} else {
logInfo("Comet extension disabled for ANSI mode")
return plan
}
}

// We shouldn't transform Spark query plan if Comet is not loaded.
if (!isCometLoaded(conf)) return plan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2368,13 +2368,13 @@ object QueryPlanSerde extends Logging with CometExprShim {

sealed trait SupportLevel

/** We support this feature with full compatibility with Spark */
/** Comet supports this feature with full (or close enough) compatibility with Spark */
case class Compatible(notes: Option[String] = None) extends SupportLevel

/** We support this feature but results can be different from Spark */
/** Comet supports this feature but results can be different from Spark */
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** We do not support this feature */
/** Comet does not support this feature */
object Unsupported extends SupportLevel

/**
Expand Down
62 changes: 62 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/arithmetic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ trait MathBase {
}

object CometAdd extends CometExpressionSerde[Add] with MathBase {

override def getSupportLevel(expr: Add): SupportLevel = {
if (expr.evalMode == EvalMode.ANSI) {
Incompatible(Some("ANSI mode is not supported"))
} else {
Compatible(None)
}
}

override def convert(
expr: Add,
inputs: Seq[Attribute],
Expand All @@ -107,6 +116,15 @@ object CometAdd extends CometExpressionSerde[Add] with MathBase {
}

object CometSubtract extends CometExpressionSerde[Subtract] with MathBase {

override def getSupportLevel(expr: Subtract): SupportLevel = {
if (expr.evalMode == EvalMode.ANSI) {
Incompatible(Some("ANSI mode is not supported"))
} else {
Compatible(None)
}
}

override def convert(
expr: Subtract,
inputs: Seq[Attribute],
Expand All @@ -128,6 +146,15 @@ object CometSubtract extends CometExpressionSerde[Subtract] with MathBase {
}

object CometMultiply extends CometExpressionSerde[Multiply] with MathBase {

override def getSupportLevel(expr: Multiply): SupportLevel = {
if (expr.evalMode == EvalMode.ANSI) {
Incompatible(Some("ANSI mode is not supported"))
} else {
Compatible(None)
}
}

override def convert(
expr: Multiply,
inputs: Seq[Attribute],
Expand All @@ -149,6 +176,15 @@ object CometMultiply extends CometExpressionSerde[Multiply] with MathBase {
}

object CometDivide extends CometExpressionSerde[Divide] with MathBase {

override def getSupportLevel(expr: Divide): SupportLevel = {
if (expr.evalMode == EvalMode.ANSI) {
Incompatible(Some("ANSI mode is not supported"))
} else {
Compatible(None)
}
}

override def convert(
expr: Divide,
inputs: Seq[Attribute],
Expand All @@ -174,6 +210,15 @@ object CometDivide extends CometExpressionSerde[Divide] with MathBase {
}

object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with MathBase {

override def getSupportLevel(expr: IntegralDivide): SupportLevel = {
if (expr.evalMode == EvalMode.ANSI) {
Incompatible(Some("ANSI mode is not supported"))
} else {
Compatible(None)
}
}

override def convert(
expr: IntegralDivide,
inputs: Seq[Attribute],
Expand Down Expand Up @@ -237,6 +282,15 @@ object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with Mat
}

object CometRemainder extends CometExpressionSerde[Remainder] with MathBase {

override def getSupportLevel(expr: Remainder): SupportLevel = {
if (expr.evalMode == EvalMode.ANSI) {
Incompatible(Some("ANSI mode is not supported"))
} else {
Compatible(None)
}
}

override def convert(
expr: Remainder,
inputs: Seq[Attribute],
Expand Down Expand Up @@ -264,6 +318,14 @@ object CometRemainder extends CometExpressionSerde[Remainder] with MathBase {

object CometRound extends CometExpressionSerde[Round] {

override def getSupportLevel(expr: Round): SupportLevel = {
if (expr.ansiEnabled) {
Incompatible(Some("ANSI mode is not supported"))
} else {
Compatible(None)
}
}

override def convert(
r: Round,
inputs: Seq[Attribute],
Expand Down
4 changes: 2 additions & 2 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1217,8 +1217,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
if (testAnsi) {
// with ANSI enabled, we should produce the same exception as Spark
withSQLConf(
(SQLConf.ANSI_ENABLED.key, "true"),
(CometConf.COMET_ANSI_MODE_ENABLED.key, "true")) {
SQLConf.ANSI_ENABLED.key -> "true",
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {

// cast() should throw exception on invalid inputs when ansi mode is enabled
val df = data.withColumn("converted", col("a").cast(toType))
Expand Down
10 changes: 5 additions & 5 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {

withSQLConf(
SQLConf.ANSI_ENABLED.key -> "true",
CometConf.COMET_ANSI_MODE_ENABLED.key -> "true") {
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
testAbsAnsiOverflow(Seq((Byte.MaxValue, Byte.MinValue)))
testAbsAnsiOverflow(Seq((Short.MaxValue, Short.MinValue)))
testAbsAnsiOverflow(Seq((Int.MaxValue, Int.MinValue)))
Expand Down Expand Up @@ -1944,7 +1944,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
def withAnsiMode(enabled: Boolean)(f: => Unit): Unit = {
withSQLConf(
SQLConf.ANSI_ENABLED.key -> enabled.toString,
CometConf.COMET_ANSI_MODE_ENABLED.key -> enabled.toString,
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> enabled.toString,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true")(f)
}
Expand Down Expand Up @@ -2098,7 +2098,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
def withAnsiMode(enabled: Boolean)(f: => Unit): Unit = {
withSQLConf(
SQLConf.ANSI_ENABLED.key -> enabled.toString,
CometConf.COMET_ANSI_MODE_ENABLED.key -> enabled.toString,
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> enabled.toString,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true")(f)
}
Expand Down Expand Up @@ -2161,7 +2161,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withSQLConf(
"spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding",
SQLConf.ANSI_ENABLED.key -> "true",
CometConf.COMET_ANSI_MODE_ENABLED.key -> "true",
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
for (n <- Seq("2147483647", "-2147483648")) {
Expand Down Expand Up @@ -2672,7 +2672,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {

Seq(true, false).foreach { ansiEnabled =>
withSQLConf(
CometConf.COMET_ANSI_MODE_ENABLED.key -> "true",
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true",
SQLConf.ANSI_ENABLED.key -> ansiEnabled.toString(),
// Prevent the optimizer from collapsing an extract value of a create array
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> SimplifyExtractValueOps.ruleName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString,
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", // needed for ANSI mode
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
val qe = sql(queryString).queryExecution
Expand Down
Loading