Skip to content

Conversation

@zikangh
Copy link
Collaborator

@zikangh zikangh commented Nov 24, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Implement deserializeOffset, commit, and stop methods in SparkMicroBatchStream to complete the Spark DSv2 streaming API contract.

  1. deserializeOffset(json) - Called when resuming from a checkpoint.
  2. commit(end) - Called after each micro-batch completes successfully.
  3. stop() - Called during query shutdown.

How was this patch tested?

Unit tests

Does this PR introduce any user-facing changes?

No.

@zikangh zikangh changed the title integration tests [WIP] DSv2 streaming integration tests Nov 24, 2025
@zikangh zikangh force-pushed the stack/integration branch 9 times, most recently from 81e998b to 813a49a Compare December 1, 2025 19:11
huan233usc pushed a commit that referenced this pull request Dec 1, 2025
…treaming (#5409)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5409/files) to
review incremental changes.
-
[**stack/latestsnapshot2**](#5409)
[[Files changed](https://github.com/delta-io/delta/pull/5409/files)]
- [stack/initialoffset2](#5498)
[[Files
changed](https://github.com/delta-io/delta/pull/5498/files/1718356813a6b39c80585d36e7aac6c8abc3a6a0..9833eaf816ee2f1dcf94d5d9a47136e69fd26336)]
- [stack/plan1](#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/9833eaf816ee2f1dcf94d5d9a47136e69fd26336..90345c732c6bd182c51648a4b875fdce2c14fc63)]
- [stack/integration](#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/90345c732c6bd182c51648a4b875fdce2c14fc63..813a49a41719ef4b773caf5438c975c8f77c646b)]
      - stack/reader

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We add implementation for `latestOffset(startOffset, limit)` and
`getDefaultReadLimit()` for a complete `SupportsAdmissionControl`
implementation.
Also refactored a few `DeltaSource.scala` methods -- we make them static
so we can call them from SparkMicrobatchStream.java.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).



## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No

---------

Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Co-authored-by: Claude <[email protected]>
Co-authored-by: Timothy Wang <[email protected]>
TimothyW553 added a commit to TimothyW553/delta that referenced this pull request Dec 2, 2025
…treaming (delta-io#5409)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5409/files) to
review incremental changes.
-
[**stack/latestsnapshot2**](delta-io#5409)
[[Files changed](https://github.com/delta-io/delta/pull/5409/files)]
- [stack/initialoffset2](delta-io#5498)
[[Files
changed](https://github.com/delta-io/delta/pull/5498/files/1718356813a6b39c80585d36e7aac6c8abc3a6a0..9833eaf816ee2f1dcf94d5d9a47136e69fd26336)]
- [stack/plan1](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/9833eaf816ee2f1dcf94d5d9a47136e69fd26336..90345c732c6bd182c51648a4b875fdce2c14fc63)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/90345c732c6bd182c51648a4b875fdce2c14fc63..813a49a41719ef4b773caf5438c975c8f77c646b)]
      - stack/reader

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We add implementation for `latestOffset(startOffset, limit)` and
`getDefaultReadLimit()` for a complete `SupportsAdmissionControl`
implementation.
Also refactored a few `DeltaSource.scala` methods -- we make them static
so we can call them from SparkMicrobatchStream.java.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).



## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No

---------

Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Co-authored-by: Claude <[email protected]>
Co-authored-by: Timothy Wang <[email protected]>
@zikangh zikangh force-pushed the stack/integration branch 2 times, most recently from a9e8d19 to 7012800 Compare December 4, 2025 21:48
@zikangh zikangh force-pushed the stack/integration branch 5 times, most recently from f698098 to 9c2e743 Compare December 5, 2025 00:59
huan233usc pushed a commit that referenced this pull request Dec 5, 2025
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5498/files) to
review incremental changes.
-
[**stack/initialoffset2**](#5498)
[[Files changed](https://github.com/delta-io/delta/pull/5498/files)]
- [stack/plan1](#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]
- [stack/integration](#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)]
      - stack/snapshot1
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We finish implementing `initialOffset()` in
`SparkMicroBatchStream.java`.

The `initialOffset()` method determines where a streaming query should
start reading when there's no checkpointed offset. This is a DSv2-only
API.

Details: 

- Added `isFirstBatch` tracking field - Boolean flag to track whether
we're processing the first batch (set to true in initialOffset())
- Updated `latestOffset(startOffset, limit)` - Now handles first batch
differently by returning null (not `previousOffset`) when no data is
available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion`
behavior


## How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh force-pushed the stack/integration branch 4 times, most recently from c1795c0 to ee00e4c Compare December 8, 2025 18:27
@zikangh zikangh changed the title [WIP] DSv2 streaming integration tests [kernel-spark] Implement deserializeOffset(), commit(), and stop() for dsv2 streaming Dec 8, 2025
@zikangh zikangh marked this pull request as ready for review December 8, 2025 19:41
harperjiang pushed a commit to harperjiang/delta that referenced this pull request Dec 8, 2025
…#5498)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5498/files) to
review incremental changes.
-
[**stack/initialoffset2**](delta-io#5498)
[[Files changed](https://github.com/delta-io/delta/pull/5498/files)]
- [stack/plan1](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)]
      - stack/snapshot1
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We finish implementing `initialOffset()` in
`SparkMicroBatchStream.java`.

The `initialOffset()` method determines where a streaming query should
start reading when there's no checkpointed offset. This is a DSv2-only
API.

Details: 

- Added `isFirstBatch` tracking field - Boolean flag to track whether
we're processing the first batch (set to true in initialOffset())
- Updated `latestOffset(startOffset, limit)` - Now handles first batch
differently by returning null (not `previousOffset`) when no data is
available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion`
behavior


## How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh force-pushed the stack/integration branch 2 times, most recently from cb77bed to 3214326 Compare December 9, 2025 00:20
gengliangwang pushed a commit that referenced this pull request Dec 9, 2025
…for dsv2 streaming (#5499)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5499/files) to
review incremental changes.
- [**stack/plan1**](#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files)]
- [stack/integration](#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)]
- [stack/integration2](#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)]
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)]
- [stack/lazy](#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)]
- [stack/snapshot](#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
In this PR, we implement the DSv2 methods `planInputPartitions()` and
`createReaderFactory()`.
These are DSv2-only API that will replace DSv1's `getBatch()`

`planInputPartitions()`: Returns physical partitions describing how to
read the data; Called once per micro-batch during planning
`createReaderFactory()`: Returns a factory that creates readers for the
partitions; Each executor uses this factory to read its assigned
partitions


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
RuntimeException exception =
assertThrows(RuntimeException.class, () -> stream.deserializeOffset(json));

assertTrue(exception.getMessage().contains("unexpected Delta table"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the full error message if it is deterministic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added checks for a fuller error message.

/* reservoirVersion= */ 1L,
/* index= */ 0L,
/* isInitialSnapshot= */ false);
String json = org.apache.spark.sql.delta.util.JsonUtils.mapper().writeValueAsString(offset);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import org.apache.spark.sql.delta.util.JsonUtils?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator

@huan233usc huan233usc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with some nits

@zikangh zikangh force-pushed the stack/integration branch 2 times, most recently from 1bff91d to e57e3e8 Compare December 10, 2025 03:02
@zikangh zikangh requested a review from jerrypeng December 10, 2025 03:21
TimothyW553 pushed a commit to TimothyW553/delta that referenced this pull request Dec 10, 2025
…#5498)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5498/files) to
review incremental changes.
-
[**stack/initialoffset2**](delta-io#5498)
[[Files changed](https://github.com/delta-io/delta/pull/5498/files)]
- [stack/plan1](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)]
      - stack/snapshot1
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We finish implementing `initialOffset()` in
`SparkMicroBatchStream.java`.

The `initialOffset()` method determines where a streaming query should
start reading when there's no checkpointed offset. This is a DSv2-only
API.

Details: 

- Added `isFirstBatch` tracking field - Boolean flag to track whether
we're processing the first batch (set to true in initialOffset())
- Updated `latestOffset(startOffset, limit)` - Now handles first batch
differently by returning null (not `previousOffset`) when no data is
available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion`
behavior


## How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
TimothyW553 pushed a commit to TimothyW553/delta that referenced this pull request Dec 10, 2025
…for dsv2 streaming (delta-io#5499)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5499/files) to
review incremental changes.
- [**stack/plan1**](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)]
- [stack/integration2](delta-io#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)]
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)]
- [stack/lazy](delta-io#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)]
- [stack/snapshot](delta-io#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
In this PR, we implement the DSv2 methods `planInputPartitions()` and
`createReaderFactory()`.
These are DSv2-only API that will replace DSv1's `getBatch()`

`planInputPartitions()`: Returns physical partitions describing how to
read the data; Called once per micro-batch during planning
`createReaderFactory()`: Returns a factory that creates readers for the
partitions; Each executor uses this factory to read its assigned
partitions


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh requested a review from tdas December 10, 2025 23:48
@huan233usc huan233usc merged commit bd02c24 into delta-io:master Dec 12, 2025
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants