Skip to content

Conversation

@zikangh
Copy link
Collaborator

@zikangh zikangh commented Dec 12, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR refactors SparkMicroBatchStream.filterDeltaLogs to use lazy iteration instead of eagerly loading all IndexedFile entries into memory.

The previous implementation loaded all file changes into memory before returning the iterator. With lazy loading, files are processed incrementally as the iterator is consumed, reducing memory pressure for large batches spanning many commits or files.

How was this patch tested?

Does this PR introduce any user-facing changes?

@zikangh zikangh changed the title Update SparkMicroBatchStream with lazy evaluation [kernel-spark] Implement lazy loading of delta log changes for dsv2 streaming Dec 12, 2025
@zikangh zikangh marked this pull request as ready for review December 12, 2025 18:54
@zikangh zikangh force-pushed the stack/lazy2 branch 2 times, most recently from 9dad057 to 45cb033 Compare December 12, 2025 19:24
huan233usc pushed a commit that referenced this pull request Dec 12, 2025
…r dsv2 streaming (#5572)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5572/files) to
review incremental changes.
- [**stack/integration**](#5572)
[[Files changed](https://github.com/delta-io/delta/pull/5572/files)]
- [stack/integration2](#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/6bae6e4ad461ef8ec4d5849c6ab70190b5c8e3c4..86dac519632a5acb21624b24b205d28d5bfb8e40)]
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/86dac519632a5acb21624b24b205d28d5bfb8e40..236b9425cddde4bf1a6155ff69666f3957b9bed2)]
- [stack/lazy](#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/236b9425cddde4bf1a6155ff69666f3957b9bed2..08f8fe0046d45ebd955d78fa4c6ea8042248d46d)]
- [stack/lazy2](#5686) [[Files
changed](https://github.com/delta-io/delta/pull/5686/files/08f8fe0046d45ebd955d78fa4c6ea8042248d46d..45cb03377364de0d170969cff445bb86068e8557)]
- [stack/snapshot](#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/45cb03377364de0d170969cff445bb86068e8557..bf0efa8312401fcee2e5fcc5301cf14df264a5c9)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description


Implement `deserializeOffset`, `commit`, and `stop` methods in
`SparkMicroBatchStream` to complete the Spark DSv2 streaming API
contract.

1. **`deserializeOffset(json)`** - Called when resuming from a
checkpoint.
2. **`commit(end)`** - Called after each micro-batch completes
successfully.
  3. **`stop()`** - Called during query shutdown.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?

Unit tests

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

No.

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant