Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion spark/src/main/scala/org/apache/comet/GenerateDocs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.catalyst.expressions.Cast

import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible}
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{Compatible, Incompatible}

/**
* Utility for generating markdown documentation from the configs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,7 @@ package org.apache.comet.expressions

import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType}

sealed trait SupportLevel

/** We support this feature with full compatibility with Spark */
case class Compatible(notes: Option[String] = None) extends SupportLevel

/** We support this feature but results can be different from Spark */
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** We do not support this feature */
object Unsupported extends SupportLevel
Comment on lines -25 to -33
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These move into QueryPlanSerde now that they are no longer specific to Cast

import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported}

object CometCast {

Expand Down
55 changes: 45 additions & 10 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -642,14 +642,31 @@ object QueryPlanSerde extends Logging with CometExprShim {
SQLConf.get

def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = {
handler match {
case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
withInfo(
expr,
s"$expr is not fully compatible with Spark. To enable it anyway, set " +
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.")
handler.getSupportLevel(expr) match {
case Unsupported =>
withInfo(expr, s"$expr is not supported.")
None
case _ =>
case Incompatible(notes) =>
if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
if (notes.isDefined) {
logWarning(
s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " +
s"but has notes: ${notes.get}")
}
handler.convert(expr, inputs, binding)
} else {
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
withInfo(
expr,
s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " +
s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " +
s"${CometConf.COMPAT_GUIDE}.")
None
}
case Compatible(notes) =>
if (notes.isDefined) {
logWarning(s"Comet supports $expr but has notes: ${notes.get}")
}
handler.convert(expr, inputs, binding)
}
}
Expand Down Expand Up @@ -2387,6 +2404,17 @@ object QueryPlanSerde extends Logging with CometExprShim {
}
}

sealed trait SupportLevel

/** We support this feature with full compatibility with Spark */
case class Compatible(notes: Option[String] = None) extends SupportLevel

/** We support this feature but results can be different from Spark */
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** We do not support this feature */
object Unsupported extends SupportLevel

/**
* Trait for providing serialization logic for operators.
*/
Expand Down Expand Up @@ -2424,6 +2452,16 @@ trait CometOperatorSerde[T <: SparkPlan] {
*/
trait CometExpressionSerde[T <: Expression] {

/**
* Determine the support level of the expression based on its attributes.
*
* @param expr
* The Spark expression.
* @return
* Support level (Compatible, Incompatible, or Unsupported).
*/
def getSupportLevel(expr: T): SupportLevel = Compatible(None)

/**
* Convert a Spark expression into a protocol buffer representation that can be passed into
* native code.
Expand Down Expand Up @@ -2474,9 +2512,6 @@ trait CometAggregateExpressionSerde {
conf: SQLConf): Option[ExprOuterClass.AggExpr]
}

/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */
trait IncompatExpr {}

/** Serde for scalar function. */
case class CometScalarFunction[T <: Expression](name: String) extends CometExpressionSerde[T] {
override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
Expand Down
52 changes: 39 additions & 13 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ object CometArrayRemove extends CometExpressionSerde[ArrayRemove] with CometExpr
}
}

object CometArrayAppend extends CometExpressionSerde[ArrayAppend] with IncompatExpr {
object CometArrayAppend extends CometExpressionSerde[ArrayAppend] {

override def getSupportLevel(expr: ArrayAppend): SupportLevel = Incompatible(None)

override def convert(
expr: ArrayAppend,
inputs: Seq[Attribute],
Expand Down Expand Up @@ -149,7 +152,10 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] {
}
}

object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with IncompatExpr {
object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] {

override def getSupportLevel(expr: ArrayDistinct): SupportLevel = Incompatible(None)

override def convert(
expr: ArrayDistinct,
inputs: Seq[Attribute],
Expand All @@ -162,7 +168,10 @@ object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with Incom
}
}

object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] with IncompatExpr {
object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] {

override def getSupportLevel(expr: ArrayIntersect): SupportLevel = Incompatible(None)

override def convert(
expr: ArrayIntersect,
inputs: Seq[Attribute],
Expand Down Expand Up @@ -201,7 +210,10 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] {
}
}

object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with IncompatExpr {
object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] {

override def getSupportLevel(expr: ArraysOverlap): SupportLevel = Incompatible(None)

override def convert(
expr: ArraysOverlap,
inputs: Seq[Attribute],
Expand All @@ -218,7 +230,10 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with Incom
}
}

object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatExpr {
object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] {

override def getSupportLevel(expr: ArrayRepeat): SupportLevel = Incompatible(None)

override def convert(
expr: ArrayRepeat,
inputs: Seq[Attribute],
Expand All @@ -232,7 +247,10 @@ object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatE
}
}

object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatExpr {
object CometArrayCompact extends CometExpressionSerde[Expression] {

override def getSupportLevel(expr: Expression): SupportLevel = Incompatible(None)

override def convert(
expr: Expression,
inputs: Seq[Attribute],
Expand All @@ -252,10 +270,9 @@ object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatE
}
}

object CometArrayExcept
extends CometExpressionSerde[ArrayExcept]
with CometExprShim
with IncompatExpr {
object CometArrayExcept extends CometExpressionSerde[ArrayExcept] with CometExprShim {

override def getSupportLevel(expr: ArrayExcept): SupportLevel = Incompatible(None)

@tailrec
def isTypeSupported(dt: DataType): Boolean = {
Expand Down Expand Up @@ -292,7 +309,10 @@ object CometArrayExcept
}
}

object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr {
object CometArrayJoin extends CometExpressionSerde[ArrayJoin] {

override def getSupportLevel(expr: ArrayJoin): SupportLevel = Incompatible(None)

override def convert(
expr: ArrayJoin,
inputs: Seq[Attribute],
Expand Down Expand Up @@ -326,7 +346,10 @@ object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr
}
}

object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatExpr {
object CometArrayInsert extends CometExpressionSerde[ArrayInsert] {

override def getSupportLevel(expr: ArrayInsert): SupportLevel = Incompatible(None)

override def convert(
expr: ArrayInsert,
inputs: Seq[Attribute],
Expand Down Expand Up @@ -361,7 +384,10 @@ object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatE
}
}

object CometArrayUnion extends CometExpressionSerde[ArrayUnion] with IncompatExpr {
object CometArrayUnion extends CometExpressionSerde[ArrayUnion] {

override def getSupportLevel(expr: ArrayUnion): SupportLevel = Incompatible(None)

override def convert(
expr: ArrayUnion,
inputs: Seq[Attribute],
Expand Down
5 changes: 4 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/unixtime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn

// TODO: DataFusion supports only -8334601211038 <= sec <= 8210266876799
// https://github.com/apache/datafusion/issues/16594
object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] with IncompatExpr {
object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {

override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None)

override def convert(
expr: FromUnixTime,
inputs: Seq[Attribute],
Expand Down
3 changes: 2 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType}

import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible}
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.Compatible

class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package org.apache.spark.sql

import org.apache.comet.CometConf
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible}
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.Compatible
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down
Loading