diff --git a/tests/kfto/kfto_mnist_training_test.go b/tests/kfto/kfto_mnist_training_test.go new file mode 100644 index 00000000..e11575ac --- /dev/null +++ b/tests/kfto/kfto_mnist_training_test.go @@ -0,0 +1,306 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kfto + +import ( + "bytes" + "fmt" + "testing" + + kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + . "github.com/onsi/gomega" + . "github.com/project-codeflare/codeflare-common/support" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestPyTorchJobMnistCpu(t *testing.T) { + runKFTOPyTorchMnistJob(t, 0, 2, "", GetCudaTrainingImage(), "resources/requirements.txt") +} +func TestPyTorchJobMnistWithCuda(t *testing.T) { + runKFTOPyTorchMnistJob(t, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") +} + +func TestPyTorchJobMnistWithROCm(t *testing.T) { + runKFTOPyTorchMnistJob(t, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") +} + +func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLabel string, image string, requirementsFile string) { + test := With(t) + + // Create a namespace + namespace := test.NewTestNamespace() + + mnist := ReadFile(test, "resources/mnist.py") + requirementsFileName := ReadFile(test, requirementsFile) + + if numGpus > 0 { + mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1) + } else { + mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1) + } + config := CreateConfigMap(test, namespace.Name, map[string][]byte{ + // MNIST Ray Notebook + "mnist.py": mnist, + "requirements.txt": requirementsFileName, + }) + + outputPvc := CreatePersistentVolumeClaim(test, namespace.Name, "50Gi", corev1.ReadWriteOnce) + defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{}) + + // Create training PyTorch job + tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, workerReplicas, outputPvc.Name, image) + defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0)) + + // Make sure the PyTorch job is running + test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble). + Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue))) + + // Make sure the PyTorch job succeeded + test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue))) + test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name) + +} + +func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, workerReplicas int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { + var useGPU = false + var backend string + + if numGpus > 0 { + useGPU = true + backend = "nccl" + } else { + backend = "gloo" + } + + tuningJob := &kftov1.PyTorchJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "PyTorchJob", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "kfto-mnist-", + }, + Spec: kftov1.PyTorchJobSpec{ + PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{ + "Master": { + Replicas: Ptr(int32(1)), + RestartPolicy: kftov1.RestartPolicyOnFailure, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-mnist", + }, + }, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-mnist", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: baseImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "/bin/bash", "-c", + fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ + pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ + python /mnt/files/mnist.py --epochs 3 --save-model --output-path /mnt/output --backend %s`, backend), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: config.Name, + MountPath: "/mnt/files", + }, + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + { + Name: "output-volume", + MountPath: "/mnt/output", + }, + }, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("6Gi"), + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: config.Name, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: config.Name, + }, + }, + }, + }, + { + Name: "tmp-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "output-volume", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: outputPvcName, + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyOnFailure, + }, + }, + }, + "Worker": { + Replicas: Ptr(int32(workerReplicas)), + RestartPolicy: kftov1.RestartPolicyOnFailure, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-mnist", + }, + }, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-mnist", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: baseImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "/bin/bash", "-c", + fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ + pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ + python /mnt/files/mnist.py --epochs 3 --save-model --backend %s`, backend), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: config.Name, + MountPath: "/mnt/files", + }, + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("6Gi"), + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: config.Name, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: config.Name, + }, + }, + }, + }, + { + Name: "tmp-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyOnFailure, + }, + }, + }, + }, + }, + } + + if useGPU { + // Update resource lists for GPU (NVIDIA/ROCm) usecase + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus)) + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus)) + + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ + { + Name: "NCCL_DEBUG", + Value: "INFO", + }, + } + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ + { + Name: "NCCL_DEBUG", + Value: "INFO", + }, + } + + // Update tolerations + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{ + { + Key: gpuLabel, + Operator: corev1.TolerationOpExists, + }, + } + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{ + { + Key: gpuLabel, + Operator: corev1.TolerationOpExists, + }, + } + } + + tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name) + + return tuningJob +} diff --git a/tests/kfto/resources/mnist.py b/tests/kfto/resources/mnist.py new file mode 100644 index 00000000..91b1cbd3 --- /dev/null +++ b/tests/kfto/resources/mnist.py @@ -0,0 +1,245 @@ +# Copyright 2023. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os + +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from torch.utils.data import DistributedSampler +from torch.utils.tensorboard import SummaryWriter +from torchvision import datasets, transforms + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 20, 5, 1) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + self.fc1 = nn.Linear(4 * 4 * 50, 500) + self.fc2 = nn.Linear(500, 10) + + def forward(self, x): + x = F.relu(self.conv1(x)) + x = F.max_pool2d(x, 2, 2) + x = F.relu(self.conv2(x)) + x = F.max_pool2d(x, 2, 2) + x = x.view(-1, 4 * 4 * 50) + x = F.relu(self.fc1(x)) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + + +def train(args, model, device, train_loader, epoch, writer): + model.train() + optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) + + for batch_idx, (data, target) in enumerate(train_loader): + # Attach tensors to the device. + data, target = data.to(device), target.to(device) + + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + optimizer.step() + if batch_idx % args.log_interval == 0: + print( + "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format( + epoch, + batch_idx * len(data), + len(train_loader.dataset), + 100.0 * batch_idx / len(train_loader), + loss.item(), + ) + ) + niter = epoch * len(train_loader) + batch_idx + writer.add_scalar("loss", loss.item(), niter) + + +def test(model, device, test_loader, writer, epoch): + model.eval() + + correct = 0 + with torch.no_grad(): + for data, target in test_loader: + # Attach tensors to the device. + data, target = data.to(device), target.to(device) + + output = model(data) + # Get the index of the max log-probability. + pred = output.max(1, keepdim=True)[1] + correct += pred.eq(target.view_as(pred)).sum().item() + + print("\naccuracy={:.4f}\n".format(float(correct) / len(test_loader.dataset))) + writer.add_scalar("accuracy", float(correct) / len(test_loader.dataset), epoch) + + +def main(): + # Training settings + parser = argparse.ArgumentParser(description="PyTorch FashionMNIST Example") + parser.add_argument( + "--batch-size", + type=int, + default=64, + metavar="N", + help="input batch size for training (default: 64)", + ) + parser.add_argument( + "--test-batch-size", + type=int, + default=1000, + metavar="N", + help="input batch size for testing (default: 1000)", + ) + parser.add_argument( + "--epochs", + type=int, + default=1, + metavar="N", + help="number of epochs to train (default: 10)", + ) + parser.add_argument( + "--lr", + type=float, + default=0.01, + metavar="LR", + help="learning rate (default: 0.01)", + ) + parser.add_argument( + "--momentum", + type=float, + default=0.5, + metavar="M", + help="SGD momentum (default: 0.5)", + ) + parser.add_argument( + "--no-cuda", + action="store_true", + default=False, + help="disables CUDA training", + ) + parser.add_argument( + "--seed", + type=int, + default=1, + metavar="S", + help="random seed (default: 1)", + ) + parser.add_argument( + "--log-interval", + type=int, + default=10, + metavar="N", + help="how many batches to wait before logging training status", + ) + parser.add_argument( + "--save-model", + action="store_true", + default=False, + help="For Saving the current Model", + ) + parser.add_argument( + "--dir", + default="logs", + metavar="L", + help="directory where summary logs are stored", + ) + + parser.add_argument( + "--backend", + type=str, + help="Distributed backend", + choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI], + default=dist.Backend.GLOO, + ) + + parser.add_argument( + "--output-path", + type=str, + default="./", + help="Path to save the trained model", + ) + + args = parser.parse_args() + use_cuda = not args.no_cuda and torch.cuda.is_available() + if use_cuda: + print("Using CUDA") + if args.backend != dist.Backend.NCCL: + print( + "Warning. Please use `nccl` distributed backend for the best performance using GPUs" + ) + + writer = SummaryWriter(args.dir) + + torch.manual_seed(args.seed) + + device = torch.device("cuda" if use_cuda else "cpu") + + # Attach model to the device. + model = Net().to(device) + + print("Using distributed PyTorch with {} backend".format(args.backend)) + # Set distributed training environment variables to run this training script locally. + if "WORLD_SIZE" not in os.environ: + os.environ["RANK"] = "0" + os.environ["WORLD_SIZE"] = "1" + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "1234" + + print(f"World Size: {os.environ['WORLD_SIZE']}. Rank: {os.environ['RANK']}") + + dist.init_process_group(backend=args.backend) + model = nn.parallel.DistributedDataParallel(model) + + # Get FashionMNIST train and test dataset. + train_ds = datasets.FashionMNIST( + "../data", + train=True, + download=True, + transform=transforms.Compose([transforms.ToTensor()]), + ) + test_ds = datasets.FashionMNIST( + "../data", + train=False, + download=True, + transform=transforms.Compose([transforms.ToTensor()]), + ) + # Add train and test loaders. + train_loader = torch.utils.data.DataLoader( + train_ds, + batch_size=args.batch_size, + sampler=DistributedSampler(train_ds), + ) + test_loader = torch.utils.data.DataLoader( + test_ds, + batch_size=args.test_batch_size, + sampler=DistributedSampler(test_ds), + ) + + for epoch in range(1, args.epochs + 1): + train(args, model, device, train_loader, epoch, writer) + test(model, device, test_loader, writer, epoch) + + if args.save_model and dist.get_rank() == 0: + model_path = os.path.join(args.output_path, "mnist_cnn.pt") + torch.save(model.state_dict(), model_path) + print(f"Model saved to {model_path}") + + +if __name__ == "__main__": + main() diff --git a/tests/kfto/resources/requirements-rocm.txt b/tests/kfto/resources/requirements-rocm.txt new file mode 100644 index 00000000..6e2f7b93 --- /dev/null +++ b/tests/kfto/resources/requirements-rocm.txt @@ -0,0 +1,5 @@ +--extra-index-url https://download.pytorch.org/whl/rocm6.1 +torchvision==0.19.0 +tensorboard==2.18.0 +fsspec[http]==2024.6.1 +numpy==2.0.2 \ No newline at end of file diff --git a/tests/kfto/resources/requirements.txt b/tests/kfto/resources/requirements.txt new file mode 100644 index 00000000..9352f8b6 --- /dev/null +++ b/tests/kfto/resources/requirements.txt @@ -0,0 +1,4 @@ +torchvision==0.19.0 +tensorboard==2.18.0 +fsspec[http]==2024.6.1 +numpy==2.0.2 \ No newline at end of file