Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify function calls and add option for custom resources #531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 42 additions & 54 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..utils import pretty_print
from ..utils.generate_yaml import (
generate_appwrapper,
head_worker_gpu_count_from_cluster,
)
from ..utils.kube_api_helpers import _kube_api_error_handling
from ..utils.generate_yaml import is_openshift_cluster
Expand Down Expand Up @@ -118,48 +119,7 @@ def create_app_wrapper(self):
f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication."
)

# Before attempting to create the cluster AW, let's evaluate the ClusterConfig

name = self.config.name
namespace = self.config.namespace
head_cpus = self.config.head_cpus
head_memory = self.config.head_memory
num_head_gpus = self.config.num_head_gpus
worker_cpu_requests = self.config.worker_cpu_requests
worker_cpu_limits = self.config.worker_cpu_limits
worker_memory_requests = self.config.worker_memory_requests
worker_memory_limits = self.config.worker_memory_limits
num_worker_gpus = self.config.num_worker_gpus
workers = self.config.num_workers
template = self.config.template
image = self.config.image
appwrapper = self.config.appwrapper
env = self.config.envs
image_pull_secrets = self.config.image_pull_secrets
write_to_file = self.config.write_to_file
local_queue = self.config.local_queue
labels = self.config.labels
return generate_appwrapper(
name=name,
namespace=namespace,
head_cpus=head_cpus,
head_memory=head_memory,
num_head_gpus=num_head_gpus,
worker_cpu_requests=worker_cpu_requests,
worker_cpu_limits=worker_cpu_limits,
worker_memory_requests=worker_memory_requests,
worker_memory_limits=worker_memory_limits,
num_worker_gpus=num_worker_gpus,
workers=workers,
template=template,
image=image,
appwrapper=appwrapper,
env=env,
image_pull_secrets=image_pull_secrets,
write_to_file=write_to_file,
local_queue=local_queue,
labels=labels,
)
return generate_appwrapper(self)

# creates a new cluster with the provided or default spec
def up(self):
Expand Down Expand Up @@ -305,7 +265,7 @@ def status(

if print_to_console:
# overriding the number of gpus with requested
cluster.worker_gpu = self.config.num_worker_gpus
_, cluster.worker_gpu = head_worker_gpu_count_from_cluster(self)
pretty_print.print_cluster_status(cluster)
elif print_to_console:
if status == CodeFlareClusterStatus.UNKNOWN:
Expand Down Expand Up @@ -443,6 +403,29 @@ def job_logs(self, job_id: str) -> str:
"""
return self.job_client.get_job_logs(job_id)

@staticmethod
def _head_worker_extended_resources_from_rc_dict(rc: Dict) -> Tuple[dict, dict]:
head_extended_resources, worker_extended_resources = {}, {}
for resource in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"].keys():
KPostOffice marked this conversation as resolved.
Show resolved Hide resolved
if resource in ["memory", "cpu"]:
continue
worker_extended_resources[resource] = rc["spec"]["workerGroupSpecs"][0][
"template"
]["spec"]["containers"][0]["resources"]["limits"][resource]

for resource in rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][
0
]["resources"]["limits"].keys():
if resource in ["memory", "cpu"]:
continue
head_extended_resources[resource] = rc["spec"]["headGroupSpec"]["template"][
"spec"
]["containers"][0]["resources"]["limits"][resource]

return head_extended_resources, worker_extended_resources

def from_k8_cluster_object(
rc,
appwrapper=True,
Expand All @@ -456,6 +439,11 @@ def from_k8_cluster_object(
else []
)

(
head_extended_resources,
worker_extended_resources,
) = Cluster._head_worker_extended_resources_from_rc_dict(rc)

cluster_config = ClusterConfiguration(
name=rc["metadata"]["name"],
namespace=rc["metadata"]["namespace"],
Expand All @@ -473,11 +461,8 @@ def from_k8_cluster_object(
worker_memory_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["memory"],
num_worker_gpus=int(
rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["nvidia.com/gpu"]
),
worker_extended_resource_requests=worker_extended_resources,
head_extended_resource_requests=head_extended_resources,
image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["image"],
Expand Down Expand Up @@ -858,6 +843,11 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
protocol = "https"
dashboard_url = f"{protocol}://{ingress.spec.rules[0].host}"

(
head_extended_resources,
worker_extended_resources,
) = Cluster._head_worker_extended_resources_from_rc_dict(rc)

return RayCluster(
name=rc["metadata"]["name"],
status=status,
Expand All @@ -872,17 +862,15 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
worker_cpu=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["resources"]["limits"]["cpu"],
worker_gpu=0, # hard to detect currently how many gpus, can override it with what the user asked for
worker_extended_resources=worker_extended_resources,
namespace=rc["metadata"]["namespace"],
head_cpus=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["cpu"],
head_mem=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["memory"],
head_gpu=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["nvidia.com/gpu"],
head_extended_resources=head_extended_resources,
dashboard=dashboard_url,
)

Expand All @@ -907,12 +895,12 @@ def _copy_to_ray(cluster: Cluster) -> RayCluster:
worker_mem_min=cluster.config.worker_memory_requests,
worker_mem_max=cluster.config.worker_memory_limits,
worker_cpu=cluster.config.worker_cpu_requests,
worker_gpu=cluster.config.num_worker_gpus,
worker_extended_resources=cluster.config.worker_extended_resource_requests,
namespace=cluster.config.namespace,
dashboard=cluster.cluster_dashboard_uri(),
head_cpus=cluster.config.head_cpus,
head_mem=cluster.config.head_memory,
head_gpu=cluster.config.num_head_gpus,
head_extended_resources=cluster.config.head_extended_resource_requests,
)
if ray.status == CodeFlareClusterStatus.READY:
ray.status = RayClusterStatus.READY
Expand Down
109 changes: 98 additions & 11 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,51 @@

dir = pathlib.Path(__file__).parent.parent.resolve()

# https://docs.ray.io/en/latest/ray-core/scheduling/accelerators.html
DEFAULT_RESOURCE_MAPPING = {
"nvidia.com/gpu": "GPU",
"intel.com/gpu": "GPU",
"amd.com/gpu": "GPU",
"aws.amazon.com/neuroncore": "neuron_cores",
"google.com/tpu": "TPU",
"habana.ai/gaudi": "HPU",
"huawei.com/Ascend910": "NPU",
"huawei.com/Ascend310": "NPU",
}


@dataclass
class ClusterConfiguration:
"""
This dataclass is used to specify resource requirements and other details, and
is passed in as an argument when creating a Cluster object.

Attributes:
- name: The name of the cluster.
- namespace: The namespace in which the cluster should be created.
- head_info: A list of strings containing information about the head node.
- head_cpus: The number of CPUs to allocate to the head node.
- head_memory: The amount of memory to allocate to the head node.
- head_gpus: The number of GPUs to allocate to the head node. (Deprecated, use head_extended_resource_requests)
- head_extended_resource_requests: A dictionary of extended resource requests for the head node. ex: {"nvidia.com/gpu": 1}
- machine_types: A list of machine types to use for the cluster.
- min_cpus: The minimum number of CPUs to allocate to each worker.
- max_cpus: The maximum number of CPUs to allocate to each worker.
- num_workers: The number of workers to create.
- min_memory: The minimum amount of memory to allocate to each worker.
- max_memory: The maximum amount of memory to allocate to each worker.
- num_gpus: The number of GPUs to allocate to each worker. (Deprecated, use worker_extended_resource_requests)
- template: The path to the template file to use for the cluster.
- appwrapper: A boolean indicating whether to use an AppWrapper.
- envs: A dictionary of environment variables to set for the cluster.
- image: The image to use for the cluster.
- image_pull_secrets: A list of image pull secrets to use for the cluster.
- write_to_file: A boolean indicating whether to write the cluster configuration to a file.
- verify_tls: A boolean indicating whether to verify TLS when connecting to the cluster.
- labels: A dictionary of labels to apply to the cluster.
- worker_extended_resource_requests: A dictionary of extended resource requests for each worker. ex: {"nvidia.com/gpu": 1}
- extended_resource_mapping: A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names
- overwrite_default_resource_mapping: A boolean indicating whether to overwrite the default resource mapping.
"""

name: str
Expand All @@ -39,7 +78,7 @@ class ClusterConfiguration:
head_cpus: typing.Union[int, str] = 2
head_memory: typing.Union[int, str] = 8
head_gpus: int = None # Deprecating
num_head_gpus: int = 0
head_extended_resource_requests: typing.Dict[str, int] = field(default_factory=dict)
machine_types: list = field(default_factory=list) # ["m4.xlarge", "g4dn.xlarge"]
worker_cpu_requests: typing.Union[int, str] = 1
worker_cpu_limits: typing.Union[int, str] = 1
Expand All @@ -50,7 +89,6 @@ class ClusterConfiguration:
worker_memory_limits: typing.Union[int, str] = 2
min_memory: typing.Union[int, str] = None # Deprecating
max_memory: typing.Union[int, str] = None # Deprecating
num_worker_gpus: int = 0
num_gpus: int = None # Deprecating
template: str = f"{dir}/templates/base-template.yaml"
appwrapper: bool = False
Expand All @@ -60,6 +98,11 @@ class ClusterConfiguration:
write_to_file: bool = False
verify_tls: bool = True
labels: dict = field(default_factory=dict)
worker_extended_resource_requests: typing.Dict[str, int] = field(
default_factory=dict
)
extended_resource_mapping: typing.Dict[str, str] = field(default_factory=dict)
overwrite_default_resource_mapping: bool = False

def __post_init__(self):
if not self.verify_tls:
Expand All @@ -70,8 +113,60 @@ def __post_init__(self):
self._memory_to_string()
self._str_mem_no_unit_add_GB()
self._memory_to_resource()
self._gpu_to_resource()
self._cpu_to_resource()
self._gpu_to_resource()
self._combine_extended_resource_mapping()
self._validate_extended_resource_requests(self.head_extended_resource_requests)
self._validate_extended_resource_requests(
self.worker_extended_resource_requests
)

def _combine_extended_resource_mapping(self):
if overwritten := set(self.extended_resource_mapping.keys()).intersection(
DEFAULT_RESOURCE_MAPPING.keys()
):
if self.overwrite_default_resource_mapping:
warnings.warn(
f"Overwriting default resource mapping for {overwritten}",
UserWarning,
)
else:
raise ValueError(
f"Resource mapping already exists for {overwritten}, set overwrite_default_resource_mapping to True to overwrite"
)
self.extended_resource_mapping = {
**DEFAULT_RESOURCE_MAPPING,
**self.extended_resource_mapping,
}

def _validate_extended_resource_requests(
self, extended_resources: typing.Dict[str, int]
):
for k in extended_resources.keys():
if k not in self.extended_resource_mapping.keys():
raise ValueError(
f"extended resource '{k}' not found in extended_resource_mapping, available resources are {list(self.extended_resource_mapping.keys())}, to add more supported resources use extended_resource_mapping. i.e. extended_resource_mapping = {{'{k}': 'FOO_BAR'}}"
)

def _gpu_to_resource(self):
if self.head_gpus:
Bobbins228 marked this conversation as resolved.
Show resolved Hide resolved
warnings.warn(
f"head_gpus is being deprecated, replacing with head_extended_resource_requests['nvidia.com/gpu'] = {self.head_gpus}"
)
if "nvidia.com/gpu" in self.head_extended_resource_requests:
KPostOffice marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"nvidia.com/gpu already exists in head_extended_resource_requests"
)
self.head_extended_resource_requests["nvidia.com/gpu"] = self.head_gpus
if self.num_gpus:
warnings.warn(
f"num_gpus is being deprecated, replacing with worker_extended_resource_requests['nvidia.com/gpu'] = {self.num_gpus}"
)
if "nvidia.com/gpu" in self.worker_extended_resource_requests:
raise ValueError(
"nvidia.com/gpu already exists in worker_extended_resource_requests"
)
self.worker_extended_resource_requests["nvidia.com/gpu"] = self.num_gpus

def _str_mem_no_unit_add_GB(self):
if isinstance(self.head_memory, str) and self.head_memory.isdecimal():
Expand All @@ -95,14 +190,6 @@ def _memory_to_string(self):
if isinstance(self.worker_memory_limits, int):
self.worker_memory_limits = f"{self.worker_memory_limits}G"

def _gpu_to_resource(self):
if self.head_gpus:
warnings.warn("head_gpus is being deprecated, use num_head_gpus")
self.num_head_gpus = self.head_gpus
if self.num_gpus:
warnings.warn("num_gpus is being deprecated, use num_worker_gpus")
self.num_worker_gpus = self.num_gpus

def _cpu_to_resource(self):
if self.min_cpus:
warnings.warn("min_cpus is being deprecated, use worker_cpu_requests")
Expand Down
7 changes: 4 additions & 3 deletions src/codeflare_sdk/cluster/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
dataclasses to store information for Ray clusters and AppWrappers.
"""

from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
import typing


class RayClusterStatus(Enum):
Expand Down Expand Up @@ -74,14 +75,14 @@ class RayCluster:
status: RayClusterStatus
head_cpus: int
head_mem: str
head_gpu: int
workers: int
worker_mem_min: str
worker_mem_max: str
worker_cpu: int
worker_gpu: int
namespace: str
dashboard: str
worker_extended_resources: typing.Dict[str, int] = field(default_factory=dict)
head_extended_resources: typing.Dict[str, int] = field(default_factory=dict)


@dataclass
Expand Down
4 changes: 0 additions & 4 deletions src/codeflare_sdk/templates/base-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ spec:
limits:
cpu: 2
memory: "8G"
nvidia.com/gpu: 0
requests:
cpu: 2
memory: "8G"
nvidia.com/gpu: 0
volumeMounts:
- mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt
name: odh-trusted-ca-cert
Expand Down Expand Up @@ -163,11 +161,9 @@ spec:
limits:
cpu: "2"
memory: "12G"
nvidia.com/gpu: "1"
requests:
cpu: "2"
memory: "12G"
nvidia.com/gpu: "1"
volumeMounts:
- mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt
name: odh-trusted-ca-cert
Expand Down
Loading