21
21
from time import sleep
22
22
from typing import List , Optional , Tuple , Dict
23
23
24
+ import openshift as oc
25
+ from kubernetes import config
24
26
from ray .job_submission import JobSubmissionClient
27
+ import urllib3
25
28
26
29
from .auth import config_check , api_config_handler
27
30
from ..utils import pretty_print
28
31
from ..utils .generate_yaml import generate_appwrapper
29
32
from ..utils .kube_api_helpers import _kube_api_error_handling
33
+ from ..utils .openshift_oauth import (
34
+ create_openshift_oauth_objects ,
35
+ delete_openshift_oauth_objects ,
36
+ )
30
37
from .config import ClusterConfiguration
31
38
from .model import (
32
39
AppWrapper ,
40
47
import os
41
48
import requests
42
49
50
+ from kubernetes import config
51
+
43
52
44
53
class Cluster :
45
54
"""
@@ -61,6 +70,39 @@ def __init__(self, config: ClusterConfiguration):
61
70
self .config = config
62
71
self .app_wrapper_yaml = self .create_app_wrapper ()
63
72
self .app_wrapper_name = self .app_wrapper_yaml .split ("." )[0 ]
73
+ self ._client = None
74
+
75
+ @property
76
+ def _client_headers (self ):
77
+ k8_client = api_config_handler () or client .ApiClient ()
78
+ return {
79
+ "Authorization" : k8_client .configuration .get_api_key_with_prefix (
80
+ "authorization"
81
+ )
82
+ }
83
+
84
+ @property
85
+ def _client_verify_tls (self ):
86
+ return not self .config .openshift_oauth
87
+
88
+ @property
89
+ def client (self ):
90
+ if self ._client :
91
+ return self ._client
92
+ if self .config .openshift_oauth :
93
+ print (
94
+ api_config_handler ().configuration .get_api_key_with_prefix (
95
+ "authorization"
96
+ )
97
+ )
98
+ self ._client = JobSubmissionClient (
99
+ self .cluster_dashboard_uri (),
100
+ headers = self ._client_headers ,
101
+ verify = self ._client_verify_tls ,
102
+ )
103
+ else :
104
+ self ._client = JobSubmissionClient (self .cluster_dashboard_uri ())
105
+ return self ._client
64
106
65
107
def evaluate_dispatch_priority (self ):
66
108
priority_class = self .config .dispatch_priority
@@ -147,6 +189,7 @@ def create_app_wrapper(self):
147
189
image_pull_secrets = image_pull_secrets ,
148
190
dispatch_priority = dispatch_priority ,
149
191
priority_val = priority_val ,
192
+ openshift_oauth = self .config .openshift_oauth ,
150
193
)
151
194
152
195
# creates a new cluster with the provided or default spec
@@ -156,6 +199,11 @@ def up(self):
156
199
the MCAD queue.
157
200
"""
158
201
namespace = self .config .namespace
202
+ if self .config .openshift_oauth :
203
+ create_openshift_oauth_objects (
204
+ cluster_name = self .config .name , namespace = namespace
205
+ )
206
+
159
207
try :
160
208
config_check ()
161
209
api_instance = client .CustomObjectsApi (api_config_handler ())
@@ -190,6 +238,11 @@ def down(self):
190
238
except Exception as e : # pragma: no cover
191
239
return _kube_api_error_handling (e )
192
240
241
+ if self .config .openshift_oauth :
242
+ delete_openshift_oauth_objects (
243
+ cluster_name = self .config .name , namespace = namespace
244
+ )
245
+
193
246
def status (
194
247
self , print_to_console : bool = True
195
248
) -> Tuple [CodeFlareClusterStatus , bool ]:
@@ -258,7 +311,16 @@ def status(
258
311
return status , ready
259
312
260
313
def is_dashboard_ready (self ) -> bool :
261
- response = requests .get (self .cluster_dashboard_uri (), timeout = 5 )
314
+ try :
315
+ response = requests .get (
316
+ self .cluster_dashboard_uri (),
317
+ headers = self ._client_headers ,
318
+ timeout = 5 ,
319
+ verify = self ._client_verify_tls ,
320
+ )
321
+ except requests .exceptions .SSLError :
322
+ # SSL exception occurs when oauth ingress has been created but cluster is not up
323
+ return False
262
324
if response .status_code == 200 :
263
325
return True
264
326
else :
@@ -330,7 +392,13 @@ def cluster_dashboard_uri(self) -> str:
330
392
return _kube_api_error_handling (e )
331
393
332
394
for route in routes ["items" ]:
333
- if route ["metadata" ]["name" ] == f"ray-dashboard-{ self .config .name } " :
395
+ if route ["metadata" ][
396
+ "name"
397
+ ] == f"ray-dashboard-{ self .config .name } " or route ["metadata" ][
398
+ "name"
399
+ ].startswith (
400
+ f"{ self .config .name } -ingress"
401
+ ):
334
402
protocol = "https" if route ["spec" ].get ("tls" ) else "http"
335
403
return f"{ protocol } ://{ route ['spec' ]['host' ]} "
336
404
return "Dashboard route not available yet, have you run cluster.up()?"
@@ -339,30 +407,24 @@ def list_jobs(self) -> List:
339
407
"""
340
408
This method accesses the head ray node in your cluster and lists the running jobs.
341
409
"""
342
- dashboard_route = self .cluster_dashboard_uri ()
343
- client = JobSubmissionClient (dashboard_route )
344
- return client .list_jobs ()
410
+ return self .client .list_jobs ()
345
411
346
412
def job_status (self , job_id : str ) -> str :
347
413
"""
348
414
This method accesses the head ray node in your cluster and returns the job status for the provided job id.
349
415
"""
350
- dashboard_route = self .cluster_dashboard_uri ()
351
- client = JobSubmissionClient (dashboard_route )
352
- return client .get_job_status (job_id )
416
+ return self .client .get_job_status (job_id )
353
417
354
418
def job_logs (self , job_id : str ) -> str :
355
419
"""
356
420
This method accesses the head ray node in your cluster and returns the logs for the provided job id.
357
421
"""
358
- dashboard_route = self .cluster_dashboard_uri ()
359
- client = JobSubmissionClient (dashboard_route )
360
- return client .get_job_logs (job_id )
422
+ return self .client .get_job_logs (job_id )
361
423
362
424
def torchx_config (
363
425
self , working_dir : str = None , requirements : str = None
364
426
) -> Dict [str , str ]:
365
- dashboard_address = f" { self .cluster_dashboard_uri (). lstrip ( 'http://' ) } "
427
+ dashboard_address = urllib3 . util . parse_url ( self .cluster_dashboard_uri ()). host
366
428
to_return = {
367
429
"cluster_name" : self .config .name ,
368
430
"dashboard_address" : dashboard_address ,
@@ -591,7 +653,7 @@ def _get_app_wrappers(
591
653
592
654
593
655
def _map_to_ray_cluster (rc ) -> Optional [RayCluster ]:
594
- if "status" in rc and " state" in rc ["status" ]:
656
+ if "state" in rc ["status" ]:
595
657
status = RayClusterStatus (rc ["status" ]["state" ].lower ())
596
658
else :
597
659
status = RayClusterStatus .UNKNOWN
@@ -606,7 +668,13 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
606
668
)
607
669
ray_route = None
608
670
for route in routes ["items" ]:
609
- if route ["metadata" ]["name" ] == f"ray-dashboard-{ rc ['metadata' ]['name' ]} " :
671
+ if route ["metadata" ][
672
+ "name"
673
+ ] == f"ray-dashboard-{ rc ['metadata' ]['name' ]} " or route ["metadata" ][
674
+ "name"
675
+ ].startswith (
676
+ f"{ rc ['metadata' ]['name' ]} -ingress"
677
+ ):
610
678
protocol = "https" if route ["spec" ].get ("tls" ) else "http"
611
679
ray_route = f"{ protocol } ://{ route ['spec' ]['host' ]} "
612
680
0 commit comments