-
Notifications
You must be signed in to change notification settings - Fork 2k
[kernel-spark] Implement 2-pass delta commit validation algorithm for dsv2 streaming #5638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
5de8d26 to
35731eb
Compare
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5498/files) to review incremental changes. - [**stack/initialoffset2**](#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files)] - [stack/plan1](#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] - [stack/integration](#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)] - stack/snapshot1 - [stack/reader](#5638) [[Files changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We finish implementing `initialOffset()` in `SparkMicroBatchStream.java`. The `initialOffset()` method determines where a streaming query should start reading when there's no checkpointed offset. This is a DSv2-only API. Details: - Added `isFirstBatch` tracking field - Boolean flag to track whether we're processing the first batch (set to true in initialOffset()) - Updated `latestOffset(startOffset, limit)` - Now handles first batch differently by returning null (not `previousOffset`) when no data is available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion` behavior ## How was this patch tested? Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
c1795c0 to
bea5bee
Compare
…#5498) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5498/files) to review incremental changes. - [**stack/initialoffset2**](delta-io#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files)] - [stack/plan1](delta-io#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] - [stack/integration](delta-io#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)] - stack/snapshot1 - [stack/reader](delta-io#5638) [[Files changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We finish implementing `initialOffset()` in `SparkMicroBatchStream.java`. The `initialOffset()` method determines where a streaming query should start reading when there's no checkpointed offset. This is a DSv2-only API. Details: - Added `isFirstBatch` tracking field - Boolean flag to track whether we're processing the first batch (set to true in initialOffset()) - Updated `latestOffset(startOffset, limit)` - Now handles first batch differently by returning null (not `previousOffset`) when no data is available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion` behavior ## How was this patch tested? Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
9ff3edd to
c8f2557
Compare
…for dsv2 streaming (#5499) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5499/files) to review incremental changes. - [**stack/plan1**](#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files)] - [stack/integration](#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)] - [stack/integration2](#5652) [[Files changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)] - [stack/reader](#5638) [[Files changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)] - [stack/lazy](#5650) [[Files changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)] - [stack/snapshot](#5651) [[Files changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> In this PR, we implement the DSv2 methods `planInputPartitions()` and `createReaderFactory()`. These are DSv2-only API that will replace DSv1's `getBatch()` `planInputPartitions()`: Returns physical partitions describing how to read the data; Called once per micro-batch during planning `createReaderFactory()`: Returns a factory that creates readers for the partitions; Each executor uses this factory to read its assigned partitions ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
7703f4a to
46d1634
Compare
46d1634 to
80e2af7
Compare
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Show resolved
Hide resolved
| output.add(new IndexedFile(version, DeltaSourceOffset.BASE_INDEX(), /* addFile= */ null)); | ||
| // TODO(#5319): implement getMetadataOrProtocolChangeIndexedFileIterator. | ||
| long index = 0; | ||
| try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using try-with-resource which automatically closes resources: https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
| if (removeOpt.isPresent()) { | ||
| RemoveFile removeFile = removeOpt.get(); | ||
| throw (RuntimeException) | ||
| DeltaErrors.deltaSourceIgnoreDeleteError(version, removeFile.getPath(), tablePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test case for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, let's follow the error message framework and add error class/state in https://github.com/delta-io/delta/blob/master/spark/src/main/resources/error/delta-error-classes.json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see testGetFileChanges_OnRemoveFile_throwError
delta/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java
Line 548 in 3a1eb6c
| public void testGetFileChanges_onRemoveFile_throwError( |
Re: error message framework, we are already using it for this error:
| "DELTA_SOURCE_IGNORE_DELETE" : { |
Going through this file, I did see cases where we are not using the error framework. I left a TODO and will fix that soon.
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Show resolved
Hide resolved
| throw new RuntimeException("Failed to extract files from commit at version " + version, e); | ||
| } | ||
| // Add END sentinel after data files. | ||
| output.add(new IndexedFile(version, DeltaSourceOffset.END_INDEX(), /* addFile= */ null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could think about how to avoid doing iterator -> list -> iterator but just assemble the iterator in the future(in a separate pr)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next PR implements proper lazy iterating which gets rid of this arraylist.
| * @param actionSet the set of actions to read (e.g., ADD, REMOVE) | ||
| * @return an iterator over columnar batches containing the requested actions | ||
| */ | ||
| public static CloseableIterator<ColumnarBatch> getActionsFromRangeUnsafe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this one if not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
…#5498) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5498/files) to review incremental changes. - [**stack/initialoffset2**](delta-io#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files)] - [stack/plan1](delta-io#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] - [stack/integration](delta-io#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)] - stack/snapshot1 - [stack/reader](delta-io#5638) [[Files changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We finish implementing `initialOffset()` in `SparkMicroBatchStream.java`. The `initialOffset()` method determines where a streaming query should start reading when there's no checkpointed offset. This is a DSv2-only API. Details: - Added `isFirstBatch` tracking field - Boolean flag to track whether we're processing the first batch (set to true in initialOffset()) - Updated `latestOffset(startOffset, limit)` - Now handles first batch differently by returning null (not `previousOffset`) when no data is available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion` behavior ## How was this patch tested? Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
…for dsv2 streaming (delta-io#5499) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5499/files) to review incremental changes. - [**stack/plan1**](delta-io#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files)] - [stack/integration](delta-io#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)] - [stack/integration2](delta-io#5652) [[Files changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)] - [stack/reader](delta-io#5638) [[Files changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)] - [stack/lazy](delta-io#5650) [[Files changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)] - [stack/snapshot](delta-io#5651) [[Files changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> In this PR, we implement the DSv2 methods `planInputPartitions()` and `createReaderFactory()`. These are DSv2-only API that will replace DSv1's `getBatch()` `planInputPartitions()`: Returns physical partitions describing how to read the data; Called once per micro-batch during planning `createReaderFactory()`: Returns a factory that creates readers for the partitions; Each executor uses this factory to read its assigned partitions ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
80e2af7 to
b7f7730
Compare
5f1ba32 to
1ba5416
Compare
1ba5416 to
9d47ae9
Compare
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
Refactors commit processing in
SparkMicroBatchStream.filterDeltaLogsto use a proper two-pass algorithm; uses the new getCommitActionsFromRangeUnsafe API.dataChange=truedataChange=trueThis provides all-or-nothing semantics and removes the previous assumption that REMOVE actions precede ADD actions within a commit.
How was this patch tested?
Existing tests in
SparkMicroBatchStreamTest.Does this PR introduce any user-facing changes?
No.