Skip to content

Commit

Permalink
updated MNIST test to use antipod-affinity for scheduling worker node…
Browse files Browse the repository at this point in the history
…s on different pods and to storage output model using PersistentVolumeClaim with RWX access mode
  • Loading branch information
abhijeet-dhumal committed Dec 19, 2024
1 parent 9ae3ffd commit c5686c8
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 16 deletions.
118 changes: 107 additions & 11 deletions tests/kfto/kfto_mnist_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,42 @@ import (
. "github.com/project-codeflare/codeflare-common/support"

corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPyTorchJobMnistCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, 0, "", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, 0, 2, "", GetCudaTrainingImage(), "resources/requirements.txt")
}

func TestPyTorchJobMnistWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
}

func TestPyTorchJobMnistWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
runKFTOPyTorchMnistJob(t, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
}

func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, gpuLabel string, image string, requirementsFile string) {
func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLabel string, image string, requirementsFile string) {
test := With(t)

storageClasses, err := test.Client().Core().StorageV1().StorageClasses().List(test.Ctx(), metav1.ListOptions{})
test.Expect(err).NotTo(HaveOccurred(), "Failed to list StorageClasses")

// Verify at least one StorageClass supports RWX
foundRWX := false
var storageClassWithRWX storagev1.StorageClass
for _, sc := range storageClasses.Items {
// Check the allowed access modes in the StorageClass annotations
if checkStorageClassSupportsRWX(sc) {
foundRWX = true
storageClassWithRWX = sc
break
}
}
test.Expect(foundRWX).To(BeTrue(), "No StorageClass found with RWX access mode")

// Create a namespace
namespace := test.NewTestNamespace()

Expand All @@ -67,12 +84,11 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, gpuLabel string, image st
"requirements.txt": requirementsFileName,
})

// Create PVC for trained model
outputPvc := CreatePersistentVolumeClaim(test, namespace.Name, "50Gi", corev1.ReadWriteMany)
outputPvc := CreatePersistentVolumeClaimWithStorageClass(test, namespace.Name, storageClassWithRWX, "50Gi", corev1.ReadWriteMany)
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})

// Create training PyTorch job
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, outputPvc.Name, image)
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, workerReplicas, outputPvc.Name, image)
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))

// Make sure the PyTorch job is running
Expand All @@ -85,7 +101,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, gpuLabel string, image st

}

func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, workerReplicas int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
var useGPU = false
var backend string

Expand Down Expand Up @@ -120,7 +136,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
python /mnt/files/mnist.py --epochs 1 --save-model --backend %s`, backend),
python /mnt/files/mnist.py --epochs 1 --save-model --output-path /mnt/output --backend %s`, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -131,6 +147,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
Name: "tmp-volume",
MountPath: "/tmp",
},
{
Name: "output-volume",
MountPath: "/mnt/output",
},
},
},
},
Expand All @@ -151,16 +171,43 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
Name: "output-volume",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: outputPvcName,
},
},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
},
},
"Worker": {
Replicas: Ptr(int32(1)),
Replicas: Ptr(int32(workerReplicas)),
RestartPolicy: kftov1.RestartPolicyOnFailure,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "kfto-mnist",
},
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "kfto-mnist",
},
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
Containers: []corev1.Container{
{
Name: "pytorch",
Expand All @@ -170,7 +217,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
python /mnt/files/mnist.py --epochs 1 --save-model --backend %s`, backend),
python /mnt/files/mnist.py --epochs 1 --save-model --output-path /mnt/output --backend %s`, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -181,6 +228,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
Name: "tmp-volume",
MountPath: "/tmp",
},
{
Name: "output-volume",
MountPath: "/mnt/output",
},
},
},
},
Expand All @@ -201,6 +252,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
Name: "output-volume",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: outputPvcName,
},
},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
Expand Down Expand Up @@ -248,3 +307,40 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config

return tuningJob
}

func checkStorageClassSupportsRWX(sc storagev1.StorageClass) bool {
// Provisioners like nfs.csi.k8s.io or kubernetes.io/nfs usually support RWX by default.
if sc.Provisioner == "nfs.csi.k8s.io" || sc.Provisioner == "kubernetes.io/nfs" {
return true
}
return false
}

func CreatePersistentVolumeClaimWithStorageClass(t Test, namespace string, storageClass storagev1.StorageClass, storageSize string, accessMode ...corev1.PersistentVolumeAccessMode) *corev1.PersistentVolumeClaim {
t.T().Helper()

pvc := &corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "PersistentVolumeClaim",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pvc-",
Namespace: namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: accessMode,
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(storageSize),
},
},
StorageClassName: &storageClass.Name,
},
}
pvc, err := t.Client().Core().CoreV1().PersistentVolumeClaims(namespace).Create(t.Ctx(), pvc, metav1.CreateOptions{})
t.Expect(err).NotTo(HaveOccurred())
t.T().Logf("Created PersistentVolumeClaim %s/%s successfully", pvc.Namespace, pvc.Name)

return pvc
}
15 changes: 12 additions & 3 deletions tests/kfto/resources/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tensorboardX import SummaryWriter
from torch.utils.data import DistributedSampler
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms


Expand Down Expand Up @@ -156,6 +156,13 @@ def main():
default=dist.Backend.GLOO,
)

parser.add_argument(
"--output-path",
type=str,
default="./",
help="Path to save the trained model",
)

args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
if use_cuda:
Expand Down Expand Up @@ -216,8 +223,10 @@ def main():
train(args, model, device, train_loader, epoch, writer)
test(model, device, test_loader, writer, epoch)

if args.save_model:
torch.save(model.state_dict(), "mnist_cnn.pt")
if args.save_model and dist.get_rank() == 0:
model_path = os.path.join(args.output_path, "mnist_cnn.pt")
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion tests/kfto/resources/requirements-rocm.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
--extra-index-url https://download.pytorch.org/whl/rocm6.1
torchvision==0.19.0
tensorboardx==2.6.2
tensorboard==2.18.0
2 changes: 1 addition & 1 deletion tests/kfto/resources/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
torchvision==0.19.0
tensorboardx==2.6.2
tensorboard==2.18.0

0 comments on commit c5686c8

Please sign in to comment.