@@ -88,6 +88,14 @@ def __init__(self, config: ClusterConfiguration):
88
88
if is_notebook ():
89
89
cluster_up_down_buttons (self )
90
90
91
+ def get_dynamic_client (self ):
92
+ """Return a dynamic client, optionally mocked in tests."""
93
+ return DynamicClient (get_api_client ())
94
+
95
+ def config_check (self ):
96
+ """Return a dynamic client, optionally mocked in tests."""
97
+ return config_check ()
98
+
91
99
@property
92
100
def _client_headers (self ):
93
101
k8_client = get_api_client ()
@@ -144,7 +152,7 @@ def up(self):
144
152
the Kueue localqueue.
145
153
"""
146
154
# TODO: Add deprecation message in favor of apply()
147
- # print( "WARNING: The up() is planned for deprecation in favor of apply().")
155
+ # print( "WARNING: The up() is planned for deprecation in favor of apply().")
148
156
149
157
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
150
158
self ._throw_for_no_raycluster ()
@@ -182,30 +190,27 @@ def up(self):
182
190
except Exception as e : # pragma: no cover
183
191
return _kube_api_error_handling (e )
184
192
185
-
186
193
def apply (self , force = False ):
187
194
"""
188
195
Applies the Cluster yaml using server-side apply.
189
196
If 'force' is set to True, conflicts will be forced.
190
197
"""
198
+ # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
199
+ self ._throw_for_no_raycluster ()
200
+ namespace = self .config .namespace
191
201
# Ensure Kubernetes configuration is loaded
192
- config_check ()
193
-
194
- # Create a dynamic client for interacting with custom resources
195
- dynamic_client = DynamicClient (get_api_client ())
196
-
197
202
try :
203
+ self .config_check ()
198
204
# Get the RayCluster custom resource definition
199
- api = dynamic_client .resources .get (
200
- api_version = "ray.io/v1" ,
201
- kind = "RayCluster"
205
+ api = self .get_dynamic_client ().resources .get (
206
+ api_version = "ray.io/v1" , kind = "RayCluster"
202
207
)
203
208
except Exception as e :
204
209
raise RuntimeError ("Failed to get RayCluster resource: " + str (e ))
205
210
206
211
# Read the YAML file and parse it into a dictionary
207
212
try :
208
- with open (self .resource_yaml , 'r' ) as f :
213
+ with open (self .resource_yaml , "r" ) as f :
209
214
resource_body = yaml .safe_load (f )
210
215
except FileNotFoundError :
211
216
raise RuntimeError (f"Resource YAML file '{ self .resource_yaml } ' not found." )
@@ -216,15 +221,14 @@ def apply(self, force=False):
216
221
resource_name = resource_body .get ("metadata" , {}).get ("name" )
217
222
if not resource_name :
218
223
raise ValueError ("The resource must have a 'metadata.name' field." )
219
-
220
224
try :
221
225
# Use server-side apply
222
226
resp = api .server_side_apply (
223
227
body = resource_body ,
224
228
name = resource_name ,
225
229
namespace = self .config .namespace ,
226
230
field_manager = "cluster-manager" ,
227
- force_conflicts = force # Allow forcing conflicts if needed
231
+ force_conflicts = force , # Allow forcing conflicts if needed
228
232
)
229
233
print (f"Cluster '{ self .config .name } ' applied successfully." )
230
234
except ApiException as e :
@@ -234,7 +238,9 @@ def apply(self, force=False):
234
238
"To force the patch, set 'force=True' in the apply() method."
235
239
)
236
240
elif e .status == 404 :
237
- print (f"Namespace '{ self .config .namespace } ' or resource '{ resource_name } ' not found. Verify the namespace or CRD." )
241
+ print (
242
+ f"Namespace '{ self .config .namespace } ' or resource '{ resource_name } ' not found. Verify the namespace or CRD."
243
+ )
238
244
else :
239
245
raise RuntimeError (f"Failed to apply cluster: { e .reason } " )
240
246
@@ -266,7 +272,7 @@ def down(self):
266
272
resource_name = self .config .name
267
273
self ._throw_for_no_raycluster ()
268
274
try :
269
- config_check ()
275
+ self . config_check ()
270
276
api_instance = client .CustomObjectsApi (get_api_client ())
271
277
if self .config .appwrapper :
272
278
api_instance .delete_namespaced_custom_object (
0 commit comments