Skip to content

Commit

Permalink
Review changes & added test
Browse files Browse the repository at this point in the history
  • Loading branch information
Bobbins228 committed Mar 12, 2024
1 parent ca2fab1 commit d6ccf07
Show file tree
Hide file tree
Showing 10 changed files with 1,043 additions and 220 deletions.
6 changes: 3 additions & 3 deletions scripts/component_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def main() -> None:
if scheduler in (
"kubernetes",
"kubernetes_mcad",
"kueue_job",
"kueue",
"local_docker",
"aws_batch",
"lsf",
Expand Down Expand Up @@ -97,13 +97,13 @@ def main() -> None:
"namespace": "torchx-dev",
},
},
"kueue_job": {
"kueue": {
"providers": [
component_provider,
examples_app_defs_providers,
],
"image": torchx_image,
"cfg": {"namespace": "torchx-dev", "local_queue": "default-kueue"},
"cfg": {"namespace": "torchx-dev", "local_queue": "torchx-local-queue"},
},
"local_cwd": {
"providers": [
Expand Down
74 changes: 74 additions & 0 deletions scripts/kueue_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

from torchx.components.dist import ddp
from torchx.runner import get_runner
from integ_test_utils import (
build_images,
BuildInfo,
push_images,
MissingEnvError
)
import argparse
from torchx.specs import AppState
from torchx.util.types import none_throws

def argparser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Kueue dist trainer integration test runner.")
parser.add_argument("--container_repo", type=str)
parser.add_argument("--dryrun", action="store_true",
help="Does not actually submit the app," " just prints the scheduler request",)
return parser

def build_and_push_image(container_repo: str) -> BuildInfo:
build = build_images()
push_images(build, container_repo=container_repo)
return build

def run_kueue_test(dryrun: bool = False):
# Gather args & build image
print("Building image")
args = argparser().parse_args()
build = build_and_push_image(args.container_repo)
image = build.torchx_image
# Create the app definition
runner = get_runner("kueue")
app = ddp(
name="kueue-test",
image=image,
m="torchx.examples.apps.lightning.train",
cpu=1,
memMB=4000,
j="1x1",
)
# Pass config variables
cfg={"namespace":"torchx-dev", "local_queue":"torchx-local-queue"}
print("Submitting job")
if dryrun:
dryrun_info = runner.dryrun(app, "kueue", cfg)
print(f"Dryrun info: {dryrun_info}")
else:
app_handle = runner.run(app, "kueue", cfg)
print(app_handle)
runner.wait(app_handle)
final_status = runner.status(app_handle)
print(f"Final status: {final_status}")
if none_throws(final_status).state != AppState.SUCCEEDED:
raise Exception(f"Dist app failed with status: {final_status}")

def main() -> None:
args = argparser().parse_args()

try:
run_kueue_test(args.dryrun)
except MissingEnvError:
print("Skip runnig tests, executed only docker build step")

if __name__ == "__main__":
main()


90 changes: 90 additions & 0 deletions scripts/setup_minikube_kueue.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/bin/bash

set -eux
minikube delete
minikube start --driver=docker --cpus=max --memory=max --nodes=2
minikube addons enable registry

# setup multi node volumes
# https://github.com/kubernetes/minikube/issues/12360#issuecomment-1430243861
minikube addons disable storage-provisioner
minikube addons disable default-storageclass
minikube addons enable volumesnapshots
minikube addons enable csi-hostpath-driver
kubectl patch storageclass csi-hostpath-sc -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'

# create namespace
kubectl create namespace torchx-dev

# install Kueue and Kueue related resources
VERSION=v0.6.0
kubectl apply --server-side -f https://github.com/kubernetes-sigs/kueue/releases/download/$VERSION/manifests.yaml

# Function to check if the kueue manager pod is running
check_pod_status() {
local status=$(kubectl get pods -n kueue-system | grep "kueue-controller-manager" | awk '{print $3}')
echo "$status"
}

# Wait until the pod is in the 'Running' state
echo "Waiting for kueue-controller-manager pod to be running in the kueue-system namespace..."
while [[ $(check_pod_status) != "Running" ]]; do
sleep 5
done
# Function to check if the service exists
check_service_existence() {
kubectl get svc kueue-webhook-service -n kueue-system --no-headers 2>/dev/null
}

# Wait until the service exists
echo "Waiting for kueue-webhook-service to exist in the kueue-system namespace..."
while [[ $(check_service_existence) == "" ]]; do
sleep 5
done
echo "kueue-webhook-service exists in the kueue-system namespace."
sleep 20
# Create Cluster Queue - UPDATE MAX VALUES
cat <<EOF | kubectl apply --server-side -f -
apiVersion: kueue.x-k8s.io/v1beta1
kind: ClusterQueue
metadata:
name: "cluster-queue"
spec:
namespaceSelector: {} # match all.
resourceGroups:
- coveredResources: ["cpu", "memory", "pods"]
flavors:
- name: "default-flavor"
resources:
- name: "cpu"
nominalQuota: 16
- name: "memory"
nominalQuota: 64000Mi
- name: "pods"
nominalQuota: 5
EOF
echo "Cluster Queue: cluster-queue applied!"

echo "Applying Resource Flavor"
cat <<EOF | kubectl apply --server-side -f -
apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
name: default-flavor
EOF
echo "Resource Flavour: default-flavor applied!"

cat <<EOF | kubectl apply --server-side -f -
apiVersion: kueue.x-k8s.io/v1beta1
kind: LocalQueue
metadata:
namespace: torchx-dev
name: torchx-local-queue
spec:
clusterQueue: cluster-queue
EOF
echo "Local Queue: torchx-local-queue applied!"

# portforwarding
kubectl port-forward --namespace kube-system service/registry 5000:80 &

2 changes: 1 addition & 1 deletion torchx/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"gcp_batch": "torchx.schedulers.gcp_batch_scheduler",
"ray": "torchx.schedulers.ray_scheduler",
"lsf": "torchx.schedulers.lsf_scheduler",
"kueue_job": "torchx.schedulers.kueue_job_scheduler"
"kueue": "torchx.schedulers.kueue_scheduler",
}


Expand Down
169 changes: 1 addition & 168 deletions torchx/schedulers/kubernetes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@
runopts,
VolumeMount,
)
from torchx.util.role_to_pod import role_to_pod
from torchx.util.strings import normalize_str
from torchx.workspace.docker_workspace import DockerWorkspaceMixin
from typing_extensions import TypedDict


if TYPE_CHECKING:
from docker import DockerClient
from kubernetes.client import ApiClient, CustomObjectsApi
Expand Down Expand Up @@ -165,10 +165,6 @@
LABEL_ORGANIZATION = "app.kubernetes.io/managed-by"
LABEL_UNIQUE_NAME = "app.kubernetes.io/instance"

ANNOTATION_ISTIO_SIDECAR = "sidecar.istio.io/inject"

LABEL_INSTANCE_TYPE = "node.kubernetes.io/instance-type"


def sanitize_for_serialization(obj: object) -> object:
from kubernetes import client
Expand All @@ -177,169 +173,6 @@ def sanitize_for_serialization(obj: object) -> object:
return api.sanitize_for_serialization(obj)


def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod":
from kubernetes.client.models import ( # noqa: F811 redefinition of unused
V1Container,
V1ContainerPort,
V1EmptyDirVolumeSource,
V1EnvVar,
V1HostPathVolumeSource,
V1ObjectMeta,
V1PersistentVolumeClaimVolumeSource,
V1Pod,
V1PodSpec,
V1ResourceRequirements,
V1SecurityContext,
V1Volume,
V1VolumeMount,
)

# limits puts an upper cap on the resources a pod may consume.
# requests is how much the scheduler allocates. We assume that the jobs will
# be allocation the whole machine so requests is slightly lower than the
# requested resources to account for the Kubernetes node reserved resources.
limits = {}
requests = {}

resource = role.resource
if resource.cpu > 0:
mcpu = int(resource.cpu * 1000)
limits["cpu"] = f"{mcpu}m"
request_mcpu = max(mcpu - RESERVED_MILLICPU, 0)
requests["cpu"] = f"{request_mcpu}m"
if resource.memMB > 0:
limits["memory"] = f"{int(resource.memMB)}M"
request_memMB = max(int(resource.memMB) - RESERVED_MEMMB, 0)
requests["memory"] = f"{request_memMB}M"
if resource.gpu > 0:
requests["nvidia.com/gpu"] = limits["nvidia.com/gpu"] = str(resource.gpu)

for device_name, device_limit in resource.devices.items():
limits[device_name] = str(device_limit)

resources = V1ResourceRequirements(
limits=limits,
requests=requests,
)

node_selector: Dict[str, str] = {}
if LABEL_INSTANCE_TYPE in resource.capabilities:
node_selector[LABEL_INSTANCE_TYPE] = resource.capabilities[LABEL_INSTANCE_TYPE]

# To support PyTorch dataloaders we need to set /dev/shm to larger than the
# 64M default so we mount an unlimited sized tmpfs directory on it.
SHM_VOL = "dshm"
volumes = [
V1Volume(
name=SHM_VOL,
empty_dir=V1EmptyDirVolumeSource(
medium="Memory",
),
),
]
volume_mounts = [
V1VolumeMount(name=SHM_VOL, mount_path="/dev/shm"),
]
security_context = V1SecurityContext()

for i, mount in enumerate(role.mounts):
mount_name = f"mount-{i}"
if isinstance(mount, BindMount):
volumes.append(
V1Volume(
name=mount_name,
host_path=V1HostPathVolumeSource(
path=mount.src_path,
),
)
)
volume_mounts.append(
V1VolumeMount(
name=mount_name,
mount_path=mount.dst_path,
read_only=mount.read_only,
)
)
elif isinstance(mount, VolumeMount):
volumes.append(
V1Volume(
name=mount_name,
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
claim_name=mount.src,
),
)
)
volume_mounts.append(
V1VolumeMount(
name=mount_name,
mount_path=mount.dst_path,
read_only=mount.read_only,
)
)
elif isinstance(mount, DeviceMount):
volumes.append(
V1Volume(
name=mount_name,
host_path=V1HostPathVolumeSource(
path=mount.src_path,
),
)
)
volume_mounts.append(
V1VolumeMount(
name=mount_name,
mount_path=mount.dst_path,
read_only=(
"w" not in mount.permissions and "m" not in mount.permissions
),
)
)
security_context.privileged = True
else:
raise TypeError(f"unknown mount type {mount}")

container = V1Container(
command=[role.entrypoint] + role.args,
image=role.image,
name=name,
env=[
V1EnvVar(
name=name,
value=value,
)
for name, value in role.env.items()
],
resources=resources,
ports=[
V1ContainerPort(
name=name,
container_port=port,
)
for name, port in role.port_map.items()
],
volume_mounts=volume_mounts,
security_context=security_context,
)

return V1Pod(
spec=V1PodSpec(
containers=[container],
restart_policy="Never",
service_account_name=service_account,
volumes=volumes,
node_selector=node_selector,
),
metadata=V1ObjectMeta(
annotations={
# Disable the istio sidecar as it prevents the containers from
# exiting once finished.
ANNOTATION_ISTIO_SIDECAR: "false",
},
labels={},
),
)


def app_to_resource(
app: AppDef,
queue: str,
Expand Down
Loading

0 comments on commit d6ccf07

Please sign in to comment.