diff --git a/tests/kfto/kfto_mnist_training_test.go b/tests/kfto/kfto_mnist_training_test.go index f645a9db..0ce43e26 100644 --- a/tests/kfto/kfto_mnist_training_test.go +++ b/tests/kfto/kfto_mnist_training_test.go @@ -27,25 +27,42 @@ import ( . "github.com/project-codeflare/codeflare-common/support" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestPyTorchJobMnistCpu(t *testing.T) { - runKFTOPyTorchMnistJob(t, 0, "", GetCudaTrainingImage(), "resources/requirements.txt") + runKFTOPyTorchMnistJob(t, 0, 2, "", GetCudaTrainingImage(), "resources/requirements.txt") } func TestPyTorchJobMnistWithCuda(t *testing.T) { - runKFTOPyTorchMnistJob(t, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") + runKFTOPyTorchMnistJob(t, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") } func TestPyTorchJobMnistWithROCm(t *testing.T) { - runKFTOPyTorchMnistJob(t, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") + runKFTOPyTorchMnistJob(t, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") } -func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, gpuLabel string, image string, requirementsFile string) { +func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLabel string, image string, requirementsFile string) { test := With(t) + storageClasses, err := test.Client().Core().StorageV1().StorageClasses().List(test.Ctx(), metav1.ListOptions{}) + test.Expect(err).NotTo(HaveOccurred(), "Failed to list StorageClasses") + + // Verify at least one StorageClass supports RWX + foundRWX := false + var storageClassWithRWX storagev1.StorageClass + for _, sc := range storageClasses.Items { + // Check the allowed access modes in the StorageClass annotations + if checkStorageClassSupportsRWX(sc) { + foundRWX = true + storageClassWithRWX = sc + break + } + } + test.Expect(foundRWX).To(BeTrue(), "No StorageClass found with RWX access mode") + // Create a namespace namespace := test.NewTestNamespace() @@ -67,12 +84,11 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, gpuLabel string, image st "requirements.txt": requirementsFileName, }) - // Create PVC for trained model - outputPvc := CreatePersistentVolumeClaim(test, namespace.Name, "50Gi", corev1.ReadWriteMany) + outputPvc := CreatePersistentVolumeClaimWithStorageClass(test, namespace.Name, storageClassWithRWX, "50Gi", corev1.ReadWriteMany) defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{}) // Create training PyTorch job - tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, outputPvc.Name, image) + tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, workerReplicas, outputPvc.Name, image) defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0)) // Make sure the PyTorch job is running @@ -85,7 +101,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, gpuLabel string, image st } -func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { +func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, workerReplicas int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { var useGPU = false var backend string @@ -120,7 +136,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config "/bin/bash", "-c", fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - python /mnt/files/mnist.py --epochs 1 --save-model --backend %s`, backend), + python /mnt/files/mnist.py --epochs 1 --save-model --output-path /mnt/output --backend %s`, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -131,6 +147,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config Name: "tmp-volume", MountPath: "/tmp", }, + { + Name: "output-volume", + MountPath: "/mnt/output", + }, }, }, }, @@ -151,16 +171,43 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, + { + Name: "output-volume", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: outputPvcName, + }, + }, + }, }, RestartPolicy: corev1.RestartPolicyOnFailure, }, }, }, "Worker": { - Replicas: Ptr(int32(1)), + Replicas: Ptr(int32(workerReplicas)), RestartPolicy: kftov1.RestartPolicyOnFailure, Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-mnist", + }, + }, Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-mnist", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, Containers: []corev1.Container{ { Name: "pytorch", @@ -170,7 +217,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config "/bin/bash", "-c", fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - python /mnt/files/mnist.py --epochs 1 --save-model --backend %s`, backend), + python /mnt/files/mnist.py --epochs 1 --save-model --output-path /mnt/output --backend %s`, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -181,6 +228,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config Name: "tmp-volume", MountPath: "/tmp", }, + { + Name: "output-volume", + MountPath: "/mnt/output", + }, }, }, }, @@ -201,6 +252,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, + { + Name: "output-volume", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: outputPvcName, + }, + }, + }, }, RestartPolicy: corev1.RestartPolicyOnFailure, }, @@ -248,3 +307,40 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config return tuningJob } + +func checkStorageClassSupportsRWX(sc storagev1.StorageClass) bool { + // Provisioners like nfs.csi.k8s.io or kubernetes.io/nfs usually support RWX by default. + if sc.Provisioner == "nfs.csi.k8s.io" || sc.Provisioner == "kubernetes.io/nfs" { + return true + } + return false +} + +func CreatePersistentVolumeClaimWithStorageClass(t Test, namespace string, storageClass storagev1.StorageClass, storageSize string, accessMode ...corev1.PersistentVolumeAccessMode) *corev1.PersistentVolumeClaim { + t.T().Helper() + + pvc := &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "PersistentVolumeClaim", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pvc-", + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: accessMode, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(storageSize), + }, + }, + StorageClassName: &storageClass.Name, + }, + } + pvc, err := t.Client().Core().CoreV1().PersistentVolumeClaims(namespace).Create(t.Ctx(), pvc, metav1.CreateOptions{}) + t.Expect(err).NotTo(HaveOccurred()) + t.T().Logf("Created PersistentVolumeClaim %s/%s successfully", pvc.Namespace, pvc.Name) + + return pvc +} diff --git a/tests/kfto/resources/mnist.py b/tests/kfto/resources/mnist.py index 4ccd0519..b9389822 100644 --- a/tests/kfto/resources/mnist.py +++ b/tests/kfto/resources/mnist.py @@ -8,8 +8,8 @@ import torch.nn as nn import torch.nn.functional as F import torch.optim as optim -from tensorboardX import SummaryWriter from torch.utils.data import DistributedSampler +from torch.utils.tensorboard import SummaryWriter from torchvision import datasets, transforms @@ -156,6 +156,13 @@ def main(): default=dist.Backend.GLOO, ) + parser.add_argument( + "--output-path", + type=str, + default="./", + help="Path to save the trained model", + ) + args = parser.parse_args() use_cuda = not args.no_cuda and torch.cuda.is_available() if use_cuda: @@ -216,8 +223,10 @@ def main(): train(args, model, device, train_loader, epoch, writer) test(model, device, test_loader, writer, epoch) - if args.save_model: - torch.save(model.state_dict(), "mnist_cnn.pt") + if args.save_model and dist.get_rank() == 0: + model_path = os.path.join(args.output_path, "mnist_cnn.pt") + torch.save(model.state_dict(), model_path) + print(f"Model saved to {model_path}") if __name__ == "__main__": diff --git a/tests/kfto/resources/requirements-rocm.txt b/tests/kfto/resources/requirements-rocm.txt index 1c69980a..1880dc8f 100644 --- a/tests/kfto/resources/requirements-rocm.txt +++ b/tests/kfto/resources/requirements-rocm.txt @@ -1,3 +1,3 @@ --extra-index-url https://download.pytorch.org/whl/rocm6.1 torchvision==0.19.0 -tensorboardx==2.6.2 +tensorboard==2.18.0 \ No newline at end of file diff --git a/tests/kfto/resources/requirements.txt b/tests/kfto/resources/requirements.txt index cd0923b3..e3ae7b3e 100644 --- a/tests/kfto/resources/requirements.txt +++ b/tests/kfto/resources/requirements.txt @@ -1,2 +1,2 @@ torchvision==0.19.0 -tensorboardx==2.6.2 +tensorboard==2.18.0 \ No newline at end of file