Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions providers/cncf/kubernetes/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,32 @@ config:
type: integer
example: ~
default: "0"
pod_launch_failure_retries:
description: |
The number of times the executor will transparently requeue a task whose worker pod
failed before the task process started running (for example a node drain, autoscaler
scale-down, node boot race, or transient image pull failure). The task instance is
still in ``queued`` state in these cases, meaning no task code ran, so requeuing does
not consume a task-level retry. Set to 0 to disable and fail such tasks immediately.
-1 for unlimited times.
version_added: 10.19.0
type: integer
example: ~
default: "1"
pod_launch_failure_excluded_container_reasons:
description: |
Comma-separated list of container termination reasons that are excluded from the
``pod_launch_failure_retries`` requeue path even when the task instance is still in
``queued`` state. Pods that fail with an excluded reason consume a normal task retry
instead of being transparently requeued. The default ``Error`` covers the case where
the container started executing but the worker process exited before writing
``running`` to the database, which is most likely an Airflow-specific startup error
rather than a transient infrastructure event. Set to an empty value to requeue these
cases as well.
version_added: 10.19.0
type: string
example: ~
default: "Error"

executors:
- airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
FailureDetails,
KubernetesJob,
KubernetesResults,
)
Expand Down Expand Up @@ -113,6 +114,19 @@ def __init__(self, *args, **kwargs):
self.task_publish_max_retries = self.conf.getint(
"kubernetes_executor", "task_publish_max_retries", fallback=0
)
self.pod_launch_failure_attempts: Counter[TaskInstanceKey] = Counter()
self.pod_launch_failure_max_retries = self.conf.getint(
"kubernetes_executor", "pod_launch_failure_retries", fallback=1
)
excluded_reasons = self.conf.get(
"kubernetes_executor", "pod_launch_failure_excluded_container_reasons", fallback="Error"
)
self.pod_launch_failure_excluded_container_reasons = frozenset(
reason.strip() for reason in excluded_reasons.split(",") if reason.strip()
)
# Job specs are retained by key so a pod that fails before the task process starts can be
# requeued without the scheduler observing the failure or consuming a task-level retry.
self.last_known_jobs: dict[TaskInstanceKey, KubernetesJob] = {}
self.completed: set[KubernetesResults] = set()
self.create_pods_after: datetime | None = None

Expand Down Expand Up @@ -226,7 +240,9 @@ def execute_async(
else:
pod_template_file = None
self.event_buffer[key] = (TaskInstanceState.QUEUED, self.scheduler_job_id)
self.task_queue.put(KubernetesJob(key, command, kube_executor_config, pod_template_file))
job = KubernetesJob(key, command, kube_executor_config, pod_template_file)
self.last_known_jobs[key] = job
self.task_queue.put(job)

def queue_workload(self, workload: workloads.All, session: Session | None) -> None:
from airflow.executors import workloads
Expand Down Expand Up @@ -455,13 +471,17 @@ def _change_state(
if state == ADOPTED:
# When the task pod is adopted by another executor,
# then remove the task from the current executor running queue.
self.last_known_jobs.pop(key, None)
try:
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running: %s", key)
return

if state == TaskInstanceState.RUNNING:
# The task process started, so any later failure is an execution failure that should
# not be requeued by the pre-execution path below.
self.last_known_jobs.pop(key, None)
Comment on lines 471 to +484

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably also clean pod_launch_failure_max_retries?

I wonder if we should wrap the containers into an object so they are always handled together.

self.event_buffer[key] = state, None
return

Expand All @@ -478,6 +498,37 @@ def _change_state(
self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, namespace=namespace)
self.log.info("Patched pod %s in namespace %s to mark it as done", key, namespace)

if state == TaskInstanceState.FAILED and self._is_pre_execution_failure(
state,
self._get_task_instance_state(key, session=session),
failure_details,
self.pod_launch_failure_excluded_container_reasons,
):
Comment on lines +501 to +506

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if state == TaskInstanceState.FAILED and self._is_pre_execution_failure(
state,
self._get_task_instance_state(key, session=session),
failure_details,
self.pod_launch_failure_excluded_container_reasons,
):
if (
state == TaskInstanceState.FAILED
and self.pod_launch_failure_max_retries != 0
and self._is_pre_execution_failure(
state,
self._get_task_instance_state(key, session=session),
failure_details,
self.pod_launch_failure_excluded_container_reasons,
)
):

This saves some db queries when no retries are allowed. Not a big deal, I think? (It shouldn’t be too common to explicitly disallow retries.)

attempts = self.pod_launch_failure_attempts[key]
job = self.last_known_jobs.get(key)
can_requeue = (
self.pod_launch_failure_max_retries == -1 or attempts < self.pod_launch_failure_max_retries
)
if can_requeue and job is not None:
self.pod_launch_failure_attempts[key] = attempts + 1
self.log.warning(
"[Try %s of %s] Pod %s/%s for task %s failed before the task process started "
"(container_reason: %s). Requeuing without consuming a task retry.",
attempts + 1,
self.pod_launch_failure_max_retries,
namespace,
pod_name,
key,
failure_details.get("container_reason") if failure_details else None,
)
# Leave the key in self.running and do not write to event_buffer: the scheduler
# never observes this failure, so no task-level retry is consumed.
self.task_queue.put(job)
return

self.last_known_jobs.pop(key, None)
self.pod_launch_failure_attempts.pop(key, None)

try:
self.running.remove(key)
except KeyError:
Expand All @@ -486,17 +537,51 @@ def _change_state(

# If we don't have a TI state, look it up from the db. event_buffer expects the TI state
if state is None:
from airflow.models.taskinstance import TaskInstance

filter_for_tis = TaskInstance.filter_for_tis([key])
if filter_for_tis is not None:
state = session.scalar(select(TaskInstance.state).where(filter_for_tis))
else:
state = None
state = TaskInstanceState(state) if state else None
state = self._get_task_instance_state(key, session=session)

self.event_buffer[key] = state, termination_reason

def _get_task_instance_state(self, key: TaskInstanceKey, *, session: Session) -> TaskInstanceState | None:
"""Look up the current task instance state from the metadata database."""
from airflow.models.taskinstance import TaskInstance

filter_for_tis = TaskInstance.filter_for_tis([key])
if filter_for_tis is None:
return None
db_state = session.scalar(select(TaskInstance.state).where(filter_for_tis))
return TaskInstanceState(db_state) if db_state else None

@staticmethod
def _is_pre_execution_failure(
state: TaskInstanceState | str | None,
ti_state: TaskInstanceState | None,
failure_details: FailureDetails | None,
excluded_container_reasons: frozenset[str],
) -> bool:
"""
Return ``True`` if a failed pod's task process never started running.

Both conditions are required:

- ``state`` is ``FAILED``: the pod actually terminated.
- ``ti_state`` is ``QUEUED``: the task instance never transitioned to ``running``, so no
task code ran. This is the authoritative signal and holds regardless of the specific
container failure reason (node drain, autoscaler scale-down, transient image pull
error, deferrable resume pod killed before ``execute_complete`` started, etc.).

Pods whose ``container_reason`` is in ``excluded_container_reasons`` are not treated as
pre-execution failures. The default exclusion of ``Error`` covers a container that
started executing but whose worker process exited before writing ``running`` to the
database, which is most likely an Airflow-specific startup error.
"""
if state != TaskInstanceState.FAILED or ti_state != TaskInstanceState.QUEUED:
return False
if failure_details:
container_reason = failure_details.get("container_reason")
if container_reason and container_reason in excluded_container_reasons:
return False
return True

def _get_pod_namespace(self, ti: TaskInstance):
pod_override = (ti.executor_config or {}).get("pod_override")
namespace = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,20 @@ def get_provider_info():
"example": None,
"default": "0",
},
"pod_launch_failure_retries": {
"description": "The number of times the executor will transparently requeue a task whose worker pod\nfailed before the task process started running (for example a node drain, autoscaler\nscale-down, node boot race, or transient image pull failure). The task instance is\nstill in ``queued`` state in these cases, meaning no task code ran, so requeuing does\nnot consume a task-level retry. Set to 0 to disable and fail such tasks immediately.\n-1 for unlimited times.\n",
"version_added": "10.19.0",
"type": "integer",
"example": None,
"default": "1",
},
"pod_launch_failure_excluded_container_reasons": {
"description": "Comma-separated list of container termination reasons that are excluded from the\n``pod_launch_failure_retries`` requeue path even when the task instance is still in\n``queued`` state. Pods that fail with an excluded reason consume a normal task retry\ninstead of being transparently requeued. The default ``Error`` covers the case where\nthe container started executing but the worker process exited before writing\n``running`` to the database, which is most likely an Airflow-specific startup error\nrather than a transient infrastructure event. Set to an empty value to requeue these\ncases as well.\n",
"version_added": "10.19.0",
"type": "string",
"example": None,
"default": "Error",
},
},
},
},
Expand Down
Loading
Loading