diff --git a/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala b/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala index dfd7028150b..5b0c99c991c 100644 --- a/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala +++ b/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala @@ -16,6 +16,7 @@ package io.delta.sql +import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule @@ -69,6 +70,17 @@ import org.apache.spark.sql.catalyst.rules.Rule */ class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension { + override def apply(extensions: SparkSessionExtensions): Unit = { + // First apply the base extensions from AbstractDeltaSparkSessionExtension + super.apply(extensions) + + // Register the analyzer rule for V2 streaming + // This rule replaces V1 (DeltaTableV2) with V2 (SparkTable) for streaming queries + extensions.injectResolutionRule { session => + new UseV2ForStreamingRule(session) + } + } + /** * NoOpRule for binary compatibility with Delta 3.3.0 * This class must remain here to satisfy MiMa checks diff --git a/spark-unified/src/main/scala/io/delta/sql/DeltaStreamingAnalyzer.scala b/spark-unified/src/main/scala/io/delta/sql/DeltaStreamingAnalyzer.scala new file mode 100644 index 00000000000..70ebfa487f7 --- /dev/null +++ b/spark-unified/src/main/scala/io/delta/sql/DeltaStreamingAnalyzer.scala @@ -0,0 +1,138 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sql + +import scala.collection.JavaConverters._ + +import io.delta.kernel.spark.table.SparkTable + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Analyzer rule that converts V1 StreamingRelation to V2 StreamingRelationV2 with V2-based + * tables for catalog-managed Delta tables only. + * + * This rule enables Delta to use: + * - V2 (DataSource V2) implementation for streaming reads of catalog-managed tables + * - V1 (DeltaLog-based) implementation for path-based tables, streaming writes, and batch ops + * + * The rule works by: + * 1. Pattern matching on StreamingRelation nodes (V1 streaming sources) + * 2. Checking if the source is a catalog-managed Delta table + * 3. Converting to StreamingRelationV2 with SparkTable (V2) + * + * This approach ensures: + * - Only catalog-managed tables get V2 streaming + * - Path-based tables continue using V1 + * - Streaming writes (sinks) continue to use V1 + * - Batch operations continue to use V1 + */ +class UseV2ForStreamingRule(spark: SparkSession) extends Rule[LogicalPlan] { + + // Check if V2 streaming is enabled via configuration + private def isV2StreamingEnabled: Boolean = { + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_V2_STREAMING_ENABLED) + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!isV2StreamingEnabled) { + return plan + } + + // Transform StreamingRelation (V1) to StreamingRelationV2 (V2) for catalog-managed tables + plan.resolveOperatorsDown { + case streamingRel @ StreamingRelation(dataSource, sourceName, output) + if isCatalogManagedDeltaTable(dataSource) && + isCatalogOwnedTable(spark, dataSource.catalogTable.get) => + + val catalogTable = dataSource.catalogTable.get + val tableIdent = catalogTable.identifier + + logInfo( + s"Converting StreamingRelation to StreamingRelationV2 with SparkTable for " + + s"catalog-managed table: ${tableIdent.unquotedString}") + + // Get catalog and identifier + val catalog = spark.sessionState.catalogManager.currentCatalog + val identifier = Identifier.of( + tableIdent.database.toArray, + tableIdent.table + ) + + // Create V2 table for streaming read + val v2Table = new SparkTable( + identifier, + catalogTable.location.toString, + dataSource.options.asJava + ) + + // Get output attributes from the table schema + val outputAttributes = DataTypeUtils.toAttributes(v2Table.schema()) + + // Return StreamingRelationV2 with V1 fallback + // Note: v1Relation allows Spark to fall back to V1 if V2 execution fails + StreamingRelationV2( + None, // source: no TableProvider, just the table + sourceName, + v2Table, // table: directly pass SparkTable + new CaseInsensitiveStringMap(dataSource.options.asJava), + outputAttributes, // output attributes + Some(catalog), + Some(identifier), + Some(streamingRel) // v1Relation fallback + ) + + // Don't transform anything else - this preserves: + // - Path-based Delta tables (no catalogTable) + // - Streaming writes (no StreamingRelation wrapper) + // - Batch reads (no StreamingRelation wrapper) + // - Batch writes (no StreamingRelation wrapper) + } + } + + /** + * Check if the DataSource is a catalog-managed Delta table. + * We only convert catalog-managed tables to V2, not path-based tables. + */ + private def isCatalogManagedDeltaTable(dataSource: DataSource): Boolean = { + dataSource.catalogTable.exists { catalogTable => + // Check if it's a Delta table by looking at the provider + catalogTable.provider.exists(_.equalsIgnoreCase("delta")) + } + } + + /** + * Check if the table is catalog-owned (CCV2). + */ + private def isCatalogOwnedTable( + spark: SparkSession, + catalogTable: CatalogTable): Boolean = { + // TODO: Implement actual check for catalog-owned tables + // Currently returns true to enable V2 streaming for all catalog-managed tables + true + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 571514eb590..6f046690de8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2856,6 +2856,22 @@ trait DeltaSQLConfBase { |When enabled, it's decided by a per-command flag.""".stripMargin) .booleanConf .createWithDefault(false) + + //////////////// + // V2 STREAMING + //////////////// + + val DELTA_V2_STREAMING_ENABLED = + buildConf("v2.streaming.enabled") + .internal() + .doc( + """When enabled, streaming queries will use the V2 (DataSource V2) implementation + |with MicroBatchStream support. Batch queries and write operations will continue to use + |the traditional V1 (DeltaLog-based) implementation. This allows gradual rollout of + |V2 streaming while maintaining compatibility with existing operations. + |""".stripMargin) + .booleanConf + .createWithDefault(false) } object DeltaSQLConf extends DeltaSQLConfBase diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/UseV2ForStreamingRuleSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/UseV2ForStreamingRuleSuite.scala new file mode 100644 index 00000000000..8c02ff1271d --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/UseV2ForStreamingRuleSuite.scala @@ -0,0 +1,209 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import io.delta.sql.UseV2ForStreamingRule +import io.delta.kernel.spark.table.SparkTable + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSparkSession + +class UseV2ForStreamingRuleSuite + extends QueryTest + with SharedSparkSession + with StreamTest + with DeltaSQLCommandTest { + + import testImplicits._ + + // Helper: run code with V2 streaming enabled/disabled + def withV2Streaming[T](enabled: Boolean)(f: => T): T = { + val key = DeltaSQLConf.DELTA_V2_STREAMING_ENABLED.key + val oldValue = spark.conf.getOption(key) + try { + spark.conf.set(key, enabled.toString) + f + } finally { + oldValue match { + case Some(v) => spark.conf.set(key, v) + case None => spark.conf.unset(key) + } + } + } + + // Helper: check for V2 in logical plan + def assertUsesV2(df: DataFrame): Unit = { + val plan = df.queryExecution.analyzed + val hasV2 = plan.collectFirst { + case StreamingRelationV2(_, _, table, _, _, _, _, _) + if table.isInstanceOf[SparkTable] => + }.isDefined + assert(hasV2, s"Expected V2 with SparkTable:\n${plan.treeString}") + } + + // Helper: check for V1 in logical plan + def assertUsesV1(df: DataFrame): Unit = { + val plan = df.queryExecution.analyzed + val hasV1 = plan.collectFirst { + case _: StreamingRelation => + }.isDefined + assert(hasV1, s"Expected V1 StreamingRelation:\n${plan.treeString}") + } + + test("catalog table logical plan uses V2 when enabled") { + withTable("test_table") { + sql("CREATE TABLE test_table (id INT, value STRING) USING delta") + sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')") + + withV2Streaming(enabled = true) { + val streamDF = spark.readStream.table("test_table") + assertUsesV2(streamDF) + } + } + } + + // TODO: Enable when V2 implements latestOffset() + ignore("catalog table execution with V2 produces correct results") { + withTable("test_table") { + sql("CREATE TABLE test_table (id INT, value STRING) USING delta") + sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + withV2Streaming(enabled = true) { + val streamDF = spark.readStream.table("test_table") + + val query = streamDF + .writeStream + .format("memory") + .queryName("output") + .start() + + query.processAllAvailable() + query.stop() + + checkAnswer( + spark.table("output"), + Seq(Row(1, "a"), Row(2, "b"), Row(3, "c")) + ) + } + } + } + + test("path-based table uses V1 even when config enabled") { + withTempDir { dir => + spark.range(5).write.format("delta").save(dir.getCanonicalPath) + + withV2Streaming(enabled = true) { + val streamDF = spark.readStream.format("delta").load(dir.getCanonicalPath) + assertUsesV1(streamDF) + } + } + } + + test("path-based table execution produces correct results") { + withTempDir { dir => + spark.range(5).toDF("id").write.format("delta").save(dir.getCanonicalPath) + + withV2Streaming(enabled = true) { + val streamDF = spark.readStream.format("delta").load(dir.getCanonicalPath) + + val query = streamDF + .writeStream + .format("memory") + .queryName("path_output") + .start() + + query.processAllAvailable() + query.stop() + + checkAnswer( + spark.table("path_output"), + (0 until 5).map(i => Row(i.toLong)) + ) + } + } + } + + test("config disabled - catalog table uses V1") { + withTable("test_table") { + sql("CREATE TABLE test_table (id INT) USING delta") + sql("INSERT INTO test_table VALUES (1)") + + withV2Streaming(enabled = false) { + val streamDF = spark.readStream.table("test_table") + assertUsesV1(streamDF) + } + } + } + + test("V1 fallback field is populated") { + withTable("test_table") { + sql("CREATE TABLE test_table (id INT) USING delta") + sql("INSERT INTO test_table VALUES (1)") + + withV2Streaming(enabled = true) { + val streamDF = spark.readStream.table("test_table") + val plan = streamDF.queryExecution.analyzed + + val v1Fallback = plan.collectFirst { + case StreamingRelationV2(_, _, _, _, _, _, _, v1Relation) => v1Relation + } + + assert(v1Fallback.isDefined, "StreamingRelationV2 should exist") + assert(v1Fallback.get.isDefined, "v1Relation should be populated") + assert(v1Fallback.get.get.isInstanceOf[StreamingRelation], + "v1Relation should contain StreamingRelation") + } + } + } + + // TODO: Enable when V2 implements latestOffset() + ignore("multi-batch streaming with V2 processes correctly") { + withTable("test_table") { + sql("CREATE TABLE test_table (id INT, value STRING) USING delta") + sql("INSERT INTO test_table VALUES (1, 'a')") + + withV2Streaming(enabled = true) { + val streamDF = spark.readStream.table("test_table") + + val query = streamDF + .writeStream + .format("memory") + .queryName("multi_batch") + .start() + + query.processAllAvailable() + + // Add more data while streaming + sql("INSERT INTO test_table VALUES (2, 'b'), (3, 'c')") + query.processAllAvailable() + + query.stop() + + checkAnswer( + spark.table("multi_batch"), + Seq(Row(1, "a"), Row(2, "b"), Row(3, "c")) + ) + } + } + } +} +