Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.delta.sql

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

Expand Down Expand Up @@ -69,6 +70,17 @@ import org.apache.spark.sql.catalyst.rules.Rule
*/
class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension {

override def apply(extensions: SparkSessionExtensions): Unit = {
// First apply the base extensions from AbstractDeltaSparkSessionExtension
super.apply(extensions)

// Register the analyzer rule for V2 streaming
// This rule replaces V1 (DeltaTableV2) with V2 (SparkTable) for streaming queries
extensions.injectResolutionRule { session =>
new UseV2ForStreamingRule(session)
}
}

/**
* NoOpRule for binary compatibility with Delta 3.3.0
* This class must remain here to satisfy MiMa checks
Expand Down
138 changes: 138 additions & 0 deletions spark-unified/src/main/scala/io/delta/sql/DeltaStreamingAnalyzer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql

import scala.collection.JavaConverters._

import io.delta.kernel.spark.table.SparkTable

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Analyzer rule that converts V1 StreamingRelation to V2 StreamingRelationV2 with V2-based
* tables for catalog-managed Delta tables only.
*
* This rule enables Delta to use:
* - V2 (DataSource V2) implementation for streaming reads of catalog-managed tables
* - V1 (DeltaLog-based) implementation for path-based tables, streaming writes, and batch ops
*
* The rule works by:
* 1. Pattern matching on StreamingRelation nodes (V1 streaming sources)
* 2. Checking if the source is a catalog-managed Delta table
* 3. Converting to StreamingRelationV2 with SparkTable (V2)
*
* This approach ensures:
* - Only catalog-managed tables get V2 streaming
* - Path-based tables continue using V1
* - Streaming writes (sinks) continue to use V1
* - Batch operations continue to use V1
*/
class UseV2ForStreamingRule(spark: SparkSession) extends Rule[LogicalPlan] {

// Check if V2 streaming is enabled via configuration
private def isV2StreamingEnabled: Boolean = {
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_V2_STREAMING_ENABLED)
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!isV2StreamingEnabled) {
return plan
}

// Transform StreamingRelation (V1) to StreamingRelationV2 (V2) for catalog-managed tables
plan.resolveOperatorsDown {
case streamingRel @ StreamingRelation(dataSource, sourceName, output)
if isCatalogManagedDeltaTable(dataSource) &&
isCatalogOwnedTable(spark, dataSource.catalogTable.get) =>

val catalogTable = dataSource.catalogTable.get
val tableIdent = catalogTable.identifier

logInfo(
s"Converting StreamingRelation to StreamingRelationV2 with SparkTable for " +
s"catalog-managed table: ${tableIdent.unquotedString}")

// Get catalog and identifier
val catalog = spark.sessionState.catalogManager.currentCatalog
val identifier = Identifier.of(
tableIdent.database.toArray,
tableIdent.table
)

// Create V2 table for streaming read
val v2Table = new SparkTable(
identifier,
catalogTable.location.toString,
dataSource.options.asJava
)

// Get output attributes from the table schema
val outputAttributes = DataTypeUtils.toAttributes(v2Table.schema())

// Return StreamingRelationV2 with V1 fallback
// Note: v1Relation allows Spark to fall back to V1 if V2 execution fails
StreamingRelationV2(
None, // source: no TableProvider, just the table
sourceName,
v2Table, // table: directly pass SparkTable
new CaseInsensitiveStringMap(dataSource.options.asJava),
outputAttributes, // output attributes
Some(catalog),
Some(identifier),
Some(streamingRel) // v1Relation fallback
)

// Don't transform anything else - this preserves:
// - Path-based Delta tables (no catalogTable)
// - Streaming writes (no StreamingRelation wrapper)
// - Batch reads (no StreamingRelation wrapper)
// - Batch writes (no StreamingRelation wrapper)
}
}

/**
* Check if the DataSource is a catalog-managed Delta table.
* We only convert catalog-managed tables to V2, not path-based tables.
*/
private def isCatalogManagedDeltaTable(dataSource: DataSource): Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5477 will introduce utils to check is a table is a ccv2 table or uc ccv2 table. Maybe try to use/ patch it for testing?

dataSource.catalogTable.exists { catalogTable =>
// Check if it's a Delta table by looking at the provider
catalogTable.provider.exists(_.equalsIgnoreCase("delta"))
}
}

/**
* Check if the table is catalog-owned (CCV2).
*/
private def isCatalogOwnedTable(
spark: SparkSession,
catalogTable: CatalogTable): Boolean = {
// TODO: Implement actual check for catalog-owned tables
// Currently returns true to enable V2 streaming for all catalog-managed tables
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2856,6 +2856,22 @@ trait DeltaSQLConfBase {
|When enabled, it's decided by a per-command flag.""".stripMargin)
.booleanConf
.createWithDefault(false)

////////////////
// V2 STREAMING
////////////////

val DELTA_V2_STREAMING_ENABLED =
buildConf("v2.streaming.enabled")
.internal()
.doc(
"""When enabled, streaming queries will use the V2 (DataSource V2) implementation
|with MicroBatchStream support. Batch queries and write operations will continue to use
|the traditional V1 (DeltaLog-based) implementation. This allows gradual rollout of
|V2 streaming while maintaining compatibility with existing operations.
|""".stripMargin)
.booleanConf
.createWithDefault(false)
}

object DeltaSQLConf extends DeltaSQLConfBase
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import io.delta.sql.UseV2ForStreamingRule
import io.delta.kernel.spark.table.SparkTable

import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.test.SharedSparkSession

class UseV2ForStreamingRuleSuite
extends QueryTest
with SharedSparkSession
with StreamTest
with DeltaSQLCommandTest {

import testImplicits._

// Helper: run code with V2 streaming enabled/disabled
def withV2Streaming[T](enabled: Boolean)(f: => T): T = {
val key = DeltaSQLConf.DELTA_V2_STREAMING_ENABLED.key
val oldValue = spark.conf.getOption(key)
try {
spark.conf.set(key, enabled.toString)
f
} finally {
oldValue match {
case Some(v) => spark.conf.set(key, v)
case None => spark.conf.unset(key)
}
}
}

// Helper: check for V2 in logical plan
def assertUsesV2(df: DataFrame): Unit = {
val plan = df.queryExecution.analyzed
val hasV2 = plan.collectFirst {
case StreamingRelationV2(_, _, table, _, _, _, _, _)
if table.isInstanceOf[SparkTable] =>
}.isDefined
assert(hasV2, s"Expected V2 with SparkTable:\n${plan.treeString}")
}

// Helper: check for V1 in logical plan
def assertUsesV1(df: DataFrame): Unit = {
val plan = df.queryExecution.analyzed
val hasV1 = plan.collectFirst {
case _: StreamingRelation =>
}.isDefined
assert(hasV1, s"Expected V1 StreamingRelation:\n${plan.treeString}")
}

test("catalog table logical plan uses V2 when enabled") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT, value STRING) USING delta")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's have a beforeAll and afterAll action for the test creatation and drop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think withTable is cleaner?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we dont need to create table and insert table in every test case. Anyway this is nit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this table properties? delta.feature.catalogOwned-preview=supported

sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")

withV2Streaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")
assertUsesV2(streamDF)
}
}
}

// TODO: Enable when V2 implements latestOffset()
ignore("catalog table execution with V2 produces correct results") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT, value STRING) USING delta")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c')")

withV2Streaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")

val query = streamDF
.writeStream
.format("memory")
.queryName("output")
.start()

query.processAllAvailable()
query.stop()

checkAnswer(
spark.table("output"),
Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))
)
}
}
}

test("path-based table uses V1 even when config enabled") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to test the V2 streaming code path with path-based table as well.
So probably we should create a new config for testing only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how this works, SparkTable requires Identifier as a constructor parameter. Unless we want to use a random generated identifier?

Copy link
Collaborator

@gengliangwang gengliangwang Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta.`tablePath`

withTempDir { dir =>
spark.range(5).write.format("delta").save(dir.getCanonicalPath)

withV2Streaming(enabled = true) {
val streamDF = spark.readStream.format("delta").load(dir.getCanonicalPath)
assertUsesV1(streamDF)
}
}
}

test("path-based table execution produces correct results") {
withTempDir { dir =>
spark.range(5).toDF("id").write.format("delta").save(dir.getCanonicalPath)

withV2Streaming(enabled = true) {
val streamDF = spark.readStream.format("delta").load(dir.getCanonicalPath)

val query = streamDF
.writeStream
.format("memory")
.queryName("path_output")
.start()

query.processAllAvailable()
query.stop()

checkAnswer(
spark.table("path_output"),
(0 until 5).map(i => Row(i.toLong))
)
}
}
}

test("config disabled - catalog table uses V1") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT) USING delta")
sql("INSERT INTO test_table VALUES (1)")

withV2Streaming(enabled = false) {
val streamDF = spark.readStream.table("test_table")
assertUsesV1(streamDF)
}
}
}

test("V1 fallback field is populated") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT) USING delta")
sql("INSERT INTO test_table VALUES (1)")

withV2Streaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")
val plan = streamDF.queryExecution.analyzed

val v1Fallback = plan.collectFirst {
case StreamingRelationV2(_, _, _, _, _, _, _, v1Relation) => v1Relation
}

assert(v1Fallback.isDefined, "StreamingRelationV2 should exist")
assert(v1Fallback.get.isDefined, "v1Relation should be populated")
assert(v1Fallback.get.get.isInstanceOf[StreamingRelation],
"v1Relation should contain StreamingRelation")
}
}
}

// TODO: Enable when V2 implements latestOffset()
ignore("multi-batch streaming with V2 processes correctly") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT, value STRING) USING delta")
sql("INSERT INTO test_table VALUES (1, 'a')")

withV2Streaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")

val query = streamDF
.writeStream
.format("memory")
.queryName("multi_batch")
.start()

query.processAllAvailable()

// Add more data while streaming
sql("INSERT INTO test_table VALUES (2, 'b'), (3, 'c')")
query.processAllAvailable()

query.stop()

checkAnswer(
spark.table("multi_batch"),
Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))
)
}
}
}
}

Loading