|
52 | 52 | import requests
|
53 | 53 |
|
54 | 54 | from kubernetes import config
|
| 55 | +from kubernetes.dynamic import DynamicClient |
| 56 | +from kubernetes import client as k8s_client |
| 57 | +from kubernetes.client.rest import ApiException |
| 58 | + |
55 | 59 | from kubernetes.client.rest import ApiException
|
56 | 60 | import warnings
|
57 | 61 |
|
@@ -176,6 +180,62 @@ def up(self):
|
176 | 180 | except Exception as e: # pragma: no cover
|
177 | 181 | return _kube_api_error_handling(e)
|
178 | 182 |
|
| 183 | + |
| 184 | + def apply(self, force=False): |
| 185 | + """ |
| 186 | + Applies the Cluster yaml using server-side apply. |
| 187 | + If 'force' is set to True, conflicts will be forced. |
| 188 | + """ |
| 189 | + # Ensure Kubernetes configuration is loaded |
| 190 | + config_check() |
| 191 | + |
| 192 | + # Create a dynamic client for interacting with custom resources |
| 193 | + dynamic_client = DynamicClient(get_api_client()) |
| 194 | + |
| 195 | + try: |
| 196 | + # Get the RayCluster custom resource definition |
| 197 | + api = dynamic_client.resources.get( |
| 198 | + api_version="ray.io/v1", |
| 199 | + kind="RayCluster" |
| 200 | + ) |
| 201 | + except Exception as e: |
| 202 | + raise RuntimeError("Failed to get RayCluster resource: " + str(e)) |
| 203 | + |
| 204 | + # Read the YAML file and parse it into a dictionary |
| 205 | + try: |
| 206 | + with open(self.resource_yaml, 'r') as f: |
| 207 | + resource_body = yaml.safe_load(f) |
| 208 | + except FileNotFoundError: |
| 209 | + raise RuntimeError(f"Resource YAML file '{self.resource_yaml}' not found.") |
| 210 | + except yaml.YAMLError as e: |
| 211 | + raise ValueError(f"Failed to parse resource YAML: {e}") |
| 212 | + |
| 213 | + # Extract the resource name from the metadata |
| 214 | + resource_name = resource_body.get("metadata", {}).get("name") |
| 215 | + if not resource_name: |
| 216 | + raise ValueError("The resource must have a 'metadata.name' field.") |
| 217 | + |
| 218 | + try: |
| 219 | + # Use server-side apply |
| 220 | + resp = api.server_side_apply( |
| 221 | + body=resource_body, |
| 222 | + name=resource_name, |
| 223 | + namespace=self.config.namespace, |
| 224 | + field_manager="cluster-manager", |
| 225 | + force_conflicts=force # Allow forcing conflicts if needed |
| 226 | + ) |
| 227 | + print(f"Cluster '{self.config.name}' applied successfully.") |
| 228 | + except ApiException as e: |
| 229 | + if e.status == 403: |
| 230 | + print( |
| 231 | + "Immutable fields detected in the configuration. The cluster cannot be patched normally. " |
| 232 | + "To force the patch, set 'force=True' in the apply() method." |
| 233 | + ) |
| 234 | + elif e.status == 404: |
| 235 | + print(f"Namespace '{self.config.namespace}' or resource '{resource_name}' not found. Verify the namespace or CRD.") |
| 236 | + else: |
| 237 | + raise RuntimeError(f"Failed to apply cluster: {e.reason}") |
| 238 | + |
179 | 239 | def _throw_for_no_raycluster(self):
|
180 | 240 | api_instance = client.CustomObjectsApi(get_api_client())
|
181 | 241 | try:
|
|
0 commit comments