From f064825e392b66e5cfed5e5fb1b3eabd8b23a609 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 15 Jan 2025 01:01:48 +0100 Subject: [PATCH] fix(python-client): truncate driver pod name and fix app name/id truncation (#119) --- spark_on_k8s/client.py | 7 +- spark_on_k8s/utils/app_manager.py | 17 ++++- spark_on_k8s/utils/warnings.py | 19 ++++++ tests/test_spark_client.py | 105 +++++++++++++++++++++++++++++- 4 files changed, 141 insertions(+), 7 deletions(-) create mode 100644 spark_on_k8s/utils/warnings.py diff --git a/spark_on_k8s/client.py b/spark_on_k8s/client.py index 31a4204..c889509 100644 --- a/spark_on_k8s/client.py +++ b/spark_on_k8s/client.py @@ -313,7 +313,7 @@ def submit_app( "spark.kubernetes.container.image": image, "spark.driver.host": app_id, "spark.driver.port": "7077", - "spark.kubernetes.driver.pod.name": f"{app_id}-driver", + "spark.kubernetes.driver.pod.name": SparkAppManager._get_pod_name(app_id=app_id), "spark.kubernetes.executor.podNamePrefix": app_id, "spark.kubernetes.container.image.pullPolicy": image_pull_policy, "spark.driver.memory": f"{driver_resources.memory}m", @@ -522,8 +522,9 @@ def _parse_app_name_and_id( # All to lowercase app_name = app_name.lower() app_id_suffix_str = app_id_suffix() - if len(app_name) > (63 - len(app_id_suffix_str) + 1): - app_name = app_name[: (63 - len(app_id_suffix_str)) + 1] + # Maximum length for pod labels and service names is 63 characters + if len(app_name) > (63 - len(app_id_suffix_str)): + app_name = app_name[: (63 - len(app_id_suffix_str))] # Replace all non-alphanumeric characters with dashes app_name = re.sub(r"[^0-9a-zA-Z]+", "-", app_name) # Remove leading non-alphabetic characters diff --git a/spark_on_k8s/utils/app_manager.py b/spark_on_k8s/utils/app_manager.py index 4516b3c..7d33290 100644 --- a/spark_on_k8s/utils/app_manager.py +++ b/spark_on_k8s/utils/app_manager.py @@ -11,6 +11,7 @@ from spark_on_k8s.k8s.sync_client import KubernetesClientManager from spark_on_k8s.utils.logging_mixin import LoggingMixin from spark_on_k8s.utils.spark_app_status import SparkAppStatus, get_app_status +from spark_on_k8s.utils.warnings import LongAppNameWarning, WarningCache if TYPE_CHECKING: from spark_on_k8s.utils.types import ConfigMap, ConfigMapSource @@ -42,6 +43,18 @@ def __init__( super().__init__(logger_name=logger_name or "SparkAppManager") self.k8s_client_manager = k8s_client_manager or KubernetesClientManager() + @staticmethod + def _get_pod_name(*, app_id: str) -> str: + if len(app_id) > 56: + WarningCache.warn( + message="The used app name or app id suffix is too long," + " pod name will be truncated and may not be unique", + category=LongAppNameWarning, + stacklevel=2, + warning_id=app_id, + ) + return f"{app_id[:56]}-driver" + def app_status( self, *, @@ -341,7 +354,7 @@ def create_spark_pod_spec( Pod template spec for the Spark application """ pod_metadata = k8s.V1ObjectMeta( - name=f"{app_id}-driver", + name=SparkAppManager._get_pod_name(app_id=app_id), namespace=namespace, labels=SparkAppManager.spark_app_labels( app_name=app_name, @@ -497,7 +510,7 @@ def create_headless_service_object( k8s.V1OwnerReference( api_version="v1", kind="Pod", - name=f"{app_id}-driver", + name=SparkAppManager._get_pod_name(app_id=app_id), uid=pod_owner_uid, ) ] diff --git a/spark_on_k8s/utils/warnings.py b/spark_on_k8s/utils/warnings.py new file mode 100644 index 0000000..7cc6807 --- /dev/null +++ b/spark_on_k8s/utils/warnings.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import warnings + + +class WarningCache: + _warning_ids = set() + + @classmethod + def warn( + cls, *, warning_id: str, message: str, category: type[Warning] = UserWarning, stacklevel: int = 1 + ): + if warning_id not in cls._warning_ids: + cls._warning_ids.add(warning_id) + warnings.warn(message=message, category=category, stacklevel=stacklevel) + + +class LongAppNameWarning(UserWarning): + pass diff --git a/tests/test_spark_client.py b/tests/test_spark_client.py index b89964b..f84ef2c 100644 --- a/tests/test_spark_client.py +++ b/tests/test_spark_client.py @@ -14,6 +14,7 @@ from spark_on_k8s import client as client_module from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S, default_app_id_suffix from spark_on_k8s.utils import configuration as configuration_module +from spark_on_k8s.utils.warnings import LongAppNameWarning FAKE_TIME = datetime.datetime(2024, 1, 14, 12, 12, 31) @@ -50,8 +51,8 @@ class TestSparkOnK8s: pytest.param( "some-very-long-name-which-is-not-allowed-by-k8s-which-is-why-we-need-to-truncate-it", default_app_id_suffix, - "some-very-long-name-which-is-not-allowed-by-k8s-w", - "some-very-long-name-which-is-not-allowed-by-k8s-w-20240114121231", + "some-very-long-name-which-is-not-allowed-by-k8s", + "some-very-long-name-which-is-not-allowed-by-k8s-20240114121231", id="app_name_with_suffix_long", ), pytest.param( @@ -161,6 +162,8 @@ def test_parse_app_name_and_id( """ Test the method _parse_app_name_and_id """ + assert len(expected_app_name) <= 63, "The expected app name is too long" + assert len(expected_app_id) <= 63, "The expected app id is too long" spark_client = SparkOnK8S() actual_app_name, actual_app_id = spark_client._parse_app_name_and_id( app_name=app_name, app_id_suffix=app_id_suffix @@ -256,6 +259,104 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced "100000", ] + @mock.patch("spark_on_k8s.k8s.sync_client.KubernetesClientManager.create_client") + @mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod") + @mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_service") + @freeze_time(FAKE_TIME) + def test_submit_app_with_long_app_name( + self, mock_create_namespaced_service, mock_create_namespaced_pod, mock_create_client + ): + """Test the method submit_app""" + + expected_app_name = "just-a-very-long-application-name-that-needs-to" + expected_app_id = f"{expected_app_name}-20240114121231" + # A part of the app name is truncated to fit the 63 characters limit + # and a `LongAppNameWarning` warning should be issued + expected_pod_name = "just-a-very-long-application-name-that-needs-to-20240114-driver" + + spark_client = SparkOnK8S() + with pytest.warns( + LongAppNameWarning, + match="The used app name or app id suffix is too long," + " pod name will be truncated and may not be unique", + ): + spark_client.submit_app( + image="pyspark-job", + app_path="local:///opt/spark/work-dir/job.py", + namespace="spark", + service_account="spark", + app_name="just-a-very-long-application-name-that-needs-to-be-truncated", + app_arguments=["100000"], + app_waiter="no_wait", + image_pull_policy="Never", + ui_reverse_proxy=True, + driver_resources=PodResources(cpu=1, memory=2048, memory_overhead=1024), + executor_instances=ExecutorInstances(min=2, max=5, initial=5), + ) + + created_pod = mock_create_namespaced_pod.call_args[1]["body"] + assert created_pod.metadata.name == expected_pod_name + assert created_pod.metadata.labels["spark-app-name"] == expected_app_name + assert created_pod.metadata.labels["spark-app-id"] == expected_app_id + assert created_pod.metadata.labels["spark-role"] == "driver" + assert created_pod.spec.containers[0].image == "pyspark-job" + assert created_pod.spec.service_account_name == "spark" + assert created_pod.spec.containers[0].args == [ + "driver", + "--master", + "k8s://https://kubernetes.default.svc.cluster.local:443", + "--conf", + f"spark.app.name={expected_app_name}", + "--conf", + f"spark.app.id={expected_app_id}", + "--conf", + "spark.kubernetes.namespace=spark", + "--conf", + "spark.kubernetes.authenticate.driver.serviceAccountName=spark", + "--conf", + "spark.kubernetes.container.image=pyspark-job", + "--conf", + f"spark.driver.host={expected_app_id}", + "--conf", + "spark.driver.port=7077", + "--conf", + f"spark.kubernetes.driver.pod.name={expected_pod_name}", + "--conf", + f"spark.kubernetes.executor.podNamePrefix={expected_app_id}", + "--conf", + "spark.kubernetes.container.image.pullPolicy=Never", + "--conf", + "spark.driver.memory=2048m", + "--conf", + "spark.executor.cores=1", + "--conf", + "spark.executor.memory=1024m", + "--conf", + "spark.executor.memoryOverhead=512m", + "--conf", + f"spark.ui.proxyBase=/webserver/ui/spark/{expected_app_id}", + "--conf", + "spark.ui.proxyRedirectUri=/", + "--conf", + "spark.dynamicAllocation.enabled=true", + "--conf", + "spark.dynamicAllocation.shuffleTracking.enabled=true", + "--conf", + "spark.dynamicAllocation.minExecutors=2", + "--conf", + "spark.dynamicAllocation.maxExecutors=5", + "--conf", + "spark.dynamicAllocation.initialExecutors=5", + "--conf", + "spark.dynamicAllocation.executorAllocationRatio=1.0", + "--conf", + "spark.dynamicAllocation.schedulerBacklogTimeout=1s", + "--conf", + "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=1s", + "local:///opt/spark/work-dir/job.py", + "100000", + ] + @mock.patch("spark_on_k8s.k8s.sync_client.KubernetesClientManager.create_client") @mock.patch("spark_on_k8s.utils.app_manager.SparkAppManager.stream_logs") @mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod")