-
Notifications
You must be signed in to change notification settings - Fork 2k
[kernel-spark] Implement availableNow trigger support for dsv2 streaming #5585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| } | ||
|
|
||
| DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset); | ||
| protected Optional<DeltaSourceOffset> latestOffsetInternal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need a new method here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new method would be better because our logic operates on DeltaSourceOffset. We don't want to retrieve Offset each time and then convert it to DeltaSourceOffset
| return null; | ||
| } | ||
| // TODO(#5318): init trigger available now support | ||
| Optional<DeltaSourceOffset> deltaStartOffset = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to wrap the start offset with Optional? I think it makes subsequent code less readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the optional wrapper
|
|
||
| @ParameterizedTest | ||
| @MethodSource("availableNowParameters") | ||
| public void testAvailableNow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AvailableNow determines a fixed end point, so let's test the behavior via multiple batches? similar to testLatestOffset_SequentialBatchAdvancement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed test to use sequential batch
20072b3 to
1a4feef
Compare
1a4feef to
d86693f
Compare
|
|
||
| DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset); | ||
| initForTriggerAvailableNowIfNeeded(deltaStartOffset); | ||
| // endOffset is null: no data is available to read for this batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This has been changed to // Return null when no data is available for this batch.
| /* startVersion= */ 0L, | ||
| /* startIndex= */ BASE_INDEX, | ||
| ReadLimitConfig.maxFiles(1), | ||
| /* numIterations= */ 3, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should use a larger number of iterations for some of these.
huan233usc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some nits
| * initialize the internal states for AvailableNow if this method is called first time after | ||
| * prepareForTriggerAvailableNow. | ||
| */ | ||
| protected void initForTriggerAvailableNowIfNeeded(DeltaSourceOffset startOffsetOpt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can those methods and lastOffsetForTriggerAvailableNow be private?
| * process up. We may run multiple micro batches, but the query will stop itself when it reaches | ||
| * this offset. | ||
| */ | ||
| protected Optional<DeltaSourceOffset> lastOffsetForTriggerAvailableNow = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move those variable definition after L76
Which Delta project/connector is this regarding?
Description
We added availableNow trigger support in
SparkMicroBatchStream.java.Also refactored the
latestOffset()function by abstracting out alatestOffsetInternal()method. The internal method takesDeltaSourceOffsetand returnsDeltaSourceOffset, whilelatestOffset()takesOffsetand returnsOffset.How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?
No