Reconnect to the running Livy batch on retry instead of resubmitting#68956
Open
1fanwang wants to merge 1 commit into
Open
Reconnect to the running Livy batch on retry instead of resubmitting#689561fanwang wants to merge 1 commit into
1fanwang wants to merge 1 commit into
Conversation
When LivyOperator waits synchronously (deferrable=False with polling_interval > 0) and the worker is lost mid-poll, the retry currently posts a brand-new Livy batch, leaving the original Spark application running and duplicating the work. Subclass ResumableJobMixin so the batch id is persisted before polling and the retry reconnects to the in-flight batch. Deferrable and fire-and-forget (polling_interval=0) paths are unchanged.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
LivyOperatorwaiting synchronously (deferrable=Falsewithpolling_interval > 0) holds theSpark batch on the worker. If the worker is lost mid-poll, the retry posts a brand-new Livy
batch — the original Spark application keeps running and the work is duplicated.
ResumableJobMixin(Airflow 3.3, AIP-103) exists to make exactly this synchronous-wait pathcrash-safe: persist the external job id before polling, and on retry reconnect to the running job
instead of resubmitting. Livy is a clean fit — a synchronous submit-then-poll operator, the same
shape the mixin was built for and the
SparkSubmitOperatoralready uses.What
LivyOperatornow subclassesResumableJobMixinand routes its synchronous-poll path throughexecute_resumable:submit_jobposts the batch and returns its id;get_job_status/is_job_active/is_job_succeededclassify LivyBatchState;poll_until_completereuses the existingpoll_for_termination;get_job_resultpushes theapp_idXCom.task_state_storebefore polling, so a retry reads it back andreconnects to the running batch.
polling_interval=0, nothingto reconnect to) paths are untouched. An Airflow-2 stub keeps the provider importable on 2.x.
Crash-safety is opt-in through the mixin's
durableflag (default on); setdurable=Falseto keepthe always-resubmit behaviour.
Tests
A new
TestLivyOperatorResumablesuite (gated on Airflow 3.3+) covers fresh-submit-persists-before-poll,the three retry decisions (reconnect / return / resubmit) across real
BatchStatevalues, gracefuldegradation without a
task_state_store, anddurable=False. The existingLivyOperatorsuite isunchanged.
End-to-end (live, Breeze)
A real worker crash during the synchronous wait, against an in-memory Livy stand-in that counts
POST /batches. ALivyOperator(durable=True, deferrable=False, polling_interval=3)submits abatch; the worker is
SIGKILLed mid-poll; the scheduler retries. Attempt 2 reads the persistedbatch id back, reconnects to the still-running batch, and finishes it — with no second submit.
Raw
Risk
Only the synchronous-poll path changes; deferrable and fire-and-forget are byte-for-byte the same.
The reconnect logic is the shared mixin core, already covered by its own tests.
Was generative AI tooling used to co-author this PR?