Skip to content

Commit

Permalink
fix(python-client): truncate driver pod name and fix app name/id trun…
Browse files Browse the repository at this point in the history
…cation (#119)
  • Loading branch information
hussein-awala authored Jan 15, 2025
1 parent 59df6f2 commit f064825
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 7 deletions.
7 changes: 4 additions & 3 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def submit_app(
"spark.kubernetes.container.image": image,
"spark.driver.host": app_id,
"spark.driver.port": "7077",
"spark.kubernetes.driver.pod.name": f"{app_id}-driver",
"spark.kubernetes.driver.pod.name": SparkAppManager._get_pod_name(app_id=app_id),
"spark.kubernetes.executor.podNamePrefix": app_id,
"spark.kubernetes.container.image.pullPolicy": image_pull_policy,
"spark.driver.memory": f"{driver_resources.memory}m",
Expand Down Expand Up @@ -522,8 +522,9 @@ def _parse_app_name_and_id(
# All to lowercase
app_name = app_name.lower()
app_id_suffix_str = app_id_suffix()
if len(app_name) > (63 - len(app_id_suffix_str) + 1):
app_name = app_name[: (63 - len(app_id_suffix_str)) + 1]
# Maximum length for pod labels and service names is 63 characters
if len(app_name) > (63 - len(app_id_suffix_str)):
app_name = app_name[: (63 - len(app_id_suffix_str))]
# Replace all non-alphanumeric characters with dashes
app_name = re.sub(r"[^0-9a-zA-Z]+", "-", app_name)
# Remove leading non-alphabetic characters
Expand Down
17 changes: 15 additions & 2 deletions spark_on_k8s/utils/app_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from spark_on_k8s.k8s.sync_client import KubernetesClientManager
from spark_on_k8s.utils.logging_mixin import LoggingMixin
from spark_on_k8s.utils.spark_app_status import SparkAppStatus, get_app_status
from spark_on_k8s.utils.warnings import LongAppNameWarning, WarningCache

if TYPE_CHECKING:
from spark_on_k8s.utils.types import ConfigMap, ConfigMapSource
Expand Down Expand Up @@ -42,6 +43,18 @@ def __init__(
super().__init__(logger_name=logger_name or "SparkAppManager")
self.k8s_client_manager = k8s_client_manager or KubernetesClientManager()

@staticmethod
def _get_pod_name(*, app_id: str) -> str:
if len(app_id) > 56:
WarningCache.warn(
message="The used app name or app id suffix is too long,"
" pod name will be truncated and may not be unique",
category=LongAppNameWarning,
stacklevel=2,
warning_id=app_id,
)
return f"{app_id[:56]}-driver"

def app_status(
self,
*,
Expand Down Expand Up @@ -341,7 +354,7 @@ def create_spark_pod_spec(
Pod template spec for the Spark application
"""
pod_metadata = k8s.V1ObjectMeta(
name=f"{app_id}-driver",
name=SparkAppManager._get_pod_name(app_id=app_id),
namespace=namespace,
labels=SparkAppManager.spark_app_labels(
app_name=app_name,
Expand Down Expand Up @@ -497,7 +510,7 @@ def create_headless_service_object(
k8s.V1OwnerReference(
api_version="v1",
kind="Pod",
name=f"{app_id}-driver",
name=SparkAppManager._get_pod_name(app_id=app_id),
uid=pod_owner_uid,
)
]
Expand Down
19 changes: 19 additions & 0 deletions spark_on_k8s/utils/warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

import warnings


class WarningCache:
_warning_ids = set()

@classmethod
def warn(
cls, *, warning_id: str, message: str, category: type[Warning] = UserWarning, stacklevel: int = 1
):
if warning_id not in cls._warning_ids:
cls._warning_ids.add(warning_id)
warnings.warn(message=message, category=category, stacklevel=stacklevel)


class LongAppNameWarning(UserWarning):
pass
105 changes: 103 additions & 2 deletions tests/test_spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from spark_on_k8s import client as client_module
from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S, default_app_id_suffix
from spark_on_k8s.utils import configuration as configuration_module
from spark_on_k8s.utils.warnings import LongAppNameWarning

FAKE_TIME = datetime.datetime(2024, 1, 14, 12, 12, 31)

Expand Down Expand Up @@ -50,8 +51,8 @@ class TestSparkOnK8s:
pytest.param(
"some-very-long-name-which-is-not-allowed-by-k8s-which-is-why-we-need-to-truncate-it",
default_app_id_suffix,
"some-very-long-name-which-is-not-allowed-by-k8s-w",
"some-very-long-name-which-is-not-allowed-by-k8s-w-20240114121231",
"some-very-long-name-which-is-not-allowed-by-k8s",
"some-very-long-name-which-is-not-allowed-by-k8s-20240114121231",
id="app_name_with_suffix_long",
),
pytest.param(
Expand Down Expand Up @@ -161,6 +162,8 @@ def test_parse_app_name_and_id(
"""
Test the method _parse_app_name_and_id
"""
assert len(expected_app_name) <= 63, "The expected app name is too long"
assert len(expected_app_id) <= 63, "The expected app id is too long"
spark_client = SparkOnK8S()
actual_app_name, actual_app_id = spark_client._parse_app_name_and_id(
app_name=app_name, app_id_suffix=app_id_suffix
Expand Down Expand Up @@ -256,6 +259,104 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced
"100000",
]

@mock.patch("spark_on_k8s.k8s.sync_client.KubernetesClientManager.create_client")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_service")
@freeze_time(FAKE_TIME)
def test_submit_app_with_long_app_name(
self, mock_create_namespaced_service, mock_create_namespaced_pod, mock_create_client
):
"""Test the method submit_app"""

expected_app_name = "just-a-very-long-application-name-that-needs-to"
expected_app_id = f"{expected_app_name}-20240114121231"
# A part of the app name is truncated to fit the 63 characters limit
# and a `LongAppNameWarning` warning should be issued
expected_pod_name = "just-a-very-long-application-name-that-needs-to-20240114-driver"

spark_client = SparkOnK8S()
with pytest.warns(
LongAppNameWarning,
match="The used app name or app id suffix is too long,"
" pod name will be truncated and may not be unique",
):
spark_client.submit_app(
image="pyspark-job",
app_path="local:///opt/spark/work-dir/job.py",
namespace="spark",
service_account="spark",
app_name="just-a-very-long-application-name-that-needs-to-be-truncated",
app_arguments=["100000"],
app_waiter="no_wait",
image_pull_policy="Never",
ui_reverse_proxy=True,
driver_resources=PodResources(cpu=1, memory=2048, memory_overhead=1024),
executor_instances=ExecutorInstances(min=2, max=5, initial=5),
)

created_pod = mock_create_namespaced_pod.call_args[1]["body"]
assert created_pod.metadata.name == expected_pod_name
assert created_pod.metadata.labels["spark-app-name"] == expected_app_name
assert created_pod.metadata.labels["spark-app-id"] == expected_app_id
assert created_pod.metadata.labels["spark-role"] == "driver"
assert created_pod.spec.containers[0].image == "pyspark-job"
assert created_pod.spec.service_account_name == "spark"
assert created_pod.spec.containers[0].args == [
"driver",
"--master",
"k8s://https://kubernetes.default.svc.cluster.local:443",
"--conf",
f"spark.app.name={expected_app_name}",
"--conf",
f"spark.app.id={expected_app_id}",
"--conf",
"spark.kubernetes.namespace=spark",
"--conf",
"spark.kubernetes.authenticate.driver.serviceAccountName=spark",
"--conf",
"spark.kubernetes.container.image=pyspark-job",
"--conf",
f"spark.driver.host={expected_app_id}",
"--conf",
"spark.driver.port=7077",
"--conf",
f"spark.kubernetes.driver.pod.name={expected_pod_name}",
"--conf",
f"spark.kubernetes.executor.podNamePrefix={expected_app_id}",
"--conf",
"spark.kubernetes.container.image.pullPolicy=Never",
"--conf",
"spark.driver.memory=2048m",
"--conf",
"spark.executor.cores=1",
"--conf",
"spark.executor.memory=1024m",
"--conf",
"spark.executor.memoryOverhead=512m",
"--conf",
f"spark.ui.proxyBase=/webserver/ui/spark/{expected_app_id}",
"--conf",
"spark.ui.proxyRedirectUri=/",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"spark.dynamicAllocation.shuffleTracking.enabled=true",
"--conf",
"spark.dynamicAllocation.minExecutors=2",
"--conf",
"spark.dynamicAllocation.maxExecutors=5",
"--conf",
"spark.dynamicAllocation.initialExecutors=5",
"--conf",
"spark.dynamicAllocation.executorAllocationRatio=1.0",
"--conf",
"spark.dynamicAllocation.schedulerBacklogTimeout=1s",
"--conf",
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=1s",
"local:///opt/spark/work-dir/job.py",
"100000",
]

@mock.patch("spark_on_k8s.k8s.sync_client.KubernetesClientManager.create_client")
@mock.patch("spark_on_k8s.utils.app_manager.SparkAppManager.stream_logs")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod")
Expand Down

0 comments on commit f064825

Please sign in to comment.