Skip to content

Commit 58fc3ca

Browse files
author
Sameer Mesiah
committed
Add team_name tags to Kubernetes executor metrics
Add the team_name tag to Kubernetes executor metrics to improve per-team observability in multi-team deployments. Pass the executor's team_name to AirflowKubernetesScheduler so scheduler-emitted metrics include the tag consistently. Maintain compatibility with prior Airflow releases by initializing team_name only when it is not already defined. Update the existing metric tests to verify metrics are emitted both with and without the team_name tag.
1 parent e9ef38b commit 58fc3ca

3 files changed

Lines changed: 151 additions & 23 deletions

File tree

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
5454
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
5555
from airflow.providers.common.compat.sdk import Stats, conf
56+
from airflow.utils.helpers import prune_dict
5657
from airflow.utils.log.logging_mixin import remove_escape_codes
5758
from airflow.utils.session import NEW_SESSION, provide_session
5859
from airflow.utils.state import TaskInstanceState
@@ -116,6 +117,10 @@ def __init__(self, *args, **kwargs):
116117
self.completed: set[KubernetesResults] = set()
117118
self.create_pods_after: datetime | None = None
118119

120+
# Maintain compatibility with older Airflow releases that do not define team_name.
121+
if not hasattr(self, "team_name"):
122+
self.team_name = None
123+
119124
def _list_pods(self, query_kwargs):
120125
query_kwargs["header_params"] = {
121126
"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
@@ -196,6 +201,7 @@ def start(self) -> None:
196201
result_queue=self.result_queue,
197202
kube_client=self.kube_client,
198203
scheduler_job_id=self.scheduler_job_id,
204+
team_name=self.team_name,
199205
)
200206

201207
def execute_async(
@@ -549,7 +555,10 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li
549555
return messages, ["\n".join(log)]
550556

551557
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
552-
with Stats.timer("kubernetes_executor.adopt_task_instances.duration"):
558+
with Stats.timer(
559+
"kubernetes_executor.adopt_task_instances.duration",
560+
tags=prune_dict({"team_name": self.team_name}),
561+
):
553562
# Always flush TIs without queued_by_job_id
554563
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
555564
scheduler_job_ids = {ti.queued_by_job_id for ti in tis}

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
)
4747
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args
4848
from airflow.providers.common.compat.sdk import AirflowException, Stats
49+
from airflow.utils.helpers import prune_dict
4950
from airflow.utils.log.logging_mixin import LoggingMixin
5051
from airflow.utils.state import TaskInstanceState
5152

@@ -474,6 +475,7 @@ def __init__(
474475
result_queue: Queue[KubernetesResults],
475476
kube_client: client.CoreV1Api,
476477
scheduler_job_id: str,
478+
team_name: str | None = None,
477479
):
478480
super().__init__()
479481
self.log.debug("Creating Kubernetes executor")
@@ -486,6 +488,7 @@ def __init__(
486488
self.watcher_queue = self._manager.Queue()
487489
self.scheduler_job_id = scheduler_job_id
488490
self.kube_watchers = self._make_kube_watchers()
491+
self.team_name = team_name
489492

490493
def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
491494
"""Run POD asynchronously."""
@@ -494,18 +497,29 @@ def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
494497

495498
self.log.debug("Pod Creation Request: \n%s", json_pod)
496499
try:
497-
with Stats.timer("kubernetes_executor.pod_creation"):
500+
with Stats.timer(
501+
"kubernetes_executor.pod_creation", tags=prune_dict({"team_name": self.team_name})
502+
):
498503
resp = self.kube_client.create_namespaced_pod(
499504
body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
500505
)
501-
Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": "200"})
506+
Stats.incr(
507+
"kubernetes_executor.pod_creation_status",
508+
tags=prune_dict({"status": "200", "team_name": self.team_name}),
509+
)
502510
self.log.debug("Pod Creation Response: %s", resp)
503511
except ApiException as e:
504-
Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": str(e.status)})
512+
Stats.incr(
513+
"kubernetes_executor.pod_creation_status",
514+
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
515+
)
505516
self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod)
506517
raise
507518
except Exception as e:
508-
Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": "error"})
519+
Stats.incr(
520+
"kubernetes_executor.pod_creation_status",
521+
tags=prune_dict({"status": "error", "team_name": self.team_name}),
522+
)
509523
self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod)
510524
raise e
511525
return resp
@@ -615,16 +629,24 @@ def delete_pod(self, pod_name: str, namespace: str) -> None:
615629
"""Delete Pod from a namespace; does not raise if it does not exist."""
616630
try:
617631
self.log.info("Deleting pod %s in namespace %s", pod_name, namespace)
618-
with Stats.timer("kubernetes_executor.pod_deletion"):
632+
with Stats.timer(
633+
"kubernetes_executor.pod_deletion", tags=prune_dict({"team_name": self.team_name})
634+
):
619635
self.kube_client.delete_namespaced_pod(
620636
pod_name,
621637
namespace,
622638
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
623639
**self.kube_config.kube_client_request_args,
624640
)
625-
Stats.incr("kubernetes_executor.pod_deletion_status", tags={"status": "200"})
641+
Stats.incr(
642+
"kubernetes_executor.pod_deletion_status",
643+
tags=prune_dict({"status": "200", "team_name": self.team_name}),
644+
)
626645
except ApiException as e:
627-
Stats.incr("kubernetes_executor.pod_deletion_status", tags={"status": str(e.status)})
646+
Stats.incr(
647+
"kubernetes_executor.pod_deletion_status",
648+
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
649+
)
628650
# If the pod is already deleted
629651
if str(e.status) != "404":
630652
raise
@@ -641,30 +663,46 @@ def patch_pod_revoked(self, *, pod_name: str, namespace: str):
641663
namespace,
642664
)
643665
try:
644-
with Stats.timer("kubernetes_executor.pod_patching"):
666+
with Stats.timer(
667+
"kubernetes_executor.pod_patching", tags=prune_dict({"team_name": self.team_name})
668+
):
645669
self.kube_client.patch_namespaced_pod(
646670
name=pod_name,
647671
namespace=namespace,
648672
body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}},
649673
)
650-
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": "200"})
674+
Stats.incr(
675+
"kubernetes_executor.pod_patching_status",
676+
tags=prune_dict({"status": "200", "team_name": self.team_name}),
677+
)
651678
except ApiException as e:
652-
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": str(e.status)})
679+
Stats.incr(
680+
"kubernetes_executor.pod_patching_status",
681+
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
682+
)
653683
self.log.warning("Failed to patch pod %s with pod revoked key.", pod_name, exc_info=True)
654684

655685
def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
656686
"""Add a "done" annotation to ensure we don't continually adopt pods."""
657687
self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace)
658688
try:
659-
with Stats.timer("kubernetes_executor.pod_patching"):
689+
with Stats.timer(
690+
"kubernetes_executor.pod_patching", tags=prune_dict({"team_name": self.team_name})
691+
):
660692
self.kube_client.patch_namespaced_pod(
661693
name=pod_name,
662694
namespace=namespace,
663695
body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
664696
)
665-
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": "200"})
697+
Stats.incr(
698+
"kubernetes_executor.pod_patching_status",
699+
tags=prune_dict({"status": "200", "team_name": self.team_name}),
700+
)
666701
except ApiException as e:
667-
Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": str(e.status)})
702+
Stats.incr(
703+
"kubernetes_executor.pod_patching_status",
704+
tags=prune_dict({"status": str(e.status), "team_name": self.team_name}),
705+
)
668706
self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e)
669707

670708
def sync(self) -> None:

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
from airflow.utils.state import State, TaskInstanceState
6767

6868
from tests_common.test_utils.config import conf_vars
69-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
69+
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS
7070

7171
try:
7272
# Check whether a module-level function from stats is importable.
@@ -249,49 +249,130 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
249249
@pytest.mark.skipif(
250250
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
251251
)
252+
@pytest.mark.parametrize(
253+
("team_name", "timer_tags", "status_tags"),
254+
[
255+
pytest.param(
256+
None,
257+
{},
258+
{"status": "200"},
259+
id="without_team",
260+
),
261+
pytest.param(
262+
"team_a",
263+
{"team_name": "team_a"},
264+
{"status": "200", "team_name": "team_a"},
265+
id="with_team",
266+
marks=pytest.mark.skipif(
267+
not AIRFLOW_V_3_1_PLUS,
268+
reason="team_name metrics require Airflow 3.1+",
269+
),
270+
),
271+
],
272+
)
252273
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats")
253274
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
254275
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
255276
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
256277
def test_delete_pod_emits_metrics_on_success(
257-
self, mock_watcher, mock_client, mock_kube_client, mock_stats
278+
self,
279+
mock_watcher,
280+
mock_client,
281+
mock_kube_client,
282+
mock_stats,
283+
team_name,
284+
timer_tags,
285+
status_tags,
258286
):
259287
pod_name = "my-pod-1"
260288
namespace = "my-namespace-1"
261289
mock_kube_client.return_value.delete_namespaced_pod = mock.MagicMock()
262290

263-
kube_executor = KubernetesExecutor()
291+
if AIRFLOW_V_3_1_PLUS:
292+
kube_executor = KubernetesExecutor(team_name=team_name)
293+
else:
294+
kube_executor = KubernetesExecutor()
295+
kube_executor.team_name = team_name
296+
264297
kube_executor.job_id = 1
265298
kube_executor.start()
266299
try:
267300
kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
268-
mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion")
269-
mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status", tags={"status": "200"})
301+
302+
mock_stats.timer.assert_any_call(
303+
"kubernetes_executor.pod_deletion",
304+
tags=timer_tags,
305+
)
306+
307+
mock_stats.incr.assert_any_call(
308+
"kubernetes_executor.pod_deletion_status",
309+
tags=status_tags,
310+
)
270311
finally:
271312
kube_executor.end()
272313

273314
@pytest.mark.skipif(
274315
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
275316
)
317+
@pytest.mark.parametrize(
318+
("team_name", "timer_tags", "status_tags"),
319+
[
320+
pytest.param(
321+
None,
322+
{},
323+
{"status": "429"},
324+
id="without_team",
325+
),
326+
pytest.param(
327+
"team_a",
328+
{"team_name": "team_a"},
329+
{"status": "429", "team_name": "team_a"},
330+
id="with_team",
331+
marks=pytest.mark.skipif(
332+
not AIRFLOW_V_3_1_PLUS,
333+
reason="team_name metrics require Airflow 3.1+",
334+
),
335+
),
336+
],
337+
)
276338
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats")
277339
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
278340
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
279341
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
280342
def test_delete_pod_emits_metrics_on_failure(
281-
self, mock_watcher, mock_client, mock_kube_client, mock_stats
343+
self,
344+
mock_watcher,
345+
mock_client,
346+
mock_kube_client,
347+
mock_stats,
348+
team_name,
349+
timer_tags,
350+
status_tags,
282351
):
283352
pod_name = "my-pod-1"
284353
namespace = "my-namespace-2"
285354
mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=429)
286355

287-
kube_executor = KubernetesExecutor()
356+
if AIRFLOW_V_3_1_PLUS:
357+
kube_executor = KubernetesExecutor(team_name=team_name)
358+
else:
359+
kube_executor = KubernetesExecutor()
360+
kube_executor.team_name = team_name
361+
288362
kube_executor.job_id = 1
289363
kube_executor.start()
290364
try:
291365
with pytest.raises(ApiException):
292366
kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
293-
mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion")
294-
mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status", tags={"status": "429"})
367+
368+
mock_stats.timer.assert_any_call(
369+
"kubernetes_executor.pod_deletion",
370+
tags=timer_tags,
371+
)
372+
mock_stats.incr.assert_any_call(
373+
"kubernetes_executor.pod_deletion_status",
374+
tags=status_tags,
375+
)
295376
finally:
296377
kube_executor.end()
297378

0 commit comments

Comments
 (0)