From 87169b78a536e3a80465d2a3a22b87292c18a263 Mon Sep 17 00:00:00 2001 From: seanmuth Date: Fri, 26 Jun 2026 15:12:11 -0500 Subject: [PATCH 1/2] Requeue KubernetesExecutor tasks whose pod failed before execution started When a worker pod is destroyed before the task process starts (node drain, autoscaler scale-down, node boot race, transient image pull failure), the task instance is still queued and no task code has run. Reporting this to the scheduler as a normal failure consumes a user-configured retry and raises a misleading failure alert for work that never executed. The executor already has the signal to tell this apart from an execution failure, so it now transparently requeues the pod without consuming a task retry, bounded by pod_launch_failure_retries and excluding container reasons in pod_launch_failure_excluded_container_reasons (default Error). --- providers/cncf/kubernetes/provider.yaml | 26 +++ .../executors/kubernetes_executor.py | 103 ++++++++- .../cncf/kubernetes/get_provider_info.py | 14 ++ .../executors/test_kubernetes_executor.py | 217 ++++++++++++++++++ 4 files changed, 351 insertions(+), 9 deletions(-) diff --git a/providers/cncf/kubernetes/provider.yaml b/providers/cncf/kubernetes/provider.yaml index d3ed980a15ebe..6ef8b8b41466a 100644 --- a/providers/cncf/kubernetes/provider.yaml +++ b/providers/cncf/kubernetes/provider.yaml @@ -468,6 +468,32 @@ config: type: integer example: ~ default: "0" + pod_launch_failure_retries: + description: | + The number of times the executor will transparently requeue a task whose worker pod + failed before the task process started running (for example a node drain, autoscaler + scale-down, node boot race, or transient image pull failure). The task instance is + still in ``queued`` state in these cases, meaning no task code ran, so requeuing does + not consume a task-level retry. Set to 0 to disable and fail such tasks immediately. + -1 for unlimited times. + version_added: 10.19.0 + type: integer + example: ~ + default: "1" + pod_launch_failure_excluded_container_reasons: + description: | + Comma-separated list of container termination reasons that are excluded from the + ``pod_launch_failure_retries`` requeue path even when the task instance is still in + ``queued`` state. Pods that fail with an excluded reason consume a normal task retry + instead of being transparently requeued. The default ``Error`` covers the case where + the container started executing but the worker process exited before writing + ``running`` to the database, which is most likely an Airflow-specific startup error + rather than a transient infrastructure event. Set to an empty value to requeue these + cases as well. + version_added: 10.19.0 + type: string + example: ~ + default: "Error" executors: - airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 4ddb189390b3c..dfb94f70f7a3f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -45,6 +45,7 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, POD_EXECUTOR_DONE_KEY, + FailureDetails, KubernetesJob, KubernetesResults, ) @@ -113,6 +114,19 @@ def __init__(self, *args, **kwargs): self.task_publish_max_retries = self.conf.getint( "kubernetes_executor", "task_publish_max_retries", fallback=0 ) + self.pod_launch_failure_attempts: Counter[TaskInstanceKey] = Counter() + self.pod_launch_failure_max_retries = self.conf.getint( + "kubernetes_executor", "pod_launch_failure_retries", fallback=1 + ) + excluded_reasons = self.conf.get( + "kubernetes_executor", "pod_launch_failure_excluded_container_reasons", fallback="Error" + ) + self.pod_launch_failure_excluded_container_reasons = frozenset( + reason.strip() for reason in excluded_reasons.split(",") if reason.strip() + ) + # Job specs are retained by key so a pod that fails before the task process starts can be + # requeued without the scheduler observing the failure or consuming a task-level retry. + self.last_known_jobs: dict[TaskInstanceKey, KubernetesJob] = {} self.completed: set[KubernetesResults] = set() self.create_pods_after: datetime | None = None @@ -226,7 +240,9 @@ def execute_async( else: pod_template_file = None self.event_buffer[key] = (TaskInstanceState.QUEUED, self.scheduler_job_id) - self.task_queue.put(KubernetesJob(key, command, kube_executor_config, pod_template_file)) + job = KubernetesJob(key, command, kube_executor_config, pod_template_file) + self.last_known_jobs[key] = job + self.task_queue.put(job) def queue_workload(self, workload: workloads.All, session: Session | None) -> None: from airflow.executors import workloads @@ -455,6 +471,7 @@ def _change_state( if state == ADOPTED: # When the task pod is adopted by another executor, # then remove the task from the current executor running queue. + self.last_known_jobs.pop(key, None) try: self.running.remove(key) except KeyError: @@ -462,6 +479,9 @@ def _change_state( return if state == TaskInstanceState.RUNNING: + # The task process started, so any later failure is an execution failure that should + # not be requeued by the pre-execution path below. + self.last_known_jobs.pop(key, None) self.event_buffer[key] = state, None return @@ -478,6 +498,37 @@ def _change_state( self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, namespace=namespace) self.log.info("Patched pod %s in namespace %s to mark it as done", key, namespace) + if state == TaskInstanceState.FAILED and self._is_pre_execution_failure( + state, + self._get_task_instance_state(key, session=session), + failure_details, + self.pod_launch_failure_excluded_container_reasons, + ): + attempts = self.pod_launch_failure_attempts[key] + job = self.last_known_jobs.get(key) + can_requeue = ( + self.pod_launch_failure_max_retries == -1 or attempts < self.pod_launch_failure_max_retries + ) + if can_requeue and job is not None: + self.pod_launch_failure_attempts[key] = attempts + 1 + self.log.warning( + "[Try %s of %s] Pod %s/%s for task %s failed before the task process started " + "(container_reason: %s). Requeuing without consuming a task retry.", + attempts + 1, + self.pod_launch_failure_max_retries, + namespace, + pod_name, + key, + failure_details.get("container_reason") if failure_details else None, + ) + # Leave the key in self.running and do not write to event_buffer: the scheduler + # never observes this failure, so no task-level retry is consumed. + self.task_queue.put(job) + return + + self.last_known_jobs.pop(key, None) + self.pod_launch_failure_attempts.pop(key, None) + try: self.running.remove(key) except KeyError: @@ -486,17 +537,51 @@ def _change_state( # If we don't have a TI state, look it up from the db. event_buffer expects the TI state if state is None: - from airflow.models.taskinstance import TaskInstance - - filter_for_tis = TaskInstance.filter_for_tis([key]) - if filter_for_tis is not None: - state = session.scalar(select(TaskInstance.state).where(filter_for_tis)) - else: - state = None - state = TaskInstanceState(state) if state else None + state = self._get_task_instance_state(key, session=session) self.event_buffer[key] = state, termination_reason + def _get_task_instance_state(self, key: TaskInstanceKey, *, session: Session) -> TaskInstanceState | None: + """Look up the current task instance state from the metadata database.""" + from airflow.models.taskinstance import TaskInstance + + filter_for_tis = TaskInstance.filter_for_tis([key]) + if filter_for_tis is None: + return None + db_state = session.scalar(select(TaskInstance.state).where(filter_for_tis)) + return TaskInstanceState(db_state) if db_state else None + + @staticmethod + def _is_pre_execution_failure( + state: TaskInstanceState | str | None, + ti_state: TaskInstanceState | None, + failure_details: FailureDetails | None, + excluded_container_reasons: frozenset[str], + ) -> bool: + """ + Return ``True`` if a failed pod's task process never started running. + + Both conditions are required: + + - ``state`` is ``FAILED``: the pod actually terminated. + - ``ti_state`` is ``QUEUED``: the task instance never transitioned to ``running``, so no + task code ran. This is the authoritative signal and holds regardless of the specific + container failure reason (node drain, autoscaler scale-down, transient image pull + error, deferrable resume pod killed before ``execute_complete`` started, etc.). + + Pods whose ``container_reason`` is in ``excluded_container_reasons`` are not treated as + pre-execution failures. The default exclusion of ``Error`` covers a container that + started executing but whose worker process exited before writing ``running`` to the + database, which is most likely an Airflow-specific startup error. + """ + if state != TaskInstanceState.FAILED or ti_state != TaskInstanceState.QUEUED: + return False + if failure_details: + container_reason = failure_details.get("container_reason") + if container_reason and container_reason in excluded_container_reasons: + return False + return True + def _get_pod_namespace(self, ti: TaskInstance): pod_override = (ti.executor_config or {}).get("pod_override") namespace = None diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index e0cfbecf3b273..607ee9f263052 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -309,6 +309,20 @@ def get_provider_info(): "example": None, "default": "0", }, + "pod_launch_failure_retries": { + "description": "The number of times the executor will transparently requeue a task whose worker pod\nfailed before the task process started running (for example a node drain, autoscaler\nscale-down, node boot race, or transient image pull failure). The task instance is\nstill in ``queued`` state in these cases, meaning no task code ran, so requeuing does\nnot consume a task-level retry. Set to 0 to disable and fail such tasks immediately.\n-1 for unlimited times.\n", + "version_added": "10.19.0", + "type": "integer", + "example": None, + "default": "1", + }, + "pod_launch_failure_excluded_container_reasons": { + "description": "Comma-separated list of container termination reasons that are excluded from the\n``pod_launch_failure_retries`` requeue path even when the task instance is still in\n``queued`` state. Pods that fail with an excluded reason consume a normal task retry\ninstead of being transparently requeued. The default ``Error`` covers the case where\nthe container started executing but the worker process exited before writing\n``running`` to the database, which is most likely an Airflow-specific startup error\nrather than a transient infrastructure event. Set to an empty value to requeue these\ncases as well.\n", + "version_added": "10.19.0", + "type": "string", + "example": None, + "default": "Error", + }, }, }, }, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index bc1c2a97f55c7..d8cb34be4b27a 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -39,6 +39,7 @@ ) from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, + KubernetesJob, KubernetesResults, KubernetesWatch, ) @@ -1032,6 +1033,222 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_ finally: executor.end() + @pytest.mark.parametrize( + ("state", "ti_state", "failure_details", "expected"), + [ + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + None, + True, + id="failed-queued-no-details", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + {"container_reason": "ContainerStatusUnknown"}, + True, + id="failed-queued-node-killed", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + {"container_reason": "ImagePullBackOff"}, + True, + id="failed-queued-transient-image-pull", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + {"container_reason": "Error"}, + False, + id="failed-queued-excluded-error-reason", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.RUNNING, + {"container_reason": "ContainerStatusUnknown"}, + False, + id="failed-but-task-was-running", + ), + pytest.param( + TaskInstanceState.SUCCESS, + TaskInstanceState.QUEUED, + None, + False, + id="not-a-failed-pod", + ), + pytest.param( + TaskInstanceState.FAILED, + None, + None, + False, + id="ti-state-missing", + ), + ], + ) + def test_is_pre_execution_failure(self, state, ti_state, failure_details, expected): + assert ( + KubernetesExecutor._is_pre_execution_failure( + state, ti_state, failure_details, frozenset({"Error"}) + ) + is expected + ) + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_requeues( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """A pod that fails while the TI is still queued is requeued without reporting a failure.""" + executor = self.kubernetes_executor + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + job = KubernetesJob(key, ["airflow", "tasks", "run"], None, None) + executor.running = {key} + executor.last_known_jobs = {key: job} + executor.task_queue = mock.MagicMock() + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "ContainerStatusUnknown", "exit_code": 137}, + ) + executor._change_state(results) + + executor.task_queue.put.assert_called_once_with(job) + assert key in executor.running + assert key not in executor.event_buffer + assert executor.pod_launch_failure_attempts[key] == 1 + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_exhausts_retries( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """Once the requeue budget is spent, the task is failed normally.""" + executor = self.kubernetes_executor + executor.pod_launch_failure_max_retries = 1 + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + job = KubernetesJob(key, ["airflow", "tasks", "run"], None, None) + executor.running = {key} + executor.last_known_jobs = {key: job} + executor.pod_launch_failure_attempts[key] = 1 + executor.task_queue = mock.MagicMock() + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "ContainerStatusUnknown"}, + ) + executor._change_state(results) + + executor.task_queue.put.assert_not_called() + assert executor.event_buffer[key][0] == State.FAILED + assert key not in executor.running + assert key not in executor.last_known_jobs + assert key not in executor.pod_launch_failure_attempts + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_excluded_reason( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """An excluded container reason consumes a normal task retry instead of requeuing.""" + executor = self.kubernetes_executor + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + executor.running = {key} + executor.last_known_jobs = {key: KubernetesJob(key, ["airflow"], None, None)} + executor.task_queue = mock.MagicMock() + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "Error", "exit_code": 1}, + ) + executor._change_state(results) + + executor.task_queue.put.assert_not_called() + assert executor.event_buffer[key][0] == State.FAILED + assert key not in executor.running + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_without_job_spec( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """Without a retained job spec the task cannot be requeued and is failed normally.""" + executor = self.kubernetes_executor + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + executor.running = {key} + executor.last_known_jobs = {} + executor.task_queue = mock.MagicMock() + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "ContainerStatusUnknown"}, + ) + executor._change_state(results) + + executor.task_queue.put.assert_not_called() + assert executor.event_buffer[key][0] == State.FAILED + assert key not in executor.running + finally: + executor.end() + + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_execute_async_retains_job_spec(self, mock_get_kube_client, mock_kubernetes_job_watcher): + """execute_async stashes the job spec so a pre-execution failure can be requeued.""" + executor = self.kubernetes_executor + executor.start() + try: + key = TaskInstanceKey("dag", "task", "run_id", 1, -1) + executor.execute_async( + key=key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + executor_config=k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")] + ) + ), + ) + assert key in executor.last_known_jobs + assert executor.last_known_jobs[key].key == key + finally: + executor.end() + @pytest.mark.db_test @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") From 78ec297e052a23f964ddffe0d9d1e6c243f670d0 Mon Sep 17 00:00:00 2001 From: seanmuth Date: Fri, 26 Jun 2026 16:51:04 -0500 Subject: [PATCH 2/2] Fix KubernetesExecutor pre-execution requeue tests hanging in CI The tests replaced executor.task_queue with a MagicMock, so the executor.end() teardown looped forever on get_nowait() instead of raising Empty, hitting the 60s CI timeout. Assert the requeue via observable executor state instead of mocking the queue, and pass a valid executor_config to the stash test so execute_async does not bail before recording the job. --- .../executors/test_kubernetes_executor.py | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index d8cb34be4b27a..b8eb6ebb5a208 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1103,6 +1103,7 @@ def test_change_state_pre_execution_failure_requeues( ): """A pod that fails while the TI is still queued is requeued without reporting a failure.""" executor = self.kubernetes_executor + executor.pod_launch_failure_max_retries = 1 executor.start() try: ti = create_task_instance(state=TaskInstanceState.QUEUED) @@ -1110,7 +1111,6 @@ def test_change_state_pre_execution_failure_requeues( job = KubernetesJob(key, ["airflow", "tasks", "run"], None, None) executor.running = {key} executor.last_known_jobs = {key: job} - executor.task_queue = mock.MagicMock() results = KubernetesResults( key, State.FAILED, @@ -1121,10 +1121,13 @@ def test_change_state_pre_execution_failure_requeues( ) executor._change_state(results) - executor.task_queue.put.assert_called_once_with(job) + # Requeued (job re-put on the queue): the key stays in running, no + # failure is reported, and the attempt counter is incremented. These + # together are reached only via the requeue branch. + assert executor.pod_launch_failure_attempts[key] == 1 assert key in executor.running assert key not in executor.event_buffer - assert executor.pod_launch_failure_attempts[key] == 1 + assert key in executor.last_known_jobs finally: executor.end() @@ -1145,7 +1148,6 @@ def test_change_state_pre_execution_failure_exhausts_retries( executor.running = {key} executor.last_known_jobs = {key: job} executor.pod_launch_failure_attempts[key] = 1 - executor.task_queue = mock.MagicMock() results = KubernetesResults( key, State.FAILED, @@ -1156,7 +1158,6 @@ def test_change_state_pre_execution_failure_exhausts_retries( ) executor._change_state(results) - executor.task_queue.put.assert_not_called() assert executor.event_buffer[key][0] == State.FAILED assert key not in executor.running assert key not in executor.last_known_jobs @@ -1178,7 +1179,6 @@ def test_change_state_pre_execution_failure_excluded_reason( key = ti.key executor.running = {key} executor.last_known_jobs = {key: KubernetesJob(key, ["airflow"], None, None)} - executor.task_queue = mock.MagicMock() results = KubernetesResults( key, State.FAILED, @@ -1189,7 +1189,6 @@ def test_change_state_pre_execution_failure_excluded_reason( ) executor._change_state(results) - executor.task_queue.put.assert_not_called() assert executor.event_buffer[key][0] == State.FAILED assert key not in executor.running finally: @@ -1209,7 +1208,6 @@ def test_change_state_pre_execution_failure_without_job_spec( key = ti.key executor.running = {key} executor.last_known_jobs = {} - executor.task_queue = mock.MagicMock() results = KubernetesResults( key, State.FAILED, @@ -1220,7 +1218,6 @@ def test_change_state_pre_execution_failure_without_job_spec( ) executor._change_state(results) - executor.task_queue.put.assert_not_called() assert executor.event_buffer[key][0] == State.FAILED assert key not in executor.running finally: @@ -1238,11 +1235,7 @@ def test_execute_async_retains_job_spec(self, mock_get_kube_client, mock_kuberne key=key, queue=None, command=["airflow", "tasks", "run", "true", "some_parameter"], - executor_config=k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")] - ) - ), + executor_config=None, ) assert key in executor.last_known_jobs assert executor.last_known_jobs[key].key == key