Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding direct KubeRay compatibility to the SDK #358

Merged
merged 10 commits into from
Oct 23, 2023
222 changes: 161 additions & 61 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, config: ClusterConfiguration):
self.config = config
self.app_wrapper_yaml = self.create_app_wrapper()
self.app_wrapper_name = self.app_wrapper_yaml.split(".")[0]
self._client = None
self._job_submission_client = None

@property
def _client_headers(self):
Expand All @@ -86,23 +86,25 @@ def _client_verify_tls(self):
return not self.config.openshift_oauth

@property
def client(self):
if self._client:
return self._client
def job_client(self):
if self._job_submission_client:
return self._job_submission_client
if self.config.openshift_oauth:
print(
api_config_handler().configuration.get_api_key_with_prefix(
"authorization"
)
)
self._client = JobSubmissionClient(
self._job_submission_client = JobSubmissionClient(
self.cluster_dashboard_uri(),
headers=self._client_headers,
verify=self._client_verify_tls,
)
else:
self._client = JobSubmissionClient(self.cluster_dashboard_uri())
return self._client
self._job_submission_client = JobSubmissionClient(
self.cluster_dashboard_uri()
)
return self._job_submission_client

def evaluate_dispatch_priority(self):
priority_class = self.config.dispatch_priority
Expand Down Expand Up @@ -141,6 +143,10 @@ def create_app_wrapper(self):

# Before attempting to create the cluster AW, let's evaluate the ClusterConfig
if self.config.dispatch_priority:
if not self.config.mcad:
raise ValueError(
"Invalid Cluster Configuration, cannot have dispatch priority without MCAD"
)
priority_val = self.evaluate_dispatch_priority()
if priority_val == None:
raise ValueError(
Expand All @@ -163,6 +169,7 @@ def create_app_wrapper(self):
template = self.config.template
image = self.config.image
instascale = self.config.instascale
mcad = self.config.mcad
instance_types = self.config.machine_types
env = self.config.envs
local_interactive = self.config.local_interactive
Expand All @@ -183,6 +190,7 @@ def create_app_wrapper(self):
template=template,
image=image,
instascale=instascale,
mcad=mcad,
instance_types=instance_types,
env=env,
local_interactive=local_interactive,
Expand All @@ -207,15 +215,18 @@ def up(self):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
body=aw,
)
if self.config.mcad:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
body=aw,
)
else:
self._component_resources_up(namespace, api_instance)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)

Expand All @@ -228,13 +239,16 @@ def down(self):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
)
if self.config.mcad:
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
)
else:
self._component_resources_down(namespace, api_instance)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)

Expand All @@ -252,42 +266,46 @@ def status(
"""
ready = False
status = CodeFlareClusterStatus.UNKNOWN
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
AppWrapperStatus.RUNNING,
AppWrapperStatus.COMPLETED,
AppWrapperStatus.RUNNING_HOLD_COMPLETION,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
AppWrapperStatus.DELETED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
AppWrapperStatus.PENDING,
AppWrapperStatus.QUEUEING,
]:
ready = False
if appwrapper.status == AppWrapperStatus.PENDING:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
if print_to_console:
pretty_print.print_app_wrappers_status([appwrapper])
return (
status,
ready,
) # no need to check the ray status since still in queue
if self.config.mcad:
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
AppWrapperStatus.RUNNING,
AppWrapperStatus.COMPLETED,
AppWrapperStatus.RUNNING_HOLD_COMPLETION,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
AppWrapperStatus.DELETED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
AppWrapperStatus.PENDING,
AppWrapperStatus.QUEUEING,
]:
ready = False
if appwrapper.status == AppWrapperStatus.PENDING:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
if print_to_console:
pretty_print.print_app_wrappers_status([appwrapper])
return (
status,
ready,
) # no need to check the ray status since still in queue
Maxusmusti marked this conversation as resolved.
Show resolved Hide resolved

# check the ray cluster status
cluster = _ray_cluster_status(self.config.name, self.config.namespace)
if cluster and not cluster.status == RayClusterStatus.UNKNOWN:
if cluster:
if cluster.status == RayClusterStatus.UNKNOWN:
ready = False
status = CodeFlareClusterStatus.STARTING
if cluster.status == RayClusterStatus.READY:
ready = True
status = CodeFlareClusterStatus.READY
Expand Down Expand Up @@ -407,19 +425,19 @@ def list_jobs(self) -> List:
"""
This method accesses the head ray node in your cluster and lists the running jobs.
"""
return self.client.list_jobs()
return self.job_client.list_jobs()

def job_status(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the job status for the provided job id.
"""
return self.client.get_job_status(job_id)
return self.job_client.get_job_status(job_id)

def job_logs(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the logs for the provided job id.
"""
return self.client.get_job_logs(job_id)
return self.job_client.get_job_logs(job_id)

def torchx_config(
self, working_dir: str = None, requirements: str = None
Expand All @@ -435,7 +453,7 @@ def torchx_config(
to_return["requirements"] = requirements
return to_return

def from_k8_cluster_object(rc):
def from_k8_cluster_object(rc, mcad=True):
machine_types = (
rc["metadata"]["labels"]["orderedinstance"].split("_")
if "orderedinstance" in rc["metadata"]["labels"]
Expand Down Expand Up @@ -474,6 +492,7 @@ def from_k8_cluster_object(rc):
0
]["image"],
local_interactive=local_interactive,
mcad=mcad,
)
return Cluster(cluster_config)

Expand All @@ -484,6 +503,66 @@ def local_client_url(self):
else:
return "None"

def _component_resources_up(
self, namespace: str, api_instance: client.CustomObjectsApi
):
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.create_namespaced_custom_object(
group="ray.io",
version="v1alpha1",
namespace=namespace,
plural="rayclusters",
body=resource,
)
elif resource["kind"] == "Route":
api_instance.create_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
body=resource,
)
elif resource["kind"] == "Secret":
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.create_namespaced_secret(
namespace=namespace,
body=resource,
)

def _component_resources_down(
self, namespace: str, api_instance: client.CustomObjectsApi
):
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.delete_namespaced_custom_object(
group="ray.io",
version="v1alpha1",
namespace=namespace,
plural="rayclusters",
name=self.app_wrapper_name,
)
elif resource["kind"] == "Route":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
name=name,
)
elif resource["kind"] == "Secret":
name = resource["metadata"]["name"]
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.delete_namespaced_secret(
namespace=namespace,
name=name,
)


def list_all_clusters(namespace: str, print_to_console: bool = True):
"""
Expand Down Expand Up @@ -549,13 +628,33 @@ def get_cluster(cluster_name: str, namespace: str = "default"):

for rc in rcs["items"]:
if rc["metadata"]["name"] == cluster_name:
return Cluster.from_k8_cluster_object(rc)
mcad = _check_aw_exists(cluster_name, namespace)
return Cluster.from_k8_cluster_object(rc, mcad=mcad)
raise FileNotFoundError(
f"Cluster {cluster_name} is not found in {namespace} namespace"
)


# private methods
def _check_aw_exists(name: str, namespace: str) -> bool:
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e, print_error=False)

for aw in aws["items"]:
if aw["metadata"]["name"] == name:
return True
return False


def _get_ingress_domain():
try:
config_check()
Expand Down Expand Up @@ -660,6 +759,7 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:

config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
# UPDATE THIS
KPostOffice marked this conversation as resolved.
Show resolved Hide resolved
routes = api_instance.list_namespaced_custom_object(
group="route.openshift.io",
version="v1",
Expand Down
1 change: 1 addition & 0 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ClusterConfiguration:
num_gpus: int = 0
template: str = f"{dir}/templates/base-template.yaml"
instascale: bool = False
mcad: bool = True
envs: dict = field(default_factory=dict)
image: str = "quay.io/project-codeflare/ray:latest-py39-cu118"
local_interactive: bool = False
Expand Down
7 changes: 2 additions & 5 deletions src/codeflare_sdk/job/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
from torchx.schedulers.ray_scheduler import RayScheduler
from torchx.specs import AppHandle, parse_app_handle, AppDryRunInfo

from ray.job_submission import JobSubmissionClient

import openshift as oc

if TYPE_CHECKING:
from ..cluster.cluster import Cluster
Expand Down Expand Up @@ -96,9 +93,9 @@ def __init__(

def _dry_run(self, cluster: "Cluster"):
j = f"{cluster.config.num_workers}x{max(cluster.config.num_gpus, 1)}" # # of proc. = # of gpus
runner = get_runner(ray_client=cluster.client)
runner = get_runner(ray_client=cluster.job_client)
runner._scheduler_instances["ray"] = RayScheduler(
session_name=runner._name, ray_client=cluster.client
session_name=runner._name, ray_client=cluster.job_client
)
return (
runner.dryrun(
Expand Down
Loading