Skip to content

Conversation

@zikangh
Copy link
Collaborator

@zikangh zikangh commented Dec 8, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR refactors SparkMicroBatchStream.filterDeltaLogs to use lazy iteration instead of eagerly loading all IndexedFile entries into memory.

Added flatMap to CloseableIterator: A new method that transforms each element into an iterator and flattens the results, enabling lazy processing of nested iterables.

The previous implementation loaded all file changes into memory before returning the iterator. With lazy loading, files are processed incrementally as the iterator is consumed, reducing memory pressure for large batches spanning many commits or files.

How was this patch tested?

Existing unit tests for streaming functionality.

Does this PR introduce any user-facing changes?

No

@zikangh zikangh force-pushed the stack/lazy branch 3 times, most recently from 767b3f4 to 0703f57 Compare December 9, 2025 00:20
gengliangwang pushed a commit that referenced this pull request Dec 9, 2025
…for dsv2 streaming (#5499)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5499/files) to
review incremental changes.
- [**stack/plan1**](#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files)]
- [stack/integration](#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)]
- [stack/integration2](#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)]
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)]
- [stack/lazy](#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)]
- [stack/snapshot](#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
In this PR, we implement the DSv2 methods `planInputPartitions()` and
`createReaderFactory()`.
These are DSv2-only API that will replace DSv1's `getBatch()`

`planInputPartitions()`: Returns physical partitions describing how to
read the data; Called once per micro-batch during planning
`createReaderFactory()`: Returns a factory that creates readers for the
partitions; Each executor uses this factory to read its assigned
partitions


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh force-pushed the stack/lazy branch 3 times, most recently from 3255888 to 61aed87 Compare December 10, 2025 03:02
TimothyW553 pushed a commit to TimothyW553/delta that referenced this pull request Dec 10, 2025
…for dsv2 streaming (delta-io#5499)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5499/files) to
review incremental changes.
- [**stack/plan1**](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)]
- [stack/integration2](delta-io#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)]
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)]
- [stack/lazy](delta-io#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)]
- [stack/snapshot](delta-io#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
In this PR, we implement the DSv2 methods `planInputPartitions()` and
`createReaderFactory()`.
These are DSv2-only API that will replace DSv1's `getBatch()`

`planInputPartitions()`: Returns physical partitions describing how to
read the data; Called once per micro-batch during planning
`createReaderFactory()`: Returns a factory that creates readers for the
partitions; Each executor uses this factory to read its assigned
partitions


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh changed the title lazy iteration [kernel-spark] Implement lazy iteration for streaming file changes for dsv2 streaming Dec 10, 2025
@zikangh zikangh force-pushed the stack/lazy branch 3 times, most recently from f2f4c31 to 38fc2d0 Compare December 11, 2025 20:11
@zikangh zikangh marked this pull request as ready for review December 11, 2025 22:59
* @param <U> The type of elements in the resulting iterator
* @return A flattened {@link CloseableIterator} over all elements from all inner iterators
*/
default <U> CloseableIterator<U> flatMap(Function<T, CloseableIterator<U>> mapper) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you separate kernel changes in an isolated PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@zikangh zikangh changed the title [kernel-spark] Implement lazy iteration for streaming file changes for dsv2 streaming [kernel-spark] Add flatMap to CloseableIterator.java to support lazy loading of delta log changes. Dec 12, 2025
@zikangh zikangh removed the request for review from gengliangwang December 12, 2025 19:24
huan233usc pushed a commit that referenced this pull request Dec 12, 2025
…r dsv2 streaming (#5572)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5572/files) to
review incremental changes.
- [**stack/integration**](#5572)
[[Files changed](https://github.com/delta-io/delta/pull/5572/files)]
- [stack/integration2](#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/6bae6e4ad461ef8ec4d5849c6ab70190b5c8e3c4..86dac519632a5acb21624b24b205d28d5bfb8e40)]
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/86dac519632a5acb21624b24b205d28d5bfb8e40..236b9425cddde4bf1a6155ff69666f3957b9bed2)]
- [stack/lazy](#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/236b9425cddde4bf1a6155ff69666f3957b9bed2..08f8fe0046d45ebd955d78fa4c6ea8042248d46d)]
- [stack/lazy2](#5686) [[Files
changed](https://github.com/delta-io/delta/pull/5686/files/08f8fe0046d45ebd955d78fa4c6ea8042248d46d..45cb03377364de0d170969cff445bb86068e8557)]
- [stack/snapshot](#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/45cb03377364de0d170969cff445bb86068e8557..bf0efa8312401fcee2e5fcc5301cf14df264a5c9)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description


Implement `deserializeOffset`, `commit`, and `stop` methods in
`SparkMicroBatchStream` to complete the Spark DSv2 streaming API
contract.

1. **`deserializeOffset(json)`** - Called when resuming from a
checkpoint.
2. **`commit(end)`** - Called after each micro-batch completes
successfully.
  3. **`stop()`** - Called during query shutdown.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?

Unit tests

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

No.

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants