diff --git a/providers/cncf/kubernetes/docs/changelog.rst b/providers/cncf/kubernetes/docs/changelog.rst index f1de75277f52c..a646d71fb49ba 100644 --- a/providers/cncf/kubernetes/docs/changelog.rst +++ b/providers/cncf/kubernetes/docs/changelog.rst @@ -27,6 +27,18 @@ Changelog --------- +.. note:: + The ``KubernetesExecutor`` now transparently requeues a worker pod that fails *before* the + task process starts (node drain, autoscaler scale-down, node boot race, transient image pull + failure, etc.) instead of failing the task on the first pod failure. This is a change in + default behavior, controlled by the new ``[kubernetes_executor] pod_launch_failure_retries`` + option (default ``1``); requeues do not consume a task-level retry. Set it to ``0`` to restore + the previous behavior of failing immediately. Avoid ``-1`` (unlimited) with a pod that fails on + every launch, as the failed pods are not cleaned up under the default + ``delete_worker_pods_on_failure = False`` and will accumulate. The companion + ``pod_launch_failure_excluded_container_reasons`` option (default ``Error``) lists container + reasons that are excluded from the requeue path. + 10.18.1 ....... diff --git a/providers/cncf/kubernetes/provider.yaml b/providers/cncf/kubernetes/provider.yaml index d3ed980a15ebe..c57d00c1ee229 100644 --- a/providers/cncf/kubernetes/provider.yaml +++ b/providers/cncf/kubernetes/provider.yaml @@ -468,6 +468,40 @@ config: type: integer example: ~ default: "0" + pod_launch_failure_retries: + description: | + The number of times the executor will transparently requeue a task whose worker pod + failed before the task process started running (for example a node drain, autoscaler + scale-down, node boot race, or transient image pull failure). The task instance is + still in ``queued`` state in these cases, meaning no task code ran, so requeuing does + not consume a task-level retry. + + This changes the previous default behavior: such tasks are now requeued once before + failing instead of failing on the first pod failure. Set this to 0 to restore the + previous behavior (fail immediately, no requeue). + + Use -1 for unlimited requeues, but with caution: a pod that fails on every launch + (for example a misconfigured image that can never be pulled) will be requeued forever, + and with the default ``delete_worker_pods_on_failure = False`` the failed pods are not + cleaned up, so they accumulate. + version_added: 10.19.0 + type: integer + example: ~ + default: "1" + pod_launch_failure_excluded_container_reasons: + description: | + Comma-separated list of container termination reasons that are excluded from the + ``pod_launch_failure_retries`` requeue path even when the task instance is still in + ``queued`` state. Pods that fail with an excluded reason consume a normal task retry + instead of being transparently requeued. The default ``Error`` covers the case where + the container started executing but the worker process exited before writing + ``running`` to the database, which is most likely an Airflow-specific startup error + rather than a transient infrastructure event. Set to an empty value to requeue these + cases as well. + version_added: 10.19.0 + type: string + example: ~ + default: "Error" executors: - airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 7410b9193a86d..2d41aaaa21fc4 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -31,6 +31,7 @@ import time from collections import Counter, defaultdict from contextlib import suppress +from dataclasses import dataclass from datetime import datetime, timedelta from queue import Empty, Queue from typing import TYPE_CHECKING, Any @@ -45,6 +46,7 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, POD_EXECUTOR_DONE_KEY, + FailureDetails, KubernetesJob, KubernetesResults, ) @@ -73,6 +75,20 @@ ) +@dataclass +class _PodLaunchAttempt: + """ + Executor-side requeue state for a task whose worker pod may fail before the task process starts. + + ``requeued_for_pod`` records the pod a requeue was last issued for, so the duplicate + ``Failed`` events Kubernetes can emit for a single pod don't each trigger another requeue. + """ + + job: KubernetesJob + attempts: int = 0 + requeued_for_pod: str | None = None + + class KubernetesExecutor(BaseExecutor): """Executor for Kubernetes.""" @@ -113,6 +129,24 @@ def __init__(self, *args, **kwargs): self.task_publish_max_retries = self.conf.getint( "kubernetes_executor", "task_publish_max_retries", fallback=0 ) + self.pod_launch_failure_max_retries = self.conf.getint( + "kubernetes_executor", "pod_launch_failure_retries", fallback=1 + ) + excluded_reasons = self.conf.get( + "kubernetes_executor", "pod_launch_failure_excluded_container_reasons", fallback="Error" + ) + self.pod_launch_failure_excluded_container_reasons = frozenset( + reason.strip() for reason in excluded_reasons.split(",") if reason.strip() + ) + # Per-key state for requeuing pods that fail before the task process starts (job spec, + # requeue count, and the pod a requeue was last issued for), so the failure is never + # observed by the scheduler and no task-level retry is consumed. + # Intentionally in-memory and not persisted (like task_publish_retries): if this scheduler + # dies the state is lost, and adoption by another scheduler is a safe no-op for it -- an + # adopted pod has no entry here, so a pre-execution failure falls through to a normal fail + # instead of requeuing. The orphaned task instance itself is still recovered by the + # scheduler's adopt_or_reset_orphaned_tasks(), which re-queues it with a fresh attempt. + self.pod_launch_attempts: dict[TaskInstanceKey, _PodLaunchAttempt] = {} self.completed: dict[tuple[str, str], KubernetesResults] = {} self.create_pods_after: datetime | None = None @@ -304,9 +338,9 @@ def execute_async( ) self.event_buffer[key] = (TaskInstanceState.QUEUED, self.scheduler_job_id) - self.task_queue.put( - KubernetesJob(key, command, kube_executor_config, pod_template_file, coordinator_kube_image) - ) + job = KubernetesJob(key, command, kube_executor_config, pod_template_file, coordinator_kube_image) + self.pod_launch_attempts[key] = _PodLaunchAttempt(job=job) + self.task_queue.put(job) def queue_workload(self, workload: workloads.All, session: Session | None) -> None: from airflow.executors import workloads @@ -545,6 +579,7 @@ def _change_state( if state == ADOPTED: # When the task pod is adopted by another executor, # then remove the task from the current executor running queue. + self.pod_launch_attempts.pop(key, None) try: self.running.remove(key) except KeyError: @@ -568,6 +603,50 @@ def _change_state( self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, namespace=namespace) self.log.info("Patched pod %s in namespace %s to mark it as done", key, namespace) + if ( + state == TaskInstanceState.FAILED + and self.pod_launch_failure_max_retries != 0 + and self._is_pre_execution_failure( + state, + self._get_task_instance_state(key, session=session), + failure_details, + self.pod_launch_failure_excluded_container_reasons, + ) + ): + attempt = self.pod_launch_attempts.get(key) + if attempt is not None: + if attempt.requeued_for_pod == pod_name: + # Kubernetes can emit several Failed events for one pod; we already requeued + # for this one, so ignore the duplicates instead of requeuing again. + self.log.debug( + "Ignoring duplicate pre-execution failure for already-requeued pod %s/%s", + namespace, + pod_name, + ) + return + if ( + self.pod_launch_failure_max_retries == -1 + or attempt.attempts < self.pod_launch_failure_max_retries + ): + attempt.attempts += 1 + attempt.requeued_for_pod = pod_name + self.log.warning( + "[Try %s of %s] Pod %s/%s for task %s failed before the task process started " + "(container_reason: %s). Requeuing without consuming a task retry.", + attempt.attempts, + self.pod_launch_failure_max_retries, + namespace, + pod_name, + key, + failure_details.get("container_reason") if failure_details else None, + ) + # Leave the key in self.running and do not write to event_buffer: the scheduler + # never observes this failure, so no task-level retry is consumed. + self.task_queue.put(attempt.job) + return + + self.pod_launch_attempts.pop(key, None) + try: self.running.remove(key) except KeyError: @@ -576,17 +655,51 @@ def _change_state( # If we don't have a TI state, look it up from the db. event_buffer expects the TI state if state is None: - from airflow.models.taskinstance import TaskInstance - - filter_for_tis = TaskInstance.filter_for_tis([key]) - if filter_for_tis is not None: - state = session.scalar(select(TaskInstance.state).where(filter_for_tis)) - else: - state = None - state = TaskInstanceState(state) if state else None + state = self._get_task_instance_state(key, session=session) self.event_buffer[key] = state, termination_reason + def _get_task_instance_state(self, key: TaskInstanceKey, *, session: Session) -> TaskInstanceState | None: + """Look up the current task instance state from the metadata database.""" + from airflow.models.taskinstance import TaskInstance + + filter_for_tis = TaskInstance.filter_for_tis([key]) + if filter_for_tis is None: + return None + db_state = session.scalar(select(TaskInstance.state).where(filter_for_tis)) + return TaskInstanceState(db_state) if db_state else None + + @staticmethod + def _is_pre_execution_failure( + state: TaskInstanceState | str | None, + ti_state: TaskInstanceState | None, + failure_details: FailureDetails | None, + excluded_container_reasons: frozenset[str], + ) -> bool: + """ + Return ``True`` if a failed pod's task process never started running. + + Both conditions are required: + + - ``state`` is ``FAILED``: the pod actually terminated. + - ``ti_state`` is ``QUEUED``: the task instance never transitioned to ``running``, so no + task code ran. This is the authoritative signal and holds regardless of the specific + container failure reason (node drain, autoscaler scale-down, transient image pull + error, deferrable resume pod killed before ``execute_complete`` started, etc.). + + Pods whose ``container_reason`` is in ``excluded_container_reasons`` are not treated as + pre-execution failures. The default exclusion of ``Error`` covers a container that + started executing but whose worker process exited before writing ``running`` to the + database, which is most likely an Airflow-specific startup error. + """ + if state != TaskInstanceState.FAILED or ti_state != TaskInstanceState.QUEUED: + return False + if failure_details: + container_reason = failure_details.get("container_reason") + if container_reason and container_reason in excluded_container_reasons: + return False + return True + def _get_pod_namespace(self, ti: TaskInstance): pod_override = (ti.executor_config or {}).get("pod_override") namespace = None diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index e0cfbecf3b273..580db92e653f8 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -309,6 +309,20 @@ def get_provider_info(): "example": None, "default": "0", }, + "pod_launch_failure_retries": { + "description": "The number of times the executor will transparently requeue a task whose worker pod\nfailed before the task process started running (for example a node drain, autoscaler\nscale-down, node boot race, or transient image pull failure). The task instance is\nstill in ``queued`` state in these cases, meaning no task code ran, so requeuing does\nnot consume a task-level retry.\n\nThis changes the previous default behavior: such tasks are now requeued once before\nfailing instead of failing on the first pod failure. Set this to 0 to restore the\nprevious behavior (fail immediately, no requeue).\n\nUse -1 for unlimited requeues, but with caution: a pod that fails on every launch\n(for example a misconfigured image that can never be pulled) will be requeued forever,\nand with the default ``delete_worker_pods_on_failure = False`` the failed pods are not\ncleaned up, so they accumulate.\n", + "version_added": "10.19.0", + "type": "integer", + "example": None, + "default": "1", + }, + "pod_launch_failure_excluded_container_reasons": { + "description": "Comma-separated list of container termination reasons that are excluded from the\n``pod_launch_failure_retries`` requeue path even when the task instance is still in\n``queued`` state. Pods that fail with an excluded reason consume a normal task retry\ninstead of being transparently requeued. The default ``Error`` covers the case where\nthe container started executing but the worker process exited before writing\n``running`` to the database, which is most likely an Airflow-specific startup error\nrather than a transient infrastructure event. Set to an empty value to requeue these\ncases as well.\n", + "version_added": "10.19.0", + "type": "string", + "example": None, + "default": "Error", + }, }, }, }, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index d17a16b3ddc02..4652149f556a3 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -36,6 +36,7 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import ( KubernetesExecutor, PodReconciliationError, + _PodLaunchAttempt, ) from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, @@ -1222,6 +1223,295 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_ finally: executor.end() + @pytest.mark.parametrize( + ("state", "ti_state", "failure_details", "expected"), + [ + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + None, + True, + id="failed-queued-no-details", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + {"container_reason": "ContainerStatusUnknown"}, + True, + id="failed-queued-node-killed", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + {"container_reason": "ImagePullBackOff"}, + True, + id="failed-queued-transient-image-pull", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.QUEUED, + {"container_reason": "Error"}, + False, + id="failed-queued-excluded-error-reason", + ), + pytest.param( + TaskInstanceState.FAILED, + TaskInstanceState.RUNNING, + {"container_reason": "ContainerStatusUnknown"}, + False, + id="failed-but-task-was-running", + ), + pytest.param( + TaskInstanceState.SUCCESS, + TaskInstanceState.QUEUED, + None, + False, + id="not-a-failed-pod", + ), + pytest.param( + TaskInstanceState.FAILED, + None, + None, + False, + id="ti-state-missing", + ), + ], + ) + def test_is_pre_execution_failure(self, state, ti_state, failure_details, expected): + assert ( + KubernetesExecutor._is_pre_execution_failure( + state, ti_state, failure_details, frozenset({"Error"}) + ) + is expected + ) + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_requeues( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """A pod that fails while the TI is still queued is requeued without reporting a failure.""" + executor = self.kubernetes_executor + executor.pod_launch_failure_max_retries = 1 + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + job = KubernetesJob(key, ["airflow", "tasks", "run"], None, None) + executor.running = {key} + executor.pod_launch_attempts = {key: _PodLaunchAttempt(job=job)} + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "ContainerStatusUnknown", "exit_code": 137}, + ) + executor._change_state(results) + + # Requeued (job re-put on the queue): the key stays in running, no + # failure is reported, and the attempt is recorded. These together + # are reached only via the requeue branch. + assert executor.pod_launch_attempts[key].attempts == 1 + assert executor.pod_launch_attempts[key].requeued_for_pod == "pod_name" + assert key in executor.running + assert key not in executor.event_buffer + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_exhausts_retries( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """Once the requeue budget is spent, the task is failed normally.""" + executor = self.kubernetes_executor + executor.pod_launch_failure_max_retries = 1 + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + job = KubernetesJob(key, ["airflow", "tasks", "run"], None, None) + executor.running = {key} + # Already requeued once (for a different pod); budget is now spent. + executor.pod_launch_attempts = { + key: _PodLaunchAttempt(job=job, attempts=1, requeued_for_pod="earlier_pod") + } + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "ContainerStatusUnknown"}, + ) + executor._change_state(results) + + assert executor.event_buffer[key][0] == State.FAILED + assert key not in executor.running + assert key not in executor.pod_launch_attempts + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_excluded_reason( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """An excluded container reason consumes a normal task retry instead of requeuing.""" + executor = self.kubernetes_executor + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + executor.running = {key} + executor.pod_launch_attempts = { + key: _PodLaunchAttempt(job=KubernetesJob(key, ["airflow"], None, None)) + } + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "Error", "exit_code": 1}, + ) + executor._change_state(results) + + assert executor.event_buffer[key][0] == State.FAILED + assert key not in executor.running + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_without_job_spec( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """Without a retained job spec the task cannot be requeued and is failed normally.""" + executor = self.kubernetes_executor + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + executor.running = {key} + executor.pod_launch_attempts = {} + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "ContainerStatusUnknown"}, + ) + executor._change_state(results) + + assert executor.event_buffer[key][0] == State.FAILED + assert key not in executor.running + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_disabled_skips_lookup( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """With pod_launch_failure_retries=0 the task fails immediately and the TI-state lookup is skipped.""" + executor = self.kubernetes_executor + executor.pod_launch_failure_max_retries = 0 + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + executor.running = {key} + executor.pod_launch_attempts = { + key: _PodLaunchAttempt(job=KubernetesJob(key, ["airflow"], None, None)) + } + results = KubernetesResults( + key, + State.FAILED, + "pod_name", + "default", + "resource_version", + {"container_reason": "ContainerStatusUnknown"}, + ) + with mock.patch.object(executor, "_get_task_instance_state") as mock_lookup: + executor._change_state(results) + + mock_lookup.assert_not_called() + assert executor.event_buffer[key][0] == State.FAILED + assert key not in executor.running + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_pre_execution_failure_dedupes_repeated_events( + self, mock_get_kube_client, mock_kubernetes_job_watcher, create_task_instance + ): + """Repeated Failed events for one pod requeue once; a new pod requeues again.""" + executor = self.kubernetes_executor + executor.pod_launch_failure_max_retries = 5 + executor.start() + try: + ti = create_task_instance(state=TaskInstanceState.QUEUED) + key = ti.key + job = KubernetesJob(key, ["airflow", "tasks", "run"], None, None) + executor.running = {key} + executor.pod_launch_attempts = {key: _PodLaunchAttempt(job=job)} + + def _failed(pod_name): + return KubernetesResults( + key, + State.FAILED, + pod_name, + "default", + "rv", + {"container_reason": "ContainerStatusUnknown"}, + ) + + # Three Failed events for the same pod -> a single requeue. + executor._change_state(_failed("pod_a")) + executor._change_state(_failed("pod_a")) + executor._change_state(_failed("pod_a")) + assert executor.pod_launch_attempts[key].attempts == 1 + assert executor.pod_launch_attempts[key].requeued_for_pod == "pod_a" + + # A failure of the requeued (distinct) pod requeues again. + executor._change_state(_failed("pod_b")) + assert executor.pod_launch_attempts[key].attempts == 2 + assert executor.pod_launch_attempts[key].requeued_for_pod == "pod_b" + assert key in executor.running + assert key not in executor.event_buffer + finally: + executor.end() + + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_execute_async_retains_job_spec(self, mock_get_kube_client, mock_kubernetes_job_watcher): + """execute_async stashes the job spec so a pre-execution failure can be requeued.""" + executor = self.kubernetes_executor + executor.start() + try: + key = TaskInstanceKey("dag", "task", "run_id", 1, -1) + executor.execute_async( + key=key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + executor_config=None, + ) + assert key in executor.pod_launch_attempts + assert executor.pod_launch_attempts[key].job.key == key + finally: + executor.end() + @pytest.mark.db_test @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")