From 1f66e25c1448b636a5949e951ff23c08d1ca1e8c Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Wed, 27 Nov 2024 23:59:23 +0100 Subject: [PATCH] feat(python-client): add more config for dynamic allocation --- README.md | 87 +++++++++++++++-------------- spark_on_k8s/client.py | 23 ++++++++ spark_on_k8s/utils/configuration.py | 5 ++ tests/test_spark_client.py | 15 +++++ 4 files changed, 88 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index b7d7381..b7d6a5a 100644 --- a/README.md +++ b/README.md @@ -157,48 +157,51 @@ The Python client and the CLI can be configured with environment variables to av time if you have a common configuration for all your apps. The environment variables are the same for both the client and the CLI. Here is a list of the available environment variables: -| Environment Variable | Description | Default | -|-------------------------------------------|-------------------------------------------------------|----------------| -| SPARK_ON_K8S_DOCKER_IMAGE | The docker image to use for the spark pods | | -| SPARK_ON_K8S_APP_PATH | The path to the app file | | -| SPARK_ON_K8S_NAMESPACE | The namespace to use | default | -| SPARK_ON_K8S_SERVICE_ACCOUNT | The service account to use | spark | -| SPARK_ON_K8S_SPARK_CONF | The spark configuration to use | {} | -| SPARK_ON_K8S_CLASS_NAME | The class name to use | | -| SPARK_ON_K8S_PACKAGES | The maven packages list to add to the classpath | -| SPARK_ON_K8S_APP_ARGUMENTS | The arguments to pass to the app | [] | -| SPARK_ON_K8S_APP_WAITER | The waiter to use to wait for the app to finish | no_wait | -| SPARK_ON_K8S_IMAGE_PULL_POLICY | The image pull policy to use | IfNotPresent | -| SPARK_ON_K8S_UI_REVERSE_PROXY | Whether to use a reverse proxy to access the spark UI | false | -| SPARK_ON_K8S_DRIVER_CPU | The driver CPU | 1 | -| SPARK_ON_K8S_DRIVER_MEMORY | The driver memory | 1024 | -| SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD | The driver memory overhead | 512 | -| SPARK_ON_K8S_EXECUTOR_CPU | The executor CPU | 1 | -| SPARK_ON_K8S_EXECUTOR_MEMORY | The executor memory | 1024 | -| SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD | The executor memory overhead | 512 | -| SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES | The minimum number of executor instances | | -| SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES | The maximum number of executor instances | | -| SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES | The initial number of executor instances | | -| SPARK_ON_K8S_CONFIG_FILE | The path to the config file | | -| SPARK_ON_K8S_CONTEXT | The context to use | | -| SPARK_ON_K8S_CLIENT_CONFIG | The sync Kubernetes client configuration to use | | -| SPARK_ON_K8S_ASYNC_CLIENT_CONFIG | The async Kubernetes client configuration to use | | -| SPARK_ON_K8S_IN_CLUSTER | Whether to use the in cluster Kubernetes config | false | -| SPARK_ON_K8S_API_DEFAULT_NAMESPACE | The default namespace to use for the API | default | -| SPARK_ON_K8S_API_HOST | The host to use for the API | 127.0.0.1 | -| SPARK_ON_K8S_API_PORT | The port to use for the API | 8000 | -| SPARK_ON_K8S_API_WORKERS | The number of workers to use for the API | 4 | -| SPARK_ON_K8S_API_LOG_LEVEL | The log level to use for the API | info | -| SPARK_ON_K8S_API_LIMIT_CONCURRENCY | The limit concurrency to use for the API | 1000 | -| SPARK_ON_K8S_API_SPARK_HISTORY_HOST | The host to use for the spark history server | | -| SPARK_ON_K8S_SPARK_DRIVER_NODE_SELECTOR | The node selector to use for the driver pod | {} | -| SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR | The node selector to use for the executor pods | {} | -| SPARK_ON_K8S_SPARK_DRIVER_LABELS | The labels to use for the driver pod | {} | -| SPARK_ON_K8S_SPARK_EXECUTOR_LABELS | The labels to use for the executor pods | {} | -| SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS | The annotations to use for the driver pod | {} | -| SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS | The annotations to use for the executor pods | {} | -| SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH | The path to the executor pod template | | -| SPARK_ON_K8S_STARTUP_TIMEOUT | The timeout to wait for the app to start in seconds | 0 (no timeout) | +| Environment Variable | Description | Default | +|---------------------------------------------------|-----------------------------------------------------------------|----------------------------------------| +| SPARK_ON_K8S_DOCKER_IMAGE | The docker image to use for the spark pods | | +| SPARK_ON_K8S_APP_PATH | The path to the app file | | +| SPARK_ON_K8S_NAMESPACE | The namespace to use | default | +| SPARK_ON_K8S_SERVICE_ACCOUNT | The service account to use | spark | +| SPARK_ON_K8S_SPARK_CONF | The spark configuration to use | {} | +| SPARK_ON_K8S_CLASS_NAME | The class name to use | | +| SPARK_ON_K8S_PACKAGES | The maven packages list to add to the classpath | +| SPARK_ON_K8S_APP_ARGUMENTS | The arguments to pass to the app | [] | +| SPARK_ON_K8S_APP_WAITER | The waiter to use to wait for the app to finish | no_wait | +| SPARK_ON_K8S_IMAGE_PULL_POLICY | The image pull policy to use | IfNotPresent | +| SPARK_ON_K8S_UI_REVERSE_PROXY | Whether to use a reverse proxy to access the spark UI | false | +| SPARK_ON_K8S_DRIVER_CPU | The driver CPU | 1 | +| SPARK_ON_K8S_DRIVER_MEMORY | The driver memory | 1024 | +| SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD | The driver memory overhead | 512 | +| SPARK_ON_K8S_EXECUTOR_CPU | The executor CPU | 1 | +| SPARK_ON_K8S_EXECUTOR_MEMORY | The executor memory | 1024 | +| SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD | The executor memory overhead | 512 | +| SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES | The minimum number of executor instances | | +| SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES | The maximum number of executor instances | | +| SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES | The initial number of executor instances | | +| SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO | The executor allocation ratio | 1 | +| SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT | The scheduler backlog timeout for dynamic allocation | 1s | +| SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT | The sustained scheduler backlog timeout for dynamic allocation | SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT | +| SPARK_ON_K8S_CONFIG_FILE | The path to the config file | | +| SPARK_ON_K8S_CONTEXT | The context to use | | +| SPARK_ON_K8S_CLIENT_CONFIG | The sync Kubernetes client configuration to use | | +| SPARK_ON_K8S_ASYNC_CLIENT_CONFIG | The async Kubernetes client configuration to use | | +| SPARK_ON_K8S_IN_CLUSTER | Whether to use the in cluster Kubernetes config | false | +| SPARK_ON_K8S_API_DEFAULT_NAMESPACE | The default namespace to use for the API | default | +| SPARK_ON_K8S_API_HOST | The host to use for the API | 127.0.0.1 | +| SPARK_ON_K8S_API_PORT | The port to use for the API | 8000 | +| SPARK_ON_K8S_API_WORKERS | The number of workers to use for the API | 4 | +| SPARK_ON_K8S_API_LOG_LEVEL | The log level to use for the API | info | +| SPARK_ON_K8S_API_LIMIT_CONCURRENCY | The limit concurrency to use for the API | 1000 | +| SPARK_ON_K8S_API_SPARK_HISTORY_HOST | The host to use for the spark history server | | +| SPARK_ON_K8S_SPARK_DRIVER_NODE_SELECTOR | The node selector to use for the driver pod | {} | +| SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR | The node selector to use for the executor pods | {} | +| SPARK_ON_K8S_SPARK_DRIVER_LABELS | The labels to use for the driver pod | {} | +| SPARK_ON_K8S_SPARK_EXECUTOR_LABELS | The labels to use for the executor pods | {} | +| SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS | The annotations to use for the driver pod | {} | +| SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS | The annotations to use for the executor pods | {} | +| SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH | The path to the executor pod template | | +| SPARK_ON_K8S_STARTUP_TIMEOUT | The timeout to wait for the app to start in seconds | 0 (no timeout) | ## Examples diff --git a/spark_on_k8s/client.py b/spark_on_k8s/client.py index b6d776c..31a4204 100644 --- a/spark_on_k8s/client.py +++ b/spark_on_k8s/client.py @@ -63,11 +63,21 @@ class ExecutorInstances: max: Maximum number of executors. If provided, dynamic allocation is enabled initial: Initial number of executors. If max and min are not provided, defaults to 2, dynamic allocation will be disabled and the number of executors will be fixed. + executor_allocation_ratio: Executor allocation ratio to use for dynamic allocation. + Defaults to 1.0 + scheduler_backlog_timeout: Scheduler backlog timeout to use for dynamic allocation. + Defaults to "1s" + sustained_scheduler_backlog_timeout: Sustained scheduler backlog timeout to use for + dynamic allocation. Defaults to sustained_scheduler_backlog_timeout. + """ min: int | None = None max: int | None = None initial: int | None = None + executor_allocation_ratio: float = 1.0 + scheduler_backlog_timeout: str = "1s" + sustained_scheduler_backlog_timeout: str | None = None class SparkOnK8S(LoggingMixin): @@ -239,6 +249,9 @@ def submit_app( min=Configuration.SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES, max=Configuration.SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES, initial=Configuration.SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES, + executor_allocation_ratio=Configuration.SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO, + scheduler_backlog_timeout=Configuration.SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT, + sustained_scheduler_backlog_timeout=Configuration.SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT, ) if ( executor_instances.min is None @@ -321,6 +334,16 @@ def submit_app( if executor_instances.max is not None: basic_conf["spark.dynamicAllocation.maxExecutors"] = f"{executor_instances.max}" basic_conf["spark.dynamicAllocation.initialExecutors"] = f"{executor_instances.initial or 0}" + basic_conf["spark.dynamicAllocation.executorAllocationRatio"] = str( + executor_instances.executor_allocation_ratio + ) + basic_conf["spark.dynamicAllocation.schedulerBacklogTimeout"] = ( + executor_instances.scheduler_backlog_timeout + ) + basic_conf["spark.dynamicAllocation.sustainedSchedulerBacklogTimeout"] = ( + executor_instances.sustained_scheduler_backlog_timeout + or executor_instances.scheduler_backlog_timeout + ) else: basic_conf["spark.executor.instances"] = ( f"{executor_instances.initial if executor_instances.initial is not None else 2}" diff --git a/spark_on_k8s/utils/configuration.py b/spark_on_k8s/utils/configuration.py index 8f4fd56..4d2143b 100644 --- a/spark_on_k8s/utils/configuration.py +++ b/spark_on_k8s/utils/configuration.py @@ -42,6 +42,11 @@ class Configuration: if getenv("SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES") else None ) + SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO = float(getenv("SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO", 1.0)) + SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT = getenv("SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT", "1s") + SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT = getenv( + "SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT", SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT + ) SPARK_ON_K8S_SECRET_ENV_VAR = json.loads(getenv("SPARK_ON_K8S_SECRET_ENV_VAR", "{}")) SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET = ( getenv("SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET").split(",") diff --git a/tests/test_spark_client.py b/tests/test_spark_client.py index bdc5c49..b89964b 100644 --- a/tests/test_spark_client.py +++ b/tests/test_spark_client.py @@ -246,6 +246,12 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced "spark.dynamicAllocation.maxExecutors=5", "--conf", "spark.dynamicAllocation.initialExecutors=5", + "--conf", + "spark.dynamicAllocation.executorAllocationRatio=1.0", + "--conf", + "spark.dynamicAllocation.schedulerBacklogTimeout=1s", + "--conf", + "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=1s", "local:///opt/spark/work-dir/job.py", "100000", ] @@ -281,6 +287,9 @@ def test_submit_app_with_env_configurations( os.environ["SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES"] = "2" os.environ["SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES"] = "5" os.environ["SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES"] = "5" + os.environ["SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO"] = "0.5" + os.environ["SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT"] = "10s" + os.environ["SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT"] = "20s" os.environ["SPARK_ON_K8S_SPARK_CONF"] = json.dumps( {"spark.conf1.key": "spark.conf1.value", "spark.conf2.key": "spark.conf2.value"} ) @@ -350,6 +359,12 @@ def test_submit_app_with_env_configurations( "--conf", "spark.dynamicAllocation.initialExecutors=5", "--conf", + "spark.dynamicAllocation.executorAllocationRatio=0.5", + "--conf", + "spark.dynamicAllocation.schedulerBacklogTimeout=10s", + "--conf", + "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=20s", + "--conf", "spark.conf1.key=spark.conf1.value", "--conf", "spark.conf2.key=spark.conf2.value",