Make ResumableJobMixin's reconnect core reusable outside execute()#68952
Make ResumableJobMixin's reconnect core reusable outside execute()#689521fanwang wants to merge 1 commit into
Conversation
TriggerDagRunOperator's durable wait lives in the task runner (it raises DagRunTriggerException and is polled there), not in execute(), so it cannot use ResumableJobMixin and re-implements the persist-and-reconnect logic by hand. Lift the mixin's core into a standalone resume_or_submit() so the same implementation can be driven from the runner now and the triggerer later, instead of duplicated per integration point.
SameerMesiah97
left a comment
There was a problem hiding this comment.
So I have scanned the diff for this PR and PR #68936 and here is what I think:
-
I think extracting a helper is a bit too premature in this scenario. PR #68936 which references
TriggerDagRunOperatoras a potential secondary consumer forresume_or_submithas not been merged yet, so currentlyResumableJobMixinremains the sole consumer of this logic. Would it make sense to wait until the second implementation lands and then extract the common pieces based on the concrete duplication? -
It seems like the common element between
ResumableJobMixinandTriggerDagRunOperatoris the recovery decision (reconnect, already succeeded, submit fresh) rather than the entire submit/poll/result workflow. Did you consider extracting only that decision state machine and leaving submission/polling behavior with the caller?
I dont see 1) as too much of an obstacle here as a similar helper type PR has been merged recently whilst the secondary consumer is sitting in draft (though it was in a provider) but 2) is something that definitely needs to be addressed as the resume_or_submit helper you are introducing is quite callback-heavy. This is a code smell that tends to indicate over-abstraction.
I definitely see the value in this new helper assuming PR #68936 lands but I have some reservations regarding the implementation.
Why
ResumableJobMixinisexecute()-coupled:execute_resumable()invokes the operator'ssubmit_job/poll_until_complete/ … methods, so only operators whoseexecute()itself doesthe submit-and-poll can use it. Operators whose wait lives elsewhere can't — on Airflow 3,
TriggerDagRunOperator.execute()raisesDagRunTriggerExceptionand the task runner does thesubmit + poll, so there is no mixin instance to call and the persist-and-reconnect logic ends up
re-implemented by hand (see #68936).
What
Extract the mixin's core — persist the external id, then make the active / succeeded / terminal
reconnect decision — into a standalone, instance-free
resume_or_submit(...).execute_resumable()becomes a thin wrapper that binds the operator's six methods to it.
This makes AIP-103 durability a reusable primitive: the same core can be driven from the task
runner (a stacked PR refactors #68936's hand-rolled logic onto it) and, later, from the triggerer —
instead of duplicating the reconnect logic at each integration point.
Tests
The mixin's full existing suite passes unchanged — submit/poll, persist-before-poll, the three
retry states, metrics, tracing span attributes, and logging. That unchanged suite is the
behaviour-preservation proof. The shared core is additionally exercised end-to-end by the stacked
TriggerDagRunOperatorPR's live crash-safety reproduction.Risk
Pure refactor with no behaviour change. The mixin's public surface and all six override methods are
untouched;
resume_or_submitcarries the same metrics, spans, and log lines as before.Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.