diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 4ddb189390b3c..cb0ad053111e9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -53,6 +53,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.common.compat.sdk import Stats, conf +from airflow.utils.helpers import prune_dict from airflow.utils.log.logging_mixin import remove_escape_codes from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState @@ -116,6 +117,10 @@ def __init__(self, *args, **kwargs): self.completed: set[KubernetesResults] = set() self.create_pods_after: datetime | None = None + # Maintain compatibility with older Airflow releases that do not define team_name. + if not hasattr(self, "team_name"): + self.team_name = None + def _list_pods(self, query_kwargs): query_kwargs["header_params"] = { "Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io" @@ -196,6 +201,7 @@ def start(self) -> None: result_queue=self.result_queue, kube_client=self.kube_client, scheduler_job_id=self.scheduler_job_id, + team_name=self.team_name, ) def execute_async( @@ -549,7 +555,10 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li return messages, ["\n".join(log)] def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: - with Stats.timer("kubernetes_executor.adopt_task_instances.duration"): + with Stats.timer( + "kubernetes_executor.adopt_task_instances.duration", + tags=prune_dict({"team_name": self.team_name}), + ): # Always flush TIs without queued_by_job_id tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id] scheduler_job_ids = {ti.queued_by_job_id for ti in tis} diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index af719ada9e4fc..2e460addb5a2f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -46,6 +46,7 @@ ) from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args from airflow.providers.common.compat.sdk import AirflowException, Stats +from airflow.utils.helpers import prune_dict from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -474,6 +475,7 @@ def __init__( result_queue: Queue[KubernetesResults], kube_client: client.CoreV1Api, scheduler_job_id: str, + team_name: str | None = None, ): super().__init__() self.log.debug("Creating Kubernetes executor") @@ -486,6 +488,7 @@ def __init__( self.watcher_queue = self._manager.Queue() self.scheduler_job_id = scheduler_job_id self.kube_watchers = self._make_kube_watchers() + self.team_name = team_name def run_pod_async(self, pod: k8s.V1Pod, **kwargs): """Run POD asynchronously.""" @@ -494,18 +497,29 @@ def run_pod_async(self, pod: k8s.V1Pod, **kwargs): self.log.debug("Pod Creation Request: \n%s", json_pod) try: - with Stats.timer("kubernetes_executor.pod_creation"): + with Stats.timer( + "kubernetes_executor.pod_creation", tags=prune_dict({"team_name": self.team_name}) + ): resp = self.kube_client.create_namespaced_pod( body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs ) - Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": "200"}) + Stats.incr( + "kubernetes_executor.pod_creation_status", + tags=prune_dict({"status": "200", "team_name": self.team_name}), + ) self.log.debug("Pod Creation Response: %s", resp) except ApiException as e: - Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": str(e.status)}) + Stats.incr( + "kubernetes_executor.pod_creation_status", + tags=prune_dict({"status": str(e.status), "team_name": self.team_name}), + ) self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod) raise except Exception as e: - Stats.incr("kubernetes_executor.pod_creation_status", tags={"status": "error"}) + Stats.incr( + "kubernetes_executor.pod_creation_status", + tags=prune_dict({"status": "error", "team_name": self.team_name}), + ) self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod) raise e return resp @@ -615,16 +629,24 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: """Delete Pod from a namespace; does not raise if it does not exist.""" try: self.log.info("Deleting pod %s in namespace %s", pod_name, namespace) - with Stats.timer("kubernetes_executor.pod_deletion"): + with Stats.timer( + "kubernetes_executor.pod_deletion", tags=prune_dict({"team_name": self.team_name}) + ): self.kube_client.delete_namespaced_pod( pod_name, namespace, body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs), **self.kube_config.kube_client_request_args, ) - Stats.incr("kubernetes_executor.pod_deletion_status", tags={"status": "200"}) + Stats.incr( + "kubernetes_executor.pod_deletion_status", + tags=prune_dict({"status": "200", "team_name": self.team_name}), + ) except ApiException as e: - Stats.incr("kubernetes_executor.pod_deletion_status", tags={"status": str(e.status)}) + Stats.incr( + "kubernetes_executor.pod_deletion_status", + tags=prune_dict({"status": str(e.status), "team_name": self.team_name}), + ) # If the pod is already deleted if str(e.status) != "404": raise @@ -641,30 +663,46 @@ def patch_pod_revoked(self, *, pod_name: str, namespace: str): namespace, ) try: - with Stats.timer("kubernetes_executor.pod_patching"): + with Stats.timer( + "kubernetes_executor.pod_patching", tags=prune_dict({"team_name": self.team_name}) + ): self.kube_client.patch_namespaced_pod( name=pod_name, namespace=namespace, body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}}, ) - Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": "200"}) + Stats.incr( + "kubernetes_executor.pod_patching_status", + tags=prune_dict({"status": "200", "team_name": self.team_name}), + ) except ApiException as e: - Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": str(e.status)}) + Stats.incr( + "kubernetes_executor.pod_patching_status", + tags=prune_dict({"status": str(e.status), "team_name": self.team_name}), + ) self.log.warning("Failed to patch pod %s with pod revoked key.", pod_name, exc_info=True) def patch_pod_executor_done(self, *, pod_name: str, namespace: str): """Add a "done" annotation to ensure we don't continually adopt pods.""" self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace) try: - with Stats.timer("kubernetes_executor.pod_patching"): + with Stats.timer( + "kubernetes_executor.pod_patching", tags=prune_dict({"team_name": self.team_name}) + ): self.kube_client.patch_namespaced_pod( name=pod_name, namespace=namespace, body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}}, ) - Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": "200"}) + Stats.incr( + "kubernetes_executor.pod_patching_status", + tags=prune_dict({"status": "200", "team_name": self.team_name}), + ) except ApiException as e: - Stats.incr("kubernetes_executor.pod_patching_status", tags={"status": str(e.status)}) + Stats.incr( + "kubernetes_executor.pod_patching_status", + tags=prune_dict({"status": str(e.status), "team_name": self.team_name}), + ) self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e) def sync(self) -> None: diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index bc1c2a97f55c7..378ab90353260 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -66,7 +66,7 @@ from airflow.utils.state import State, TaskInstanceState from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS try: # Check whether a module-level function from stats is importable. @@ -249,49 +249,375 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" ) + @pytest.mark.parametrize( + ("team_name", "timer_tags", "status_tags"), + [ + pytest.param( + None, + {}, + {"status": "200"}, + id="without_team", + ), + pytest.param( + "team_a", + {"team_name": "team_a"}, + {"status": "200", "team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, + reason="team_name metrics require Airflow 3.1+", + ), + ), + ], + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + def test_run_pod_async_emits_metrics_on_success( + self, + mock_watcher, + mock_client, + mock_kube_client, + mock_stats, + team_name, + timer_tags, + status_tags, + ): + pod = mock.MagicMock() + pod.metadata.namespace = "default" + + mock_api_client = mock.Mock() + mock_api_client.sanitize_for_serialization.return_value = {} + + mock_kube_client.return_value.api_client = mock_api_client + mock_kube_client.return_value.create_namespaced_pod = mock.MagicMock() + + kube_executor = KubernetesExecutor() + kube_executor.team_name = team_name + kube_executor.job_id = 1 + kube_executor.start() + + try: + kube_executor.kube_scheduler.run_pod_async(pod) + + mock_stats.timer.assert_any_call( + "kubernetes_executor.pod_creation", + tags=timer_tags, + ) + + mock_stats.incr.assert_any_call( + "kubernetes_executor.pod_creation_status", + tags=status_tags, + ) + finally: + kube_executor.end() + + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.parametrize( + ("team_name", "timer_tags", "status_tags"), + [ + pytest.param( + None, + {}, + {"status": "429"}, + id="without_team", + ), + pytest.param( + "team_a", + {"team_name": "team_a"}, + {"status": "429", "team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, + reason="team_name metrics require Airflow 3.1+", + ), + ), + ], + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + def test_run_pod_async_emits_metrics_on_failure( + self, + mock_watcher, + mock_client, + mock_kube_client, + mock_stats, + team_name, + timer_tags, + status_tags, + ): + pod = mock.MagicMock() + pod.metadata.namespace = "default" + + mock_api_client = mock.Mock() + mock_api_client.sanitize_for_serialization.return_value = {} + + mock_kube_client.return_value.api_client = mock_api_client + mock_kube_client.return_value.create_namespaced_pod.side_effect = ApiException(status=429) + + kube_executor = KubernetesExecutor() + kube_executor.team_name = team_name + kube_executor.job_id = 1 + kube_executor.start() + + try: + with pytest.raises(ApiException): + kube_executor.kube_scheduler.run_pod_async(pod) + + mock_stats.timer.assert_any_call( + "kubernetes_executor.pod_creation", + tags=timer_tags, + ) + + mock_stats.incr.assert_any_call( + "kubernetes_executor.pod_creation_status", + tags=status_tags, + ) + finally: + kube_executor.end() + + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.parametrize( + ("team_name", "timer_tags", "status_tags"), + [ + pytest.param( + None, + {}, + {"status": "200"}, + id="without_team", + ), + pytest.param( + "team_a", + {"team_name": "team_a"}, + {"status": "200", "team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, + reason="team_name metrics require Airflow 3.1+", + ), + ), + ], + ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") def test_delete_pod_emits_metrics_on_success( - self, mock_watcher, mock_client, mock_kube_client, mock_stats + self, + mock_watcher, + mock_client, + mock_kube_client, + mock_stats, + team_name, + timer_tags, + status_tags, ): pod_name = "my-pod-1" namespace = "my-namespace-1" mock_kube_client.return_value.delete_namespaced_pod = mock.MagicMock() kube_executor = KubernetesExecutor() + kube_executor.team_name = team_name + kube_executor.job_id = 1 kube_executor.start() try: kube_executor.kube_scheduler.delete_pod(pod_name, namespace) - mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion") - mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status", tags={"status": "200"}) + + mock_stats.timer.assert_any_call( + "kubernetes_executor.pod_deletion", + tags=timer_tags, + ) + + mock_stats.incr.assert_any_call( + "kubernetes_executor.pod_deletion_status", + tags=status_tags, + ) finally: kube_executor.end() @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" ) + @pytest.mark.parametrize( + ("team_name", "timer_tags", "status_tags"), + [ + pytest.param( + None, + {}, + {"status": "429"}, + id="without_team", + ), + pytest.param( + "team_a", + {"team_name": "team_a"}, + {"status": "429", "team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, + reason="team_name metrics require Airflow 3.1+", + ), + ), + ], + ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") def test_delete_pod_emits_metrics_on_failure( - self, mock_watcher, mock_client, mock_kube_client, mock_stats + self, + mock_watcher, + mock_client, + mock_kube_client, + mock_stats, + team_name, + timer_tags, + status_tags, ): pod_name = "my-pod-1" namespace = "my-namespace-2" mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=429) kube_executor = KubernetesExecutor() + kube_executor.team_name = team_name + kube_executor.job_id = 1 kube_executor.start() try: with pytest.raises(ApiException): kube_executor.kube_scheduler.delete_pod(pod_name, namespace) - mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion") - mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status", tags={"status": "429"}) + + mock_stats.timer.assert_any_call( + "kubernetes_executor.pod_deletion", + tags=timer_tags, + ) + mock_stats.incr.assert_any_call( + "kubernetes_executor.pod_deletion_status", + tags=status_tags, + ) + finally: + kube_executor.end() + + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.parametrize( + ("team_name", "timer_tags", "status_tags"), + [ + pytest.param(None, {}, {"status": "200"}, id="without_team"), + pytest.param( + "team_a", + {"team_name": "team_a"}, + {"status": "200", "team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, + reason="team_name metrics require Airflow 3.1+", + ), + ), + ], + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + def test_patch_pod_emits_metrics_on_success( + self, + mock_watcher, + mock_client, + mock_kube_client, + mock_stats, + team_name, + timer_tags, + status_tags, + ): + mock_kube_client.return_value.patch_namespaced_pod = mock.MagicMock() + + kube_executor = KubernetesExecutor() + kube_executor.team_name = team_name + kube_executor.job_id = 1 + kube_executor.start() + + try: + kube_executor.kube_scheduler.patch_pod_executor_done( + pod_name="pod", + namespace="default", + ) + + mock_stats.timer.assert_any_call( + "kubernetes_executor.pod_patching", + tags=timer_tags, + ) + + mock_stats.incr.assert_any_call( + "kubernetes_executor.pod_patching_status", + tags=status_tags, + ) + finally: + kube_executor.end() + + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.parametrize( + ("team_name", "timer_tags", "status_tags"), + [ + pytest.param(None, {}, {"status": "429"}, id="without_team"), + pytest.param( + "team_a", + {"team_name": "team_a"}, + {"status": "429", "team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, + reason="team_name metrics require Airflow 3.1+", + ), + ), + ], + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + def test_patch_pod_emits_metrics_on_failure( + self, + mock_watcher, + mock_client, + mock_kube_client, + mock_stats, + team_name, + timer_tags, + status_tags, + ): + mock_kube_client.return_value.patch_namespaced_pod.side_effect = ApiException(status=429) + + kube_executor = KubernetesExecutor() + kube_executor.team_name = team_name + kube_executor.job_id = 1 + kube_executor.start() + + try: + kube_executor.kube_scheduler.patch_pod_executor_done( + pod_name="pod", + namespace="default", + ) + + mock_stats.timer.assert_any_call( + "kubernetes_executor.pod_patching", + tags=timer_tags, + ) + + mock_stats.incr.assert_any_call( + "kubernetes_executor.pod_patching_status", + tags=status_tags, + ) finally: kube_executor.end() @@ -1130,6 +1456,22 @@ def test_change_state_failed_pod_deletion( finally: executor.end() + @pytest.mark.parametrize( + ("team_name", "timer_tags"), + [ + pytest.param(None, {}, id="without_team"), + pytest.param( + "team_a", + {"team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, + reason="team_name metrics require Airflow 3.1+", + ), + ), + ], + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.Stats.timer") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task" @@ -1138,9 +1480,16 @@ def test_change_state_failed_pod_deletion( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods" ) def test_try_adopt_task_instances( - self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client + self, + mock_adopt_completed_pods, + mock_adopt_launched_task, + mock_kube_dynamic_client, + mock_stats_timer, + team_name, + timer_tags, ): executor = self.kubernetes_executor + executor.team_name = team_name executor.scheduler_job_id = "10" ti_key = annotations_to_key( { @@ -1196,6 +1545,11 @@ def test_try_adopt_task_instances( mock_adopt_completed_pods.assert_called_once() assert reset_tis == [] # This time our return is empty - no TIs to reset + mock_stats_timer.assert_any_call( + "kubernetes_executor.adopt_task_instances.duration", + tags=timer_tags, + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"