Skip to content

Commit

Permalink
feat(python-client): add more config for dynamic allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Nov 27, 2024
1 parent 575f92f commit 1f66e25
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 42 deletions.
87 changes: 45 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,48 +157,51 @@ The Python client and the CLI can be configured with environment variables to av
time if you have a common configuration for all your apps. The environment variables are the same for both the client
and the CLI. Here is a list of the available environment variables:

| Environment Variable | Description | Default |
|-------------------------------------------|-------------------------------------------------------|----------------|
| SPARK_ON_K8S_DOCKER_IMAGE | The docker image to use for the spark pods | |
| SPARK_ON_K8S_APP_PATH | The path to the app file | |
| SPARK_ON_K8S_NAMESPACE | The namespace to use | default |
| SPARK_ON_K8S_SERVICE_ACCOUNT | The service account to use | spark |
| SPARK_ON_K8S_SPARK_CONF | The spark configuration to use | {} |
| SPARK_ON_K8S_CLASS_NAME | The class name to use | |
| SPARK_ON_K8S_PACKAGES | The maven packages list to add to the classpath |
| SPARK_ON_K8S_APP_ARGUMENTS | The arguments to pass to the app | [] |
| SPARK_ON_K8S_APP_WAITER | The waiter to use to wait for the app to finish | no_wait |
| SPARK_ON_K8S_IMAGE_PULL_POLICY | The image pull policy to use | IfNotPresent |
| SPARK_ON_K8S_UI_REVERSE_PROXY | Whether to use a reverse proxy to access the spark UI | false |
| SPARK_ON_K8S_DRIVER_CPU | The driver CPU | 1 |
| SPARK_ON_K8S_DRIVER_MEMORY | The driver memory | 1024 |
| SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD | The driver memory overhead | 512 |
| SPARK_ON_K8S_EXECUTOR_CPU | The executor CPU | 1 |
| SPARK_ON_K8S_EXECUTOR_MEMORY | The executor memory | 1024 |
| SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD | The executor memory overhead | 512 |
| SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES | The minimum number of executor instances | |
| SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES | The maximum number of executor instances | |
| SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES | The initial number of executor instances | |
| SPARK_ON_K8S_CONFIG_FILE | The path to the config file | |
| SPARK_ON_K8S_CONTEXT | The context to use | |
| SPARK_ON_K8S_CLIENT_CONFIG | The sync Kubernetes client configuration to use | |
| SPARK_ON_K8S_ASYNC_CLIENT_CONFIG | The async Kubernetes client configuration to use | |
| SPARK_ON_K8S_IN_CLUSTER | Whether to use the in cluster Kubernetes config | false |
| SPARK_ON_K8S_API_DEFAULT_NAMESPACE | The default namespace to use for the API | default |
| SPARK_ON_K8S_API_HOST | The host to use for the API | 127.0.0.1 |
| SPARK_ON_K8S_API_PORT | The port to use for the API | 8000 |
| SPARK_ON_K8S_API_WORKERS | The number of workers to use for the API | 4 |
| SPARK_ON_K8S_API_LOG_LEVEL | The log level to use for the API | info |
| SPARK_ON_K8S_API_LIMIT_CONCURRENCY | The limit concurrency to use for the API | 1000 |
| SPARK_ON_K8S_API_SPARK_HISTORY_HOST | The host to use for the spark history server | |
| SPARK_ON_K8S_SPARK_DRIVER_NODE_SELECTOR | The node selector to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR | The node selector to use for the executor pods | {} |
| SPARK_ON_K8S_SPARK_DRIVER_LABELS | The labels to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_LABELS | The labels to use for the executor pods | {} |
| SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS | The annotations to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS | The annotations to use for the executor pods | {} |
| SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH | The path to the executor pod template | |
| SPARK_ON_K8S_STARTUP_TIMEOUT | The timeout to wait for the app to start in seconds | 0 (no timeout) |
| Environment Variable | Description | Default |
|---------------------------------------------------|-----------------------------------------------------------------|----------------------------------------|
| SPARK_ON_K8S_DOCKER_IMAGE | The docker image to use for the spark pods | |
| SPARK_ON_K8S_APP_PATH | The path to the app file | |
| SPARK_ON_K8S_NAMESPACE | The namespace to use | default |
| SPARK_ON_K8S_SERVICE_ACCOUNT | The service account to use | spark |
| SPARK_ON_K8S_SPARK_CONF | The spark configuration to use | {} |
| SPARK_ON_K8S_CLASS_NAME | The class name to use | |
| SPARK_ON_K8S_PACKAGES | The maven packages list to add to the classpath |
| SPARK_ON_K8S_APP_ARGUMENTS | The arguments to pass to the app | [] |
| SPARK_ON_K8S_APP_WAITER | The waiter to use to wait for the app to finish | no_wait |
| SPARK_ON_K8S_IMAGE_PULL_POLICY | The image pull policy to use | IfNotPresent |
| SPARK_ON_K8S_UI_REVERSE_PROXY | Whether to use a reverse proxy to access the spark UI | false |
| SPARK_ON_K8S_DRIVER_CPU | The driver CPU | 1 |
| SPARK_ON_K8S_DRIVER_MEMORY | The driver memory | 1024 |
| SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD | The driver memory overhead | 512 |
| SPARK_ON_K8S_EXECUTOR_CPU | The executor CPU | 1 |
| SPARK_ON_K8S_EXECUTOR_MEMORY | The executor memory | 1024 |
| SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD | The executor memory overhead | 512 |
| SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES | The minimum number of executor instances | |
| SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES | The maximum number of executor instances | |
| SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES | The initial number of executor instances | |
| SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO | The executor allocation ratio | 1 |
| SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT | The scheduler backlog timeout for dynamic allocation | 1s |
| SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT | The sustained scheduler backlog timeout for dynamic allocation | SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT |
| SPARK_ON_K8S_CONFIG_FILE | The path to the config file | |
| SPARK_ON_K8S_CONTEXT | The context to use | |
| SPARK_ON_K8S_CLIENT_CONFIG | The sync Kubernetes client configuration to use | |
| SPARK_ON_K8S_ASYNC_CLIENT_CONFIG | The async Kubernetes client configuration to use | |
| SPARK_ON_K8S_IN_CLUSTER | Whether to use the in cluster Kubernetes config | false |
| SPARK_ON_K8S_API_DEFAULT_NAMESPACE | The default namespace to use for the API | default |
| SPARK_ON_K8S_API_HOST | The host to use for the API | 127.0.0.1 |
| SPARK_ON_K8S_API_PORT | The port to use for the API | 8000 |
| SPARK_ON_K8S_API_WORKERS | The number of workers to use for the API | 4 |
| SPARK_ON_K8S_API_LOG_LEVEL | The log level to use for the API | info |
| SPARK_ON_K8S_API_LIMIT_CONCURRENCY | The limit concurrency to use for the API | 1000 |
| SPARK_ON_K8S_API_SPARK_HISTORY_HOST | The host to use for the spark history server | |
| SPARK_ON_K8S_SPARK_DRIVER_NODE_SELECTOR | The node selector to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR | The node selector to use for the executor pods | {} |
| SPARK_ON_K8S_SPARK_DRIVER_LABELS | The labels to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_LABELS | The labels to use for the executor pods | {} |
| SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS | The annotations to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS | The annotations to use for the executor pods | {} |
| SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH | The path to the executor pod template | |
| SPARK_ON_K8S_STARTUP_TIMEOUT | The timeout to wait for the app to start in seconds | 0 (no timeout) |

## Examples

Expand Down
23 changes: 23 additions & 0 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,21 @@ class ExecutorInstances:
max: Maximum number of executors. If provided, dynamic allocation is enabled
initial: Initial number of executors. If max and min are not provided, defaults to 2,
dynamic allocation will be disabled and the number of executors will be fixed.
executor_allocation_ratio: Executor allocation ratio to use for dynamic allocation.
Defaults to 1.0
scheduler_backlog_timeout: Scheduler backlog timeout to use for dynamic allocation.
Defaults to "1s"
sustained_scheduler_backlog_timeout: Sustained scheduler backlog timeout to use for
dynamic allocation. Defaults to sustained_scheduler_backlog_timeout.
"""

min: int | None = None
max: int | None = None
initial: int | None = None
executor_allocation_ratio: float = 1.0
scheduler_backlog_timeout: str = "1s"
sustained_scheduler_backlog_timeout: str | None = None


class SparkOnK8S(LoggingMixin):
Expand Down Expand Up @@ -239,6 +249,9 @@ def submit_app(
min=Configuration.SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES,
max=Configuration.SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES,
initial=Configuration.SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES,
executor_allocation_ratio=Configuration.SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO,
scheduler_backlog_timeout=Configuration.SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT,
sustained_scheduler_backlog_timeout=Configuration.SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT,
)
if (
executor_instances.min is None
Expand Down Expand Up @@ -321,6 +334,16 @@ def submit_app(
if executor_instances.max is not None:
basic_conf["spark.dynamicAllocation.maxExecutors"] = f"{executor_instances.max}"
basic_conf["spark.dynamicAllocation.initialExecutors"] = f"{executor_instances.initial or 0}"
basic_conf["spark.dynamicAllocation.executorAllocationRatio"] = str(
executor_instances.executor_allocation_ratio
)
basic_conf["spark.dynamicAllocation.schedulerBacklogTimeout"] = (
executor_instances.scheduler_backlog_timeout
)
basic_conf["spark.dynamicAllocation.sustainedSchedulerBacklogTimeout"] = (
executor_instances.sustained_scheduler_backlog_timeout
or executor_instances.scheduler_backlog_timeout
)
else:
basic_conf["spark.executor.instances"] = (
f"{executor_instances.initial if executor_instances.initial is not None else 2}"
Expand Down
5 changes: 5 additions & 0 deletions spark_on_k8s/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class Configuration:
if getenv("SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES")
else None
)
SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO = float(getenv("SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO", 1.0))
SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT = getenv("SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT", "1s")
SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT = getenv(
"SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT", SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT
)
SPARK_ON_K8S_SECRET_ENV_VAR = json.loads(getenv("SPARK_ON_K8S_SECRET_ENV_VAR", "{}"))
SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET = (
getenv("SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET").split(",")
Expand Down
15 changes: 15 additions & 0 deletions tests/test_spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced
"spark.dynamicAllocation.maxExecutors=5",
"--conf",
"spark.dynamicAllocation.initialExecutors=5",
"--conf",
"spark.dynamicAllocation.executorAllocationRatio=1.0",
"--conf",
"spark.dynamicAllocation.schedulerBacklogTimeout=1s",
"--conf",
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=1s",
"local:///opt/spark/work-dir/job.py",
"100000",
]
Expand Down Expand Up @@ -281,6 +287,9 @@ def test_submit_app_with_env_configurations(
os.environ["SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES"] = "2"
os.environ["SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES"] = "5"
os.environ["SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES"] = "5"
os.environ["SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO"] = "0.5"
os.environ["SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT"] = "10s"
os.environ["SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT"] = "20s"
os.environ["SPARK_ON_K8S_SPARK_CONF"] = json.dumps(
{"spark.conf1.key": "spark.conf1.value", "spark.conf2.key": "spark.conf2.value"}
)
Expand Down Expand Up @@ -350,6 +359,12 @@ def test_submit_app_with_env_configurations(
"--conf",
"spark.dynamicAllocation.initialExecutors=5",
"--conf",
"spark.dynamicAllocation.executorAllocationRatio=0.5",
"--conf",
"spark.dynamicAllocation.schedulerBacklogTimeout=10s",
"--conf",
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=20s",
"--conf",
"spark.conf1.key=spark.conf1.value",
"--conf",
"spark.conf2.key=spark.conf2.value",
Expand Down

0 comments on commit 1f66e25

Please sign in to comment.