Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,15 +600,6 @@ object CometConf extends ShimCometConf {
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan"))

val COMET_ANSI_MODE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.ansi.enabled")
.internal()
.doc(
"Comet does not respect ANSI mode in most cases and by default will not accelerate " +
"queries when ansi mode is enabled. Enable this setting to test Comet's experimental " +
"support for ANSI mode. This should not be used in production.")
.booleanConf
.createWithDefault(COMET_ANSI_MODE_ENABLED_DEFAULT)

val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.caseConversion.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ package org.apache.comet.shims

trait ShimCometConf {
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = false
protected val COMET_ANSI_MODE_ENABLED_DEFAULT = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ package org.apache.comet.shims

trait ShimCometConf {
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
protected val COMET_ANSI_MODE_ENABLED_DEFAULT = true
}
27 changes: 19 additions & 8 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index d3544881af1..5cc127f064d 100644
index d3544881af1..9c174496a4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -881,7 +881,7 @@ index b5b34922694..a72403780c4 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 525d97e4998..8a3e7457618 100644
index 525d97e4998..5e04319dd97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -894,6 +894,19 @@ index 525d97e4998..8a3e7457618 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4467,7 +4468,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val msg = intercept[SparkException] {
sql(query).collect()
}.getMessage
- assert(msg.contains(query))
+ if (!isCometEnabled) {
+ // Comet's error message does not include the original SQL query
+ // https://github.com/apache/datafusion-comet/issues/2215
+ assert(msg.contains(query))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ANSI was not enabled, was this passing?
If so, is there a way to check whether ANSI is enabled and skip only when both comet and ansi are enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were previously falling back to Spark for this test. This Spark test is specifically for testing ANSI errors from cast.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall now why we were falling back to Spark so I will take another look

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Spark 3.x we previously fell back to Spark for this test because we do not run the Spark SQL tests for 3.x with ENABLE_COMET_ANSI_MODE=true and therefore did not enable spark.comet.ansi.enabled.

+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 48ad10992c5..51d1ee65422 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
Expand Down Expand Up @@ -2798,10 +2811,10 @@ index dd55fcfe42c..a1d390c93d0 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..a1fb4abe681 100644
index ed2e309fa07..a5ea58146ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2828,7 +2841,6 @@ index ed2e309fa07..a1fb4abe681 100644
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+ }
conf.set(
Expand Down Expand Up @@ -2920,10 +2932,10 @@ index a902cb3a69e..800a3acbe99 100644

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 07361cfdce9..b4d53dbe900 100644
index 07361cfdce9..97dab2a3506 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -55,25 +55,55 @@ object TestHive
@@ -55,25 +55,54 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -2987,7 +2999,6 @@ index 07361cfdce9..b4d53dbe900 100644
+ if (a != null && a.toBoolean) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+ }

Expand Down
27 changes: 19 additions & 8 deletions dev/diffs/3.5.6.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index 68e2c422a24..fb9c2e88fac 100644
index 68e2c422a24..d971894ffe6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -866,7 +866,7 @@ index c26757c9cff..d55775f09d7 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 793a0da6a86..6ccb9d62582 100644
index 793a0da6a86..e48e74091cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -879,6 +879,19 @@ index 793a0da6a86..6ccb9d62582 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4497,7 +4498,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val msg = intercept[SparkException] {
sql(query).collect()
}.getMessage
- assert(msg.contains(query))
+ if (!isCometEnabled) {
+ // Comet's error message does not include the original SQL query
+ // https://github.com/apache/datafusion-comet/issues/2215
+ assert(msg.contains(query))
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index fa1a64460fc..1d2e215d6a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
Expand Down Expand Up @@ -2770,10 +2783,10 @@ index e937173a590..ca06132102d 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..a1fb4abe681 100644
index ed2e309fa07..a5ea58146ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2800,7 +2813,6 @@ index ed2e309fa07..a1fb4abe681 100644
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+ }
conf.set(
Expand Down Expand Up @@ -2935,10 +2947,10 @@ index 6160c3e5f6c..0956d7d9edc 100644

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1d646f40b3e..7f2cdb8f061 100644
index 1d646f40b3e..5babe505301 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,25 +53,55 @@ object TestHive
@@ -53,25 +53,54 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -3002,7 +3014,6 @@ index 1d646f40b3e..7f2cdb8f061 100644
+ if (a != null && a.toBoolean) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+ }

Expand Down
29 changes: 20 additions & 9 deletions dev/diffs/4.0.0.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ index ad424b3a7cc..4ece0117a34 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b3fce19979e..345acb4811a 100644
index b3fce19979e..67edf5eb91c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1524,7 +1524,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand Down Expand Up @@ -1086,11 +1086,24 @@ index b3fce19979e..345acb4811a 100644
test("SPARK-39175: Query context of Cast should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2218")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4497,7 +4500,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -4490,14 +4493,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
assert(ex.isInstanceOf[SparkNumberFormatException] ||
ex.isInstanceOf[SparkDateTimeException] ||
ex.isInstanceOf[SparkRuntimeException])
- assert(ex.getMessage.contains(query))
+
+ if (!isCometEnabled) {
+ // Comet's error message does not include the original SQL query
+ // https://github.com/apache/datafusion-comet/issues/2215
+ assert(ex.getMessage.contains(query))
+ }
}
}
}
}

test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " +
Expand Down Expand Up @@ -3491,10 +3504,10 @@ index f0f3f94b811..d64e4e54e22 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index 245219c1756..880406011d9 100644
index 245219c1756..7d2ef1b9145 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -75,6 +75,32 @@ trait SharedSparkSessionBase
@@ -75,6 +75,31 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -3521,7 +3534,6 @@ index 245219c1756..880406011d9 100644
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+ }
conf.set(
Expand Down Expand Up @@ -3630,10 +3642,10 @@ index b67370f6eb9..746b3974b29 100644
override def beforeEach(): Unit = {
super.beforeEach()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a394d0b7393..8411da928ab 100644
index a394d0b7393..d29b3058897 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,24 +53,48 @@ object TestHive
@@ -53,24 +53,47 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -3690,7 +3702,6 @@ index a394d0b7393..8411da928ab 100644
+ if (a != null && a.toBoolean) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+ }
+
Expand Down
17 changes: 13 additions & 4 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,19 @@ The `native_datafusion` scan has some additional limitations:

## ANSI Mode

Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default,
Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled,
specify `spark.comet.ansi.enabled=true` in the Spark configuration. Comet's ANSI support is experimental and should not
be used in production.
Comet will fall back to Spark for the following expressions when ANSI mode is enabled, unless
`spark.comet.expression.allowIncompatible=true`.

- Add
- Subtract
- Multiply
- Divide
- IntegralDivide
- Remainder
- Round
- Average
- Sum
- Cast (in some cases)

There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object CometCast {
Compatible(Some("Only supports years between 262143 BC and 262142 AD"))
case DataTypes.TimestampType if timeZoneId.exists(tz => tz != "UTC") =>
Incompatible(Some(s"Cast will use UTC instead of $timeZoneId"))
case DataTypes.TimestampType if evalMode == "ANSI" =>
case DataTypes.TimestampType if evalMode == CometEvalMode.ANSI =>
Incompatible(Some("ANSI mode not supported"))
case DataTypes.TimestampType =>
// https://github.com/apache/datafusion-comet/issues/328
Expand Down
15 changes: 1 addition & 14 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}

import org.apache.comet.{CometConf, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_EXEC_SHUFFLE_ENABLED}
import org.apache.comet.CometConf.COMET_EXEC_SHUFFLE_ENABLED
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
Expand Down Expand Up @@ -605,19 +605,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}

private def _apply(plan: SparkPlan): SparkPlan = {
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
if (COMET_ANSI_MODE_ENABLED.get()) {
if (!isSpark40Plus) {
logWarning("Using Comet's experimental support for ANSI mode.")
}
} else {
logInfo("Comet extension disabled for ANSI mode")
return plan
}
}

// We shouldn't transform Spark query plan if Comet is not loaded.
if (!isCometLoaded(conf)) return plan

Expand Down
17 changes: 14 additions & 3 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2368,13 +2368,24 @@ object QueryPlanSerde extends Logging with CometExprShim {

sealed trait SupportLevel

/** We support this feature with full compatibility with Spark */
/**
* Comet either supports this feature with full compatibility with Spark, or may have known
* differences in some specific edge cases that are unlikely to be an issue for most users.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Compatible(notes: Option[String] = None) extends SupportLevel

/** We support this feature but results can be different from Spark */
/**
* Comet supports this feature but results can be different from Spark.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** We do not support this feature */
/** Comet does not support this feature */
object Unsupported extends SupportLevel

/**
Expand Down
30 changes: 24 additions & 6 deletions spark/src/main/scala/org/apache/comet/serde/aggregates.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,18 @@ object CometAverage extends CometAggregateExpressionSerde {
return None
}

if (avg.evalMode != EvalMode.LEGACY) {
withInfo(aggExpr, "Average is only supported in legacy mode")
return None
avg.evalMode match {
case EvalMode.ANSI if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
withInfo(
aggExpr,
"ANSI mode is not supported. Set " +
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to allow it anyway")
return None
case EvalMode.TRY =>
withInfo(aggExpr, "TRY mode is not supported")
return None
case _ =>
// supported
}

val child = avg.child
Expand Down Expand Up @@ -194,9 +203,18 @@ object CometSum extends CometAggregateExpressionSerde {
return None
}

if (sum.evalMode != EvalMode.LEGACY) {
withInfo(aggExpr, "Sum is only supported in legacy mode")
return None
sum.evalMode match {
case EvalMode.ANSI if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
withInfo(
aggExpr,
"ANSI mode is not supported. Set " +
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to allow it anyway")
return None
case EvalMode.TRY =>
withInfo(aggExpr, "TRY mode is not supported")
return None
case _ =>
// supported
}

val childExpr = exprToProto(sum.child, inputs, binding)
Expand Down
Loading
Loading