Skip to content

Conversation

@vitaliili-db
Copy link
Contributor

This implements Option A from the design doc: using an analyzer rule to replace V1 (DeltaTableV2) with V2 (SparkTable) for streaming reads only.

Key changes:

  • Add UseKernelForStreamingRule analyzer rule in spark-unified module
  • Rule pattern matches on StreamingRelationV2 to isolate streaming reads
  • Add DELTA_KERNEL_STREAMING_ENABLED config flag (default: false)
  • Register rule in DeltaSparkSessionExtension

Behavior:

  • Streaming reads (readStream) → V2 (Kernel-based, MicroBatchStream)
  • Streaming writes (writeStream) → V1 (DeltaLog-based)
  • Batch reads/writes → V1 (DeltaLog-based)
  • MERGE/UPDATE/DELETE → V1 (DeltaLog-based)

This approach:

  • Requires zero user code changes
  • Works with existing V1 and V2 implementations unchanged
  • Enables gradual rollout via configuration flag
  • Provides graceful fallback on errors

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

How was this patch tested?

Does this PR introduce any user-facing changes?

This implements Option A from the design doc: using an analyzer rule
to replace V1 (DeltaTableV2) with V2 (SparkTable) for streaming reads only.

Key changes:
- Add UseKernelForStreamingRule analyzer rule in spark-unified module
- Rule pattern matches on StreamingRelationV2 to isolate streaming reads
- Add DELTA_KERNEL_STREAMING_ENABLED config flag (default: false)
- Register rule in DeltaSparkSessionExtension

Behavior:
- Streaming reads (readStream) → V2 (Kernel-based, MicroBatchStream)
- Streaming writes (writeStream) → V1 (DeltaLog-based)
- Batch reads/writes → V1 (DeltaLog-based)
- MERGE/UPDATE/DELETE → V1 (DeltaLog-based)

This approach:
- Requires zero user code changes
- Works with existing V1 and V2 implementations unchanged
- Enables gradual rollout via configuration flag
- Provides graceful fallback on errors
// Register the analyzer rule for kernel-based streaming
// This rule replaces V1 (DeltaTableV2) with V2 (SparkTable) for streaming queries
extensions.injectResolutionRule { session =>
new UseKernelForStreamingRule(session)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kernel-spark will be named as sparkV2. So let's rename the rule as UseV2ForStreaming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

///////////////////

val DELTA_KERNEL_STREAMING_ENABLED =
buildConf("kernel.streaming.enabled")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about sparkV2.streaming.enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to v2.streaming.enabled


test("catalog table logical plan uses V2 when enabled") {
withTable("test_table") {
sql("CREATE TABLE test_table (id INT, value STRING) USING delta")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's have a beforeAll and afterAll action for the test creatation and drop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think withTable is cleaner?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we dont need to create table and insert table in every test case. Anyway this is nit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this table properties? delta.feature.catalogOwned-preview=supported

}
}

test("path-based table uses V1 even when config enabled") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to test the V2 streaming code path with path-based table as well.
So probably we should create a new config for testing only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how this works, SparkTable requires Identifier as a constructor parameter. Unless we want to use a random generated identifier?

Copy link
Collaborator

@gengliangwang gengliangwang Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta.`tablePath`

* Check if the DataSource is a catalog-managed Delta table.
* We only convert catalog-managed tables to V2, not path-based tables.
*/
private def isCatalogManagedDeltaTable(dataSource: DataSource): Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5477 will introduce utils to check is a table is a ccv2 table or uc ccv2 table. Maybe try to use/ patch it for testing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants