diff --git a/clients/python-client/.gitignore b/clients/python-client/.gitignore new file mode 100644 index 00000000000..f2b07cb0b37 --- /dev/null +++ b/clients/python-client/.gitignore @@ -0,0 +1,35 @@ + + + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + + +# Distribution / packaging +bin/ +build/ +develop-eggs/ +dist/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +.tox/ +htmlcov +.coverage +.cache +nosetests.xml +coverage.xml \ No newline at end of file diff --git a/clients/python-client/README.md b/clients/python-client/README.md new file mode 100644 index 00000000000..5f2bc049701 --- /dev/null +++ b/clients/python-client/README.md @@ -0,0 +1,116 @@ +# Overview + +This python client library provide APIs to handle `raycluster` from your python application. + +## Prerequisites + +It is assumed that your `k8s cluster in already setup`. Your kubectl configuration is expected to be in `~/.kube/config` if you are running the code directly from you terminal. + +It is also expected that the `kuberay operator` is installed. [Installation instructions are here.](https://github.com/ray-project/kuberay#quick-start) + +## Usage + +There are multiple levels of using the api with increasing levels of complexity. + +### director + +This is the easiest form of using the api to create rayclusters with predefined cluster sizes + +```python +my_kuberay_api = kuberay_cluster_api.RayClusterApi() + +my_cluster_director = kuberay_cluster_builder.Director() + +cluster0 = my_cluster_director.build_small_cluster(name="new-cluster0") + +if cluster0: + my_kuberay_api.create_ray_cluster(body=cluster0) +``` + +the director create the custer definition, and the custer_api acts as the http client sending the create (post) request to the k8s api-server + +### cluster_builder + +The builder allows you to build the cluster piece by piece, you are can customize more the values of the cluster definition + +```python +cluster1 = ( + my_cluster_builder.build_meta(name="new-cluster1") + .build_head() + .build_worker(group_name="workers", replicas=3) + .get_cluster() + ) + +if not my_cluster_builder.succeeded: + return + +my_kuberay_api.create_ray_cluster(body=cluster1) +``` + +### cluster_utils + +the cluster_utils gives you even more options to modify your cluster definition, add/remove worker groups, change replicas in a worker group, duplicate a worker group, etc. + +```python +my_Cluster_utils = kuberay_cluster_utils.ClusterUtils() + +cluster_to_patch, succeeded = my_Cluster_utils.update_worker_group_replicas( + cluster2, group_name="workers", max_replicas=4, min_replicas=1, replicas=2 + ) + +if succeeded: + my_kuberay_api.patch_ray_cluster( + name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch + ) +``` + +### cluster_api + +Finally, the cluster_api is the one you always use to implement your cluster change in k8s. You can use it with raw `JSON` if you wish. The director/cluster_builder/cluster_utils are just tools to shield the user from using raw `JSON`. + +## Code Organization + +clients/ +└── python-client + ├── README.md + ├── examples + │ ├── complete-example.py + │ ├── use-builder.py + │ ├── use-director.py + │ ├── use-raw-with-api.py + │ └── use-utils.py + ├── python_client + │ ├── LICENSE + │ ├── __init__.py + │ ├── constants.py + │ ├── kuberay_cluster_api.py + │ ├── pyproject.toml + │ ├── setup.cfg + │ └── utils + │ ├── __init__.py + │ ├── kuberay_cluster_builder.py + │ └── kuberay_cluster_utils.py + └── python_client_test + ├── README.md + ├── test_director.py + └── test_utils.py + +## For developers + +make sure you have installed setuptool + +`pip install -U pip setuptools` + +#### run the pip command + +from the directory `path/to/kuberay/clients/python-client/python_client` + +`pip install -e .` + +#### to uninstall the module run + +`pip uninstall python-client` + +### For testing run + + `python -m unittest discover 'path/to/kuberay/clients/python_client_test/'` diff --git a/clients/python-client/examples/complete-example.py b/clients/python-client/examples/complete-example.py new file mode 100644 index 00000000000..40d0ba8273f --- /dev/null +++ b/clients/python-client/examples/complete-example.py @@ -0,0 +1,121 @@ +import sys +import os +from os import path + + +""" +in case you are working directly with the source, and don't wish to +install the module with pip install, you can directly import the packages by uncommenting the following code. +""" + +""" +sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) +sibling_dirs = [ + d for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d)) +] +for sibling_dir in sibling_dirs: + sys.path.append(os.path.join(parent_dir, sibling_dir)) +""" + +import kuberay_cluster_api + +from utils import kuberay_cluster_utils, kuberay_cluster_builder + + +def main(): + + print("starting cluster handler...") + my_kuberay_api = kuberay_cluster_api.RayClusterApi() + + my_cluster_director = kuberay_cluster_builder.Director() + + my_cluster_builder = kuberay_cluster_builder.ClusterBuilder() + + my_Cluster_utils = kuberay_cluster_utils.ClusterUtils() + + cluster0 = my_cluster_director.build_small_cluster(name="new-cluster0") + + if cluster0: + my_kuberay_api.create_ray_cluster(body=cluster0) + + cluster1 = ( + my_cluster_builder.build_meta(name="new-cluster1") + .build_head() + .build_worker(group_name="workers") + .get_cluster() + ) + + if not my_cluster_builder.succeeded: + print("error building the cluster, aborting...") + return + my_kuberay_api.create_ray_cluster(body=cluster1) + + cluster2 = ( + my_cluster_builder.build_meta(name="new-cluster2") + .build_head() + .build_worker(group_name="workers") + .get_cluster() + ) + + if not my_cluster_builder.succeeded: + print("error building the cluster, aborting...") + return + + my_kuberay_api.create_ray_cluster(body=cluster2) + + cluster_to_patch, succeeded = my_Cluster_utils.update_worker_group_replicas( + cluster2, group_name="workers", max_replicas=4, min_replicas=1, replicas=2 + ) + + if succeeded: + print( + "trying to patch raycluster = {}".format( + cluster_to_patch["metadata"]["name"] + ) + ) + my_kuberay_api.patch_ray_cluster( + name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch + ) + + cluster_to_patch, succeeded = my_Cluster_utils.duplicate_worker_group( + cluster1, group_name="workers", new_group_name="new-workers" + ) + if succeeded: + print( + "trying to patch raycluster = {}".format( + cluster_to_patch["metadata"]["name"] + ) + ) + my_kuberay_api.patch_ray_cluster( + name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch + ) + + kube_ray_list = my_kuberay_api.list_ray_clusters(k8s_namespace="default") + if "items" in kube_ray_list: + line = "-" * 72 + print(line) + print("{:<63s}{:>2s}".format("Name", "Namespace")) + print(line) + for cluster in kube_ray_list["items"]: + print( + "{:<63s}{:>2s}".format( + cluster["metadata"]["name"], + cluster["metadata"]["namespace"], + ) + ) + print(line) + + if "items" in kube_ray_list: + for cluster in kube_ray_list["items"]: + print("deleting raycluster = {}".format(cluster["metadata"]["name"])) + my_kuberay_api.delete_ray_cluster( + name=cluster["metadata"]["name"], + k8s_namespace=cluster["metadata"]["namespace"], + ) + + +if __name__ == "__main__": + main() diff --git a/clients/python-client/examples/use-builder.py b/clients/python-client/examples/use-builder.py new file mode 100644 index 00000000000..c2d52c3b6e5 --- /dev/null +++ b/clients/python-client/examples/use-builder.py @@ -0,0 +1,76 @@ +import sys +import os +from os import path +import json + + +""" +in case you are working directly with the source, and don't wish to +install the module with pip install, you can directly import the packages by uncommenting the following code. +""" + +""" +sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) +sibling_dirs = [ + d for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d)) +] +for sibling_dir in sibling_dirs: + sys.path.append(os.path.join(parent_dir, sibling_dir)) +""" + +import kuberay_cluster_api + +from utils import kuberay_cluster_builder + + +def main(): + + print("starting cluster handler...") + my_kuberay_api = kuberay_cluster_api.RayClusterApi() + + my_cluster_builder = kuberay_cluster_builder.ClusterBuilder() + + cluster1 = ( + my_cluster_builder.build_meta(name="new-cluster1") + .build_head() + .build_worker(group_name="workers") + .get_cluster() + ) + + if not my_cluster_builder.succeeded: + print("error building the cluster, aborting...") + return + + print("creating raycluster = {}".format(cluster1["metadata"]["name"])) + my_kuberay_api.create_ray_cluster(body=cluster1) + + # the rest of the code is simply to list and cleanup the created cluster + kube_ray_list = my_kuberay_api.list_ray_clusters(k8s_namespace="default") + if "items" in kube_ray_list: + line = "-" * 72 + print(line) + print("{:<63s}{:>2s}".format("Name", "Namespace")) + print(line) + for cluster in kube_ray_list["items"]: + print( + "{:<63s}{:>2s}".format( + cluster["metadata"]["name"], + cluster["metadata"]["namespace"], + ) + ) + print(line) + + if "items" in kube_ray_list: + for cluster in kube_ray_list["items"]: + print("deleting raycluster = {}".format(cluster["metadata"]["name"])) + my_kuberay_api.delete_ray_cluster( + name=cluster["metadata"]["name"], + k8s_namespace=cluster["metadata"]["namespace"], + ) + + +if __name__ == "__main__": + main() diff --git a/clients/python-client/examples/use-director.py b/clients/python-client/examples/use-director.py new file mode 100644 index 00000000000..124e619c6a6 --- /dev/null +++ b/clients/python-client/examples/use-director.py @@ -0,0 +1,87 @@ +import sys +import os +from os import path +import json +import time + +""" +in case you are working directly with the source, and don't wish to +install the module with pip install, you can directly import the packages by uncommenting the following code. +""" + +""" +sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) +sibling_dirs = [ + d for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d)) +] +for sibling_dir in sibling_dirs: + sys.path.append(os.path.join(parent_dir, sibling_dir)) + +""" +import kuberay_cluster_api + +from utils import kuberay_cluster_builder + + +def wait(duration: int = 5, step_name: str = "next"): + print("waiting for {} seconds before {} step".format(duration, step_name)) + for i in range(duration, 0, -1): + sys.stdout.write(str(i) + " ") + sys.stdout.flush() + time.sleep(1) + print() + + +def main(): + + print("starting cluster handler...") + + my_kube_ray_api = kuberay_cluster_api.RayClusterApi() + + my_cluster_director = kuberay_cluster_builder.Director() + + # building the raycluster representation + cluster_body = my_cluster_director.build_small_cluster( + name="new-small-cluster", k8s_namespace="default" + ) + + # creating the raycluster in k8s + if cluster_body: + print("creating the cluster...") + my_kube_ray_api.create_ray_cluster(body=cluster_body) + + # now the cluster should be created. + # the rest of the code is simply to fetch, print and cleanup the created cluster + + print("fetching the cluster...") + # fetching the raycluster from k8s api-server + kube_ray_cluster = my_kube_ray_api.get_ray_cluster( + name=cluster_body["metadata"]["name"], k8s_namespace="default" + ) + + if kube_ray_cluster: + print( + "try: kubectl -n {} get raycluster {} -oyaml".format( + kube_ray_cluster["metadata"]["namespace"], + kube_ray_cluster["metadata"]["name"], + ) + ) + wait(step_name="print created cluster in JSON") + print("printing the raycluster JSON representation...") + json_formatted_str = json.dumps(kube_ray_cluster, indent=2) + print(json_formatted_str) + + wait(step_name="cleaning up") + print("deleting raycluster {}.".format(kube_ray_cluster["metadata"]["name"])) + + my_kube_ray_api.delete_ray_cluster( + name=kube_ray_cluster["metadata"]["name"], + k8s_namespace=kube_ray_cluster["metadata"]["namespace"], + ) + + +if __name__ == "__main__": + main() diff --git a/clients/python-client/examples/use-raw-with-api.py b/clients/python-client/examples/use-raw-with-api.py new file mode 100644 index 00000000000..6dd265c1622 --- /dev/null +++ b/clients/python-client/examples/use-raw-with-api.py @@ -0,0 +1,204 @@ +import json +from os import path +import os +import sys + + +""" +in case you are working directly with the source, and don't wish to +install the module with pip install, you can directly import the packages by uncommenting the following code. +""" + +""" +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) +sibling_dirs = [ + d for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d)) +] +for sibling_dir in sibling_dirs: + sys.path.append(os.path.join(parent_dir, sibling_dir)) +""" +import kuberay_cluster_api + +cluster_body: dict = { + "apiVersion": "ray.io/v1alpha1", + "kind": "RayCluster", + "metadata": { + "labels": {"controller-tools.k8s.io": "1.0"}, + "name": "raycluster-mini-raw", + }, + "spec": { + "rayVersion": "2.2.0", + "headGroupSpec": { + "rayStartParams": { + "dashboard-host": "0.0.0.0", + "num-cpus": "1", + "block": "true", + }, + "template": { + "spec": { + "containers": [ + { + "name": "ray-head", + "image": "rayproject/ray:2.2.0", + "resources": { + "limits": {"cpu": 1, "memory": "2Gi"}, + "requests": {"cpu": "500m", "memory": "2Gi"}, + }, + "ports": [ + {"containerPort": 6379, "name": "gcs-server"}, + {"containerPort": 8265, "name": "dashboard"}, + {"containerPort": 10001, "name": "client"}, + ], + } + ] + } + }, + }, + }, +} + + +cluster_body2: dict = { + "apiVersion": "ray.io/v1alpha1", + "kind": "RayCluster", + "metadata": { + "labels": {"controller-tools.k8s.io": "1.0"}, + "name": "raycluster-complete-raw", + }, + "spec": { + "rayVersion": "2.2.0", + "headGroupSpec": { + "serviceType": "ClusterIP", + "rayStartParams": {"dashboard-host": "0.0.0.0", "block": "true"}, + "template": { + "metadata": {"labels": {}}, + "spec": { + "containers": [ + { + "name": "ray-head", + "image": "rayproject/ray:2.2.0", + "ports": [ + {"containerPort": 6379, "name": "gcs"}, + {"containerPort": 8265, "name": "dashboard"}, + {"containerPort": 10001, "name": "client"}, + ], + "lifecycle": { + "preStop": { + "exec": {"command": ["/bin/sh", "-c", "ray stop"]} + } + }, + "volumeMounts": [ + {"mountPath": "/tmp/ray", "name": "ray-logs"} + ], + "resources": { + "limits": {"cpu": "1", "memory": "2G"}, + "requests": {"cpu": "500m", "memory": "2G"}, + }, + } + ], + "volumes": [{"name": "ray-logs", "emptyDir": {}}], + }, + }, + }, + "workerGroupSpecs": [ + { + "replicas": 1, + "minReplicas": 1, + "maxReplicas": 10, + "groupName": "small-group", + "rayStartParams": {"block": "true"}, + "template": { + "spec": { + "containers": [ + { + "name": "ray-worker", + "image": "rayproject/ray:2.2.0", + "lifecycle": { + "preStop": { + "exec": { + "command": ["/bin/sh", "-c", "ray stop"] + } + } + }, + "volumeMounts": [ + {"mountPath": "/tmp/ray", "name": "ray-logs"} + ], + "resources": { + "limits": {"cpu": "1", "memory": "1G"}, + "requests": {"cpu": "500m", "memory": "1G"}, + }, + } + ], + "initContainers": [ + { + "name": "init", + "image": "busybox:1.28", + "command": [ + "sh", + "-c", + "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for K8s Service $RAY_IP; sleep 2; done", + ], + } + ], + "volumes": [{"name": "ray-logs", "emptyDir": {}}], + } + }, + } + ], + }, +} + + +def main(): + + print("starting cluster handler...") + + my_kube_ray_api = kuberay_cluster_api.RayClusterApi() + + my_kube_ray_api.create_ray_cluster(body=cluster_body) + + my_kube_ray_api.create_ray_cluster(body=cluster_body2) + + # the rest of the code is simply to fetch, print and cleanup the created cluster + kube_ray_cluster = my_kube_ray_api.get_ray_cluster( + name=cluster_body["metadata"]["name"], k8s_namespace="default" + ) + + if kube_ray_cluster: + print("printing the raycluster json representation...") + json_formatted_str = json.dumps(kube_ray_cluster, indent=2) + print(json_formatted_str) + + print( + "try: kubectl -n default get raycluster {} -oyaml".format( + kube_ray_cluster["metadata"]["name"] + ) + ) + # the rest of the code is simply to list and cleanup the created cluster + kube_ray_list = kuberay_cluster_api.list_ray_clusters(k8s_namespace="default") + if "items" in kube_ray_list: + line = "-" * 72 + print(line) + print("{:<63s}{:>2s}".format("Name", "Namespace")) + print(line) + for cluster in kube_ray_list["items"]: + print( + "{:<63s}{:>2s}".format( + cluster["metadata"]["name"], + cluster["metadata"]["namespace"], + ) + ) + print(line) + + if "items" in kube_ray_list: + for cluster in kube_ray_list["items"]: + print("deleting raycluster = {}".format(cluster["metadata"]["name"])) + kuberay_cluster_api.delete_ray_cluster( + name=cluster["metadata"]["name"], + k8s_namespace=cluster["metadata"]["namespace"], + ) + + +if __name__ == "__main__": + main() diff --git a/clients/python-client/examples/use-utils.py b/clients/python-client/examples/use-utils.py new file mode 100644 index 00000000000..4f27446134b --- /dev/null +++ b/clients/python-client/examples/use-utils.py @@ -0,0 +1,107 @@ +import sys +import os +from os import path +import json + + +""" +in case you are working directly with the source, and don't wish to +install the module with pip install, you can directly import the packages by uncommenting the following code. +""" + +""" +sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) +sibling_dirs = [ + d for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d)) +] +for sibling_dir in sibling_dirs: + sys.path.append(os.path.join(parent_dir, sibling_dir)) +""" + +import kuberay_cluster_api + +from utils import kuberay_cluster_utils, kuberay_cluster_builder + + +def main(): + + print("starting cluster handler...") + my_kuberay_api = kuberay_cluster_api.RayClusterApi() + + my_cluster_builder = kuberay_cluster_builder.ClusterBuilder() + + my_Cluster_utils = kuberay_cluster_utils.ClusterUtils() + + cluster1 = ( + my_cluster_builder.build_meta(name="new-cluster1") + .build_head() + .build_worker(group_name="workers") + .get_cluster() + ) + + if not my_cluster_builder.succeeded: + print("error building the cluster, aborting...") + return + + print("creating raycluster = {}".format(cluster1["metadata"]["name"])) + my_kuberay_api.create_ray_cluster(body=cluster1) + + cluster_to_patch, succeeded = my_Cluster_utils.update_worker_group_replicas( + cluster1, group_name="workers", max_replicas=4, min_replicas=1, replicas=2 + ) + + if succeeded: + print( + "trying to patch raycluster = {}".format( + cluster_to_patch["metadata"]["name"] + ) + ) + my_kuberay_api.patch_ray_cluster( + name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch + ) + + cluster_to_patch, succeeded = my_Cluster_utils.duplicate_worker_group( + cluster1, group_name="workers", new_group_name="duplicate-workers" + ) + if succeeded: + print( + "trying to patch raycluster = {}".format( + cluster_to_patch["metadata"]["name"] + ) + ) + my_kuberay_api.patch_ray_cluster( + name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch + ) + + # the rest of the code is simply to list and cleanup the created cluster + kube_ray_list = my_kuberay_api.list_ray_clusters(k8s_namespace="default") + if "items" in kube_ray_list: + line = "-" * 72 + print(line) + print("{:<63s}{:>2s}".format("Name", "Namespace")) + print(line) + for cluster in kube_ray_list["items"]: + print( + "{:<63s}{:>2s}".format( + cluster["metadata"]["name"], + cluster["metadata"]["namespace"], + ) + ) + print(line) + + if "items" in kube_ray_list: + for cluster in kube_ray_list["items"]: + print("deleting raycluster = {}".format(cluster["metadata"]["name"])) + my_kuberay_api.delete_ray_cluster( + name=cluster["metadata"]["name"], + k8s_namespace=cluster["metadata"]["namespace"], + ) + json_formatted_str = json.dumps(cluster, indent=2) + print(json_formatted_str) + + +if __name__ == "__main__": + main() diff --git a/clients/python-client/python_client/LICENSE b/clients/python-client/python_client/LICENSE new file mode 100644 index 00000000000..1dcfa84a3fb --- /dev/null +++ b/clients/python-client/python_client/LICENSE @@ -0,0 +1,272 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +-------------------------------------------------------------------------------- + +Code in python/ray/rllib/{evolution_strategies, dqn} adapted from +https://github.com/openai (MIT License) + +Copyright (c) 2016 OpenAI (http://openai.com) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +-------------------------------------------------------------------------------- + +Code in python/ray/rllib/impala/vtrace.py from +https://github.com/deepmind/scalable_agent + +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +-------------------------------------------------------------------------------- +Code in python/ray/rllib/ars is adapted from https://github.com/modestyachts/ARS + +Copyright (c) 2018, ARS contributors (Horia Mania, Aurelia Guy, Benjamin Recht) +All rights reserved. + +Redistribution and use of ARS in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/clients/python-client/python_client/__init__.py b/clients/python-client/python_client/__init__.py new file mode 100644 index 00000000000..6a9beea82f6 --- /dev/null +++ b/clients/python-client/python_client/__init__.py @@ -0,0 +1 @@ +__version__ = "0.4.0" diff --git a/clients/python-client/python_client/constants.py b/clients/python-client/python_client/constants.py new file mode 100644 index 00000000000..005fa9c48cb --- /dev/null +++ b/clients/python-client/python_client/constants.py @@ -0,0 +1,11 @@ +# Declares the constants that are used by the client +import logging + +# Group, Version, Plural +GROUP = "ray.io" +VERSION = "v1alpha1" +PLURAL = "rayclusters" +KIND = "RayCluster" + +# log level +LOGLEVEL = logging.INFO diff --git a/clients/python-client/python_client/kuberay_cluster_api.py b/clients/python-client/python_client/kuberay_cluster_api.py new file mode 100644 index 00000000000..b9a40d35100 --- /dev/null +++ b/clients/python-client/python_client/kuberay_cluster_api.py @@ -0,0 +1,196 @@ +""" +Set of APIs to manage rayclusters. +""" +__copyright__ = "Copyright 2021, Microsoft Corp." + +import copy +import logging +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from typing import Any, Dict, List, Optional +import constants + + +log = logging.getLogger(__name__) +if logging.getLevelName(log.level) == "NOTSET": + logging.basicConfig(format="%(asctime)s %(message)s", level=constants.LOGLEVEL) + + +class RayClusterApi: + """ + RayClusterApi provides APIs to list, get, create, build, update, delete rayclusters. + + Methods: + - list_ray_clusters(k8s_namespace: str = "default", async_req: bool = False) -> Any: + - get_ray_cluster(name: str, k8s_namespace: str = "default") -> Any: + - create_ray_cluster(body: Any, k8s_namespace: str = "default") -> Any: + - delete_ray_cluster(name: str, k8s_namespace: str = "default") -> bool: + - patch_ray_cluster(name: str, ray_patch: Any, k8s_namespace: str = "default") -> Any: + """ + + # initial config to setup the kube client + def __init__(self): + # loading the config + self.kube_config: Optional[Any] = config.load_kube_config() + self.api = client.CustomObjectsApi() + + def __del__(self): + self.api = None + self.kube_config = None + + def list_ray_clusters( + self, k8s_namespace: str = "default", async_req: bool = False + ) -> Any: + """List Ray clusters in a given namespace. + + Parameters: + - k8s_namespace (str, optional): The namespace in which to list the Ray clusters. Defaults to "default". + - async_req (bool, optional): Whether to make the request asynchronously. Defaults to False. + + Returns: + Any: The custom resource for Ray clusters in the specified namespace, or None if not found. + + Raises: + ApiException: If there was an error fetching the custom resource. + """ + try: + resource: Any = self.api.list_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + namespace=k8s_namespace, + async_req=async_req, + ) + if "items" in resource: + return resource + return None + except ApiException as e: + if e.status == 404: + log.error("raycluster resource is not found. error = {}".format(e)) + return None + else: + log.error("error fetching custom resource: {}".format(e)) + return None + + def get_ray_cluster(self, name: str, k8s_namespace: str = "default") -> Any: + """Get a specific Ray cluster in a given namespace. + + Parameters: + - name (str): The name of the Ray cluster custom resource. Defaults to "". + - k8s_namespace (str, optional): The namespace in which to retrieve the Ray cluster. Defaults to "default". + + Returns: + Any: The custom resource for the specified Ray cluster, or None if not found. + + Raises: + ApiException: If there was an error fetching the custom resource. + """ + try: + resource: Any = self.api.get_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + name=name, + namespace=k8s_namespace, + ) + return resource + except ApiException as e: + if e.status == 404: + log.error("raycluster resource is not found. error = {}".format(e)) + return None + else: + log.error("error fetching custom resource: {}".format(e)) + return None + + def create_ray_cluster(self, body: Any, k8s_namespace: str = "default") -> Any: + """Create a new Ray cluster custom resource. + + Parameters: + - body (Any): The data of the custom resource to create. + - k8s_namespace (str, optional): The namespace in which to create the custom resource. Defaults to "default". + + Returns: + Any: The created custom resource, or None if it already exists or there was an error. + """ + try: + resource: Any = self.api.create_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + body=body, + namespace=k8s_namespace, + ) + return resource + except ApiException as e: + if e.status == 409: + log.error( + "raycluster resource already exists. error = {}".format(e.reason) + ) + return None + else: + log.error("error creating custom resource: {}".format(e)) + return None + + def delete_ray_cluster(self, name: str, k8s_namespace: str = "default") -> bool: + """Delete a Ray cluster custom resource. + + Parameters: + - name (str): The name of the Ray cluster custom resource to delete. + - k8s_namespace (str, optional): The namespace in which the Ray cluster exists. Defaults to "default". + + Returns: + Any: The deleted custom resource, or None if already deleted or there was an error. + """ + try: + resource: Any = self.api.delete_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + name=name, + namespace=k8s_namespace, + ) + return resource + except ApiException as e: + if e.status == 404: + log.error( + "raycluster custom resource already deleted. error = {}".format( + e.reason + ) + ) + return None + else: + log.error( + "error deleting the raycluster custom resource: {}".format(e.reason) + ) + return None + + def patch_ray_cluster( + self, name: str, ray_patch: Any, k8s_namespace: str = "default" + ) -> Any: + """Patch an existing Ray cluster custom resource. + + Parameters: + - name (str): The name of the Ray cluster custom resource to be patched. + - ray_patch (Any): The patch data for the Ray cluster. + - k8s_namespace (str, optional): The namespace in which the Ray cluster exists. Defaults to "default". + + Returns: + bool: True if the patch was successful, False otherwise. + """ + try: + # we patch the existing raycluster with the new config + self.api.patch_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + name=name, + body=ray_patch, + namespace=k8s_namespace, + ) + except ApiException as e: + log.error("raycluster `{}` failed to patch, with error: {}".format(name, e)) + return False + else: + log.info("raycluster `%s` is patched successfully", name) + + return True diff --git a/clients/python-client/python_client/pyproject.toml b/clients/python-client/python_client/pyproject.toml new file mode 100755 index 00000000000..9787c3bdf00 --- /dev/null +++ b/clients/python-client/python_client/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" diff --git a/clients/python-client/python_client/setup.cfg b/clients/python-client/python_client/setup.cfg new file mode 100755 index 00000000000..18426ab21be --- /dev/null +++ b/clients/python-client/python_client/setup.cfg @@ -0,0 +1,22 @@ +[metadata] +name = python_client +version = 0.4.0 +author = Ali Kanso +description = A Kuberay python client library to create/delete/update clusters +long_description = file: README.md +keywords = kuberay, ray, python-kuberay +license = Apache License 2.0 +url = https://github.com/ray-project/kuberay/ +classifiers = + Programming Language :: Python :: 3 + +[options] +packages = find: +python_requires = >=3.6.5 + +[options.packages.find] +exclude = + examples* + tools* + docs* + python_client_test.tests* \ No newline at end of file diff --git a/clients/python-client/python_client/utils/__init__.py b/clients/python-client/python_client/utils/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/clients/python-client/python_client/utils/kuberay_cluster_builder.py b/clients/python-client/python_client/utils/kuberay_cluster_builder.py new file mode 100644 index 00000000000..88956080b05 --- /dev/null +++ b/clients/python-client/python_client/utils/kuberay_cluster_builder.py @@ -0,0 +1,314 @@ +""" +Set of helper methods to manage rayclusters. Requires Python 3.9 and higher +""" + +import constants +import copy +import logging +import math +from typing import Any +from abc import ABCMeta, abstractmethod +from utils import kuberay_cluster_utils + + +log = logging.getLogger(__name__) +if logging.getLevelName(log.level) == "NOTSET": + logging.basicConfig(format="%(asctime)s %(message)s", level=constants.LOGLEVEL) + + +class IClusterBuilder(metaclass=ABCMeta): + """ + IClusterBuilder is an interface for building a cluster. + + The class defines abstract methods for building the metadata, head pod, worker groups, and retrieving the built cluster. + """ + + @staticmethod + @abstractmethod + def build_meta(): + "builds the cluster metadata" + + @staticmethod + @abstractmethod + def build_head(): + "builds the head pod" + + @staticmethod + @abstractmethod + def build_worker(): + "builds a worker group" + + @staticmethod + @abstractmethod + def get_cluster(): + "Returns the built cluster" + + +# Concrete implementation of the builder interface +class ClusterBuilder(IClusterBuilder): + """ + ClusterBuilder implements the abstract methods of IClusterBuilder to build a cluster. + """ + + def __init__(self): + self.cluster: dict[str, Any] = {} + self.succeeded: bool = False + self.cluster_utils = kuberay_cluster_utils.ClusterUtils() + + def build_meta( + self, + name: str, + k8s_namespace: str = "default", + labels: dict = None, + ray_version: str = "2.2.0", + ): + """Builds the metadata and ray version of the cluster. + + Parameters: + - name (str): The name of the cluster. + - k8s_namespace (str, optional): The namespace in which the Ray cluster exists. Defaults to "default". + - labels (dict, optional): A dictionary of key-value pairs to add as labels to the cluster. Defaults to None. + - ray_version (str, optional): The version of Ray to use for the cluster. Defaults to "2.2.0". + """ + self.cluster = self.cluster_utils.populate_meta( + cluster=self.cluster, + name=name, + k8s_namespace=k8s_namespace, + labels=labels, + ray_version=ray_version, + ) + return self + + def build_head( + self, + ray_image: str = "rayproject/ray:2.2.0", + service_type: str = "ClusterIP", + cpu_requests: str = "1", + memory_requests: str = "1G", + cpu_limits: str = "2", + memory_limits: str = "2G", + ray_start_params: dict = { + "block": "true", + "dashboard-host": "0.0.0.0", + }, + ): + """Build head node of the ray cluster. + + Parameters: + - ray_image (str): Docker image for the head node. Default value is "rayproject/ray:2.2.0". + - service_type (str): Service type of the head node. Default value is "ClusterIP". + - cpu_requests (str): CPU requests for the head node. Default value is "1". + - memory_requests (str): Memory requests for the head node. Default value is "1G". + - cpu_limits (str): CPU limits for the head node. Default value is "2". + - memory_limits (str): Memory limits for the head node. Default value is "2G". + - ray_start_params (dict): Dictionary of start parameters for the head node. + Default values are "block": "true" and "dashboard-host": "0.0.0.0". + """ + self.cluster, self.succeeded = self.cluster_utils.populate_ray_head( + self.cluster, + ray_image=ray_image, + service_type=service_type, + cpu_requests=cpu_requests, + memory_requests=memory_requests, + cpu_limits=cpu_limits, + memory_limits=memory_limits, + ray_start_params=ray_start_params, + ) + return self + + def build_worker( + self, + group_name: str, + ray_image: str = "rayproject/ray:2.2.0", + ray_command: Any = ["/bin/bash", "-lc"], + init_image: str = "busybox:1.28", + cpu_requests: str = "1", + memory_requests: str = "1G", + cpu_limits: str = "2", + memory_limits: str = "2G", + replicas: int = 1, + min_replicas: int = -1, + max_replicas: int = -1, + ray_start_params: dict = { + "block": "true", + }, + ): + """Build worker specifications of the cluster. + + This function sets the worker configuration of the cluster, including the Docker image, CPU and memory requirements, number of replicas, and other parameters. + + Parameters: + - group_name (str): name of the worker group. + - ray_image (str, optional): Docker image for the Ray process. Default is "rayproject/ray:2.2.0". + - ray_command (Any, optional): Command to run in the Docker container. Default is ["/bin/bash", "-lc"]. + - init_image (str, optional): Docker image for the init container. Default is "busybox:1.28". + - cpu_requests (str, optional): CPU requests for the worker pods. Default is "1". + - memory_requests (str, optional): Memory requests for the worker pods. Default is "1G". + - cpu_limits (str, optional): CPU limits for the worker pods. Default is "2". + - memory_limits (str, optional): Memory limits for the worker pods. Default is "2G". + - replicas (int, optional): Number of worker pods to run. Default is 1. + - min_replicas (int, optional): Minimum number of worker pods to run. Default is -1. + - max_replicas (int, optional): Maximum number of worker pods to run. Default is -1. + - ray_start_params (dict, optional): Additional parameters to pass to the ray start command. Default is {"block": "true"}. + """ + if min_replicas < 0: + min_replicas = int(math.ceil(replicas / 2)) + if max_replicas < 0: + max_replicas = int(replicas * 3) + + if "spec" in self.cluster.keys(): + if "workerGroupSpecs" not in self.cluster.keys(): + log.info( + "setting the workerGroupSpecs for group_name {}".format(group_name) + ) + self.cluster["spec"]["workerGroupSpecs"] = [] + else: + log.error( + "error creating custom resource: {meta}, the spec section is missing, did you run build_head()?".format( + self.cluster["metadata"] + ) + ) + self.succeeded = False + return self + + worker_group, self.succeeded = self.cluster_utils.populate_worker_group( + self.cluster, + group_name, + ray_image, + ray_command, + init_image, + cpu_requests, + memory_requests, + cpu_limits, + memory_limits, + replicas, + min_replicas, + max_replicas, + ray_start_params, + ) + + if self.succeeded: + self.cluster["spec"]["workerGroupSpecs"].append(worker_group) + return self + + def get_cluster(self): + cluster = copy.deepcopy(self.cluster) + return cluster + + +class Director: + def __init__(self): + self.cluster_builder = ClusterBuilder() + + def build_basic_cluster(self, name: str, k8s_namespace: str = "default") -> dict: + """Builds a basic cluster with the given name and k8s_namespace parameters. + + Parameters: + - name (str): The name of the cluster. + - k8s_namespace (str, optional): The kubernetes namespace for the cluster, with a default value of "default". + + Returns: + dict: The basic cluster as a dictionary. + """ + cluster: dict = ( + self.cluster_builder.build_meta(name=name, k8s_namespace=k8s_namespace) + .build_head() + .get_cluster() + ) + + if self.cluster_builder.succeeded: + return cluster + return None + + def build_small_cluster(self, name: str, k8s_namespace: str = "default") -> dict: + """Builds a small cluster with the given name and k8s_namespace parameters with 1 workergroup, + the workgroup has 1 replica with 2 cpu and 2G memory limits + + Parameters: + - name (str): The name of the cluster. + - k8s_namespace (str, optional): The kubernetes namespace for the cluster, with a default value of "default". + + Returns: + dict: The small cluster as a dictionary. + """ + cluster: dict = ( + self.cluster_builder.build_meta(name=name, k8s_namespace=k8s_namespace) + .build_head() + .build_worker( + group_name="{}-workers".format(name), + replicas=1, + min_replicas=0, + max_replicas=2, + cpu_requests="1", + memory_requests="1G", + cpu_limits="2", + memory_limits="2G", + ) + .get_cluster() + ) + + if self.cluster_builder.succeeded: + return cluster + return None + + def build_medium_cluster(self, name: str, k8s_namespace: str = "default") -> dict: + """Builds a medium cluster with the given name and k8s_namespace parameters with 1 workergroup, + the workgroup has 3 replicas with 4 cpu and 4G memory limits + + Parameters: + - name (str): The name of the cluster. + - k8s_namespace (str, optional): The kubernetes namespace for the cluster, with a default value of "default". + + Returns: + dict: The small cluster as a dictionary. + """ + cluster: dict = ( + self.cluster_builder.build_meta(name=name, k8s_namespace=k8s_namespace) + .build_head() + .build_worker( + group_name="{}-workers".format(name), + replicas=3, + min_replicas=0, + max_replicas=6, + cpu_requests="2", + memory_requests="2G", + cpu_limits="4", + memory_limits="4G", + ) + .get_cluster() + ) + + if self.cluster_builder.succeeded: + return cluster + return None + + def build_large_cluster(self, name: str, k8s_namespace: str = "default") -> dict: + """Builds a medium cluster with the given name and k8s_namespace parameters. with 1 workergroup, + the workgroup has 6 replicas with 6 cpu and 6G memory limits + + Parameters: + - name (str): The name of the cluster. + - k8s_namespace (str, optional): The kubernetes namespace for the cluster, with a default value of "default". + + Returns: + dict: The small cluster as a dictionary. + """ + cluster: dict = ( + self.cluster_builder.build_meta(name=name, k8s_namespace=k8s_namespace) + .build_head() + .build_worker( + group_name="{}-workers".format(name), + replicas=6, + min_replicas=0, + max_replicas=12, + cpu_requests="3", + memory_requests="4G", + cpu_limits="6", + memory_limits="8G", + ) + .get_cluster() + ) + + if self.cluster_builder.succeeded: + return cluster + return None diff --git a/clients/python-client/python_client/utils/kuberay_cluster_utils.py b/clients/python-client/python_client/utils/kuberay_cluster_utils.py new file mode 100644 index 00000000000..1f3deeb4d84 --- /dev/null +++ b/clients/python-client/python_client/utils/kuberay_cluster_utils.py @@ -0,0 +1,487 @@ +""" +Set of helper methods to manage rayclusters. Requires Python 3.6 and higher +""" + +import constants +import logging +import copy +import re +from typing import Any, Tuple + + +log = logging.getLogger(__name__) +if logging.getLevelName(log.level) == "NOTSET": + logging.basicConfig(format="%(asctime)s %(message)s", level=constants.LOGLEVEL) + +""" +ClusterUtils contains methods to facilitate modifying/populating the config of a raycluster +""" + + +class ClusterUtils: + """ + ClusterUtils - Utility class for populating cluster information + + Methods: + - populate_meta(cluster: dict, name: str, k8s_namespace: str, labels: dict, ray_version: str) -> dict: + - populate_ray_head(cluster: dict, ray_image: str,service_type: str, cpu_requests: str, memory_requests: str, cpu_limits: str, memory_limits: str, ray_start_params: dict) -> Tuple[dict, bool]: + - populate_worker_group(cluster: dict, group_name: str, ray_image: str, ray_command: Any, init_image: str, cpu_requests: str, memory_requests: str, cpu_limits: str, memory_limits: str, replicas: int, min_replicas: int, max_replicas: int, ray_start_params: dict) -> Tuple[dict, bool]: + - update_worker_group_replicas(cluster: dict, group_name: str, max_replicas: int, min_replicas: int, replicas: int) -> Tuple[dict, bool]: + """ + + def populate_meta( + self, + cluster: dict, + name: str, + k8s_namespace: str, + labels: dict, + ray_version: str, + ) -> dict: + """Populate the metadata and ray version of the cluster. + + Parameters: + - cluster (dict): A dictionary representing a cluster. + - name (str): The name of the cluster. + - k8s_namespace (str): The namespace of the cluster. + - labels (dict): A dictionary of labels to be applied to the cluster. + - ray_version (str): The version of Ray to use in the cluster. + + Returns: + dict: The updated cluster dictionary with metadata and ray version populated. + """ + + assert self.is_valid_name(name) + + cluster["apiVersion"] = "{group}/{version}".format( + group=constants.GROUP, version=constants.VERSION + ) + cluster["kind"] = constants.KIND + cluster["metadata"] = { + "name": name, + "namespace": k8s_namespace, + "labels": labels, + } + cluster["spec"] = {"rayVersion": ray_version} + return cluster + + def populate_ray_head( + self, + cluster: dict, + ray_image: str, + service_type: str, + cpu_requests: str, + memory_requests: str, + cpu_limits: str, + memory_limits: str, + ray_start_params: dict, + ) -> Tuple[dict, bool]: + """Populate the ray head specs of the cluster + Parameters: + - cluster (dict): The dictionary representation of the cluster. + - ray_image (str): The name of the ray image to use for the head node. + - service_type (str): The type of service to run for the head node. + - cpu_requests (str): The CPU resource requests for the head node. + - memory_requests (str): The memory resource requests for the head node. + - cpu_limits (str): The CPU resource limits for the head node. + - memory_limits (str): The memory resource limits for the head node. + - ray_start_params (dict): The parameters for starting the Ray cluster. + + Returns: + - Tuple (dict, bool): The updated cluster, and a boolean indicating whether the update was successful. + """ + # validate arguments + try: + arguments = locals() + for k, v in arguments.items(): + assert v + except AssertionError as e: + log.error( + "error creating ray head, the parameters are not fully defined. {} = {}".format( + k, v + ) + ) + return cluster, False + + # make sure metadata exists + if "spec" in cluster.keys(): + if "headGroupSpec" not in cluster.keys(): + log.info( + "setting the headGroupSpec for cluster {}".format( + cluster["metadata"]["name"] + ) + ) + cluster["spec"]["headGroupSpec"] = [] + else: + log.error("error creating ray head, the spec and/or metadata is not define") + return cluster, False + + # populate headGroupSpec + cluster["spec"]["headGroupSpec"] = { + "serviceType": service_type, + "rayStartParams": ray_start_params, + "template": { + "spec": { + "containers": [ + { + "image": ray_image, + "name": "ray-head", + "ports": [ + { + "containerPort": 6379, + "name": "gcs-server", + "protocol": "TCP", + }, + { + "containerPort": 8265, + "name": "dashboard", + "protocol": "TCP", + }, + { + "containerPort": 10001, + "name": "client", + "protocol": "TCP", + }, + ], + "resources": { + "requests": { + "cpu": cpu_requests, + "memory": memory_requests, + }, + "limits": {"cpu": cpu_limits, "memory": memory_limits}, + }, + "volumeMounts": [ + {"mountPath": "/tmp/ray", "name": "ray-logs"} + ], + } + ], + "volumes": [{"emptyDir": {}, "name": "ray-logs"}], + } + }, + } + + return cluster, True + + def populate_worker_group( + self, + cluster: dict, + group_name: str, + ray_image: str, + ray_command: Any, + init_image: str, + cpu_requests: str, + memory_requests: str, + cpu_limits: str, + memory_limits: str, + replicas: int, + min_replicas: int, + max_replicas: int, + ray_start_params: dict, + ) -> Tuple[dict, bool]: + """Populate the worker group specification in the cluster dictionary. + + Parameters: + - cluster (dict): Dictionary representing the cluster spec. + - group_name (str): The name of the worker group. + - ray_image (str): The image to use for the Ray worker containers. + - ray_command (Any): The command to run in the Ray worker containers. + - init_image (str): The init container image to use. + - cpu_requests (str): The requested CPU resources for the worker containers. + - memory_requests (str): The requested memory resources for the worker containers. + - cpu_limits (str): The limit on CPU resources for the worker containers. + - memory_limits (str): The limit on memory resources for the worker containers. + - replicas (int): The desired number of replicas for the worker group. + - min_replicas (int): The minimum number of replicas for the worker group. + - max_replicas (int): The maximum number of replicas for the worker group. + - ray_start_params (dict): The parameters to pass to the Ray worker start command. + + Returns: + - Tuple[dict, bool]: A tuple of the cluster specification and a boolean indicating + whether the worker group was successfully populated. + """ + # validate arguments + try: + arguments = locals() + for k, v in arguments.items(): + if k != "min_replicas": + assert v + except AssertionError as e: + log.error( + "error populating worker group, the parameters are not fully defined. {} = {}".format( + k, v + ) + ) + return cluster, False + + assert self.is_valid_name(group_name) + assert max_replicas >= min_replicas + + worker_group: dict[str, Any] = { + "groupName": group_name, + "maxReplicas": max_replicas, + "minReplicas": min_replicas, + "rayStartParams": ray_start_params, + "replicas": replicas, + "template": { + "spec": { + "containers": [ + { + "image": ray_image, + "command": ray_command, + "lifecycle": { + "preStop": { + "exec": {"command": ["/bin/sh", "-c", "ray stop"]} + } + }, + "name": "ray-worker", + "resources": { + "requests": { + "cpu": cpu_requests, + "memory": memory_requests, + }, + "limits": { + "cpu": cpu_limits, + "memory": memory_limits, + }, + }, + "volumeMounts": [ + {"mountPath": "/tmp/ray", "name": "ray-logs"} + ], + } + ], + "initContainers": [ + { + "command": [ + "sh", + "-c", + "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for K8s Service $RAY_IP; sleep 2; done", + ], + "image": init_image, + "name": "init", + } + ], + "volumes": [{"emptyDir": {}, "name": "ray-logs"}], + } + }, + } + + return worker_group, True + + def update_worker_group_replicas( + self, + cluster: dict, + group_name: str, + max_replicas: int, + min_replicas: int, + replicas: int, + ) -> Tuple[dict, bool]: + """Update the number of replicas for a worker group in the cluster. + + Parameters: + - cluster (dict): The cluster to update. + - group_name (str): The name of the worker group to update. + - max_replicas (int): The maximum number of replicas for the worker group. + - min_replicas (int): The minimum number of replicas for the worker group. + - replicas (int): The desired number of replicas for the worker group. + + Returns: + Tuple[dict, bool]: A tuple containing the updated cluster and a flag indicating whether the update was successful. + """ + try: + arguments = locals() + for k, v in arguments.items(): + if k != "min_replicas": + assert v + except AssertionError as e: + log.error( + "error updating worker group, the parameters are not fully defined. {} = {}".format( + k, v + ) + ) + return cluster, False + + assert cluster["spec"]["workerGroupSpecs"] + assert max_replicas >= min_replicas + + for i in range(len(cluster["spec"]["workerGroupSpecs"])): + if cluster["spec"]["workerGroupSpecs"][i]["groupName"] == group_name: + + cluster["spec"]["workerGroupSpecs"][i]["maxReplicas"] = max_replicas + cluster["spec"]["workerGroupSpecs"][i]["minReplicas"] = min_replicas + cluster["spec"]["workerGroupSpecs"][i]["replicas"] = replicas + return cluster, True + + return cluster, False + + def update_worker_group_resources( + self, + cluster: dict, + group_name: str, + cpu_requests: str, + memory_requests: str, + cpu_limits: str, + memory_limits: str, + container_name="unspecified", + ) -> Tuple[dict, bool]: + """Update the resources for a worker group pods in the cluster. + + Parameters: + - cluster (dict): The cluster to update. + - group_name (str): The name of the worker group to update. + - cpu_requests (str): CPU requests for the worker pods. + - memory_requests (str): Memory requests for the worker pods. + - cpu_limits (str): CPU limits for the worker pods. + - memory_limits (str): Memory limits for the worker pods. + + Returns: + Tuple[dict, bool]: A tuple containing the updated cluster and a flag indicating whether the update was successful. + """ + try: + arguments = locals() + for k, v in arguments.items(): + if k != "min_replicas": + assert v + except AssertionError as e: + log.error( + "error updating worker group, the parameters are not fully defined. {} = {}".format( + k, v + ) + ) + return cluster, False + + assert cluster["spec"]["workerGroupSpecs"] + + worker_groups = cluster["spec"]["workerGroupSpecs"] + + def add_values(group_index: int, container_index: int): + worker_groups[group_index]["template"]["spec"]["containers"][ + container_index + ]["resources"]["requests"]["cpu"] = cpu_requests + worker_groups[group_index]["template"]["spec"]["containers"][ + container_index + ]["resources"]["requests"]["memory"] = memory_requests + worker_groups[group_index]["template"]["spec"]["containers"][ + container_index + ]["resources"]["limits"]["cpu"] = cpu_limits + worker_groups[group_index]["template"]["spec"]["containers"][ + container_index + ]["resources"]["limits"]["memory"] = memory_limits + + for group_index, worker_group in enumerate(worker_groups): + if worker_group["groupName"] != group_name: + continue + + containers = worker_group["template"]["spec"]["containers"] + container_names = [container["name"] for container in containers] + + if len(containers) == 0: + log.error( + f"error updating container resources, the worker group {group_name} has no containers" + ) + return cluster, False + + if container_name == "unspecified": + add_values(group_index, 0) + return cluster, True + elif container_name == "all_containers": + for container_index in range(len(containers)): + add_values(group_index, container_index) + return cluster, True + elif container_name in container_names: + container_index = container_names.index(container_name) + add_values(group_index, container_index) + return cluster, True + + return cluster, False + + def duplicate_worker_group( + self, + cluster: dict, + group_name: str, + new_group_name: str, + ) -> Tuple[dict, bool]: + """Duplicate a worker group in the cluster. + + Parameters: + - cluster (dict): The cluster definition. + - group_name (str): The name of the worker group to be duplicated. + - new_group_name (str): The name for the duplicated worker group. + + Returns: + Tuple[dict, bool]: A tuple containing the updated cluster definition and a boolean indicating the success of the operation. + """ + try: + arguments = locals() + for k, v in arguments.items(): + assert v + except AssertionError as e: + log.error( + f"error duplicating worker group, the parameters are not fully defined. {k} = {v}" + ) + return cluster, False + assert self.is_valid_name(new_group_name) + assert cluster["spec"]["workerGroupSpecs"] + + worker_groups = cluster["spec"]["workerGroupSpecs"] + for _, worker_group in enumerate(worker_groups): + if worker_group["groupName"] == group_name: + duplicate_group = copy.deepcopy(worker_group) + duplicate_group["groupName"] = new_group_name + worker_groups.append(duplicate_group) + return cluster, True + + log.error( + f"error duplicating worker group, no match was found for {group_name}" + ) + return cluster, False + + def delete_worker_group( + self, + cluster: dict, + group_name: str, + ) -> Tuple[dict, bool]: + """Deletes a worker group in the cluster. + + Parameters: + - cluster (dict): The cluster definition. + - group_name (str): The name of the worker group to be duplicated. + + Returns: + Tuple[dict, bool]: A tuple containing the updated cluster definition and a boolean indicating the success of the operation. + """ + try: + arguments = locals() + for k, v in arguments.items(): + assert v + except AssertionError as e: + log.error( + f"error creating ray head, the parameters are not fully defined. {k} = {v}" + ) + return cluster, False + + assert cluster["spec"]["workerGroupSpecs"] + + worker_groups = cluster["spec"]["workerGroupSpecs"] + first_or_none = next((x for x in worker_groups if x["groupName"] == group_name), None) + if first_or_none: + worker_groups.remove(first_or_none) + return cluster, True + + log.error( + f"error removing worker group, no match was found for {group_name}" + ) + return cluster, False + + def is_valid_name(self, name: str) -> bool: + msg = "The name must be 63 characters or less, begin and end with an alphanumeric character, and contain only dashes, dots, and alphanumerics." + if len(name) > 63 or not bool(re.match("^[a-z0-9]([-.]*[a-z0-9])+$", name)): + log.error(msg) + return False + return True + + def is_valid_label(self, name: str) -> bool: + msg = "The label name must be 63 characters or less, begin and end with an alphanumeric character, and contain only dashes, underscores, dots, and alphanumerics." + if len(name) > 63 or not bool(re.match("^[a-z0-9]([-._]*[a-z0-9])+$", name)): + log.error(msg) + return False + return True + diff --git a/clients/python-client/python_client_test/README.md b/clients/python-client/python_client_test/README.md new file mode 100644 index 00000000000..e2adb918aed --- /dev/null +++ b/clients/python-client/python_client_test/README.md @@ -0,0 +1,43 @@ +# Overview + +## For developers + +make sure you have installed setuptool + +`pip install -U pip setuptools` + +**run the pip command** + + +from the directory `path/to/kuberay/clients/python-client/python_client` + +`pip install -e .` + +**to uninstall the module run** + +`pip uninstall python-client` + +## For testing run + +`python -m unittest discover 'path/to/kuberay/clients/python_client_test/'` + +### Coverage report + +__install prerequisites__ + +`sudo apt install libsqlite3-dev` + +`pip3 install db-sqlite3` + +`pyenv install 3.6.5` # or your Python version + +`pip install coverage` + +__To gather data__ +`python -m coverage run -m unittest` + +__to generate a coverage report__ +`python -m coverage report` + +__to generate the test coverage report in HTML format__ +`python -m coverage html` diff --git a/clients/python-client/python_client_test/test_director.py b/clients/python-client/python_client_test/test_director.py new file mode 100644 index 00000000000..30afb3d8350 --- /dev/null +++ b/clients/python-client/python_client_test/test_director.py @@ -0,0 +1,122 @@ +import unittest +from utils import kuberay_cluster_builder + + +class TestDirector(unittest.TestCase): + def __init__(self, methodName: str = ...) -> None: + super().__init__(methodName) + self.director = kuberay_cluster_builder.Director() + + def test_build_basic_cluster(self): + cluster = self.director.build_basic_cluster(name="basic-cluster") + # testing meta + actual = cluster["metadata"]["name"] + expected = "basic-cluster" + self.assertEqual(actual, expected) + + actual = cluster["metadata"]["namespace"] + expected = "default" + self.assertEqual(actual, expected) + + # testing the head pod + actual = cluster["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["cpu"] + expected = "1" + self.assertEqual(actual, expected) + + + def test_build_small_cluster(self): + cluster = self.director.build_small_cluster(name="small-cluster") + # testing meta + actual = cluster["metadata"]["name"] + expected = "small-cluster" + self.assertEqual(actual, expected) + + actual = cluster["metadata"]["namespace"] + expected = "default" + self.assertEqual(actual, expected) + + # testing the head pod + actual = cluster["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["cpu"] + expected = "1" + self.assertEqual(actual, expected) + + # testing the workergroup + actual = cluster["spec"]["workerGroupSpecs"][0]["replicas"] + expected = 1 + self.assertEqual(actual, expected) + + actual = cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["cpu"] + expected = "1" + self.assertEqual(actual, expected) + + def test_build_medium_cluster(self): + cluster = self.director.build_medium_cluster(name="medium-cluster") + # testing meta + actual = cluster["metadata"]["name"] + expected = "medium-cluster" + self.assertEqual(actual, expected) + + actual = cluster["metadata"]["namespace"] + expected = "default" + self.assertEqual(actual, expected) + + # testing the head pod + actual = cluster["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["cpu"] + expected = "1" + self.assertEqual(actual, expected) + + # testing the workergroup + actual = cluster["spec"]["workerGroupSpecs"][0]["replicas"] + expected = 3 + self.assertEqual(actual, expected) + + actual = cluster["spec"]["workerGroupSpecs"][0]["groupName"] + expected = "medium-cluster-workers" + self.assertEqual(actual, expected) + + actual = cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["cpu"] + expected = "2" + self.assertEqual(actual, expected) + + def test_build_large_cluster(self): + cluster = self.director.build_large_cluster(name="large-cluster") + # testing meta + actual = cluster["metadata"]["name"] + expected = "large-cluster" + self.assertEqual(actual, expected) + + actual = cluster["metadata"]["namespace"] + expected = "default" + self.assertEqual(actual, expected) + + # testing the head pod + actual = cluster["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["cpu"] + expected = "1" + self.assertEqual(actual, expected) + + # testing the workergroup + actual = cluster["spec"]["workerGroupSpecs"][0]["replicas"] + expected = 6 + self.assertEqual(actual, expected) + + actual = cluster["spec"]["workerGroupSpecs"][0]["groupName"] + expected = "large-cluster-workers" + self.assertEqual(actual, expected) + + actual = cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["cpu"] + expected = "3" + self.assertEqual(actual, expected) diff --git a/clients/python-client/python_client_test/test_utils.py b/clients/python-client/python_client_test/test_utils.py new file mode 100644 index 00000000000..baa50e7bd17 --- /dev/null +++ b/clients/python-client/python_client_test/test_utils.py @@ -0,0 +1,284 @@ +import unittest +import copy +import re +from utils import kuberay_cluster_utils, kuberay_cluster_builder + + + +test_cluster_body: dict = { + "apiVersion": "ray.io/v1alpha1", + "kind": "RayCluster", + "metadata": { + "labels": {"controller-tools.k8s.io": "1.0"}, + "name": "raycluster-complete-raw", + }, + "spec": { + "rayVersion": "2.2.0", + "headGroupSpec": { + "serviceType": "ClusterIP", + "rayStartParams": {"dashboard-host": "0.0.0.0", "block": "true"}, + "template": { + "metadata": {"labels": {}}, + "spec": { + "containers": [ + { + "name": "ray-head", + "image": "rayproject/ray:2.2.0", + "ports": [ + {"containerPort": 6379, "name": "gcs"}, + {"containerPort": 8265, "name": "dashboard"}, + {"containerPort": 10001, "name": "client"}, + ], + "lifecycle": { + "preStop": { + "exec": {"command": ["/bin/sh", "-c", "ray stop"]} + } + }, + "volumeMounts": [ + {"mountPath": "/tmp/ray", "name": "ray-logs"} + ], + "resources": { + "limits": {"cpu": "1", "memory": "2G"}, + "requests": {"cpu": "500m", "memory": "2G"}, + }, + } + ], + "volumes": [{"name": "ray-logs", "emptyDir": {}}], + }, + }, + }, + "workerGroupSpecs": [ + { + "replicas": 1, + "minReplicas": 1, + "maxReplicas": 10, + "groupName": "small-group", + "rayStartParams": {"block": "true"}, + "template": { + "spec": { + "containers": [ + { + "name": "ray-worker", + "image": "rayproject/ray:2.2.0", + "lifecycle": { + "preStop": { + "exec": { + "command": ["/bin/sh", "-c", "ray stop"] + } + } + }, + "volumeMounts": [ + {"mountPath": "/tmp/ray", "name": "ray-logs"} + ], + "resources": { + "limits": {"cpu": "1", "memory": "1G"}, + "requests": {"cpu": "500m", "memory": "1G"}, + }, + }, + { + "name": "side-car", + "image": "rayproject/ray:2.2.0", + "resources": { + "limits": {"cpu": "1", "memory": "1G"}, + "requests": {"cpu": "500m", "memory": "1G"}, + }, + } + ], + "initContainers": [ + { + "name": "init", + "image": "busybox:1.28", + "command": [ + "sh", + "-c", + "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for K8s Service $RAY_IP; sleep 2; done", + ], + } + ], + "volumes": [{"name": "ray-logs", "emptyDir": {}}], + } + }, + } + ], + }, +} + +class TestUtils(unittest.TestCase): + def __init__(self, methodName: str = ...) -> None: + super().__init__(methodName) + self.director = kuberay_cluster_builder.Director() + self.utils = kuberay_cluster_utils.ClusterUtils() + + def test_update_worker_group_replicas(self): + cluster = self.director.build_small_cluster(name="small-cluster") + + actual = cluster["metadata"]["name"] + expected = "small-cluster" + self.assertEqual(actual, expected) + + cluster, succeeded = self.utils.update_worker_group_replicas( + cluster, + group_name="small-cluster-workers", + max_replicas=10, + min_replicas=1, + replicas=5, + ) + + self.assertEqual(succeeded, True) + + # testing the workergroup + actual = cluster["spec"]["workerGroupSpecs"][0]["replicas"] + expected = 5 + self.assertEqual(actual, expected) + + actual = cluster["spec"]["workerGroupSpecs"][0]["maxReplicas"] + expected = 10 + self.assertEqual(actual, expected) + + actual = cluster["spec"]["workerGroupSpecs"][0]["minReplicas"] + expected = 1 + self.assertEqual(actual, expected) + + def test_update_worker_group_resources(self): + cluster: dict = copy.deepcopy(test_cluster_body) + actual = cluster["metadata"]["name"] + expected = "raycluster-complete-raw" + self.assertEqual(actual, expected) + + cluster, succeeded = self.utils.update_worker_group_resources( + cluster, + group_name="small-group", + cpu_requests = "3", + memory_requests = "5G", + cpu_limits = "5", + memory_limits = "10G", + container_name = "unspecified", + ) + self.assertEqual(succeeded, True) + self.assertEqual(cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0]["resources"]["requests"]["cpu"], "3") + self.assertEqual(cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][1]["resources"]["requests"]["cpu"], "500m") + + cluster, succeeded = self.utils.update_worker_group_resources( + cluster, + group_name="small-group", + cpu_requests = "4", + memory_requests = "5G", + cpu_limits = "5", + memory_limits = "10G", + container_name = "side-car", + ) + self.assertEqual(succeeded, True) + self.assertEqual(cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][1]["resources"]["requests"]["cpu"], "4") + + + cluster, succeeded = self.utils.update_worker_group_resources( + cluster, + group_name="small-group", + cpu_requests = "4", + memory_requests = "15G", + cpu_limits = "5", + memory_limits = "25G", + container_name = "all_containers", + ) + self.assertEqual(succeeded, True) + self.assertEqual(cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][1]["resources"]["requests"]["memory"], "15G") + + cluster, succeeded = self.utils.update_worker_group_resources( + cluster, + group_name="small-group", + cpu_requests = "4", + memory_requests = "15G", + cpu_limits = "5", + memory_limits = "25G", + container_name = "wrong_name", + ) + self.assertEqual(succeeded, False) + + # missing parameter test + with self.assertRaises(TypeError): + cluster, succeeded = self.utils.update_worker_group_resources( + cluster, + group_name="small-group", + cpu_requests = "4", + ) + + def test_duplicate_worker_group(self): + cluster = self.director.build_small_cluster(name="small-cluster") + actual = cluster["metadata"]["name"] + expected = "small-cluster" + self.assertEqual(actual, expected) + + cluster, succeeded = self.utils.duplicate_worker_group( + cluster, + group_name="small-cluster-workers", + new_group_name="new-small-group-workers", + ) + self.assertEqual(succeeded, True) + self.assertEqual(cluster["spec"]["workerGroupSpecs"][1]["groupName"], "new-small-group-workers") + self.assertEqual(cluster["spec"]["workerGroupSpecs"][1]["template"]["spec"]["containers"][0]["resources"]["requests"]["cpu"], "1") + + # missing parameter test + with self.assertRaises(TypeError): + cluster, succeeded = self.utils.duplicate_worker_group( + cluster, + group_name="small-cluster-workers", + ) + + + def test_delete_worker_group(self): + cluster = self.director.build_small_cluster(name="small-cluster") + actual = cluster["metadata"]["name"] + expected = "small-cluster" + self.assertEqual(actual, expected) + + cluster, succeeded = self.utils.delete_worker_group( + cluster, + group_name="small-cluster-workers", + ) + self.assertEqual(succeeded, True) + self.assertEqual(len(cluster["spec"]["workerGroupSpecs"]),0) + + # deleting the same worker group again should fail + with self.assertRaises(AssertionError): + cluster, succeeded = self.utils.delete_worker_group( + cluster, + group_name="small-cluster-workers", + ) + def test_delete_worker_group(self): + """ + Test delete_worker_group + """ + cluster = self.director.build_small_cluster(name="small-cluster") + actual = cluster["metadata"]["name"] + expected = "small-cluster" + self.assertEqual(actual, expected) + + cluster, succeeded = self.utils.delete_worker_group( + cluster, + group_name="small-cluster-workers", + ) + self.assertEqual(succeeded, True) + self.assertEqual(len(cluster["spec"]["workerGroupSpecs"]),0) + + # deleting the same worker group again should fail + with self.assertRaises(AssertionError): + cluster, succeeded = self.utils.delete_worker_group( + cluster, + group_name="small-cluster-workers", + ) + + def test_name(self): + self.assertEqual(self.utils.is_valid_name("name"), True) + self.assertEqual(self.utils.is_valid_name("name-"), False) + self.assertEqual(self.utils.is_valid_name(".name"), False) + self.assertEqual(self.utils.is_valid_name("name_something"), False) + self.assertEqual(self.utils.is_valid_name("toooooooooooooooooooooooooooooooooooooooooo-loooooooooooooooooooong"), False) + + + def test_label(self): + self.assertEqual(self.utils.is_valid_label("name"), True) + self.assertEqual(self.utils.is_valid_label("name-"), False) + self.assertEqual(self.utils.is_valid_label(".name"), False) + self.assertEqual(self.utils.is_valid_label("name_something"), True) + self.assertEqual(self.utils.is_valid_label("good.name"), True) + self.assertEqual(self.utils.is_valid_label("toooooooooooooooooooooooooooooooooooooooooo-loooooooooooooooooooong"), False) \ No newline at end of file