From 647f58ea238dfb403fc22b5c09ed78808e395abf Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Mon, 25 Sep 2023 17:31:38 -0400 Subject: [PATCH 01/10] Added component generation --- src/codeflare_sdk/cluster/cluster.py | 6 ++++++ src/codeflare_sdk/cluster/config.py | 1 + src/codeflare_sdk/utils/generate_yaml.py | 18 +++++++++++++++++- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index 29c026bdc..2c27a7bb9 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -141,6 +141,10 @@ def create_app_wrapper(self): # Before attempting to create the cluster AW, let's evaluate the ClusterConfig if self.config.dispatch_priority: + if not self.config.mcad: + raise ValueError( + "Invalid Cluster Configuration, cannot have dispatch priority without MCAD" + ) priority_val = self.evaluate_dispatch_priority() if priority_val == None: raise ValueError( @@ -163,6 +167,7 @@ def create_app_wrapper(self): template = self.config.template image = self.config.image instascale = self.config.instascale + mcad = self.config.mcad instance_types = self.config.machine_types env = self.config.envs local_interactive = self.config.local_interactive @@ -183,6 +188,7 @@ def create_app_wrapper(self): template=template, image=image, instascale=instascale, + mcad=mcad, instance_types=instance_types, env=env, local_interactive=local_interactive, diff --git a/src/codeflare_sdk/cluster/config.py b/src/codeflare_sdk/cluster/config.py index fe83e9e55..a21318abc 100644 --- a/src/codeflare_sdk/cluster/config.py +++ b/src/codeflare_sdk/cluster/config.py @@ -46,6 +46,7 @@ class ClusterConfiguration: num_gpus: int = 0 template: str = f"{dir}/templates/base-template.yaml" instascale: bool = False + mcad: bool = True envs: dict = field(default_factory=dict) image: str = "quay.io/project-codeflare/ray:latest-py39-cu118" local_interactive: bool = False diff --git a/src/codeflare_sdk/utils/generate_yaml.py b/src/codeflare_sdk/utils/generate_yaml.py index 4757f5370..2436f7f8d 100755 --- a/src/codeflare_sdk/utils/generate_yaml.py +++ b/src/codeflare_sdk/utils/generate_yaml.py @@ -457,6 +457,18 @@ def _create_oauth_sidecar_object( ) +def write_components(user_yaml, output_file_name): + components = user_yaml.get("spec", "resources")["resources"].get("GenericItems") + open(output_file_name, "w").close() + with open(output_file_name, "a") as outfile: + for component in components: + if "generictemplate" in component: + yaml.dump( + component["generictemplate"], outfile, default_flow_style=False + ) + print(f"Written to: {output_file_name}") + + def generate_appwrapper( name: str, namespace: str, @@ -472,6 +484,7 @@ def generate_appwrapper( template: str, image: str, instascale: bool, + mcad: bool, instance_types: list, env, local_interactive: bool, @@ -527,5 +540,8 @@ def generate_appwrapper( enable_openshift_oauth(user_yaml, cluster_name, namespace) outfile = appwrapper_name + ".yaml" - write_user_appwrapper(user_yaml, outfile) + if not mcad: + write_components(user_yaml, outfile) + else: + write_user_appwrapper(user_yaml, outfile) return outfile From 46c427d349ea353006cacc0a23a7f2e6f5c2e79b Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Tue, 26 Sep 2023 17:41:55 -0400 Subject: [PATCH 02/10] Added multi-resource YAML support --- src/codeflare_sdk/cluster/cluster.py | 33 +++++++++++++++++------- src/codeflare_sdk/utils/generate_yaml.py | 1 + 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index 2c27a7bb9..c1335451b 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -42,7 +42,7 @@ RayCluster, RayClusterStatus, ) -from kubernetes import client, config +from kubernetes import client, config, utils import yaml import os import requests @@ -213,15 +213,28 @@ def up(self): try: config_check() api_instance = client.CustomObjectsApi(api_config_handler()) - with open(self.app_wrapper_yaml) as f: - aw = yaml.load(f, Loader=yaml.FullLoader) - api_instance.create_namespaced_custom_object( - group="workload.codeflare.dev", - version="v1beta1", - namespace=namespace, - plural="appwrappers", - body=aw, - ) + if self.config.mcad: + with open(self.app_wrapper_yaml) as f: + aw = yaml.load(f, Loader=yaml.FullLoader) + api_instance.create_namespaced_custom_object( + group="workload.codeflare.dev", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + body=aw, + ) + else: + with open(self.app_wrapper_yaml) as f: + yamls = yaml.load_all(f, Loader=yaml.FullLoader) + for resource in yamls: + print(resource["kind"]) + # api_instance.create_namespaced_custom_object( + # group="ray.io", + # version="v1alpha1", + # namespace=namespace, + # plural="rayclusters", + # body=aw, + # ) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) diff --git a/src/codeflare_sdk/utils/generate_yaml.py b/src/codeflare_sdk/utils/generate_yaml.py index 2436f7f8d..e44788d97 100755 --- a/src/codeflare_sdk/utils/generate_yaml.py +++ b/src/codeflare_sdk/utils/generate_yaml.py @@ -463,6 +463,7 @@ def write_components(user_yaml, output_file_name): with open(output_file_name, "a") as outfile: for component in components: if "generictemplate" in component: + outfile.write("---\n") yaml.dump( component["generictemplate"], outfile, default_flow_style=False ) From f7cc5c5cd50fef95c10030fef2ce311019663e13 Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Tue, 26 Sep 2023 18:03:09 -0400 Subject: [PATCH 03/10] Cluster.up on ray cluster object --- src/codeflare_sdk/cluster/cluster.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index c1335451b..e5afc3159 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -227,14 +227,16 @@ def up(self): with open(self.app_wrapper_yaml) as f: yamls = yaml.load_all(f, Loader=yaml.FullLoader) for resource in yamls: - print(resource["kind"]) - # api_instance.create_namespaced_custom_object( - # group="ray.io", - # version="v1alpha1", - # namespace=namespace, - # plural="rayclusters", - # body=aw, - # ) + if resource["kind"] == "RayCluster": + api_instance.create_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + body=resource, + ) + else: + print(resource["kind"]) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) From 019e8dcc7a23ef2ec1d9b4b2a7ac28b1fe8d59c6 Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Thu, 28 Sep 2023 14:50:25 -0400 Subject: [PATCH 04/10] Basic status and down for RayCluster --- src/codeflare_sdk/cluster/cluster.py | 94 ++++++++++++++++------------ 1 file changed, 55 insertions(+), 39 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index e5afc3159..ce37a0cb0 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -249,13 +249,28 @@ def down(self): try: config_check() api_instance = client.CustomObjectsApi(api_config_handler()) - api_instance.delete_namespaced_custom_object( - group="workload.codeflare.dev", - version="v1beta1", - namespace=namespace, - plural="appwrappers", - name=self.app_wrapper_name, - ) + if self.config.mcad: + api_instance.delete_namespaced_custom_object( + group="workload.codeflare.dev", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + name=self.app_wrapper_name, + ) + else: + with open(self.app_wrapper_yaml) as f: + yamls = yaml.load_all(f, Loader=yaml.FullLoader) + for resource in yamls: + if resource["kind"] == "RayCluster": + api_instance.delete_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + name=self.app_wrapper_name, + ) + else: + print(resource["kind"]) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -273,38 +288,39 @@ def status( """ ready = False status = CodeFlareClusterStatus.UNKNOWN - # check the app wrapper status - appwrapper = _app_wrapper_status(self.config.name, self.config.namespace) - if appwrapper: - if appwrapper.status in [ - AppWrapperStatus.RUNNING, - AppWrapperStatus.COMPLETED, - AppWrapperStatus.RUNNING_HOLD_COMPLETION, - ]: - ready = False - status = CodeFlareClusterStatus.STARTING - elif appwrapper.status in [ - AppWrapperStatus.FAILED, - AppWrapperStatus.DELETED, - ]: - ready = False - status = CodeFlareClusterStatus.FAILED # should deleted be separate - return status, ready # exit early, no need to check ray status - elif appwrapper.status in [ - AppWrapperStatus.PENDING, - AppWrapperStatus.QUEUEING, - ]: - ready = False - if appwrapper.status == AppWrapperStatus.PENDING: - status = CodeFlareClusterStatus.QUEUED - else: - status = CodeFlareClusterStatus.QUEUEING - if print_to_console: - pretty_print.print_app_wrappers_status([appwrapper]) - return ( - status, - ready, - ) # no need to check the ray status since still in queue + if self.config.mcad: + # check the app wrapper status + appwrapper = _app_wrapper_status(self.config.name, self.config.namespace) + if appwrapper: + if appwrapper.status in [ + AppWrapperStatus.RUNNING, + AppWrapperStatus.COMPLETED, + AppWrapperStatus.RUNNING_HOLD_COMPLETION, + ]: + ready = False + status = CodeFlareClusterStatus.STARTING + elif appwrapper.status in [ + AppWrapperStatus.FAILED, + AppWrapperStatus.DELETED, + ]: + ready = False + status = CodeFlareClusterStatus.FAILED # should deleted be separate + return status, ready # exit early, no need to check ray status + elif appwrapper.status in [ + AppWrapperStatus.PENDING, + AppWrapperStatus.QUEUEING, + ]: + ready = False + if appwrapper.status == AppWrapperStatus.PENDING: + status = CodeFlareClusterStatus.QUEUED + else: + status = CodeFlareClusterStatus.QUEUEING + if print_to_console: + pretty_print.print_app_wrappers_status([appwrapper]) + return ( + status, + ready, + ) # no need to check the ray status since still in queue # check the ray cluster status cluster = _ray_cluster_status(self.config.name, self.config.namespace) From 3e20dfd4126e0802687262b9ef9c029d2ff245fd Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Tue, 3 Oct 2023 14:05:26 -0400 Subject: [PATCH 05/10] Finished up/down and added unit tests --- src/codeflare_sdk/cluster/cluster.py | 44 ++++++-- tests/test-case-no-mcad.yamls | 162 +++++++++++++++++++++++++++ tests/unit_test.py | 105 +++++++++++++++-- 3 files changed, 293 insertions(+), 18 deletions(-) create mode 100644 tests/test-case-no-mcad.yamls diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index ce37a0cb0..5c3ac7b78 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -235,8 +235,20 @@ def up(self): plural="rayclusters", body=resource, ) - else: - print(resource["kind"]) + elif resource["kind"] == "Route": + api_instance.create_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=namespace, + plural="routes", + body=resource, + ) + elif resource["kind"] == "Secret": + secret_instance = client.CoreV1Api(api_config_handler()) + secret_instance.create_namespaced_secret( + namespace=namespace, + body=resource, + ) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -269,8 +281,22 @@ def down(self): plural="rayclusters", name=self.app_wrapper_name, ) - else: - print(resource["kind"]) + elif resource["kind"] == "Route": + name = resource["metadata"]["name"] + api_instance.delete_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=namespace, + plural="routes", + name=name, + ) + elif resource["kind"] == "Secret": + name = resource["metadata"]["name"] + secret_instance = client.CoreV1Api(api_config_handler()) + secret_instance.delete_namespaced_secret( + namespace=namespace, + name=name, + ) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -375,7 +401,7 @@ def wait_ready(self, timeout: Optional[int] = None, dashboard_check: bool = True time = 0 while not ready: status, ready = self.status(print_to_console=False) - if status == CodeFlareClusterStatus.UNKNOWN: + if self.config.mcad and status == CodeFlareClusterStatus.UNKNOWN: print( "WARNING: Current cluster status is unknown, have you run cluster.up yet?" ) @@ -472,7 +498,7 @@ def torchx_config( to_return["requirements"] = requirements return to_return - def from_k8_cluster_object(rc): + def from_k8_cluster_object(rc, mcad=True): machine_types = ( rc["metadata"]["labels"]["orderedinstance"].split("_") if "orderedinstance" in rc["metadata"]["labels"] @@ -511,6 +537,7 @@ def from_k8_cluster_object(rc): 0 ]["image"], local_interactive=local_interactive, + mcad=mcad, ) return Cluster(cluster_config) @@ -571,7 +598,7 @@ def get_current_namespace(): # pragma: no cover return None -def get_cluster(cluster_name: str, namespace: str = "default"): +def get_cluster(cluster_name: str, namespace: str = "default", mcad=True): try: config_check() api_instance = client.CustomObjectsApi(api_config_handler()) @@ -586,7 +613,7 @@ def get_cluster(cluster_name: str, namespace: str = "default"): for rc in rcs["items"]: if rc["metadata"]["name"] == cluster_name: - return Cluster.from_k8_cluster_object(rc) + return Cluster.from_k8_cluster_object(rc, mcad=mcad) raise FileNotFoundError( f"Cluster {cluster_name} is not found in {namespace} namespace" ) @@ -697,6 +724,7 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]: config_check() api_instance = client.CustomObjectsApi(api_config_handler()) + # UPDATE THIS routes = api_instance.list_namespaced_custom_object( group="route.openshift.io", version="v1", diff --git a/tests/test-case-no-mcad.yamls b/tests/test-case-no-mcad.yamls new file mode 100644 index 000000000..87c1c5c4a --- /dev/null +++ b/tests/test-case-no-mcad.yamls @@ -0,0 +1,162 @@ +--- +apiVersion: ray.io/v1alpha1 +kind: RayCluster +metadata: + labels: + appwrapper.mcad.ibm.com: unit-test-cluster-ray + controller-tools.k8s.io: '1.0' + name: unit-test-cluster-ray + namespace: ns +spec: + autoscalerOptions: + idleTimeoutSeconds: 60 + imagePullPolicy: Always + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 500m + memory: 512Mi + upscalingMode: Default + enableInTreeAutoscaling: false + headGroupSpec: + rayStartParams: + block: 'true' + dashboard-host: 0.0.0.0 + num-gpus: '0' + serviceType: ClusterIP + template: + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: unit-test-cluster-ray + operator: In + values: + - unit-test-cluster-ray + containers: + - env: + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: RAY_USE_TLS + value: '0' + - name: RAY_TLS_SERVER_CERT + value: /home/ray/workspace/tls/server.crt + - name: RAY_TLS_SERVER_KEY + value: /home/ray/workspace/tls/server.key + - name: RAY_TLS_CA_CERT + value: /home/ray/workspace/tls/ca.crt + image: quay.io/project-codeflare/ray:2.5.0-py38-cu116 + imagePullPolicy: Always + lifecycle: + preStop: + exec: + command: + - /bin/sh + - -c + - ray stop + name: ray-head + ports: + - containerPort: 6379 + name: gcs + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: 2 + memory: 8G + nvidia.com/gpu: 0 + requests: + cpu: 2 + memory: 8G + nvidia.com/gpu: 0 + imagePullSecrets: + - name: unit-test-pull-secret + rayVersion: 2.5.0 + workerGroupSpecs: + - groupName: small-group-unit-test-cluster-ray + maxReplicas: 2 + minReplicas: 2 + rayStartParams: + block: 'true' + num-gpus: '7' + replicas: 2 + template: + metadata: + annotations: + key: value + labels: + key: value + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: unit-test-cluster-ray + operator: In + values: + - unit-test-cluster-ray + containers: + - env: + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: RAY_USE_TLS + value: '0' + - name: RAY_TLS_SERVER_CERT + value: /home/ray/workspace/tls/server.crt + - name: RAY_TLS_SERVER_KEY + value: /home/ray/workspace/tls/server.key + - name: RAY_TLS_CA_CERT + value: /home/ray/workspace/tls/ca.crt + image: quay.io/project-codeflare/ray:2.5.0-py38-cu116 + lifecycle: + preStop: + exec: + command: + - /bin/sh + - -c + - ray stop + name: machine-learning + resources: + limits: + cpu: 4 + memory: 6G + nvidia.com/gpu: 7 + requests: + cpu: 3 + memory: 5G + nvidia.com/gpu: 7 + imagePullSecrets: + - name: unit-test-pull-secret + initContainers: + - command: + - sh + - -c + - until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; + do echo waiting for myservice; sleep 2; done + image: busybox:1.28 + name: init-myservice +--- +apiVersion: route.openshift.io/v1 +kind: Route +metadata: + labels: + odh-ray-cluster-service: unit-test-cluster-ray-head-svc + name: ray-dashboard-unit-test-cluster-ray + namespace: ns +spec: + port: + targetPort: dashboard + to: + kind: Service + name: unit-test-cluster-ray-head-svc diff --git a/tests/unit_test.py b/tests/unit_test.py index f2c86f1f9..669c58b85 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -34,6 +34,7 @@ get_cluster, _app_wrapper_status, _ray_cluster_status, + _get_ingress_domain, ) from codeflare_sdk.cluster.auth import ( TokenAuthentication, @@ -242,6 +243,8 @@ def test_config_creation(): assert config.machine_types == ["cpu.small", "gpu.large"] assert config.image_pull_secrets == ["unit-test-pull-secret"] assert config.dispatch_priority == None + assert config.mcad == True + assert config.local_interactive == False def test_cluster_creation(): @@ -253,6 +256,20 @@ def test_cluster_creation(): ) +def test_cluster_creation_no_mcad(): + config = createClusterConfig() + config.name = "unit-test-cluster-ray" + config.mcad = False + cluster = Cluster(config) + assert cluster.app_wrapper_yaml == "unit-test-cluster-ray.yaml" + assert cluster.app_wrapper_name == "unit-test-cluster-ray" + assert filecmp.cmp( + "unit-test-cluster-ray.yaml", + f"{parent}/tests/test-case-no-mcad.yamls", + shallow=True, + ) + + def test_cluster_creation_priority(mocker): mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( @@ -286,23 +303,49 @@ def test_default_cluster_creation(mocker): def arg_check_apply_effect(group, version, namespace, plural, body, *args): - assert group == "workload.codeflare.dev" - assert version == "v1beta1" assert namespace == "ns" - assert plural == "appwrappers" - with open("unit-test-cluster.yaml") as f: - aw = yaml.load(f, Loader=yaml.FullLoader) - assert body == aw assert args == tuple() + if plural == "appwrappers": + assert group == "workload.codeflare.dev" + assert version == "v1beta1" + with open("unit-test-cluster.yaml") as f: + aw = yaml.load(f, Loader=yaml.FullLoader) + assert body == aw + elif plural == "rayclusters": + assert group == "ray.io" + assert version == "v1alpha1" + with open("unit-test-cluster-ray.yaml") as f: + yamls = yaml.load_all(f, Loader=yaml.FullLoader) + for resource in yamls: + if resource["kind"] == "RayCluster": + assert body == resource + elif plural == "routes": + assert group == "route.openshift.io" + assert version == "v1" + with open("unit-test-cluster-ray.yaml") as f: + yamls = yaml.load_all(f, Loader=yaml.FullLoader) + for resource in yamls: + if resource["kind"] == "Route": + assert body == resource + else: + assert 1 == 0 def arg_check_del_effect(group, version, namespace, plural, name, *args): - assert group == "workload.codeflare.dev" - assert version == "v1beta1" assert namespace == "ns" - assert plural == "appwrappers" - assert name == "unit-test-cluster" assert args == tuple() + if plural == "appwrappers": + assert group == "workload.codeflare.dev" + assert version == "v1beta1" + assert name == "unit-test-cluster" + elif plural == "rayclusters": + assert group == "ray.io" + assert version == "v1alpha1" + assert name == "unit-test-cluster-ray" + elif plural == "routes": + assert group == "route.openshift.io" + assert version == "v1" + assert name == "ray-dashboard-unit-test-cluster-ray" def test_cluster_up_down(mocker): @@ -324,6 +367,47 @@ def test_cluster_up_down(mocker): cluster.down() +def test_cluster_up_down_no_mcad(mocker): + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.create_namespaced_custom_object", + side_effect=arg_check_apply_effect, + ) + mocker.patch( + "kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object", + side_effect=arg_check_del_effect, + ) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_cluster_custom_object", + return_value={"items": []}, + ) + config = createClusterConfig() + config.name = "unit-test-cluster-ray" + config.mcad = False + cluster = Cluster(config) + cluster.up() + cluster.down() + + +def arg_check_list_effect(group, version, plural, name, *args): + assert group == "config.openshift.io" + assert version == "v1" + assert plural == "ingresses" + assert name == "cluster" + assert args == tuple() + return {"spec": {"domain": "test"}} + + +def test_get_ingress_domain(mocker): + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", + side_effect=arg_check_list_effect, + ) + domain = _get_ingress_domain() + assert domain == "test" + + def aw_status_fields(group, version, namespace, plural, *args): assert group == "workload.codeflare.dev" assert version == "v1beta1" @@ -2432,6 +2516,7 @@ def test_cleanup(): os.remove("unit-test-cluster.yaml") os.remove("prio-test-cluster.yaml") os.remove("unit-test-default-cluster.yaml") + os.remove("unit-test-cluster-ray.yaml") os.remove("test.yaml") os.remove("raytest2.yaml") os.remove("quicktest.yaml") From 2241b6f3a96d2c6b9cf64baca911470bd78e7640 Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Tue, 3 Oct 2023 14:11:05 -0400 Subject: [PATCH 06/10] Remove unused utils import --- src/codeflare_sdk/cluster/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index 5c3ac7b78..4e58de09c 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -42,7 +42,7 @@ RayCluster, RayClusterStatus, ) -from kubernetes import client, config, utils +from kubernetes import client, config import yaml import os import requests From 4595f3686a050ca1528d471b2e57d235bac142c4 Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Fri, 6 Oct 2023 14:00:54 -0400 Subject: [PATCH 07/10] Applied review feedback --- src/codeflare_sdk/cluster/cluster.py | 121 +++++++++++++---------- src/codeflare_sdk/utils/generate_yaml.py | 2 +- 2 files changed, 68 insertions(+), 55 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index 4e58de09c..ac8d71a0a 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -224,31 +224,7 @@ def up(self): body=aw, ) else: - with open(self.app_wrapper_yaml) as f: - yamls = yaml.load_all(f, Loader=yaml.FullLoader) - for resource in yamls: - if resource["kind"] == "RayCluster": - api_instance.create_namespaced_custom_object( - group="ray.io", - version="v1alpha1", - namespace=namespace, - plural="rayclusters", - body=resource, - ) - elif resource["kind"] == "Route": - api_instance.create_namespaced_custom_object( - group="route.openshift.io", - version="v1", - namespace=namespace, - plural="routes", - body=resource, - ) - elif resource["kind"] == "Secret": - secret_instance = client.CoreV1Api(api_config_handler()) - secret_instance.create_namespaced_secret( - namespace=namespace, - body=resource, - ) + self._component_level_up(namespace, api_instance) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -270,33 +246,7 @@ def down(self): name=self.app_wrapper_name, ) else: - with open(self.app_wrapper_yaml) as f: - yamls = yaml.load_all(f, Loader=yaml.FullLoader) - for resource in yamls: - if resource["kind"] == "RayCluster": - api_instance.delete_namespaced_custom_object( - group="ray.io", - version="v1alpha1", - namespace=namespace, - plural="rayclusters", - name=self.app_wrapper_name, - ) - elif resource["kind"] == "Route": - name = resource["metadata"]["name"] - api_instance.delete_namespaced_custom_object( - group="route.openshift.io", - version="v1", - namespace=namespace, - plural="routes", - name=name, - ) - elif resource["kind"] == "Secret": - name = resource["metadata"]["name"] - secret_instance = client.CoreV1Api(api_config_handler()) - secret_instance.delete_namespaced_secret( - namespace=namespace, - name=name, - ) + self._component_level_down(namespace, api_instance) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -350,7 +300,10 @@ def status( # check the ray cluster status cluster = _ray_cluster_status(self.config.name, self.config.namespace) - if cluster and not cluster.status == RayClusterStatus.UNKNOWN: + if cluster: + if cluster.status == RayClusterStatus.UNKNOWN: + ready = False + status = CodeFlareClusterStatus.STARTING if cluster.status == RayClusterStatus.READY: ready = True status = CodeFlareClusterStatus.READY @@ -401,7 +354,7 @@ def wait_ready(self, timeout: Optional[int] = None, dashboard_check: bool = True time = 0 while not ready: status, ready = self.status(print_to_console=False) - if self.config.mcad and status == CodeFlareClusterStatus.UNKNOWN: + if status == CodeFlareClusterStatus.UNKNOWN: print( "WARNING: Current cluster status is unknown, have you run cluster.up yet?" ) @@ -548,6 +501,66 @@ def local_client_url(self): else: return "None" + def _component_level_up( + self, namespace: str, api_instance: client.CustomObjectsApi + ): + with open(self.app_wrapper_yaml) as f: + yamls = yaml.load_all(f, Loader=yaml.FullLoader) + for resource in yamls: + if resource["kind"] == "RayCluster": + api_instance.create_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + body=resource, + ) + elif resource["kind"] == "Route": + api_instance.create_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=namespace, + plural="routes", + body=resource, + ) + elif resource["kind"] == "Secret": + secret_instance = client.CoreV1Api(api_config_handler()) + secret_instance.create_namespaced_secret( + namespace=namespace, + body=resource, + ) + + def _component_level_down( + self, namespace: str, api_instance: client.CustomObjectsApi + ): + with open(self.app_wrapper_yaml) as f: + yamls = yaml.load_all(f, Loader=yaml.FullLoader) + for resource in yamls: + if resource["kind"] == "RayCluster": + api_instance.delete_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + name=self.app_wrapper_name, + ) + elif resource["kind"] == "Route": + name = resource["metadata"]["name"] + api_instance.delete_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=namespace, + plural="routes", + name=name, + ) + elif resource["kind"] == "Secret": + name = resource["metadata"]["name"] + secret_instance = client.CoreV1Api(api_config_handler()) + secret_instance.delete_namespaced_secret( + namespace=namespace, + name=name, + ) + def list_all_clusters(namespace: str, print_to_console: bool = True): """ diff --git a/src/codeflare_sdk/utils/generate_yaml.py b/src/codeflare_sdk/utils/generate_yaml.py index e44788d97..a833892ad 100755 --- a/src/codeflare_sdk/utils/generate_yaml.py +++ b/src/codeflare_sdk/utils/generate_yaml.py @@ -457,7 +457,7 @@ def _create_oauth_sidecar_object( ) -def write_components(user_yaml, output_file_name): +def write_components(user_yaml: dict, output_file_name: str): components = user_yaml.get("spec", "resources")["resources"].get("GenericItems") open(output_file_name, "w").close() with open(output_file_name, "a") as outfile: From aea1321ab9312cc9caf47ee616f838f5cf444d32 Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Mon, 9 Oct 2023 16:44:10 -0400 Subject: [PATCH 08/10] Changed naming of internal funcs --- src/codeflare_sdk/cluster/cluster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index ac8d71a0a..8646d0b65 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -224,7 +224,7 @@ def up(self): body=aw, ) else: - self._component_level_up(namespace, api_instance) + self._component_resources_up(namespace, api_instance) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -246,7 +246,7 @@ def down(self): name=self.app_wrapper_name, ) else: - self._component_level_down(namespace, api_instance) + self._component_resources_down(namespace, api_instance) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -501,7 +501,7 @@ def local_client_url(self): else: return "None" - def _component_level_up( + def _component_resources_up( self, namespace: str, api_instance: client.CustomObjectsApi ): with open(self.app_wrapper_yaml) as f: @@ -530,7 +530,7 @@ def _component_level_up( body=resource, ) - def _component_level_down( + def _component_resources_down( self, namespace: str, api_instance: client.CustomObjectsApi ): with open(self.app_wrapper_yaml) as f: From 9ddc536f9252118d4d0cbbec24284dc1a983871d Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Fri, 13 Oct 2023 15:51:48 -0400 Subject: [PATCH 09/10] Review feedback applied, auto-select --- src/codeflare_sdk/cluster/cluster.py | 22 ++++++++++++++++++++- src/codeflare_sdk/utils/kube_api_helpers.py | 10 +++++++--- tests/test-case-no-mcad.yamls | 6 +++--- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index 8646d0b65..e13c635fd 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -611,7 +611,7 @@ def get_current_namespace(): # pragma: no cover return None -def get_cluster(cluster_name: str, namespace: str = "default", mcad=True): +def get_cluster(cluster_name: str, namespace: str = "default"): try: config_check() api_instance = client.CustomObjectsApi(api_config_handler()) @@ -626,6 +626,7 @@ def get_cluster(cluster_name: str, namespace: str = "default", mcad=True): for rc in rcs["items"]: if rc["metadata"]["name"] == cluster_name: + mcad = _check_aw_exists(cluster_name, namespace) return Cluster.from_k8_cluster_object(rc, mcad=mcad) raise FileNotFoundError( f"Cluster {cluster_name} is not found in {namespace} namespace" @@ -633,6 +634,25 @@ def get_cluster(cluster_name: str, namespace: str = "default", mcad=True): # private methods +def _check_aw_exists(name: str, namespace: str) -> bool: + try: + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + aws = api_instance.list_namespaced_custom_object( + group="workload.codeflare.dev", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e, print_error=False) + + for aw in aws["items"]: + if aw["metadata"]["name"] == name: + return True + return False + + def _get_ingress_domain(): try: config_check() diff --git a/src/codeflare_sdk/utils/kube_api_helpers.py b/src/codeflare_sdk/utils/kube_api_helpers.py index 8f8180b97..01a93ef5c 100644 --- a/src/codeflare_sdk/utils/kube_api_helpers.py +++ b/src/codeflare_sdk/utils/kube_api_helpers.py @@ -23,7 +23,9 @@ # private methods -def _kube_api_error_handling(e: Exception): # pragma: no cover +def _kube_api_error_handling( + e: Exception, print_error: bool = True +): # pragma: no cover perm_msg = ( "Action not permitted, have you put in correct/up-to-date auth credentials?" ) @@ -32,11 +34,13 @@ def _kube_api_error_handling(e: Exception): # pragma: no cover if type(e) == config.ConfigException: raise PermissionError(perm_msg) if type(e) == executing.executing.NotOneValueFound: - print(nf_msg) + if print_error: + print(nf_msg) return if type(e) == client.ApiException: if e.reason == "Not Found": - print(nf_msg) + if print_error: + print(nf_msg) return elif e.reason == "Unauthorized" or e.reason == "Forbidden": raise PermissionError(perm_msg) diff --git a/tests/test-case-no-mcad.yamls b/tests/test-case-no-mcad.yamls index 87c1c5c4a..6d905566d 100644 --- a/tests/test-case-no-mcad.yamls +++ b/tests/test-case-no-mcad.yamls @@ -51,7 +51,7 @@ spec: value: /home/ray/workspace/tls/server.key - name: RAY_TLS_CA_CERT value: /home/ray/workspace/tls/ca.crt - image: quay.io/project-codeflare/ray:2.5.0-py38-cu116 + image: quay.io/project-codeflare/ray:latest-py39-cu118 imagePullPolicy: Always lifecycle: preStop: @@ -79,7 +79,7 @@ spec: nvidia.com/gpu: 0 imagePullSecrets: - name: unit-test-pull-secret - rayVersion: 2.5.0 + rayVersion: 2.7.0 workerGroupSpecs: - groupName: small-group-unit-test-cluster-ray maxReplicas: 2 @@ -118,7 +118,7 @@ spec: value: /home/ray/workspace/tls/server.key - name: RAY_TLS_CA_CERT value: /home/ray/workspace/tls/ca.crt - image: quay.io/project-codeflare/ray:2.5.0-py38-cu116 + image: quay.io/project-codeflare/ray:latest-py39-cu118 lifecycle: preStop: exec: From 4e08c08aaa248e440bb5b0fdb43056a18619b873 Mon Sep 17 00:00:00 2001 From: Mustafa Eyceoz Date: Mon, 23 Oct 2023 11:25:15 -0400 Subject: [PATCH 10/10] OAuth conflict resolution --- src/codeflare_sdk/cluster/cluster.py | 22 ++++++++++++---------- src/codeflare_sdk/job/jobs.py | 7 ++----- tests/unit_test.py | 8 ++++---- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index e13c635fd..664752783 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -70,7 +70,7 @@ def __init__(self, config: ClusterConfiguration): self.config = config self.app_wrapper_yaml = self.create_app_wrapper() self.app_wrapper_name = self.app_wrapper_yaml.split(".")[0] - self._client = None + self._job_submission_client = None @property def _client_headers(self): @@ -86,23 +86,25 @@ def _client_verify_tls(self): return not self.config.openshift_oauth @property - def client(self): - if self._client: - return self._client + def job_client(self): + if self._job_submission_client: + return self._job_submission_client if self.config.openshift_oauth: print( api_config_handler().configuration.get_api_key_with_prefix( "authorization" ) ) - self._client = JobSubmissionClient( + self._job_submission_client = JobSubmissionClient( self.cluster_dashboard_uri(), headers=self._client_headers, verify=self._client_verify_tls, ) else: - self._client = JobSubmissionClient(self.cluster_dashboard_uri()) - return self._client + self._job_submission_client = JobSubmissionClient( + self.cluster_dashboard_uri() + ) + return self._job_submission_client def evaluate_dispatch_priority(self): priority_class = self.config.dispatch_priority @@ -423,19 +425,19 @@ def list_jobs(self) -> List: """ This method accesses the head ray node in your cluster and lists the running jobs. """ - return self.client.list_jobs() + return self.job_client.list_jobs() def job_status(self, job_id: str) -> str: """ This method accesses the head ray node in your cluster and returns the job status for the provided job id. """ - return self.client.get_job_status(job_id) + return self.job_client.get_job_status(job_id) def job_logs(self, job_id: str) -> str: """ This method accesses the head ray node in your cluster and returns the logs for the provided job id. """ - return self.client.get_job_logs(job_id) + return self.job_client.get_job_logs(job_id) def torchx_config( self, working_dir: str = None, requirements: str = None diff --git a/src/codeflare_sdk/job/jobs.py b/src/codeflare_sdk/job/jobs.py index 27f15283d..c3814971a 100644 --- a/src/codeflare_sdk/job/jobs.py +++ b/src/codeflare_sdk/job/jobs.py @@ -22,9 +22,6 @@ from torchx.schedulers.ray_scheduler import RayScheduler from torchx.specs import AppHandle, parse_app_handle, AppDryRunInfo -from ray.job_submission import JobSubmissionClient - -import openshift as oc if TYPE_CHECKING: from ..cluster.cluster import Cluster @@ -96,9 +93,9 @@ def __init__( def _dry_run(self, cluster: "Cluster"): j = f"{cluster.config.num_workers}x{max(cluster.config.num_gpus, 1)}" # # of proc. = # of gpus - runner = get_runner(ray_client=cluster.client) + runner = get_runner(ray_client=cluster.job_client) runner._scheduler_instances["ray"] = RayScheduler( - session_name=runner._name, ray_client=cluster.client + session_name=runner._name, ray_client=cluster.job_client ) return ( runner.dryrun( diff --git a/tests/unit_test.py b/tests/unit_test.py index 669c58b85..9ac13159c 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -1935,7 +1935,7 @@ def test_DDPJobDefinition_dry_run(mocker: MockerFixture): "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", return_value="", ) - mocker.patch.object(Cluster, "client") + mocker.patch.object(Cluster, "job_client") ddp = createTestDDP() cluster = createClusterWithConfig() ddp_job, _ = ddp._dry_run(cluster) @@ -2005,7 +2005,7 @@ def test_DDPJobDefinition_dry_run_no_resource_args(mocker): Test that the dry run correctly gets resources from the cluster object when the job definition does not specify resources. """ - mocker.patch.object(Cluster, "client") + mocker.patch.object(Cluster, "job_client") mocker.patch( "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", return_value="", @@ -2097,7 +2097,7 @@ def test_DDPJobDefinition_submit(mocker: MockerFixture): mock_schedule = MagicMock() mocker.patch.object(Runner, "schedule", mock_schedule) mock_schedule.return_value = "fake-dashboard-url" - mocker.patch.object(Cluster, "client") + mocker.patch.object(Cluster, "job_client") ddp_def = createTestDDP() cluster = createClusterWithConfig() mocker.patch( @@ -2124,7 +2124,7 @@ def test_DDPJobDefinition_submit(mocker: MockerFixture): def test_DDPJob_creation(mocker: MockerFixture): - mocker.patch.object(Cluster, "client") + mocker.patch.object(Cluster, "job_client") mock_schedule = MagicMock() mocker.patch.object(Runner, "schedule", mock_schedule) mocker.patch.object(