Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,11 @@ lazy val spark = (project in file("spark-unified"))
// MUST be set BEFORE crossSparkSettings() to avoid overwriting version-specific directories
Test / unmanagedSourceDirectories := {
val sparkDir = (sparkV1 / baseDirectory).value
val unifiedDir = baseDirectory.value
Seq(
sparkDir / "src" / "test" / "scala",
sparkDir / "src" / "test" / "java"
sparkDir / "src" / "test" / "java",
unifiedDir / "src" / "test" / "scala" // Add spark-unified test sources
)
},
Test / unmanagedResourceDirectories := Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.delta.sql

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

Expand Down Expand Up @@ -69,6 +70,17 @@ import org.apache.spark.sql.catalyst.rules.Rule
*/
class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension {

override def apply(extensions: SparkSessionExtensions): Unit = {
// First apply the base extensions from AbstractDeltaSparkSessionExtension
super.apply(extensions)

// Register the analyzer rule for kernel-based streaming
// This rule replaces V1 (DeltaTableV2) with V2 (SparkTable) for streaming queries
extensions.injectResolutionRule { session =>
new UseKernelForStreamingRule(session)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kernel-spark will be named as sparkV2. So let's rename the rule as UseV2ForStreaming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

/**
* NoOpRule for binary compatibility with Delta 3.3.0
* This class must remain here to satisfy MiMa checks
Expand Down
125 changes: 125 additions & 0 deletions spark-unified/src/main/scala/io/delta/sql/DeltaStreamingAnalyzer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, StreamingRelationV2}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.StreamingRelation
import io.delta.kernel.spark.table.SparkTable

/**
* Analyzer rule that converts V1 StreamingRelation to V2 StreamingRelationV2 with Kernel-based
* tables for catalog-managed Delta tables only.
*
* This rule enables Delta to use:
* - V2 (Kernel-based) implementation for streaming reads of catalog-managed tables
* - V1 (DeltaLog-based) implementation for path-based tables, streaming writes, and batch ops
*
* The rule works by:
* 1. Pattern matching on StreamingRelation nodes (V1 streaming sources)
* 2. Checking if the source is a catalog-managed Delta table
* 3. Converting to StreamingRelationV2 with SparkTable (V2)
*
* This approach ensures:
* - Only catalog-managed tables get V2 streaming
* - Path-based tables continue using V1
* - Streaming writes (sinks) continue to use V1
* - Batch operations continue to use V1
*/
class UseKernelForStreamingRule(spark: SparkSession) extends Rule[LogicalPlan] {

// Check if Kernel streaming is enabled via configuration
private def isKernelStreamingEnabled: Boolean = {
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_KERNEL_STREAMING_ENABLED)
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!isKernelStreamingEnabled) {
return plan
}

// Transform StreamingRelation (V1) to StreamingRelationV2 (V2) for catalog-managed tables
plan.transformUp {
case streamingRel @ StreamingRelation(dataSource, sourceName, output)
if isCatalogManagedDeltaTable(dataSource) =>

val catalogTable = dataSource.catalogTable.get
val tableIdent = catalogTable.identifier

logInfo(
s"Converting StreamingRelation to StreamingRelationV2 with SparkTable for " +
s"catalog-managed table: ${tableIdent.unquotedString}")

// Get catalog and identifier
val catalog = spark.sessionState.catalogManager.currentCatalog
val identifier = Identifier.of(
tableIdent.database.toArray,
tableIdent.table
)

// Create V2 table for streaming read
val v2Table = new SparkTable(
identifier,
catalogTable.location.toString,
dataSource.options.asJava
)

// Create ResolvedTable
val resolvedTable = ResolvedTable(
catalog,
identifier,
v2Table,
v2Table.columns.toAttributes
)

// Return StreamingRelationV2 with V1 fallback
// Note: v1Relation allows Spark to fall back to V1 if V2 execution fails
StreamingRelationV2(
source = Some(dataSource),
sourceName = sourceName,
table = resolvedTable,
extraOptions = dataSource.options,
output = output,
catalog = Some(catalog),
identifier = Some(identifier),
v1Relation = Some(streamingRel)
)

// Don't transform anything else - this preserves:
// - Path-based Delta tables (no catalogTable)
// - Streaming writes (no StreamingRelation wrapper)
// - Batch reads (no StreamingRelation wrapper)
// - Batch writes (no StreamingRelation wrapper)
}
}

/**
* Check if the DataSource is a catalog-managed Delta table.
* We only convert catalog-managed tables to V2, not path-based tables.
*/
private def isCatalogManagedDeltaTable(dataSource: DataSource): Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5477 will introduce utils to check is a table is a ccv2 table or uc ccv2 table. Maybe try to use/ patch it for testing?

dataSource.catalogTable.exists { catalogTable =>
// Check if it's a Delta table by looking at the provider
catalogTable.provider.exists(_.equalsIgnoreCase("delta"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql

import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.StreamingRelationV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.execution.streaming.{StreamTest, StreamingRelation}
import org.apache.spark.sql.test.SharedSparkSession
import io.delta.kernel.spark.table.SparkTable

class UseKernelForStreamingRuleSuite
extends QueryTest
with SharedSparkSession
with StreamTest
with DeltaSQLCommandTest {

import testImplicits._

// Helper: enable/disable config
def withKernelStreaming[T](enabled: Boolean)(f: => T): T = {
withSQLConf(DeltaSQLConf.DELTA_KERNEL_STREAMING_ENABLED.key -> enabled.toString) {
f
}
}

// Helper: check for V2 in logical plan
def assertUsesV2(df: DataFrame): Unit = {
val plan = df.queryExecution.analyzed
val hasV2 = plan.collectFirst {
case StreamingRelationV2(_, _, table, _, _, _, _, _)
if table.table.isInstanceOf[SparkTable] =>
}.isDefined
assert(hasV2, s"Expected V2 with SparkTable:\n${plan.treeString}")
}

// Helper: check for V1 in logical plan
def assertUsesV1(df: DataFrame): Unit = {
val plan = df.queryExecution.analyzed
val hasV1 = plan.collectFirst {
case _: StreamingRelation =>
}.isDefined
assert(hasV1, s"Expected V1 StreamingRelation:\n${plan.treeString}")
}

test("catalog table logical plan uses V2 when enabled") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT, value STRING) USING delta")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")

withKernelStreaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")
assertUsesV2(streamDF)
}
}
}

test("catalog table execution with V2 produces correct results") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT, value STRING) USING delta")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c')")

withKernelStreaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")

val query = streamDF
.writeStream
.format("memory")
.queryName("output")
.start()

query.processAllAvailable()
query.stop()

checkAnswer(
spark.table("output"),
Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))
)
}
}
}

test("path-based table uses V1 even when config enabled") {
withTempDir { dir =>
spark.range(5).write.format("delta").save(dir.getCanonicalPath)

withKernelStreaming(enabled = true) {
val streamDF = spark.readStream.format("delta").load(dir.getCanonicalPath)
assertUsesV1(streamDF)
}
}
}

test("path-based table execution produces correct results") {
withTempDir { dir =>
spark.range(5).toDF("id").write.format("delta").save(dir.getCanonicalPath)

withKernelStreaming(enabled = true) {
val streamDF = spark.readStream.format("delta").load(dir.getCanonicalPath)

val query = streamDF
.writeStream
.format("memory")
.queryName("path_output")
.start()

query.processAllAvailable()
query.stop()

checkAnswer(
spark.table("path_output"),
(0 until 5).map(i => Row(i.toLong))
)
}
}
}

test("config disabled - catalog table uses V1") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT) USING delta")
sql("INSERT INTO test_table VALUES (1)")

withKernelStreaming(enabled = false) {
val streamDF = spark.readStream.table("test_table")
assertUsesV1(streamDF)
}
}
}

test("V1 fallback field is populated") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT) USING delta")
sql("INSERT INTO test_table VALUES (1)")

withKernelStreaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")
val plan = streamDF.queryExecution.analyzed

val v1Fallback = plan.collectFirst {
case StreamingRelationV2(_, _, _, _, _, _, _, v1Relation) => v1Relation
}

assert(v1Fallback.isDefined, "StreamingRelationV2 should exist")
assert(v1Fallback.get.isDefined, "v1Relation should be populated")
assert(v1Fallback.get.get.isInstanceOf[StreamingRelation],
"v1Relation should contain StreamingRelation")
}
}
}

test("multi-batch streaming with V2 processes correctly") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT, value STRING) USING delta")
sql("INSERT INTO test_table VALUES (1, 'a')")

withKernelStreaming(enabled = true) {
val streamDF = spark.readStream.table("test_table")

val query = streamDF
.writeStream
.format("memory")
.queryName("multi_batch")
.start()

query.processAllAvailable()

// Add more data while streaming
sql("INSERT INTO test_table VALUES (2, 'b'), (3, 'c')")
query.processAllAvailable()

query.stop()

checkAnswer(
spark.table("multi_batch"),
Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))
)
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2856,6 +2856,22 @@ trait DeltaSQLConfBase {
|When enabled, it's decided by a per-command flag.""".stripMargin)
.booleanConf
.createWithDefault(false)

///////////////////
// KERNEL STREAMING
///////////////////

val DELTA_KERNEL_STREAMING_ENABLED =
buildConf("kernel.streaming.enabled")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about sparkV2.streaming.enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to v2.streaming.enabled

.internal()
.doc(
"""When enabled, streaming queries will use the Kernel-based V2 (DSv2) implementation
|with MicroBatchStream support. Batch queries and write operations will continue to use
|the traditional V1 (DeltaLog-based) implementation. This allows gradual rollout of
|Kernel streaming while maintaining compatibility with existing operations.
|""".stripMargin)
.booleanConf
.createWithDefault(false)
}

object DeltaSQLConf extends DeltaSQLConfBase
Loading