diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala index d8cc62cf9b..56fe75179c 100644 --- a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala +++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.catalyst.expressions.Cast -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.{Compatible, Incompatible} /** * Utility for generating markdown documentation from the configs. diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 337eae11db..fcf22c4a04 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,16 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} -sealed trait SupportLevel - -/** We support this feature with full compatibility with Spark */ -case class Compatible(notes: Option[String] = None) extends SupportLevel - -/** We support this feature but results can be different from Spark */ -case class Incompatible(notes: Option[String] = None) extends SupportLevel - -/** We do not support this feature */ -object Unsupported extends SupportLevel +import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported} object CometCast { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 23cf9d313e..109e5aa940 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -642,14 +642,31 @@ object QueryPlanSerde extends Logging with CometExprShim { SQLConf.get def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { - handler match { - case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => - withInfo( - expr, - s"$expr is not fully compatible with Spark. To enable it anyway, set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") + handler.getSupportLevel(expr) match { + case Unsupported => + withInfo(expr, s"$expr is not supported.") None - case _ => + case Incompatible(notes) => + if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { + if (notes.isDefined) { + logWarning( + s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " + + s"but has notes: ${notes.get}") + } + handler.convert(expr, inputs, binding) + } else { + val optionalNotes = notes.map(str => s" ($str)").getOrElse("") + withInfo( + expr, + s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " + + s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " + + s"${CometConf.COMPAT_GUIDE}.") + None + } + case Compatible(notes) => + if (notes.isDefined) { + logWarning(s"Comet supports $expr but has notes: ${notes.get}") + } handler.convert(expr, inputs, binding) } } @@ -2387,6 +2404,17 @@ object QueryPlanSerde extends Logging with CometExprShim { } } +sealed trait SupportLevel + +/** We support this feature with full compatibility with Spark */ +case class Compatible(notes: Option[String] = None) extends SupportLevel + +/** We support this feature but results can be different from Spark */ +case class Incompatible(notes: Option[String] = None) extends SupportLevel + +/** We do not support this feature */ +object Unsupported extends SupportLevel + /** * Trait for providing serialization logic for operators. */ @@ -2424,6 +2452,16 @@ trait CometOperatorSerde[T <: SparkPlan] { */ trait CometExpressionSerde[T <: Expression] { + /** + * Determine the support level of the expression based on its attributes. + * + * @param expr + * The Spark expression. + * @return + * Support level (Compatible, Incompatible, or Unsupported). + */ + def getSupportLevel(expr: T): SupportLevel = Compatible(None) + /** * Convert a Spark expression into a protocol buffer representation that can be passed into * native code. @@ -2474,9 +2512,6 @@ trait CometAggregateExpressionSerde { conf: SQLConf): Option[ExprOuterClass.AggExpr] } -/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */ -trait IncompatExpr {} - /** Serde for scalar function. */ case class CometScalarFunction[T <: Expression](name: String) extends CometExpressionSerde[T] { override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 2a77d5fa14..d2624ca23c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -93,7 +93,10 @@ object CometArrayRemove extends CometExpressionSerde[ArrayRemove] with CometExpr } } -object CometArrayAppend extends CometExpressionSerde[ArrayAppend] with IncompatExpr { +object CometArrayAppend extends CometExpressionSerde[ArrayAppend] { + + override def getSupportLevel(expr: ArrayAppend): SupportLevel = Incompatible(None) + override def convert( expr: ArrayAppend, inputs: Seq[Attribute], @@ -149,7 +152,10 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] { } } -object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with IncompatExpr { +object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] { + + override def getSupportLevel(expr: ArrayDistinct): SupportLevel = Incompatible(None) + override def convert( expr: ArrayDistinct, inputs: Seq[Attribute], @@ -162,7 +168,10 @@ object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with Incom } } -object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] with IncompatExpr { +object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] { + + override def getSupportLevel(expr: ArrayIntersect): SupportLevel = Incompatible(None) + override def convert( expr: ArrayIntersect, inputs: Seq[Attribute], @@ -201,7 +210,10 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] { } } -object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with IncompatExpr { +object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { + + override def getSupportLevel(expr: ArraysOverlap): SupportLevel = Incompatible(None) + override def convert( expr: ArraysOverlap, inputs: Seq[Attribute], @@ -218,7 +230,10 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with Incom } } -object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatExpr { +object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] { + + override def getSupportLevel(expr: ArrayRepeat): SupportLevel = Incompatible(None) + override def convert( expr: ArrayRepeat, inputs: Seq[Attribute], @@ -232,7 +247,10 @@ object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatE } } -object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatExpr { +object CometArrayCompact extends CometExpressionSerde[Expression] { + + override def getSupportLevel(expr: Expression): SupportLevel = Incompatible(None) + override def convert( expr: Expression, inputs: Seq[Attribute], @@ -252,10 +270,9 @@ object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatE } } -object CometArrayExcept - extends CometExpressionSerde[ArrayExcept] - with CometExprShim - with IncompatExpr { +object CometArrayExcept extends CometExpressionSerde[ArrayExcept] with CometExprShim { + + override def getSupportLevel(expr: ArrayExcept): SupportLevel = Incompatible(None) @tailrec def isTypeSupported(dt: DataType): Boolean = { @@ -292,7 +309,10 @@ object CometArrayExcept } } -object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr { +object CometArrayJoin extends CometExpressionSerde[ArrayJoin] { + + override def getSupportLevel(expr: ArrayJoin): SupportLevel = Incompatible(None) + override def convert( expr: ArrayJoin, inputs: Seq[Attribute], @@ -326,7 +346,10 @@ object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr } } -object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatExpr { +object CometArrayInsert extends CometExpressionSerde[ArrayInsert] { + + override def getSupportLevel(expr: ArrayInsert): SupportLevel = Incompatible(None) + override def convert( expr: ArrayInsert, inputs: Seq[Attribute], @@ -361,7 +384,10 @@ object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatE } } -object CometArrayUnion extends CometExpressionSerde[ArrayUnion] with IncompatExpr { +object CometArrayUnion extends CometExpressionSerde[ArrayUnion] { + + override def getSupportLevel(expr: ArrayUnion): SupportLevel = Incompatible(None) + override def convert( expr: ArrayUnion, inputs: Seq[Attribute], diff --git a/spark/src/main/scala/org/apache/comet/serde/unixtime.scala b/spark/src/main/scala/org/apache/comet/serde/unixtime.scala index 0171d3e1a2..198c7d3101 100644 --- a/spark/src/main/scala/org/apache/comet/serde/unixtime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/unixtime.scala @@ -27,7 +27,10 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn // TODO: DataFusion supports only -8334601211038 <= sec <= 8210266876799 // https://github.com/apache/datafusion/issues/16594 -object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] with IncompatExpr { +object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] { + + override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None) + override def convert( expr: FromUnixTime, inputs: Seq[Attribute], diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index fbf38e2e03..8bad71e081 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType} import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.Compatible class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { diff --git a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala index 7a5af87619..d030106c3e 100644 --- a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql import org.apache.comet.CometConf -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.Compatible import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.TableIdentifier