Refined Kubernetes scheduler implementation#1316
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the KubernetesScheduler, enabling AReaL to deploy and manage worker groups on Kubernetes using StatefulSets and headless Services. The implementation includes integration with existing trainers, comprehensive documentation in English and Chinese, and new unit and integration tests. The review feedback focuses on improving the robustness of the scheduler by using more specific exception handling for Kubernetes and network requests, and simplifying the code by removing redundant attribute fallbacks.
|
|
||
| try: | ||
| config.load_incluster_config() | ||
| except Exception: |
There was a problem hiding this comment.
The except Exception: is too broad. It's better to catch the specific exception thrown by load_incluster_config(), which is kubernetes.config.ConfigException. This makes the code's intent clearer and avoids catching unexpected errors.
| except Exception: | |
| except config.ConfigException: |
| tail_lines=tail_lines, | ||
| timestamps=True, | ||
| ) | ||
| except Exception as e: |
| namespace=namespace, | ||
| field_selector=f"involvedObject.name={pod_name}", | ||
| ).items | ||
| except Exception as e: |
| container_statuses = _obj_get(pod, "status.container_statuses", None) | ||
| if container_statuses is None: | ||
| container_statuses = _obj_get(pod, "status.containerStatuses", []) or [] |
There was a problem hiding this comment.
The official Kubernetes Python client converts API fields from camelCase to snake_case for object attributes. The fallback from container_statuses to containerStatuses is likely unnecessary and can be removed for clarity. A similar simplification can be applied to exit_code vs exitCode on lines 588-602.
| container_statuses = _obj_get(pod, "status.container_statuses", None) | |
| if container_statuses is None: | |
| container_statuses = _obj_get(pod, "status.containerStatuses", []) or [] | |
| container_statuses = _obj_get(pod, "status.container_statuses", []) or [] |
| url = f"http://{format_hostport(wi.worker.ip, port)}/health" | ||
| try: | ||
| return requests.get(url, timeout=2.0).status_code == 200 | ||
| except Exception: |
| except requests.exceptions.Timeout as e: | ||
| last_error = f"Timeout: {e}" | ||
| except requests.exceptions.ConnectionError as e: | ||
| self._check_pods_health(health_role) | ||
| last_error = f"Connection error: {e}" |
There was a problem hiding this comment.
The exception handling for requests errors can be simplified. requests.exceptions.Timeout and requests.exceptions.ConnectionError are both subclasses of requests.exceptions.RequestException. You can catch the base class to cover all request-related errors.
| except requests.exceptions.Timeout as e: | |
| last_error = f"Timeout: {e}" | |
| except requests.exceptions.ConnectionError as e: | |
| self._check_pods_health(health_role) | |
| last_error = f"Connection error: {e}" | |
| except requests.exceptions.RequestException as e: | |
| self._check_pods_health(health_role) | |
| last_error = f"Request error: {e}" |
| from kubernetes import client, config | ||
| from kubernetes.client import ApiException |
There was a problem hiding this comment.
We can force installing it in pyproject.toml
| spec: SchedulingSpec | None = None | ||
|
|
||
|
|
||
| class KubernetesClient(Protocol): |
There was a problem hiding this comment.
IDT this procotol is mandatory because we only have one concrete implementation
There was a problem hiding this comment.
The protocol is mainly for testing and dependency injection. Even though we currently have one implementation, using an interface makes it easier to plug in fake/mock Kubernetes clients in tests without depending on the real client.
| ) | ||
|
|
||
| service = { | ||
| "apiVersion": "v1", |
There was a problem hiding this comment.
I'm not familar with k8s but I usually see api version v2. Why is it hard-coded here? Can we configure it?
There was a problem hiding this comment.
Actually the standard Kubernetes API usage around Service and StatefulSet is still version v1 as it is most stable in that. I hard-coded the stable API versions intentionally since these resources are part of the long-term stable Kubernetes APIs
| actor.scheduling_spec[0].image=ghcr.io/<org>/<image>:<tag> \ | ||
| rollout.scheduling_spec[0].image=ghcr.io/<org>/<image>:<tag> |
There was a problem hiding this comment.
Why there's a [0] indexing?
There was a problem hiding this comment.
Currently the Kubernetes scheduler supports one SchedulingSpec per role, so scheduling_spec[0] is used as the shared pod template for all replicas.
The indexing is kept because it matches the existing AReaL config structure.
There was a problem hiding this comment.
This test is not very helpful. It only test the initialization and some method calls with mocks. We'd better just run integration tests in real k8s environments and skip the test otherwise.
garrett4wade
left a comment
There was a problem hiding this comment.
Review Findings
1. Unused health_role in create_engine (MEDIUM)
In areal/infra/scheduler/kubernetes.py, create_engine assigns health_role = self._colocated_roles.get(wi.role, wi.role) but never uses it. Both call_engine and async_call_engine use health_role with self._check_pods_health(health_role) during retry loops, so this looks like a copy-paste leftover where the health check was forgotten.
Suggestion: Either remove the dead assignment, or add a self._check_pods_health(health_role) call before the HTTP request for consistency with call_engine.
2. Redundant exit code extraction in _check_pods_health (MEDIUM)
The exit code is extracted twice with different defaults:
exit_code = int(_obj_get(terminated, "exit_code", _obj_get(terminated, "exitCode", 0)))
if terminated and exit_code != 0:
exit_code = int(_obj_get(terminated, "exit_code", _obj_get(terminated, "exitCode", -1)))
raise WorkerFailedError(...)The second extraction is dead code — if the first returned non-zero, the second returns the identical value. If the first returned 0 (field missing), the branch is skipped. Additionally, a terminated container with no exitCode field silently defaults to 0 (success), which could mask edge cases.
Suggestion: Simplify to a single extraction, and consider defaulting to -1 for terminated containers with missing exit codes:
if terminated:
exit_code = int(_obj_get(terminated, "exit_code", _obj_get(terminated, "exitCode", -1)))
if exit_code != 0:
raise WorkerFailedError(...)3. PR title typo (LOW)
"schedular" → "scheduler"
4. Residual risk: No RBAC guidance in docs
The docs mention service account permissions but don't provide a sample ClusterRole/RoleBinding manifest. This is the most common setup friction for K8s integrations — users will need to figure out the exact verbs and resources themselves. Consider adding a minimal RBAC example covering Services, StatefulSets, Pods, pod logs, and pod events.
|
Thanks for the detailed review and suggestions, I will address the requested fixes and push the updates. Really appreciate the feedback on both the Kubernetes integration and the scheduler semantics consistency. |
a369188 to
c8e7d99
Compare
Description
Adds a Kubernetes-backed scheduler implementation for AReaL using StatefulSet-based worker orchestration.
This PR:
Related Issue
#Native Kubernetes (K8S) scheduler [CC]
Type of Change
Checklist
pre-commit run --all-files)./docs/build_all.sh)main/review-prcommand/create-prBreaking Change Details (if applicable):
N/A
Additional Context
Design highlights:
kubectlsubprocessesNeed help? Check the Contributing Guide or ask in
GitHub Discussions!