-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Add AftersynchronizedProcessing Time as continuation trigger #36285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Run Yaml_Xlang_Direct PreCommit |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Run Yaml_Xlang_Direct PreCommit |
|
Run Prism_Python PreCommit 3.12 |
|
Run Python_Transforms PreCommit 3.9 |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| return any(t.has_ontime_pane() for t in self.triggers) | ||
|
|
||
| def get_continuation_trigger(self): | ||
| return Repeatedly( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why the continuation trigger of a AfterEach trigger is a repeatedly trigger? @tarun-google
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shunping With continuation trigger concept, we are injecting a new trigger after the GroupBy window. Which gets evaluated every time there is a new pane of data released by first trigger. For example if the initial trigger is AfterProcessingTime(5), which trigger only once after 5 sec. we are adding a new trigger after GroupBy when this trigger happens, which is pass by layer. A lot of our triggers are one time.
But the point with AfterEach(condition1, condition2,..) is it is not a one time trigger. it triggers every time there is a condition met. So, if we just write the continuation trigger AfterAny() then it triggers only once. we want continuation trigger for AfterEach to trigger every time the condition is met, not once.
Reference:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, it's not an exact science. There's no real correctness criteria for continuation trigger except "don't hold up data that is already triggered". And the only reason we don't make all of them Repeatedly(Always) is for the corner case of aligned processing time where the user might be surprised if a downstream aggregation had many more outputs because it fired right away instead of waiting for everything aligned to the same processing time. TBH even then it is sort of meh.
Brings back #34212
support and fallbacks added for AfterSynchronizedProcessingTime in RunnerV2
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.