Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ jobs:
org.apache.spark.CometPluginsUnifiedModeOverrideSuite
org.apache.comet.rules.CometScanRuleSuite
org.apache.comet.rules.CometExecRuleSuite
org.apache.comet.rules.CometCBOSuite
org.apache.spark.sql.CometTPCDSQuerySuite
org.apache.spark.sql.CometTPCDSQueryTestSuite
org.apache.spark.sql.CometTPCHQuerySuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ jobs:
org.apache.spark.CometPluginsUnifiedModeOverrideSuite
org.apache.comet.rules.CometScanRuleSuite
org.apache.comet.rules.CometExecRuleSuite
org.apache.comet.rules.CometCBOSuite
org.apache.spark.sql.CometTPCDSQuerySuite
org.apache.spark.sql.CometTPCDSQueryTestSuite
org.apache.spark.sql.CometTPCHQuerySuite
Expand Down
154 changes: 154 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object CometConf extends ShimCometConf {
private val CATEGORY_SHUFFLE = "shuffle"
private val CATEGORY_TUNING = "tuning"
private val CATEGORY_TESTING = "testing"
private val CATEGORY_CBO = "cbo"

def register(conf: ConfigEntry[_]): Unit = {
assert(conf.category.nonEmpty, s"${conf.key} does not have a category defined")
Expand Down Expand Up @@ -772,6 +773,123 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)

// CBO Configuration Options

val COMET_CBO_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.cbo.enabled")
.category(CATEGORY_CBO)
.doc(
"Enable cost-based optimizer to decide Comet vs Spark execution. " +
"When enabled, Comet estimates whether native execution will be faster " +
"and falls back to Spark if not. Note: This only affects operator conversion " +
"(filter, project, aggregate, etc.), not scan conversion which is handled separately.")
.booleanConf
.createWithDefault(false)

val COMET_CBO_EXPLAIN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.cbo.explain.enabled")
.category(CATEGORY_CBO)
.doc("Log CBO decision details for debugging.")
.booleanConf
.createWithDefault(false)

val COMET_CBO_SPEEDUP_THRESHOLD: ConfigEntry[Double] = conf("spark.comet.cbo.speedupThreshold")
.category(CATEGORY_CBO)
.doc("Minimum estimated speedup ratio required to use Comet. " +
"Values less than 1.0 allow Comet even when estimated slightly slower.")
.doubleConf
.checkValue(_ > 0, "Threshold must be positive")
.createWithDefault(1.0)

val COMET_CBO_DEFAULT_ROW_COUNT: ConfigEntry[Long] = conf("spark.comet.cbo.defaultRowCount")
.category(CATEGORY_CBO)
.internal()
.doc("Default row count estimate when statistics unavailable.")
.longConf
.createWithDefault(1000000L)

val COMET_CBO_TRANSITION_COST: ConfigEntry[Double] = conf("spark.comet.cbo.cost.transition")
.category(CATEGORY_CBO)
.internal()
.doc("Cost penalty per row for columnar<->row transitions.")
.doubleConf
.createWithDefault(0.001)

val COMET_CBO_SCAN_WEIGHT: ConfigEntry[Double] = conf("spark.comet.cbo.weight.scan")
.category(CATEGORY_CBO)
.internal()
.doc("Weight for scan operators in cost calculation.")
.doubleConf
.createWithDefault(1.0)

val COMET_CBO_FILTER_WEIGHT: ConfigEntry[Double] = conf("spark.comet.cbo.weight.filter")
.category(CATEGORY_CBO)
.internal()
.doc("Weight for filter operators in cost calculation.")
.doubleConf
.createWithDefault(0.1)

val COMET_CBO_PROJECT_WEIGHT: ConfigEntry[Double] = conf("spark.comet.cbo.weight.project")
.category(CATEGORY_CBO)
.internal()
.doc("Weight for project operators in cost calculation.")
.doubleConf
.createWithDefault(0.1)

val COMET_CBO_AGGREGATE_WEIGHT: ConfigEntry[Double] = conf("spark.comet.cbo.weight.aggregate")
.category(CATEGORY_CBO)
.internal()
.doc("Weight for aggregate operators in cost calculation.")
.doubleConf
.createWithDefault(2.0)

val COMET_CBO_JOIN_WEIGHT: ConfigEntry[Double] = conf("spark.comet.cbo.weight.join")
.category(CATEGORY_CBO)
.internal()
.doc("Weight for join operators in cost calculation.")
.doubleConf
.createWithDefault(5.0)

val COMET_CBO_SORT_WEIGHT: ConfigEntry[Double] = conf("spark.comet.cbo.weight.sort")
.category(CATEGORY_CBO)
.internal()
.doc("Weight for sort operators in cost calculation.")
.doubleConf
.createWithDefault(1.5)

val COMET_CBO_SCAN_SPEEDUP: ConfigEntry[Double] = conf("spark.comet.cbo.speedup.scan")
.category(CATEGORY_CBO)
.internal()
.doc("Expected speedup factor for Comet scan operators vs Spark.")
.doubleConf
.createWithDefault(2.0)

val COMET_CBO_FILTER_SPEEDUP: ConfigEntry[Double] = conf("spark.comet.cbo.speedup.filter")
.category(CATEGORY_CBO)
.internal()
.doc("Expected speedup factor for Comet filter operators vs Spark.")
.doubleConf
.createWithDefault(3.0)

val COMET_CBO_AGGREGATE_SPEEDUP: ConfigEntry[Double] = conf("spark.comet.cbo.speedup.aggregate")
.category(CATEGORY_CBO)
.internal()
.doc("Expected speedup factor for Comet aggregate operators vs Spark.")
.doubleConf
.createWithDefault(2.5)

val COMET_CBO_JOIN_SPEEDUP: ConfigEntry[Double] = conf("spark.comet.cbo.speedup.join")
.category(CATEGORY_CBO)
.internal()
.doc("Expected speedup factor for Comet join operators vs Spark.")
.doubleConf
.createWithDefault(2.0)

val COMET_CBO_SORT_SPEEDUP: ConfigEntry[Double] = conf("spark.comet.cbo.speedup.sort")
.category(CATEGORY_CBO)
.internal()
.doc("Expected speedup factor for Comet sort operators vs Spark.")
.doubleConf
.createWithDefault(2.0)

/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
exec: String,
Expand Down Expand Up @@ -822,6 +940,42 @@ object CometConf extends ShimCometConf {
def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = {
conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true"
}

// CBO expression cost configuration helpers

/** Config key prefix for CBO expression costs */
val COMET_CBO_EXPR_COST_PREFIX = "spark.comet.cbo.exprCost"

/**
* Get the config key for an expression's cost multiplier. Example:
* spark.comet.cbo.exprCost.AttributeReference
*/
def getExprCostConfigKey(exprName: String): String = {
s"$COMET_CBO_EXPR_COST_PREFIX.$exprName"
}

/**
* Get the config key for an expression class's cost multiplier.
*/
def getExprCostConfigKey(exprClass: Class[_]): String = {
getExprCostConfigKey(exprClass.getSimpleName)
}

/**
* Get the cost multiplier for an expression from config, with fallback to default. A cost
* multiplier < 1.0 means Comet is faster, > 1.0 means Spark is faster.
*/
def getExprCost(exprName: String, defaultCost: Double, conf: SQLConf): Double = {
getDoubleConf(getExprCostConfigKey(exprName), defaultCost, conf)
}

def getDoubleConf(name: String, defaultValue: Double, conf: SQLConf): Double = {
try {
conf.getConfString(name, defaultValue.toString).toDouble
} catch {
case _: NumberFormatException => defaultValue
}
}
}

object ConfigHelpers {
Expand Down
11 changes: 10 additions & 1 deletion spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffl
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

import org.apache.comet.CometExplainInfo.getActualPlan
import org.apache.comet.rules.CometCBOInfo

class ExtendedExplainInfo extends ExtendedExplainGenerator {

override def title: String = "Comet"

def generateExtendedInfo(plan: SparkPlan): String = {
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.get() match {
val baseInfo = CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.get() match {
case CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE =>
// Generates the extended info in a verbose manner, printing each node along with the
// extended information in a tree display.
Expand All @@ -47,6 +48,14 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
// Generates the extended info as a list of fallback reasons
getFallbackReasons(plan).mkString("\n").trim
}

// Add CBO info if available
val cboInfo = getActualPlan(plan)
.getTagValue(CometCBOInfo.TAG)
.map(analysis => s"\n${analysis.toExplainString}")
.getOrElse("")

baseInfo + cboInfo
}

def getFallbackReasons(plan: SparkPlan): Seq[String] = {
Expand Down
Loading
Loading