Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.common.compat.sdk import Stats, conf
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -116,6 +117,10 @@ def __init__(self, *args, **kwargs):
self.completed: set[KubernetesResults] = set()
self.create_pods_after: datetime | None = None

# Maintain compatibility with older Airflow releases that do not define team_name.
if not hasattr(self, "team_name"):
self.team_name = None

def _list_pods(self, query_kwargs):
query_kwargs["header_params"] = {
"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
Expand Down Expand Up @@ -196,6 +201,7 @@ def start(self) -> None:
result_queue=self.result_queue,
kube_client=self.kube_client,
scheduler_job_id=self.scheduler_job_id,
team_name=self.team_name,
)

def execute_async(
Expand Down Expand Up @@ -549,7 +555,10 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li
return messages, ["\n".join(log)]

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
with Stats.timer("kubernetes_executor.adopt_task_instances.duration"):
with Stats.timer(
"kubernetes_executor.adopt_task_instances.duration",
tags=prune_dict({"team_name": self.team_name}),
):
# Always flush TIs without queued_by_job_id
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args
from airflow.providers.common.compat.sdk import AirflowException, Stats
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState

Expand Down Expand Up @@ -474,6 +475,7 @@ def __init__(
result_queue: Queue[KubernetesResults],
kube_client: client.CoreV1Api,
scheduler_job_id: str,
team_name: str | None = None,
):
super().__init__()
self.log.debug("Creating Kubernetes executor")
Expand All @@ -486,6 +488,7 @@ def __init__(
self.watcher_queue = self._manager.Queue()
self.scheduler_job_id = scheduler_job_id
self.kube_watchers = self._make_kube_watchers()
self.team_name = team_name

def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
"""Run POD asynchronously."""
Expand All @@ -494,18 +497,29 @@ def run_pod_async(self, pod: k8s.V1Pod, **kwargs):

self.log.debug("Pod Creation Request: \n%s", json_pod)
try:
with Stats.timer("kubernetes_executor.pod_creation"):
with Stats.timer(
"kubernetes_executor.pod_creation", tags=prune_dict({"team_name": self.team_name})
):
resp = self.kube_client.create_namespaced_pod(
body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
)
Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": "200"})
Stats.incr(
"kubernetes_executor.pod_creation_status",
tags=prune_dict({"status": "200", "team_name": self.team_name}),
)
self.log.debug("Pod Creation Response: %s", resp)
except ApiException as e:
Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": str(e.status)})
Stats.incr(
"kubernetes_executor.pod_creation_status",
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
)
self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod)
raise
except Exception as e:
Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": "error"})
Stats.incr(
"kubernetes_executor.pod_creation_status",
tags=prune_dict({"status": "error", "team_name": self.team_name}),
)
self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod)
raise e
return resp
Expand Down Expand Up @@ -615,16 +629,24 @@ def delete_pod(self, pod_name: str, namespace: str) -> None:
"""Delete Pod from a namespace; does not raise if it does not exist."""
try:
self.log.info("Deleting pod %s in namespace %s", pod_name, namespace)
with Stats.timer("kubernetes_executor.pod_deletion"):
with Stats.timer(
"kubernetes_executor.pod_deletion", tags=prune_dict({"team_name": self.team_name})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these timers, can you run a manual test and confirm they work as intended? Technically this is a change; even if team_name is None, this will now be called with tags={} where previously it didn't get a tags at all. You may need to do something like tags=prune_dict(...) or None?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just chiming in here as I was doing some testing for amazon and I think we should be covered based on what I found here in the stats.py file:

regular_kw: dict[str, Any] = {**kwargs}
if tags:
regular_kw["tags"] = tags

Still worth running some manual tests but figured I'd share that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferruzzi

Do you still want me to run a manual test with a StatsD backend? By tracing the codepath, we can deduce that no tag will be passed if the team_name is None (there is a falsy check in the timer function in the stats.py module as @justinpakzad pointed out). So this means there is no behavior change for the default case i.e. no team present.

):
self.kube_client.delete_namespaced_pod(
pod_name,
namespace,
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
**self.kube_config.kube_client_request_args,
)
Stats.incr("kubernetes_executor.pod_deletion_status", tags={"status": "200"})
Stats.incr(
"kubernetes_executor.pod_deletion_status",
tags=prune_dict({"status": "200", "team_name": self.team_name}),
)
except ApiException as e:
Stats.incr("kubernetes_executor.pod_deletion_status", tags={"status": str(e.status)})
Stats.incr(
"kubernetes_executor.pod_deletion_status",
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
)
# If the pod is already deleted
if str(e.status) != "404":
raise
Expand All @@ -641,30 +663,46 @@ def patch_pod_revoked(self, *, pod_name: str, namespace: str):
namespace,
)
try:
with Stats.timer("kubernetes_executor.pod_patching"):
with Stats.timer(
"kubernetes_executor.pod_patching", tags=prune_dict({"team_name": self.team_name})
):
self.kube_client.patch_namespaced_pod(
name=pod_name,
namespace=namespace,
body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}},
)
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": "200"})
Stats.incr(
"kubernetes_executor.pod_patching_status",
tags=prune_dict({"status": "200", "team_name": self.team_name}),
)
except ApiException as e:
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": str(e.status)})
Stats.incr(
"kubernetes_executor.pod_patching_status",
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
)
self.log.warning("Failed to patch pod %s with pod revoked key.", pod_name, exc_info=True)

def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
"""Add a "done" annotation to ensure we don't continually adopt pods."""
self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace)
try:
with Stats.timer("kubernetes_executor.pod_patching"):
with Stats.timer(
"kubernetes_executor.pod_patching", tags=prune_dict({"team_name": self.team_name})
):
self.kube_client.patch_namespaced_pod(
name=pod_name,
namespace=namespace,
body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
)
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": "200"})
Stats.incr(
"kubernetes_executor.pod_patching_status",
tags=prune_dict({"status": "200", "team_name": self.team_name}),
)
except ApiException as e:
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": str(e.status)})
Stats.incr(
"kubernetes_executor.pod_patching_status",
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
)
self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e)

def sync(self) -> None:
Expand Down
Loading
Loading