-
Notifications
You must be signed in to change notification settings - Fork 2k
[kernel-spark] Implement latestOffset() with rate limiting for dsv2 streaming #5409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f602eff to
09ebcbb
Compare
|
Hi @huan233usc, could you please add @huan233usc, @gengliangwang, @jerrypeng, @tdas to the reviewers list? |
| this.streamingHelper = new StreamingHelper(tablePath, hadoopConf); | ||
|
|
||
| // Initialize snapshot at source init to get table ID, similar to DeltaSource.scala | ||
| this.snapshotAtSourceInit = TableManager.loadSnapshot(tablePath).build(engine); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call streamingHelper.loadLatestSnapshot(), underlying it will call TableManager.loadSnapshot(tablePath).build(engine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| @Override | ||
| public Offset latestOffset(Offset startOffset, ReadLimit limit) { | ||
| // For the first batch, initialOffset() should be called before latestOffset(). | ||
| // if startOffset is null: no data is available to read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there by a case that startOffset is set to null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark will call initialOffset first to obtain this startOffset for batch 0 -- so it could be null when initialOffset() returns null (i.e. table has no data).
| // TODO(#5318): Check read-incompatible schema changes during stream start | ||
| IndexedFile lastFile = lastFileChange.get(); | ||
| return ScalaUtils.toJavaOptional( | ||
| DeltaSource.buildOffsetFromIndexedFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeltaSource.buildOffsetFromIndexedFile seems to always return an offset although it returns Option[Offset].
Can we update the signature for DeltaSource.buildOffsetFromIndexedFile? I just want to minimize the interaction with null if it is not really necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
797770c to
8298d06
Compare
8298d06 to
df5e0f1
Compare
8d106d8 to
029a596
Compare
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Show resolved
Hide resolved
| @Override | ||
| public Offset latestOffset() { | ||
| throw new UnsupportedOperationException("latestOffset is not supported"); | ||
| // TODO(#5318): Implement latestOffset with proper start offset and limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just throw InvalidStateException with error msg "latestOffset() should not be called - use latestOffset(Offset, ReadLimit) instead"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| CommitRange commitRange; | ||
| try { | ||
| commitRange = builder.build(engine); | ||
| } catch (io.delta.kernel.exceptions.KernelException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| * | ||
| * @throws RuntimeException if an IOException occurs while closing the iterator | ||
| */ | ||
| public static <T> Optional<T> iteratorLast(CloseableIterator<T> iterator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a simple unit test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just re-use the impl in DeltaSource?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't do that unfortunately -- the kernel & deltaLog CloseableIterator are different classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is specific to streaming in this method? isnt there IteratorUtils where this method is better suited?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this to CloseableIterator directly. It seemed like the best option; second best option might be kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/Utils.java. could you take another look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe define a helper method in kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/Utils.java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Utils.java does seem better because I don't think it's widely-applicable enough to be in CloseableIterator.
huan233usc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM -- with some nits to resolve
bfb2188 to
de6b22f
Compare
|
|
||
| override def toString(): String = s"DeltaSource[${deltaLog.dataPath}]" | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to move this code? Did any of the logic change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to move object AdmissionLimits to object DeltaSource to make it static. getStartingVersion shouldn't change.
jerrypeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor comments. Generally looks good to me!
de6b22f to
d69e1e1
Compare
| } | ||
|
|
||
| /** | ||
| * Get the version from a batch. Assumes all rows in the batch have the same version, so it reads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from a batch of what?
not something in this PR, but this is a general comment for doing better documentation along the way. @zikangh @huan233usc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good.
56f28fb to
8f30033
Compare
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Use this [link](https://github.com/delta-io/delta/pull/5477/files) to review incremental changes. - [**catalogtableutils-ccv2**](delta-io#5477) [[Files changed](https://github.com/delta-io/delta/pull/5477/files)] - [stack/ccv2-catalog-config](delta-io#5520) [[Files changed](https://github.com/delta-io/delta/pull/5520/files/6359268dbee8d1a114e3f66620c6585bc0bdb6eb..4a1d8fa93e56d68b5971fb32970bdeaa5799abdc)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) This PR adds utils for CatalogTable, Scala, and for catalogtable testing. In particular, CatalogTableUtils is used for determining if a table is managed/owned by UC -- which will determine the source of truth for operations. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> - tested locally via `build/sbt -DsparkVersion=master "++ 2.13.16" clean sparkV2/test` - passing CI tests <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> No. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> --------- Signed-off-by: TimothyW553 <[email protected]> Signed-off-by: Timothy Wang <[email protected]>
de0015d to
1718356
Compare
…treaming (delta-io#5409) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5409/files) to review incremental changes. - [**stack/latestsnapshot2**](delta-io#5409) [[Files changed](https://github.com/delta-io/delta/pull/5409/files)] - [stack/initialoffset2](delta-io#5498) [[Files changed](https://github.com/delta-io/delta/pull/5498/files/1718356813a6b39c80585d36e7aac6c8abc3a6a0..9833eaf816ee2f1dcf94d5d9a47136e69fd26336)] - [stack/plan1](delta-io#5499) [[Files changed](https://github.com/delta-io/delta/pull/5499/files/9833eaf816ee2f1dcf94d5d9a47136e69fd26336..90345c732c6bd182c51648a4b875fdce2c14fc63)] - [stack/integration](delta-io#5572) [[Files changed](https://github.com/delta-io/delta/pull/5572/files/90345c732c6bd182c51648a4b875fdce2c14fc63..813a49a41719ef4b773caf5438c975c8f77c646b)] - stack/reader --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> We add implementation for `latestOffset(startOffset, limit)` and `getDefaultReadLimit()` for a complete `SupportsAdmissionControl` implementation. Also refactored a few `DeltaSource.scala` methods -- we make them static so we can call them from SparkMicrobatchStream.java. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream). ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No --------- Signed-off-by: TimothyW553 <[email protected]> Signed-off-by: Timothy Wang <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: Timothy Wang <[email protected]>
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
We add implementation for
latestOffset(startOffset, limit)andgetDefaultReadLimit()for a completeSupportsAdmissionControlimplementation.Also refactored a few
DeltaSource.scalamethods -- we make them static so we can call them from SparkMicrobatchStream.java.How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?
No