diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 06d926dfc0..72c2bea9e4 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -411,61 +411,70 @@ case class CometExecRule(session: SparkSession) * BroadcastQueryStageExec. */ private def convertSubqueryBroadcasts(plan: SparkPlan): SparkPlan = { + // CometIcebergNativeScanExec.runtimeFilters is a top-level constructor field visible to + // productIterator, so transformExpressionsUp rewrites it directly. The wrapped @transient + // originalPlan still holds the pre-rewrite runtimeFilters; we don't sync it here because + // CometIcebergNativeScanExec.serializedPartitionData rebuilds originalPlan from the + // top-level runtimeFilters at serialization time (single source of truth). plan.transformExpressionsUp { case inSub: InSubqueryExec => - inSub.plan match { - case sub: SubqueryBroadcastExec => - sub.child match { - case b: BroadcastExchangeExec => - // The BroadcastExchangeExec child is CometNativeColumnarToRowExec wrapping - // a Comet plan. Strip the row transition to get the columnar Comet plan. - val cometChild = b.child match { - case c2r: CometNativeColumnarToRowExec => c2r.child - case other => other - } - if (cometChild.isInstanceOf[CometNativeExec]) { - logInfo( - "Converting SubqueryBroadcastExec to " + - "CometSubqueryBroadcastExec for DPP exchange reuse") - val cometBroadcast = CometBroadcastExchangeExec(b, b.output, b.mode, cometChild) - val cometSub = CometSubqueryBroadcastExec( - sub.name, - getSubqueryBroadcastExecIndices(sub), - sub.buildKeys, - cometBroadcast) - inSub.withNewPlan(cometSub) - } else { - inSub - } - case _ => inSub - } - case sab: SubqueryAdaptiveBroadcastExec if isSpark35Plus => - // Wrap SABs to prevent Spark's PlanAdaptiveDynamicPruningFilters from - // converting them to Literal.TrueLiteral. Spark's rule pattern-matches for - // BroadcastHashJoinExec, which Comet replaced with CometBroadcastHashJoinExec. - // Without wrapping, DPP is disabled for both Comet native scans and non-Comet - // scans (e.g., V2 BatchScan). CometPlanAdaptiveDynamicPruningFilters - // (queryStageOptimizerRule, 3.5+) unwraps and converts them later. - // - // On Spark 3.4, injectQueryStageOptimizerRule is unavailable. The isSpark35Plus - // guard leaves SABs unwrapped; CometSpark34AqeDppFallbackRule then tags the - // matching BHJ's build broadcast so Spark's rule can match it natively. - assert( - sab.buildKeys.nonEmpty, - s"SubqueryAdaptiveBroadcastExec '${sab.name}' has empty buildKeys") - logInfo( - s"Wrapping SubqueryAdaptiveBroadcastExec '${sab.name}' in " + - "CometSubqueryAdaptiveBroadcastExec to preserve AQE DPP") - val indices = getSubqueryBroadcastIndices(sab) - val wrapped = CometSubqueryAdaptiveBroadcastExec( - sab.name, - indices, - sab.onlyInBroadcast, - sab.buildPlan, - sab.buildKeys, - sab.child) - inSub.withNewPlan(wrapped) - case _ => inSub - } + rewriteInSubqueryPlan(inSub) + } + } + + private def rewriteInSubqueryPlan(inSub: InSubqueryExec): Expression = { + inSub.plan match { + case sub: SubqueryBroadcastExec => + sub.child match { + case b: BroadcastExchangeExec => + // The BroadcastExchangeExec child is CometNativeColumnarToRowExec wrapping + // a Comet plan. Strip the row transition to get the columnar Comet plan. + val cometChild = b.child match { + case c2r: CometNativeColumnarToRowExec => c2r.child + case other => other + } + if (cometChild.isInstanceOf[CometNativeExec]) { + logInfo( + "Converting SubqueryBroadcastExec to " + + "CometSubqueryBroadcastExec for DPP exchange reuse") + val cometBroadcast = CometBroadcastExchangeExec(b, b.output, b.mode, cometChild) + val cometSub = CometSubqueryBroadcastExec( + sub.name, + getSubqueryBroadcastExecIndices(sub), + sub.buildKeys, + cometBroadcast) + inSub.withNewPlan(cometSub) + } else { + inSub + } + case _ => inSub + } + case sab: SubqueryAdaptiveBroadcastExec if isSpark35Plus => + // Wrap SABs to prevent Spark's PlanAdaptiveDynamicPruningFilters from + // converting them to Literal.TrueLiteral. Spark's rule pattern-matches for + // BroadcastHashJoinExec, which Comet replaced with CometBroadcastHashJoinExec. + // Without wrapping, DPP is disabled for both Comet native scans and non-Comet + // scans (e.g., V2 BatchScan). CometPlanAdaptiveDynamicPruningFilters + // (queryStageOptimizerRule, 3.5+) unwraps and converts them later. + // + // On Spark 3.4, injectQueryStageOptimizerRule is unavailable. The isSpark35Plus + // guard leaves SABs unwrapped; CometSpark34AqeDppFallbackRule then tags the + // matching BHJ's build broadcast so Spark's rule can match it natively. + assert( + sab.buildKeys.nonEmpty, + s"SubqueryAdaptiveBroadcastExec '${sab.name}' has empty buildKeys") + logInfo( + s"Wrapping SubqueryAdaptiveBroadcastExec '${sab.name}' in " + + "CometSubqueryAdaptiveBroadcastExec to preserve AQE DPP") + val indices = getSubqueryBroadcastIndices(sab) + val wrapped = CometSubqueryAdaptiveBroadcastExec( + sab.name, + indices, + sab.onlyInBroadcast, + sab.buildPlan, + sab.buildKeys, + sab.child) + inSub.withNewPlan(wrapped) + case _ => inSub } } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala index 20207ffa5f..217f8bc314 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, Dynamic import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometNativeScanExec, CometSubqueryAdaptiveBroadcastExec, CometSubqueryBroadcastExec} +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec @@ -34,7 +34,7 @@ import org.apache.comet.shims.{ShimPrepareExecutedPlan, ShimSubqueryBroadcast} /** * Converts CometSubqueryAdaptiveBroadcastExec (wrapped AQE DPP) to CometSubqueryBroadcastExec - * inside CometNativeScanExec's partitionFilters. + * inside CometNativeScanExec's partitionFilters and CometIcebergNativeScanExec's runtimeFilters. * * CometExecRule wraps SubqueryAdaptiveBroadcastExec in CometSubqueryAdaptiveBroadcastExec during * queryStagePreparationRules to prevent Spark's PlanAdaptiveDynamicPruningFilters from replacing @@ -49,6 +49,11 @@ import org.apache.comet.shims.{ShimPrepareExecutedPlan, ShimSubqueryBroadcast} * CometScanExec.partitionFilters are separate InSubqueryExec instances. Both must be converted * because CometScanExec.dynamicallySelectedPartitions evaluates its own partitionFilters. * + * For CometIcebergNativeScanExec, runtimeFilters is a top-level constructor field and + * originalPlan.runtimeFilters mirrors it (sharing the same InSubqueryExec instances). The Iceberg + * case rewrites both in lockstep so the wrapper's expressions tree and the inner BatchScanExec's + * runtime filters stay aligned. + * * @see * PlanAdaptiveDynamicPruningFilters (Spark's equivalent for BroadcastHashJoinExec) * @see @@ -74,12 +79,32 @@ case object CometPlanAdaptiveDynamicPruningFilters case nativeScan: CometNativeScanExec if nativeScan.partitionFilters.exists(hasCometSAB) => logDebug("Converting AQE DPP for CometNativeScanExec") convertNativeScanDPP(nativeScan, plan) - case p: SparkPlan if !p.isInstanceOf[CometNativeScanExec] && hasWrappedSAB(p) => + case icebergScan: CometIcebergNativeScanExec + if icebergScan.runtimeFilters.exists(hasCometSAB) => + logDebug("Converting AQE DPP for CometIcebergNativeScanExec") + convertIcebergScanDPP(icebergScan, plan) + case p: SparkPlan + if !p.isInstanceOf[CometNativeScanExec] + && !p.isInstanceOf[CometIcebergNativeScanExec] + && hasWrappedSAB(p) => logDebug(s"Converting AQE DPP for non-Comet node: ${p.nodeName}") convertNonCometNodeDPP(p, plan) } } + private def convertIcebergScanDPP( + icebergScan: CometIcebergNativeScanExec, + stagePlan: SparkPlan): CometIcebergNativeScanExec = { + val newFilters = icebergScan.runtimeFilters.map(f => convertFilter(f, stagePlan)) + if (newFilters == icebergScan.runtimeFilters) return icebergScan + // Top-level runtimeFilters is the single source of truth. + // CometIcebergNativeScanExec.serializedPartitionData rebuilds originalPlan from the top-level + // field at serialization time, so we don't need to sync originalPlan.runtimeFilters here. + val newScan = icebergScan.copy(runtimeFilters = newFilters) + icebergScan.logicalLink.foreach(newScan.setLogicalLink) + newScan + } + private def convertNativeScanDPP( nativeScan: CometNativeScanExec, stagePlan: SparkPlan): CometNativeScanExec = { @@ -156,6 +181,7 @@ case object CometPlanAdaptiveDynamicPruningFilters case _ => None } } + inSub.plan match { // ReusedSubqueryExec extends BaseSubqueryExec, so unwrap it before dispatching // to `BaseSubqueryExec`. The order is load-bearing: if the general case runs @@ -179,7 +205,7 @@ case object CometPlanAdaptiveDynamicPruningFilters * (correct results, scans all partitions). * * 3. No reusable broadcast + onlyInBroadcast=false: Aggregate SubqueryExec on the build side - * (DPP via separate execution, matching Spark's PlanAdaptiveDynamicPruningFilters lines 68-79). + * (DPP via separate execution, matching Spark's PlanAdaptiveDynamicPruningFilters case 3). */ private def convertSAB( inSub: InSubqueryExec, @@ -215,10 +241,10 @@ case object CometPlanAdaptiveDynamicPruningFilters val canReuse = conf.exchangeReuseEnabled && matchingJoin.isDefined if (canReuse) { - // Case 1: broadcast reuse. Matches Spark's PlanAdaptiveDynamicPruningFilters - // lines 44-64: construct a NEW exchange wrapping adaptivePlan.executedPlan, - // then wrap in a new ASPE. AQE's stageCache ensures the broadcast runs once - // via ReusedExchangeExec (same canonical form as the join's exchange). + // Case 1: broadcast reuse. Mirrors Spark's PlanAdaptiveDynamicPruningFilters case 1: + // construct a fresh exchange wrapping the build subtree, then wrap in a new ASPE. + // AQE's stageCache ensures the broadcast runs once via ReusedExchangeExec (same + // canonical form as the join's exchange). val (broadcastChild, isComet) = matchingJoin.get val buildSidePlan = adaptivePlan.executedPlan logDebug( @@ -227,7 +253,7 @@ case object CometPlanAdaptiveDynamicPruningFilters s"${broadcastChild.getClass.getSimpleName}") // Construct the exchange from buildSidePlan (not from the existing exchange), - // matching Spark's PlanAdaptiveDynamicPruningFilters lines 44-48. The existing + // mirroring Spark's PlanAdaptiveDynamicPruningFilters case 1. The existing // exchange may belong to a different plan context (e.g., the main query) with // different attribute IDs than the current SAB's build side (e.g., a scalar // subquery). Using the existing exchange's output/mode would cause schema @@ -274,7 +300,7 @@ case object CometPlanAdaptiveDynamicPruningFilters // Case 3: no reusable broadcast, but the optimizer says DPP is worthwhile // even without broadcast reuse. Create an aggregate SubqueryExec on the build // side to get distinct partition key values for pruning. - // Matches Spark's PlanAdaptiveDynamicPruningFilters lines 68-79. + // Matches Spark's PlanAdaptiveDynamicPruningFilters case 3. val aliases = sab.indices.map(idx => Alias(sab.buildKeys(idx), sab.buildKeys(idx).toString)()) val aggregate = Aggregate(aliases, aliases, sab.buildPlan) @@ -425,6 +451,7 @@ case object CometPlanAdaptiveDynamicPruningFilters * CometNativeScanExec.partitionFilters has CometSubqueryAdaptiveBroadcastExec (wrapped by * CometExecRule). The inner CometScanExec.partitionFilters may have the original * SubqueryAdaptiveBroadcastExec (unwrapped, because CometScanExec is + * * @transient). */ private def hasCometSAB(e: Expression): Boolean = diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 634023de9d..369af88b27 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -985,8 +985,22 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit perPartitionBuilders += partitionBuilder.build() } - case _ => - throw new IllegalStateException("Expected DataSourceRDD from BatchScanExec") + case other if other.getClass.getName == "org.apache.spark.rdd.ParallelCollectionRDD" => + // Spark's BatchScanExec.inputRDD returns sparkContext.parallelize(empty, 1) when + // DPP filtering removes all input partitions. That ParallelCollectionRDD is the only + // non-DataSourceRDD shape its inputRDD produces, so reaching this branch means "DPP + // pruned everything"; emit no per-partition data and let native execution return empty. + // Re-querying scan.toBatch.planInputPartitions() to verify is unreliable because + // Iceberg's Scan state after filter() doesn't always reflect post-DPP partitions on + // a re-call (V2 scan state is one-shot for the materialized inputRDD). Matched by class + // name because ParallelCollectionRDD is private[spark]. + logDebug( + "BatchScanExec.inputRDD is ParallelCollectionRDD (DPP pruned all partitions); " + + "skipping per-partition serialization") + case other => + throw new IllegalStateException( + "Expected DataSourceRDD or ParallelCollectionRDD from BatchScanExec, " + + s"got ${other.getClass.getName}") } // Log deduplication summary diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 9e0e12e178..e08cd5b29d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -24,10 +24,9 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, DynamicPruningExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} -import org.apache.spark.sql.execution.{InSubqueryExec, SubqueryAdaptiveBroadcastExec} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -38,7 +37,6 @@ import com.google.common.base.Objects import org.apache.comet.iceberg.CometIcebergNativeScanMetadata import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.operator.CometIcebergNativeScan -import org.apache.comet.shims.ShimSubqueryBroadcast /** * Native Iceberg scan operator that delegates file reading to iceberg-rust. @@ -48,139 +46,72 @@ import org.apache.comet.shims.ShimSubqueryBroadcast * serialized to protobuf for the native side to execute using iceberg-rust's FileIO and * ArrowReader. This provides better performance than reading through Spark's abstraction layers. * - * Supports Dynamic Partition Pruning (DPP) by deferring partition serialization to execution - * time. The doPrepare() method waits for DPP subqueries to resolve, then lazy - * serializedPartitionData serializes the DPP-filtered partitions from inputRDD. + * Supports Dynamic Partition Pruning (DPP) via top-level `runtimeFilters` (mirroring Spark's + * `BatchScanExec.runtimeFilters`). Because the field is a constructor parameter, Spark's standard + * `expressions` walk picks up the contained `DynamicPruningExpression(InSubqueryExec(...))`, and + * the standard `prepare -> prepareSubqueries -> waitForSubqueries` lifecycle resolves it. The + * lifecycle is invoked via `CometLeafExec.ensureSubqueriesResolved`, called from + * `CometNativeExec.findAllPlanData` before `commonData` is read. */ case class CometIcebergNativeScanExec( override val nativeOp: Operator, override val output: Seq[Attribute], + runtimeFilters: Seq[Expression], @transient override val originalPlan: BatchScanExec, override val serializedPlanOpt: SerializedPlan, metadataLocation: String, @transient nativeIcebergScanMetadata: CometIcebergNativeScanMetadata) - extends CometLeafExec - with ShimSubqueryBroadcast { + extends CometLeafExec { override val supportsColumnar: Boolean = true override val nodeName: String = "CometIcebergNativeScan" /** - * Prepare DPP subquery plans. Called by Spark's prepare() before doExecuteColumnar(). + * Lazy partition serialization, deferred until execution time. Triggered from `commonData` / + * `perPartitionData` (via `CometNativeExec.findAllPlanData`) and from `LazyIcebergMetric.value` + * (via Iceberg planning metrics). Lazy val semantics ensure single evaluation across entry + * points. * - * This follows Spark's convention of preparing subqueries in doPrepare() rather than - * doExecuteColumnar(). While the actual waiting for DPP results happens later in - * serializedPartitionData, calling prepare() here ensures subquery plans are set up before - * execution begins. - */ - override protected def doPrepare(): Unit = { - originalPlan.runtimeFilters.foreach { - case DynamicPruningExpression(e: InSubqueryExec) => - e.plan.prepare() - case _ => - } - super.doPrepare() - } - - /** - * Lazy partition serialization - deferred until execution time for DPP support. - * - * Entry points: This lazy val may be triggered from either doExecuteColumnar() (via - * commonData/perPartitionData) or capturedMetricValues (for Iceberg metrics). Lazy val - * semantics ensure single evaluation regardless of entry point. - * - * DPP (Dynamic Partition Pruning) Flow: - * - * {{{ - * Planning time: - * CometIcebergNativeScanExec created - * - serializedPartitionData not evaluated (lazy) - * - No partition serialization yet - * - * Execution time: - * 1. Spark calls prepare() on the plan tree - * - doPrepare() calls e.plan.prepare() for each DPP filter - * - Subquery plans are set up (but not yet executed) - * - * 2. Spark calls doExecuteColumnar() (or metrics are accessed) - * - Accesses perPartitionData (or capturedMetricValues) - * - Forces serializedPartitionData evaluation (here) - * - Waits for DPP values (updateResult or reflection) - * - Calls serializePartitions with DPP-filtered inputRDD - * - Only matching partitions are serialized - * }}} + * DPP InSubqueryExec values must already be resolved by the time this lazy val runs. + * `CometNativeExec.findAllPlanData` calls `ensureSubqueriesResolved` (which invokes Spark's + * `prepare -> waitForSubqueries`) before reading `commonData`. The `serializePartitions` call + * below reads `originalPlan.runtimeFilters` indirectly through `inputRDD -> filteredPartitions` + * and applies the resolved values to Iceberg's runtime filtering. `originalPlan.runtimeFilters` + * shares the same `InSubqueryExec` instances as the top-level `runtimeFilters` field (enforced + * by every construction site), so values resolved through `waitForSubqueries` are visible on + * both sides. */ @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = { - // Ensure DPP subqueries are resolved before accessing inputRDD. - originalPlan.runtimeFilters.foreach { - case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty => - e.plan match { - case sab: SubqueryAdaptiveBroadcastExec => - // SubqueryAdaptiveBroadcastExec.executeCollect() throws, so we call - // child.executeCollect() directly. We use the index from SAB to find the - // right buildKey, then locate that key's column in child.output. - val rows = sab.child.executeCollect() - val indices = getSubqueryBroadcastIndices(sab) - - // SPARK-46946 changed index: Int to indices: Seq[Int] as a preparatory refactor - // for future features (Null Safe Equality DPP, multiple equality predicates). - // Currently indices always has one element. CometScanRule checks for multi-index - // DPP and falls back, so this assertion should never fail. - assert( - indices.length == 1, - s"Multi-index DPP not supported: indices=$indices. See SPARK-46946.") - val buildKeyIndex = indices.head - val buildKey = sab.buildKeys(buildKeyIndex) - - // Find column index in child.output by matching buildKey's exprId - val colIndex = buildKey match { - case attr: Attribute => - sab.child.output.indexWhere(_.exprId == attr.exprId) - // DPP may cast partition column to match join key type - case Cast(attr: Attribute, _, _, _) => - sab.child.output.indexWhere(_.exprId == attr.exprId) - case _ => buildKeyIndex - } - if (colIndex < 0) { - throw new IllegalStateException( - s"DPP build key '$buildKey' not found in ${sab.child.output.map(_.name)}") - } - - setInSubqueryResult(e, rows.map(_.get(colIndex, e.child.dataType))) - case _ => - e.updateResult() - } - case _ => - } - - CometIcebergNativeScan.serializePartitions(originalPlan, output, nativeIcebergScanMetadata) - } - - /** - * Sets InSubqueryExec's private result field via reflection. - * - * Reflection is required because: - * - SubqueryAdaptiveBroadcastExec.executeCollect() throws UnsupportedOperationException - * - InSubqueryExec has no public setter for result, only updateResult() which calls - * executeCollect() - * - We can't replace e.plan since it's a val - */ - private def setInSubqueryResult(e: InSubqueryExec, result: Array[_]): Unit = { - val fields = e.getClass.getDeclaredFields - // Field name is mangled by Scala compiler, e.g. "org$apache$...$InSubqueryExec$$result" - val resultField = fields - .find(f => f.getName.endsWith("$result") && !f.getName.contains("Broadcast")) - .getOrElse { - throw new IllegalStateException( - s"Cannot find 'result' field in ${e.getClass.getName}. " + - "Spark version may be incompatible with Comet's DPP implementation.") + // Canonicalized instances set originalPlan = null and are not meant to be executed. + // If we ever reach this lazy val on a canonicalized form, fail loud rather than NPE + // deep inside originalPlan.inputRDD. + assert( + originalPlan != null, + "serializedPartitionData accessed on a canonicalized CometIcebergNativeScanExec; " + + "this lazy val should only execute on non-canonical instances") + // Rebuild originalPlan with the current top-level runtimeFilters before serializing. + // Spark's PlanAdaptiveDynamicPruningFilters and our transformExpressionsUp passes rewrite + // the top-level `runtimeFilters` (visible via productIterator), but `originalPlan` is + // @transient and not touched by transformAllExpressions. serializePartitions reads runtime + // filters via originalPlan.inputRDD -> filteredPartitions, so an out-of-sync originalPlan + // would re-translate the original (unresolved) InSubqueryExec and throw "no subquery + // result". This makes the top-level runtimeFilters the single source of truth at + // serialization time. + val effectiveOriginalPlan = + if (originalPlan.runtimeFilters != runtimeFilters) { + originalPlan.copy(runtimeFilters = runtimeFilters) + } else { + originalPlan } - resultField.setAccessible(true) - resultField.set(e, result) + CometIcebergNativeScan.serializePartitions( + effectiveOriginalPlan, + output, + nativeIcebergScanMetadata) } def commonData: Array[Byte] = serializedPartitionData._1 + def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2 // numPartitions for execution - derived from actual DPP-filtered partitions @@ -191,10 +122,6 @@ case class CometIcebergNativeScanExec( override lazy val outputOrdering: Seq[SortOrder] = Nil - // Capture metric VALUES and TYPES (not objects!) in a serializable case class - // This survives serialization while SQLMetric objects get reset to 0 - private case class MetricValue(name: String, value: Long, metricType: String) - /** * Maps Iceberg V2 custom metric types to standard Spark metric types for better UI formatting. * @@ -226,22 +153,43 @@ case class CometIcebergNativeScanExec( } /** - * Captures Iceberg planning metrics for display in Spark UI. + * Defers value computation until .value is read. SparkPlanInfo.fromSparkPlan reads the metrics + * map (names, ids, metricType) for SQL UI events at planning time, before AQE's + * queryStageOptimizerRules run. If we triggered serializedPartitionData during map + * construction, DPP would be resolved against an unconverted CometSubqueryAdaptiveBroadcastExec + * and throw at executeCollect(). Lazy value access ensures planning runs only when the value is + * actually needed, by which time CometPlanAdaptiveDynamicPruningFilters has converted the SAB. * - * This lazy val intentionally triggers serializedPartitionData evaluation because Iceberg - * populates metrics during planning (when inputRDD is accessed). Both this and - * doExecuteColumnar() may trigger serializedPartitionData, but lazy val semantics ensure it's - * evaluated only once. + * Overrides merge/reset because executor accumulator updates carry 0 (these are driver-side + * planning metrics) and would zero out the resolved value at end of stage. */ - @transient private lazy val capturedMetricValues: Seq[MetricValue] = { - // Guard against null originalPlan (from doCanonicalize) - if (originalPlan == null) { - Seq.empty - } else { - // Trigger serializedPartitionData to ensure Iceberg planning has run and - // metrics are populated + private class LazyIcebergMetric(metricType: String, metricName: String) + extends SQLMetric(metricType, 0) { + + override def value: Long = { + // Ensure DPP InSubqueryExec values are resolved before serializedPartitionData runs; + // otherwise serializePartitions reads originalPlan.runtimeFilters with empty values + // and inputRDD -> filteredPartitions skips DPP, caching an unfiltered result. + ensureSubqueriesResolved() val _ = serializedPartitionData + originalPlan.metrics.get(metricName).map(_.value).getOrElse(0L) + } + + override def merge(other: AccumulatorV2[Long, Long]): Unit = {} + + override def reset(): Unit = {} + } + /** + * Iceberg planning metrics, declared eagerly from originalPlan.metrics names/types but with + * values resolved lazily via [[LazyIcebergMetric]]. Constructing this map enumerates only the + * metric definitions (scan.supportedCustomMetrics), which is a metadata call that does not + * trigger Iceberg planning. + */ + @transient private lazy val icebergPlanningMetrics: Map[String, LazyIcebergMetric] = { + if (originalPlan == null) { + Map.empty + } else { originalPlan.metrics .filterNot { case (name, _) => // Filter out metrics that are now runtime metrics incremented on the native side @@ -249,49 +197,22 @@ case class CometIcebergNativeScanExec( } .map { case (name, metric) => val mappedType = mapMetricType(name, metric.metricType) - MetricValue(name, metric.value, mappedType) + val lazyMetric = new LazyIcebergMetric(mappedType, name) + sparkContext.register(lazyMetric, name) + name -> lazyMetric } - .toSeq } } - /** - * Immutable SQLMetric for planning metrics that don't change during execution. - * - * Regular SQLMetric extends AccumulatorV2, which means when execution completes, accumulator - * updates from executors (which are 0 since they don't update planning metrics) get merged back - * to the driver, overwriting the driver's values with 0. - * - * This class overrides the accumulator methods to make the metric truly immutable once set. - */ - private class ImmutableSQLMetric(metricType: String) extends SQLMetric(metricType, 0) { - - override def merge(other: AccumulatorV2[Long, Long]): Unit = {} - - override def reset(): Unit = {} - } - override lazy val metrics: Map[String, SQLMetric] = { val baseMetrics = Map( "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "bytes_scanned" -> SQLMetrics.createSizeMetric(sparkContext, "number of bytes scanned")) - // Create IMMUTABLE metrics with captured values AND types - // these won't be affected by accumulator merges - val icebergMetrics = capturedMetricValues.map { mv => - // Create the immutable metric with initValue = 0 (Spark 4 requires initValue <= 0) - val metric = new ImmutableSQLMetric(mv.metricType) - // Set the actual value after creation - metric.set(mv.value) - // Register it with SparkContext to assign metadata (name, etc.) - sparkContext.register(metric, mv.name) - mv.name -> metric - }.toMap - // Add num_splits as a runtime metric (incremented on the native side during execution) val numSplitsMetric = SQLMetrics.createMetric(sparkContext, "number of file splits processed") - baseMetrics ++ icebergMetrics + ("num_splits" -> numSplitsMetric) + baseMetrics ++ icebergPlanningMetrics + ("num_splits" -> numSplitsMetric) } /** Executes using CometExecRDD - planning data is computed lazily on first access. */ @@ -333,6 +254,7 @@ case class CometIcebergNativeScanExec( CometIcebergNativeScanExec( nativeOp, output, + runtimeFilters, originalPlan, newSerializedPlan, metadataLocation, @@ -343,6 +265,9 @@ case class CometIcebergNativeScanExec( CometIcebergNativeScanExec( nativeOp, output.map(QueryPlan.normalizeExpressions(_, output)), + QueryPlan.normalizePredicates( + CometScanUtils.filterUnusedDynamicPruningExpressions(runtimeFilters), + output), null, // Don't need originalPlan for canonicalization SerializedPlan(None), metadataLocation, @@ -356,27 +281,36 @@ case class CometIcebergNativeScanExec( val taskCount = if (hasMeta) nativeIcebergScanMetadata.tasks.size() else 0 val scanDesc = if (originalPlan != null) originalPlan.scan.description() else "canonicalized" // Include runtime filters (DPP) in string representation - val runtimeFiltersStr = if (originalPlan != null && originalPlan.runtimeFilters.nonEmpty) { - s", runtimeFilters=${originalPlan.runtimeFilters.mkString("[", ", ", "]")}" + val runtimeFiltersStr = if (runtimeFilters.nonEmpty) { + s", runtimeFilters=${runtimeFilters.mkString("[", ", ", "]")}" } else { "" } Iterator(output, s"$metadataLocation, $scanDesc$runtimeFiltersStr", taskCount) } + /** + * Equality / hashCode include runtimeFilters so plan-tree walks (e.g. transformUp) detect DPP + * rewrites. Without runtimeFilters in equality, replacing an `InSubqueryExec` produces a new + * instance that compares equal to the old one and the rewrite is silently dropped. + * `originalPlan` (`@transient`) and `nativeIcebergScanMetadata` (`@transient`) are + * intentionally omitted: they're recoverable from `metadataLocation` + the serialized plan and + * including them would over-constrain equality across re-planning. + */ override def equals(obj: Any): Boolean = { obj match { case other: CometIcebergNativeScanExec => this.metadataLocation == other.metadataLocation && this.output == other.output && - this.serializedPlanOpt == other.serializedPlanOpt + this.serializedPlanOpt == other.serializedPlanOpt && + this.runtimeFilters == other.runtimeFilters case _ => false } } override def hashCode(): Int = - Objects.hashCode(metadataLocation, output.asJava, serializedPlanOpt) + Objects.hashCode(metadataLocation, output.asJava, serializedPlanOpt, runtimeFilters) } object CometIcebergNativeScanExec { @@ -392,6 +326,7 @@ object CometIcebergNativeScanExec { val exec = CometIcebergNativeScanExec( nativeOp, scanExec.output, + scanExec.runtimeFilters, scanExec, SerializedPlan(None), metadataLocation, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 45d708aaef..f4f0b1fb74 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.comet.shims.ShimStreamSourceAwareSparkPlan -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.{ScalarSubquery => ExecScalarSubquery} +import org.apache.spark.sql.execution.{ScalarSubquery => ExecScalarSubquery, _} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ @@ -79,34 +78,6 @@ case class CometNativeScanExec( override lazy val metadata: Map[String, String] = if (originalPlan != null) originalPlan.metadata else Map.empty - /** - * Prepare subquery plans before execution. - * - * DPP: partitionFilters may contain DynamicPruningExpression(InSubqueryExec(...)) from - * PlanDynamicPruningFilters. - * - * Scalar subquery pushdown (SPARK-43402, Spark 4.0+): dataFilters may contain ScalarSubquery. - * - * serializedPartitionData can be triggered outside the normal prepare() -> executeSubqueries() - * flow (e.g., from a BroadcastExchangeExec thread), so we prepare subquery plans here and - * resolve them explicitly in serializedPartitionData via updateResult(). - */ - override protected def doPrepare(): Unit = { - partitionFilters.foreach { - case DynamicPruningExpression(e: InSubqueryExec) => - e.plan.prepare() - case _ => - } - dataFilters.foreach { f => - f.foreach { - case s: ExecScalarSubquery => - s.plan.prepare() - case _ => - } - } - super.doPrepare() - } - override val nodeName: String = s"CometNativeScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" @@ -186,28 +157,16 @@ case class CometNativeScanExec( * partition's files (lazily, as tasks are scheduled). */ @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = { - // Ensure DPP subqueries are resolved before accessing file partitions. - // serializedPartitionData can be triggered from findAllPlanData (via commonData) on a - // different execution path than the standard prepare() -> executeSubqueries() flow - // (e.g., from a BroadcastExchangeExec thread). We must resolve DPP here explicitly. - partitionFilters.foreach { - case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty => - logDebug(s"Resolving DPP subquery: plan=${e.plan.getClass.getSimpleName}") - try { - e.updateResult() - logDebug("DPP subquery resolved successfully") - } catch { - case ex: Exception => - logError(s"DPP subquery resolution failed: ${ex.getMessage}") - throw ex - } - case _ => - } - // CometNativeScanExec.partitionFilters and CometScanExec.partitionFilters contain - // different InSubqueryExec instances. convertSubqueryBroadcasts replaced the former with - // CometSubqueryBroadcastExec, but the latter still has the original SubqueryBroadcastExec. - // Both need resolution because CometScanExec.dynamicallySelectedPartitions evaluates its - // own partitionFilters. updateResult() is a no-op if already resolved. + // Outer partitionFilters (wrapper) DPP is resolved by Spark's standard + // prepare -> waitForSubqueries lifecycle, triggered explicitly via + // CometLeafExec.ensureSubqueriesResolved called from + // CometNativeExec.findAllPlanData before commonData is read. + // + // Inner scan.partitionFilters holds a SEPARATE InSubqueryExec instance that + // Spark's expressions walk does not see (scan is @transient and not a sibling + // expression). It still needs explicit resolution because + // CometScanExec.dynamicallySelectedPartitions evaluates its own partitionFilters. + // updateResult is a no-op if already resolved. if (scan != null) { scan.partitionFilters.foreach { case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty => @@ -272,6 +231,7 @@ case class CometNativeScanExec( } def commonData: Array[Byte] = serializedPartitionData._1 + def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2 override def doExecuteColumnar(): RDD[ColumnarBatch] = { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 888df13bac..c63b646797 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -664,15 +664,23 @@ abstract class CometNativeExec extends CometExec { private def findAllPlanData( plan: SparkPlan): (Map[String, Array[Byte]], Map[String, Array[Array[Byte]]]) = { plan match { - // Found an Iceberg scan with planning data - case iceberg: CometIcebergNativeScanExec - if iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty => - ( - Map(iceberg.metadataLocation -> iceberg.commonData), - Map(iceberg.metadataLocation -> iceberg.perPartitionData)) + case iceberg: CometIcebergNativeScanExec => + // Trigger Spark's standard prepare -> waitForSubqueries lifecycle so DPP + // InSubqueryExec values are resolved before commonData is read. Without this, + // the parent CometNativeExec.executeQuery flow never invokes the scan's + // executeQuery, leaving DPP unresolved and forcing a sync-on-this await inside + // the serializedPartitionData lazy val initializer (a known deadlock surface). + iceberg.ensureSubqueriesResolved() + if (iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty) { + ( + Map(iceberg.metadataLocation -> iceberg.commonData), + Map(iceberg.metadataLocation -> iceberg.perPartitionData)) + } else { + (Map.empty, Map.empty) + } - // Found a NativeScan with planning data case nativeScan: CometNativeScanExec => + nativeScan.ensureSubqueriesResolved() ( Map(nativeScan.sourceKey -> nativeScan.commonData), Map(nativeScan.sourceKey -> nativeScan.perPartitionData)) @@ -767,7 +775,24 @@ abstract class CometNativeExec extends CometExec { } } -abstract class CometLeafExec extends CometNativeExec with LeafExecNode +abstract class CometLeafExec extends CometNativeExec with LeafExecNode { + + /** + * Public bridge to Spark's `prepare()` + `waitForSubqueries()` lifecycle. Comet's parent + * `CometNativeExec` reads scan data via `findAllPlanData -> commonData` rather than invoking + * `executeColumnar` on its scan children, which means Spark's standard `executeQuery` chain + * never fires on the scan and DPP InSubqueryExec values are never resolved through + * `waitForSubqueries`. Calling this method before reading `commonData` restores the standard + * lifecycle: `prepare()` collects subqueries into `runningSubqueries` (via + * `prepareSubqueries`), then `waitForSubqueries()` resolves them via `updateResult()`. After + * this returns, any `e.values()` access on the scan's runtime/partition filter + * `InSubqueryExec`s returns the resolved values directly without further await. + */ + def ensureSubqueriesResolved(): Unit = { + prepare() + waitForSubqueries() + } +} abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index ba593dcaa1..a2d8c25764 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -28,12 +28,16 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.CometListenerBusUtils import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.comet.CometIcebergNativeScanExec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression +import org.apache.spark.sql.comet._ +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.{InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, BroadcastQueryStageExec} +import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, TimestampType} -import org.apache.comet.CometSparkSessionExtensions.isSpark42Plus +import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark42Plus} import org.apache.comet.iceberg.RESTCatalogHelper import org.apache.comet.testing.{FuzzDataGenerator, SchemaGenOptions} @@ -42,7 +46,10 @@ import org.apache.comet.testing.{FuzzDataGenerator, SchemaGenOptions} * * Note: Requires Iceberg dependencies to be added to pom.xml */ -class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { +class CometIcebergNativeSuite + extends CometTestBase + with RESTCatalogHelper + with AdaptiveSparkPlanHelper { // Skip these tests if Iceberg is not available in classpath private def icebergAvailable: Boolean = { @@ -2536,7 +2543,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { "spark.sql.catalog.runtime_cat" -> "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.runtime_cat.type" -> "hadoop", "spark.sql.catalog.runtime_cat.warehouse" -> warehouseDir.getAbsolutePath, - "spark.sql.autoBroadcastJoinThreshold" -> "1KB", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { @@ -2601,6 +2608,17 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { val numPartitions = icebergScans.head.numPartitions assert(numPartitions == 1, s"Expected DPP to prune to 1 partition but got $numPartitions") + // Verify AQE DPP used CometSubqueryBroadcastExec with broadcast reuse + if (isSpark35Plus) { + val subqueries = collectIcebergDPPSubqueries(cometPlan) + assert(subqueries.size == 2, s"Expected 2 DPP subqueries but got ${subqueries.size}") + subqueries.foreach { sub => + assert( + sub.isInstanceOf[CometSubqueryBroadcastExec], + s"Expected CometSubqueryBroadcastExec but got ${sub.getClass.getSimpleName}") + } + } + spark.sql("DROP TABLE runtime_cat.db.multi_dpp_fact") } } @@ -2615,7 +2633,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { "spark.sql.catalog.runtime_cat.type" -> "hadoop", "spark.sql.catalog.runtime_cat.warehouse" -> warehouseDir.getAbsolutePath, // Prevent fact table from being broadcast (force dimension to be broadcast) - "spark.sql.autoBroadcastJoinThreshold" -> "1KB", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { @@ -2679,6 +2697,13 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { val numPartitions = icebergScans.head.numPartitions assert(numPartitions == 1, s"Expected DPP to prune to 1 partition but got $numPartitions") + // Verify AQE DPP used CometSubqueryBroadcastExec with broadcast reuse + if (isSpark35Plus) { + val subqueries = collectIcebergDPPSubqueries(cometPlan) + assert(subqueries.nonEmpty, s"Expected DPP subqueries in plan:\n$cometPlan") + assertCsbBroadcastReuse(subqueries, cometPlan) + } + spark.sql("DROP TABLE runtime_cat.db.fact_table") } } @@ -2934,4 +2959,1023 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } } + + // ---- AQE DPP broadcast reuse tests ---- + + private def collectIcebergDPPSubqueries(plan: SparkPlan): Seq[SparkPlan] = { + collect(plan) { case scan: CometIcebergNativeScanExec => scan } + .flatMap(_.runtimeFilters) + .collect { case DynamicPruningExpression(e: InSubqueryExec) => + e.plan + } + } + + /** + * Asserts each subquery is a CometSubqueryBroadcastExec whose ASPE child contains a + * ReusedExchangeExec or BroadcastQueryStageExec, proving AQE stageCache wired the DPP subquery + * to the join's broadcast (no double-execution of the build side). + */ + private def assertCsbBroadcastReuse(subqueries: Seq[SparkPlan], cometPlan: SparkPlan): Unit = { + subqueries.foreach { sub => + assert( + sub.isInstanceOf[CometSubqueryBroadcastExec], + s"Expected CometSubqueryBroadcastExec but got ${sub.getClass.getSimpleName}") + val csb = sub.asInstanceOf[CometSubqueryBroadcastExec] + assert( + csb.child.isInstanceOf[AdaptiveSparkPlanExec], + s"Expected AdaptiveSparkPlanExec child but got ${csb.child.getClass.getSimpleName}") + val aspe = csb.child.asInstanceOf[AdaptiveSparkPlanExec] + val hasReuse = collect(aspe) { case r: ReusedExchangeExec => r }.nonEmpty || + collect(aspe) { case b: BroadcastQueryStageExec => b }.nonEmpty + assert( + hasReuse, + "DPP subquery's ASPE should contain ReusedExchangeExec or " + + s"BroadcastQueryStageExec for broadcast reuse:\n${cometPlan.treeString}") + } + } + + test("AQE DPP - CometSubqueryBroadcastExec replaces SubqueryAdaptiveBroadcastExec") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.dpp_reuse_fact ( + id BIGINT, data STRING, date DATE + ) USING iceberg PARTITIONED BY (date) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.dpp_reuse_fact VALUES + (1, 'a', DATE '1970-01-01'), (2, 'b', DATE '1970-01-02'), + (3, 'c', DATE '1970-01-02'), (4, 'd', DATE '1970-01-03') + """) + + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02")))) + .toDF("id", "date") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("aqe_dim") + + val query = + """SELECT /*+ BROADCAST(d) */ f.* FROM aqe_cat.db.dpp_reuse_fact f + |JOIN aqe_dim d ON f.date = d.date AND d.id = 1""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + // Verify CometSubqueryBroadcastExec replaced SubqueryAdaptiveBroadcastExec + val subqueries = collectIcebergDPPSubqueries(cometPlan) + assert(subqueries.nonEmpty, s"Expected DPP subqueries in plan:\n$cometPlan") + subqueries.foreach { sub => + assert( + sub.isInstanceOf[CometSubqueryBroadcastExec], + s"Expected CometSubqueryBroadcastExec but got ${sub.getClass.getSimpleName}") + } + + // Verify broadcast reuse: subquery child should be an ASPE that contains a + // BroadcastQueryStageExec (via AQE stageCache, possibly wrapped in + // ReusedExchangeExec). Reference equality (eq) on the join's BQS does not + // hold because convertSAB wraps a fresh exchange in a new ASPE; the actual + // reuse manifests as ReusedExchangeExec inside the ASPE's final plan. + assertCsbBroadcastReuse(subqueries, cometPlan) + + // Verify correct results and partition pruning + val icebergScans = collectIcebergNativeScans(cometPlan) + assert(icebergScans.nonEmpty, "Expected CometIcebergNativeScanExec in plan") + assert( + icebergScans.head.numPartitions == 1, + s"Expected DPP to prune to 1 partition but got ${icebergScans.head.numPartitions}") + } + + spark.sql("DROP TABLE aqe_cat.db.dpp_reuse_fact") + } + } + } + + test("AQE DPP - multiple DPP filters reuse same broadcast") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.multi_dpp_reuse ( + id BIGINT, data STRING, date DATE, ts TIMESTAMP + ) USING iceberg PARTITIONED BY (data, bucket(8, id)) + """) + val df = spark + .range(1, 100) + .selectExpr( + "id", + "CAST(DATE_ADD(DATE '1970-01-01', CAST(id % 4 AS INT)) AS STRING) as data", + "DATE_ADD(DATE '1970-01-01', CAST(id % 4 AS INT)) as date", + "CAST(DATE_ADD(DATE '1970-01-01', CAST(id % 4 AS INT)) AS TIMESTAMP) as ts") + df.coalesce(1) + .write + .format("iceberg") + .option("fanout-enabled", "true") + .mode("append") + .saveAsTable("aqe_cat.db.multi_dpp_reuse") + + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02"), "1970-01-02"))) + .toDF("id", "date", "data") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("aqe_multi_dim") + + val query = + """SELECT /*+ BROADCAST(d) */ f.* + |FROM aqe_cat.db.multi_dpp_reuse f + |JOIN aqe_multi_dim d ON f.id = d.id AND f.data = d.data + |WHERE d.date = DATE '1970-01-02'""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + // Both DPP filters should use CometSubqueryBroadcastExec + val subqueries = collectIcebergDPPSubqueries(cometPlan) + assert(subqueries.size == 2, s"Expected 2 DPP subqueries but got ${subqueries.size}") + subqueries.foreach { sub => + assert( + sub.isInstanceOf[CometSubqueryBroadcastExec], + s"Expected CometSubqueryBroadcastExec but got ${sub.getClass.getSimpleName}") + } + + // Both should reuse the dim broadcast. Each subquery child is an ASPE that + // contains a BroadcastQueryStageExec (or ReusedExchangeExec) - AQE stageCache + // dedupes via canonical form rather than Java reference identity. + assertCsbBroadcastReuse(subqueries, cometPlan) + } + + spark.sql("DROP TABLE aqe_cat.db.multi_dpp_reuse") + } + } + } + + test("AQE DPP - two separate broadcast joins disambiguated by buildKeys") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dim1Dir = new File(warehouseDir, "dim1_parquet") + val dim2Dir = new File(warehouseDir, "dim2_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Fact table partitioned by TWO columns: date and category + spark.sql(""" + CREATE TABLE aqe_cat.db.two_join_fact ( + id BIGINT, date DATE, category STRING, value INT + ) USING iceberg PARTITIONED BY (date, category) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.two_join_fact VALUES + (1, DATE '2024-01-01', 'A', 10), + (2, DATE '2024-01-01', 'B', 20), + (3, DATE '2024-01-02', 'A', 30), + (4, DATE '2024-01-02', 'B', 40), + (5, DATE '2024-01-03', 'A', 50), + (6, DATE '2024-01-03', 'C', 60) + """) + + // Dim1: filters on date + spark + .createDataFrame(Seq((java.sql.Date.valueOf("2024-01-02"), "keep"))) + .toDF("date", "label") + .write + .parquet(dim1Dir.getAbsolutePath) + spark.read.parquet(dim1Dir.getAbsolutePath).createOrReplaceTempView("date_dim") + + // Dim2: filters on category + spark + .createDataFrame(Seq(("A", "keep"))) + .toDF("category", "label") + .write + .parquet(dim2Dir.getAbsolutePath) + spark.read.parquet(dim2Dir.getAbsolutePath).createOrReplaceTempView("cat_dim") + + // Two separate broadcast joins, each creates its own DPP filter + val query = + """SELECT /*+ BROADCAST(d1), BROADCAST(d2) */ f.* + |FROM aqe_cat.db.two_join_fact f + |JOIN date_dim d1 ON f.date = d1.date + |JOIN cat_dim d2 ON f.category = d2.category + |WHERE d1.label = 'keep' AND d2.label = 'keep'""".stripMargin + + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + // Should have DPP subqueries for both joins + val subqueries = collectIcebergDPPSubqueries(cometPlan) + assert(subqueries.nonEmpty, s"Expected DPP subqueries in plan:\n$cometPlan") + + // Each should be CometSubqueryBroadcastExec with an ASPE child wrapping the + // build-side broadcast (reuse manifests as ReusedExchangeExec inside the ASPE). + subqueries.foreach { sub => + assert( + sub.isInstanceOf[CometSubqueryBroadcastExec], + s"Expected CometSubqueryBroadcastExec but got ${sub.getClass.getSimpleName}") + } + + val subqueryCsbs = subqueries.collect { case csb: CometSubqueryBroadcastExec => csb } + assertCsbBroadcastReuse(subqueryCsbs, cometPlan) + + // buildKeys disambiguation: the two DPP subqueries should not share canonical + // form (different join keys -> different broadcasts). Compare canonicalized + // plans rather than Java reference identity. + if (subqueryCsbs.size >= 2) { + val distinctCanonical = subqueryCsbs.map(_.child.canonicalized).distinct + assert( + distinctCanonical.size == subqueryCsbs.size, + s"Expected ${subqueryCsbs.size} distinct broadcasts (by canonical form) " + + s"but got ${distinctCanonical.size}. buildKeys disambiguation may not be working.") + } + + // Verify correct results: date=2024-01-02 AND category=A returns row (3, 2024-01-02, A, 30) + val icebergScans = collectIcebergNativeScans(cometPlan) + assert(icebergScans.nonEmpty, "Expected CometIcebergNativeScanExec in plan") + assert( + icebergScans.head.numPartitions == 1, + s"Expected DPP to prune to 1 partition but got ${icebergScans.head.numPartitions}") + } + + spark.sql("DROP TABLE aqe_cat.db.two_join_fact") + } + } + } + + test("AQE DPP - graceful fallback when broadcast join is not Comet") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true", + // Disable Comet BHJ so the join stays as Spark's BroadcastHashJoinExec. + // The rule cannot find CometBroadcastHashJoinExec and must handle gracefully. + CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.key -> "false", + CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.key -> "false") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.fallback_fact ( + id BIGINT, data STRING, date DATE + ) USING iceberg PARTITIONED BY (date) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.fallback_fact VALUES + (1, 'a', DATE '1970-01-01'), (2, 'b', DATE '1970-01-02'), + (3, 'c', DATE '1970-01-02'), (4, 'd', DATE '1970-01-03') + """) + + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02")))) + .toDF("id", "date") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("fallback_dim") + + // Query should still produce correct results even without Comet BHJ + val query = + """SELECT /*+ BROADCAST(d) */ f.* FROM aqe_cat.db.fallback_fact f + |JOIN fallback_dim d ON f.date = d.date AND d.id = 1 + |ORDER BY f.id""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + assertNoLeftoverCSAB(cometPlan) + + spark.sql("DROP TABLE aqe_cat.db.fallback_fact") + } + } + } + + test("AQE DPP - empty broadcast result prunes all partitions") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.empty_dpp_fact ( + id BIGINT, data STRING, date DATE + ) USING iceberg PARTITIONED BY (date) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.empty_dpp_fact VALUES + (1, 'a', DATE '1970-01-01'), (2, 'b', DATE '1970-01-02'), + (3, 'c', DATE '1970-01-03') + """) + + // Dim table with a value that matches NO fact partitions + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("2099-12-31")))) + .toDF("id", "date") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("empty_dim") + + val query = + """SELECT /*+ BROADCAST(d) */ f.* FROM aqe_cat.db.empty_dpp_fact f + |JOIN empty_dim d ON f.date = d.date AND d.id = 1""".stripMargin + + // Should return empty result, DPP prunes all partitions + val result = spark.sql(query).collect() + assert(result.isEmpty, s"Expected empty result but got ${result.length} rows") + + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + // Verify the rule still converted the SAB + val subqueries = collectIcebergDPPSubqueries(cometPlan) + subqueries.foreach { sub => + assert( + sub.isInstanceOf[CometSubqueryBroadcastExec], + s"Expected CometSubqueryBroadcastExec but got ${sub.getClass.getSimpleName}") + } + } + + spark.sql("DROP TABLE aqe_cat.db.empty_dpp_fact") + } + } + } + + test("AQE DPP - no broadcast join (SMJ) handles SAB gracefully") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + // Disable broadcast to force sort-merge join, no broadcast join for DPP to reuse + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.smj_fact ( + id BIGINT, data STRING, date DATE + ) USING iceberg PARTITIONED BY (date) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.smj_fact VALUES + (1, 'a', DATE '1970-01-01'), (2, 'b', DATE '1970-01-02'), + (3, 'c', DATE '1970-01-02'), (4, 'd', DATE '1970-01-03') + """) + + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02")))) + .toDF("id", "date") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("smj_dim") + + // No BROADCAST hint + threshold=-1 forces SMJ. DPP may still create SABs + // but there is no broadcast join for our rule to find. + val query = + """SELECT f.* FROM aqe_cat.db.smj_fact f + |JOIN smj_dim d ON f.date = d.date + |WHERE d.id = 1 + |ORDER BY f.id""".stripMargin + + // Should produce correct results regardless of DPP path + val (_, cometPlan) = checkSparkAnswer(query) + // Even when no BHJ exists, no CSAB should survive the rule. On 3.5+ the rule + // emits TrueLiteral or aggregate SubqueryExec; on 3.4 the wrapper is never + // created in the first place. A leftover CSAB would crash at runtime. + assertNoLeftoverCSAB(cometPlan) + + spark.sql("DROP TABLE aqe_cat.db.smj_fact") + } + } + } + + // Asserts no leftover CometSubqueryAdaptiveBroadcastExec in the plan. Any survivor + // would throw at execution because doExecute is intentionally unimplemented. + private def assertNoLeftoverCSAB(plan: SparkPlan): Unit = { + val remaining = collectWithSubqueries(plan) { case s: CometSubqueryAdaptiveBroadcastExec => + s + } + assert( + remaining.isEmpty, + "Expected no unconverted CometSubqueryAdaptiveBroadcastExec, " + + s"found ${remaining.size}:\n${plan.treeString}") + } + + test("AQE DPP - ReuseAdaptiveSubquery wraps CSAB in ReusedSubqueryExec (UNION ALL)") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Two fact tables with identical partition schema; UNION ALL pushes a single + // logical DPP through to both scans. Their SABs share buildPlan and canonicalize + // identically, so ReuseAdaptiveSubquery wraps one in ReusedSubqueryExec. + spark.sql(""" + CREATE TABLE aqe_cat.db.fact1 (id BIGINT, fact_date DATE) + USING iceberg PARTITIONED BY (fact_date) + """) + spark.sql(""" + CREATE TABLE aqe_cat.db.fact2 (id BIGINT, fact_date DATE) + USING iceberg PARTITIONED BY (fact_date) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.fact1 VALUES + (1, DATE '2024-01-01'), (2, DATE '2024-01-02'), (3, DATE '2024-01-03') + """) + spark.sql(""" + INSERT INTO aqe_cat.db.fact2 VALUES + (4, DATE '2024-01-01'), (5, DATE '2024-01-02'), (6, DATE '2024-01-03') + """) + + spark + .createDataFrame(Seq((9L, java.sql.Date.valueOf("2024-01-02")))) + .toDF("dim_id", "dim_date") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("union_dim") + + val query = + """SELECT /*+ BROADCAST(d) */ f.id, f.fact_date + |FROM ( + | SELECT id, fact_date FROM aqe_cat.db.fact1 + | UNION ALL + | SELECT id, fact_date FROM aqe_cat.db.fact2 + |) f + |JOIN union_dim d ON f.fact_date = d.dim_date + |WHERE d.dim_id > 7""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + // No CSAB should survive on either version (would crash at execution otherwise). + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + // Subquery dedup: exactly 1 CometSubqueryBroadcastExec shared across both + // fact scans, plus at least 1 ReusedSubqueryExec pointer. + val cometSubqueries = collectWithSubqueries(cometPlan) { + case s: CometSubqueryBroadcastExec => s + } + assert( + cometSubqueries.size == 1, + "Expected exactly 1 CometSubqueryBroadcastExec (shared), got " + + s"${cometSubqueries.size}:\n${cometPlan.treeString}") + val reusedCsbs = collectWithSubqueries(cometPlan) { + case r @ ReusedSubqueryExec(_: CometSubqueryBroadcastExec) => r + } + assert( + reusedCsbs.nonEmpty, + "Expected at least one ReusedSubqueryExec(CometSubqueryBroadcastExec):" + + s"\n${cometPlan.treeString}") + } + + spark.sql("DROP TABLE aqe_cat.db.fact1") + spark.sql("DROP TABLE aqe_cat.db.fact2") + } + } + } + + test("AQE DPP - cross-stage scalar subquery with broadcast reuse") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.scalar_fact ( + id BIGINT, value INT, fact_date DATE + ) USING iceberg PARTITIONED BY (fact_date) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.scalar_fact VALUES + (1, 10, DATE '2024-01-01'), (2, 20, DATE '2024-01-02'), + (3, 30, DATE '2024-01-02'), (4, 40, DATE '2024-01-03') + """) + + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("2024-01-02"), "US"))) + .toDF("dim_id", "dim_date", "country") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("scalar_dim") + + // Same DPP shape in main query and in an uncorrelated scalar subquery. + // Main: BHJ over fact x dim. Scalar: independent scan x dim with same DPP. + // The scalar subquery is a separate ASPE, so the cross-stage search via + // context.qe.executedPlan AND subqueryCache dedup must both work. + // BROADCAST hints on BOTH the main and scalar joins are intentional. The V1 + // analog (CometExecSuite "AQE DPP: uncorrelated scalar subquery with broadcast + // reuse") relies on `withDppTables` running ANALYZE TABLE to populate row-level + // stats, which lets Spark's optimizer naturally pick BuildRight (dim broadcast) + // in both contexts. Iceberg has no clean ANALYZE-equivalent (its stats come + // from manifest summaries), and Spark's default size estimate for the small + // post-aggregate fact tempts AQE to broadcast fact in the scalar subquery - + // BuildLeft instead of BuildRight - which leaves no dim broadcast for the + // SAB to match against, falling through to TrueLiteral. The hint forces the + // same broadcast direction Spark picks naturally for V1, so the rule and + // subqueryCache deduplication path under test actually fires. + val query = + """SELECT /*+ BROADCAST(d) */ + | f.fact_date, + | SUM(f.value) AS s, + | (SELECT /*+ BROADCAST(d2) */ SUM(f2.value) + | FROM aqe_cat.db.scalar_fact f2 + | JOIN scalar_dim d2 ON f2.fact_date = d2.dim_date + | WHERE d2.country = 'US') AS total + |FROM aqe_cat.db.scalar_fact f + |JOIN scalar_dim d ON f.fact_date = d.dim_date + |WHERE d.country = 'US' + |GROUP BY 1""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + // Cross-plan dedup is a known 3.4 limitation (each ASPE sees only its own + // plan at prep-rule time, so the scalar subquery's SAB cannot find the main + // query's BHJ). Strict shape checks only on 3.5+. + if (isSpark35Plus) { + val countBroadcasts = collectWithSubqueries(cometPlan) { + case _: CometSubqueryBroadcastExec => 1 + }.sum + val countReused = collectWithSubqueries(cometPlan) { + case ReusedSubqueryExec(_: CometSubqueryBroadcastExec) => 1 + }.sum + + assert( + countBroadcasts == 1, + "Expected 1 CometSubqueryBroadcastExec (shared across plans), " + + s"got $countBroadcasts:\n${cometPlan.treeString}") + assert( + countReused == 1, + s"Expected 1 ReusedSubqueryExec, got $countReused:\n${cometPlan.treeString}") + } + + spark.sql("DROP TABLE aqe_cat.db.scalar_fact") + } + } + } + + test("AQE DPP - exchange reuse disabled falls through to aggregate SubqueryExec") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + // exchangeReuseEnabled=false drops case 1; onlyInBroadcast=false (the default + // when stats favor running anyway) makes the rule pick case 3, not case 2. + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.noreuse_fact ( + id BIGINT, value INT, fact_date DATE + ) USING iceberg PARTITIONED BY (fact_date) + """) + // Larger fact table so PartitionPruning.pruningHasBenefit evaluates DPP + // worth inserting even with REUSE_BROADCAST_ONLY=false. + spark + .range(1, 200) + .selectExpr( + "id", + "cast(id as int) as value", + "DATE_ADD(DATE '2024-01-01', CAST(id % 4 AS INT)) as fact_date") + .write + .format("iceberg") + .mode("append") + .saveAsTable("aqe_cat.db.noreuse_fact") + + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("2024-01-02")))) + .toDF("dim_id", "dim_date") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("noreuse_dim") + + val query = + """SELECT /*+ BROADCAST(d) */ f.* FROM aqe_cat.db.noreuse_fact f + |JOIN noreuse_dim d ON f.fact_date = d.dim_date + |WHERE d.dim_id = 1 + |ORDER BY f.id""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + // With exchange reuse off, no CometSubqueryBroadcastExec is created. + // Case 3 emits an aggregate SubqueryExec (or Spark may inline TrueLiteral). + val csbCount = collectWithSubqueries(cometPlan) { case _: CometSubqueryBroadcastExec => + 1 + }.sum + assert( + csbCount == 0, + "Expected 0 CometSubqueryBroadcastExec when exchange reuse is off, " + + s"got $csbCount:\n${cometPlan.treeString}") + + spark.sql("DROP TABLE aqe_cat.db.noreuse_fact") + } + } + } + + test("AQE DPP - broadcast exchange count is 1 across multiple DPP filters") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.exchg_fact ( + id BIGINT, data STRING, fact_date DATE + ) USING iceberg PARTITIONED BY (data, fact_date) + """) + val df = spark + .range(1, 50) + .selectExpr( + "id", + "CAST(DATE_ADD(DATE '2024-01-01', CAST(id % 4 AS INT)) AS STRING) as data", + "DATE_ADD(DATE '2024-01-01', CAST(id % 4 AS INT)) as fact_date") + df.coalesce(1) + .write + .format("iceberg") + .option("fanout-enabled", "true") + .mode("append") + .saveAsTable("aqe_cat.db.exchg_fact") + + spark + .createDataFrame(Seq((1L, "2024-01-02", java.sql.Date.valueOf("2024-01-02")))) + .toDF("dim_id", "data", "dim_date") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("exchg_dim") + + val query = + """SELECT /*+ BROADCAST(d) */ f.* + |FROM aqe_cat.db.exchg_fact f + |JOIN exchg_dim d ON f.data = d.data AND f.fact_date = d.dim_date""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + // Two DPP filters from the same join must not duplicate the dim broadcast. + // Count broadcasts across the whole plan including subquery contexts. + val cometBroadcasts = collectWithSubqueries(cometPlan) { + case b: CometBroadcastExchangeExec => b + } + assert( + cometBroadcasts.size == 1, + "Expected exactly 1 CometBroadcastExchangeExec across whole plan, " + + s"got ${cometBroadcasts.size}:\n${cometPlan.treeString}") + } + + spark.sql("DROP TABLE aqe_cat.db.exchg_fact") + } + } + } + + // SPARK-34637: DPP-side broadcast query stage must be created before the main + // join's broadcast stage. Iceberg is V2 BatchScan, exercising the equivalent of + // Spark's V2 path. + test("AQE DPP - V2 broadcast query stage creation order (SPARK-34637)") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.q34637_fact ( + store_id INT, units_sold INT + ) USING iceberg PARTITIONED BY (store_id) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.q34637_fact VALUES + (1, 70), (1, 70), (15, 70), (15, 70), (2, 30), (3, 40) + """) + + // Mirrors the SPARK-34637 query: WITH clause grouped by store_id, self-joined. + // The DPP-side broadcast stage must be created before the main BHJ's stage, + // otherwise AQE's stageCache cannot dedupe. + val query = + """WITH v AS ( + | SELECT f.store_id FROM aqe_cat.db.q34637_fact f + | WHERE f.units_sold = 70 GROUP BY f.store_id + |) + |SELECT * FROM v v1 JOIN v v2 ON v1.store_id = v2.store_id""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + val cometSubqueries = collectWithSubqueries(cometPlan) { + case s: CometSubqueryBroadcastExec => s + } + assert( + cometSubqueries.nonEmpty, + s"Expected CometSubqueryBroadcastExec for DPP, got 0:\n${cometPlan.treeString}") + } + + spark.sql("DROP TABLE aqe_cat.db.q34637_fact") + } + } + } + + // SPARK-32509 (Iceberg port): DPP filter exists but no matching BHJ + // (AUTO_BROADCASTJOIN_THRESHOLD=-1 forces SMJ). The unused DPP filter degrades to + // `Literal.TrueLiteral`. The invariant under test: the two scans of the same table + // canonicalize identically AFTER DPP degradation, so the fact data is read exactly once. + // This depends on CometIcebergNativeScanExec.doCanonicalize stripping + // DynamicPruningExpression(TrueLiteral) - analogous to FileSourceScanExec.doCanonicalize + // on V1. + // + // The "exactly once" property manifests differently across Spark versions: + // - 3.5+: EnsureRequirements inserts a hash shuffle for SMJ; AQE recognizes the matching + // canonical form on the peer side and emits ReusedExchangeExec -> 1 shuffle, 1 reuse. + // Same shape as V1's CometExecSuite version of this test. + // - 3.4: planner doesn't insert shuffles for this query shape (V2-specific outputPartitioning + // interaction with EnsureRequirements). Result: 0 shuffles. Still "data read once" - just + // via a different mechanism. + // Either shape satisfies the invariant; assertion accepts both. + test("AQE DPP - unused DPP filter and exchange reuse (SPARK-32509)") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.q32509_fact ( + store_id INT, units_sold INT + ) USING iceberg PARTITIONED BY (store_id) + """) + // Match V1's withDppTables shape: 31 rows across many distinct store_ids -> many file + // partitions on Iceberg's PARTITIONED BY (store_id). Multiple partitions ensure SMJ + // sees enough work to need shuffles (Spark's planner can skip shuffles for trivially + // small inputs). Only store_id=15 has units_sold=70, mirroring V1, so the self-join + // WHERE units_sold=70 produces exactly one (15, 15) row. + spark + .range(1, 32) + .selectExpr( + "cast(id as int) as store_id", + "cast(case when id = 15 then 70 else (id * 10) end as int) as units_sold") + .write + .format("iceberg") + .mode("append") + .saveAsTable("aqe_cat.db.q32509_fact") + + val query = + """WITH v1 AS ( + | SELECT f.store_id FROM aqe_cat.db.q32509_fact f WHERE f.units_sold = 70 + |) + |SELECT * FROM v1 a JOIN v1 b ON a.store_id = b.store_id""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + // Accept either shape (see test docstring): single shuffle with reuse, or no + // shuffles at all. Both prove the two scans canonicalize identically after DPP + // degrades to TrueLiteral, so fact data is read exactly once. + val shuffleExchanges = collect(cometPlan) { + case e: ShuffleExchangeExec => e + case e: CometShuffleExchangeExec => e + } + val reusedExchanges = collect(cometPlan) { case r: ReusedExchangeExec => r } + + val singleShuffleWithReuse = + shuffleExchanges.size == 1 && reusedExchanges.size == 1 + val noShuffles = shuffleExchanges.isEmpty && reusedExchanges.isEmpty + assert( + singleShuffleWithReuse || noShuffles, + "Expected fact data read exactly once: either (1 shuffle + 1 ReusedExchange) " + + s"or (0 shuffles), got (${shuffleExchanges.size} shuffles, " + + s"${reusedExchanges.size} reused):\n${cometPlan.treeString}") + + spark.sql("DROP TABLE aqe_cat.db.q32509_fact") + } + } + } + + // Multi-key BHJ where SAB build keys must keep their position so the index lookup + // selects the right DPP value. A bug in convertSAB key handling would either pick + // the wrong column or produce SubqueryExec instead of broadcast reuse. + test("AQE DPP - avoid reordering broadcast join keys") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dim2Dir = new File(warehouseDir, "dim2_parquet") + val dim3Dir = new File(warehouseDir, "dim3_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.large ( + id BIGINT, A BIGINT, B BIGINT + ) USING iceberg PARTITIONED BY (A) + """) + spark.sql(""" + INSERT INTO aqe_cat.db.large + SELECT id, id + 1, id + 2 FROM range(100) + """) + + spark + .createDataFrame((0 until 10).map(i => (i.toLong, (i + 1).toLong, (i + 2).toLong))) + .toDF("id", "C", "D") + .write + .parquet(dim2Dir.getAbsolutePath) + spark.read.parquet(dim2Dir.getAbsolutePath).createOrReplaceTempView("dimTwo") + + spark + .createDataFrame( + (0 until 10).map(i => (i.toLong, (i + 1).toLong, (i + 2).toLong, (i + 3).toLong))) + .toDF("id", "E", "F", "G") + .write + .parquet(dim3Dir.getAbsolutePath) + spark.read.parquet(dim3Dir.getAbsolutePath).createOrReplaceTempView("dimThree") + + // Compose a query that triggers DPP via a BROADCAST join with multi-key + // build side, and a leading SMJ that consumes (A, B) in one order while the + // BHJ joins on (B, A) in another. If our rule reorders buildKeys, the DPP + // value picked from the broadcast row will be wrong or DPP will degrade to + // SubqueryExec. + val query = + """SELECT /*+ BROADCAST(prod) */ fact.id + |FROM aqe_cat.db.large fact + |LEFT JOIN dimTwo dim2 ON fact.A = dim2.C AND fact.B = dim2.D + |JOIN dimThree prod ON fact.B = prod.F AND fact.A = prod.E + |WHERE prod.G > 5""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + + assertNoLeftoverCSAB(cometPlan) + + if (isSpark35Plus) { + val dpExprs = collect(cometPlan) { case s: CometIcebergNativeScanExec => s } + .flatMap(_.runtimeFilters.collect { case d: DynamicPruningExpression => + d.child + }) + val hasSubquery = dpExprs.exists { + case InSubqueryExec(_, _: SubqueryExec, _, _, _, _) => true + case _ => false + } + val hasBroadcast = dpExprs.exists { + case InSubqueryExec(_, _: CometSubqueryBroadcastExec, _, _, _, _) => true + case _ => false + } + assert(!hasSubquery, s"Should not have SubqueryExec DPP:\n${cometPlan.treeString}") + assert(hasBroadcast, s"Should have broadcast DPP:\n${cometPlan.treeString}") + } + + spark.sql("DROP TABLE aqe_cat.db.large") + } + } + } + + // Join key is non-atomic (struct/array of partition column). Our sabKeyIds extraction + // via references.map(_.exprId) must traverse the wrapper so the BHJ key lookup matches. + test("AQE DPP - non-atomic type (struct/array) join key") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.aqe_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.aqe_cat.type" -> "hadoop", + "spark.sql.catalog.aqe_cat.warehouse" -> warehouseDir.getAbsolutePath, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1KB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE aqe_cat.db.nonatomic_fact ( + date_id INT, store_id INT, units_sold INT + ) USING iceberg PARTITIONED BY (store_id) + """) + spark + .range(100) + .selectExpr( + "cast(id as int) as date_id", + "cast(id % 10 as int) as store_id", + "cast(id * 2 as int) as units_sold") + .write + .format("iceberg") + .mode("append") + .saveAsTable("aqe_cat.db.nonatomic_fact") + + spark + .range(10) + .selectExpr("cast(id as int) as store_id", "cast(id as string) as country") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("nonatomic_dim") + + Seq("struct", "array").foreach { dataType => + val query = + s"""SELECT /*+ BROADCAST(d) */ f.date_id, f.store_id + |FROM aqe_cat.db.nonatomic_fact f + |JOIN nonatomic_dim d + | ON $dataType(f.store_id) = $dataType(d.store_id) + |WHERE d.country = '3'""".stripMargin + val (_, cometPlan) = checkSparkAnswer(query) + assertNoLeftoverCSAB(cometPlan) + } + + spark.sql("DROP TABLE aqe_cat.db.nonatomic_fact") + } + } + } }