-
Notifications
You must be signed in to change notification settings - Fork 2k
[kernel-spark] Implement initialOffset() for dsv2 streaming #5498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hello @huan233usc @gengliangwang @tdas @jerrypeng, could you please take a look at this PR? Thanks! |
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
huan233usc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
195a309 to
3763efa
Compare
| public Offset latestOffset(Offset startOffset, ReadLimit limit) { | ||
| // For the first batch, initialOffset() should be called before latestOffset(). | ||
| // if startOffset is null: no data is available to read. | ||
| if (startOffset == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition should never happen for DSv2 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should just assert as a sanity check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could happen:
- we need a way for initialOffset() to indicate "there's no data to read".
- latestOffset() will be called even if initialOffset() returns null: https://github.com/apache/spark/blob/27a4849834406a5bbfb0a0b11ea8b725936baef6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala#L773
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a way for initialOffset() to indicate "there's no data to read".
Why would initialOffset need to indicate that?
latestOffset() will be called even if initialOffset() returns null
Why do initialOffset need initialOffset to return null. I don't understand the use case. Pls reference what Kafka v2 source:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Please take a look at the modified code.
latestOffset()might return null if there's nothing to read (e.g. table empty).- DSv2 APIs don't explicitly mention whether initialOffset() can return null, but I'm now in agreement with you that we should not return null.
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
Show resolved
Hide resolved
kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java
Show resolved
Hide resolved
05dc041 to
b777299
Compare
f288551 to
3c2826a
Compare
| } | ||
|
|
||
| if (version < 0) { | ||
| // This shouldn't happen; defensively return null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is this valid? The starting version shouldn't be less than 0. That is not a valid starting version for delta.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this shouldn't happen -- we validate startingVersion in DeltaOptions.scala before we reach this code. I'm mirroring the logic here in DSv1:
| if (version < 0) { |
3c2826a to
9833eaf
Compare
…treaming (#5409) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5409/files) to review incremental changes. - [**stack/latestsnapshot2**](#5409) [[Files changed](https://github.com/delta-io/delta/pull/5409/files)] - [stack/initialoffset2](#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files/1718356813a6b39c80585d36e7aac6c8abc3a6a0..9833eaf816ee2f1dcf94d5d9a47136e69fd26336)] - [stack/plan1](#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/9833eaf816ee2f1dcf94d5d9a47136e69fd26336..90345c732c6bd182c51648a4b875fdce2c14fc63)] - [stack/integration](#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/90345c732c6bd182c51648a4b875fdce2c14fc63..813a49a41719ef4b773caf5438c975c8f77c646b)] - stack/reader --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We add implementation for `latestOffset(startOffset, limit)` and `getDefaultReadLimit()` for a complete `SupportsAdmissionControl` implementation. Also refactored a few `DeltaSource.scala` methods -- we make them static so we can call them from SparkMicrobatchStream.java. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No --------- Signed-off-by: TimothyW553 <[email protected]> Signed-off-by: Timothy Wang <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: Timothy Wang <[email protected]>
…treaming (delta-io#5409) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5409/files) to review incremental changes. - [**stack/latestsnapshot2**](delta-io#5409) [[Files changed](https://github.com/delta-io/delta/pull/5409/files)] - [stack/initialoffset2](delta-io#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files/1718356813a6b39c80585d36e7aac6c8abc3a6a0..9833eaf816ee2f1dcf94d5d9a47136e69fd26336)] - [stack/plan1](delta-io#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/9833eaf816ee2f1dcf94d5d9a47136e69fd26336..90345c732c6bd182c51648a4b875fdce2c14fc63)] - [stack/integration](delta-io#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/90345c732c6bd182c51648a4b875fdce2c14fc63..813a49a41719ef4b773caf5438c975c8f77c646b)] - stack/reader --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We add implementation for `latestOffset(startOffset, limit)` and `getDefaultReadLimit()` for a complete `SupportsAdmissionControl` implementation. Also refactored a few `DeltaSource.scala` methods -- we make them static so we can call them from SparkMicrobatchStream.java. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No --------- Signed-off-by: TimothyW553 <[email protected]> Signed-off-by: Timothy Wang <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: Timothy Wang <[email protected]>
9833eaf to
8087d6a
Compare
|
@zikangh could you resolve the conflicts? |
8087d6a to
63662ce
Compare
63662ce to
e1adc7c
Compare
| private final boolean shouldValidateOffsets; | ||
| private final SparkSession spark; | ||
|
|
||
| // Tracks whether this is the first batch for this stream (no checkpointed offset). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you write down the assumptions around this. that this boolean is used with the assumption that the following sequence of methods will be called - initialOffset -> latestOffset -> ... and then set to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| @ParameterizedTest | ||
| @MethodSource("initialOffsetParameters") | ||
| public void testInitialOffset_FirstBatchParity( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the write way to test this stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically... there are other existing tests that already provide test coverage for this.. right? is it that we are unable to run those tests because the streaming source v2 is incomplete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have a lot of end-to-end tests that we can enable once we have all the requisite pieces. This is a future PR that enables some of these tests: https://github.com/delta-io/delta/pull/5572/files/154897c75c21697300bd31e851b04147339ce466..f6980981137c5943fc590f0b46c70557adb4d161#diff-5b8b5b3f181cbc43ecdeffe4e814641b3e78801fb46d6aaf74a6b2928ba64791
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes sense. minor questions but LGTM
89b26e6 to
4e9c219
Compare
4e9c219 to
90e1d9b
Compare
…#5498) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5498/files) to review incremental changes. - [**stack/initialoffset2**](delta-io#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files)] - [stack/plan1](delta-io#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] - [stack/integration](delta-io#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)] - stack/snapshot1 - [stack/reader](delta-io#5638) [[Files changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We finish implementing `initialOffset()` in `SparkMicroBatchStream.java`. The `initialOffset()` method determines where a streaming query should start reading when there's no checkpointed offset. This is a DSv2-only API. Details: - Added `isFirstBatch` tracking field - Boolean flag to track whether we're processing the first batch (set to true in initialOffset()) - Updated `latestOffset(startOffset, limit)` - Now handles first batch differently by returning null (not `previousOffset`) when no data is available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion` behavior ## How was this patch tested? Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
…#5498) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5498/files) to review incremental changes. - [**stack/initialoffset2**](delta-io#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files)] - [stack/plan1](delta-io#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] - [stack/integration](delta-io#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)] - stack/snapshot1 - [stack/reader](delta-io#5638) [[Files changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We finish implementing `initialOffset()` in `SparkMicroBatchStream.java`. The `initialOffset()` method determines where a streaming query should start reading when there's no checkpointed offset. This is a DSv2-only API. Details: - Added `isFirstBatch` tracking field - Boolean flag to track whether we're processing the first batch (set to true in initialOffset()) - Updated `latestOffset(startOffset, limit)` - Now handles first batch differently by returning null (not `previousOffset`) when no data is available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion` behavior ## How was this patch tested? Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
We finish implementing
initialOffset()inSparkMicroBatchStream.java.The
initialOffset()method determines where a streaming query should start reading when there's no checkpointed offset. This is a DSv2-only API.Details:
isFirstBatchtracking field - Boolean flag to track whether we're processing the first batch (set to true in initialOffset())latestOffset(startOffset, limit)- Now handles first batch differently by returning null (notpreviousOffset) when no data is available, matching DSv1'sgetStartingOffsetFromSpecificDeltaVersionbehaviorHow was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?