Skip to content

Commit

Permalink
Job nodepool (#922)
Browse files Browse the repository at this point in the history
* Added toleration and label for job nodes

* Removed outdated method

* Start all pipeline jobs in job pool

* Changed job pool

* fix vscode launch var duplication

---------

Co-authored-by: Nils Gustav Stråbø <[email protected]>
  • Loading branch information
satr and nilsgstrabo authored Sep 8, 2023
1 parent 3ba1b75 commit d547981
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 160 deletions.
4 changes: 3 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@
"RADIXOPERATOR_PODSECURITYSTANDARD_WARN_VERSION": "v1.23",
"RADIX_ZONE": "dev",
"RADIX_DEPLOYMENTS_PER_ENVIRONMENT_HISTORY_LIMIT": "10",
"RADIX_PIPELINE_JOBS_HISTORY_LIMIT": "5"
"RADIX_PIPELINE_JOBS_HISTORY_LIMIT": "5",
"SECCOMP_PROFILE_FILENAME": "allow-buildah.json",
"RADIX_BUILDAH_IMAGE_BUILDER": "quay.io/buildah/stable:v1.31"
},
"args": ["--useOutClusterClient=false"]
},
Expand Down
4 changes: 2 additions & 2 deletions charts/radix-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
name: radix-operator
version: 1.21.9
appVersion: 1.41.9
version: 1.22.0
appVersion: 1.42.0
kubeVersion: ">=1.24.0"
description: Radix Operator
keywords:
Expand Down
10 changes: 10 additions & 0 deletions charts/radix-operator/templates/seccomp-profile-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ spec:
labels:
app: {{ .Values.seccompProfile.daemonSetName }}
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "nodepool-type"
operator: NotIn
values: ["system"]
containers:
- name: file-copy-container
image: alpine
Expand All @@ -30,3 +38,5 @@ spec:
- name: configmap-volume
configMap:
name: {{ .Values.seccompProfile.configMapName }}
tolerations:
- operator: Exists
2 changes: 2 additions & 0 deletions pipeline-runner/steps/build_acr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func createACRBuildJob(rr *v1.RadixRegistration, pipelineInfo *model.PipelineInf
Containers: buildContainers,
SecurityContext: buildPodSecurityContext,
Volumes: getACRBuildJobVolumes(&defaultMode, buildSecrets),
Affinity: utils.GetPodSpecAffinity(nil, appName, "", false, true),
Tolerations: utils.GetPodSpecTolerations(nil, false, true),
},
},
},
Expand Down
4 changes: 3 additions & 1 deletion pipeline-runner/utils/tekton.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
podLabelsFileName = "labels"
)

//CreateActionPipelineJob Create action pipeline job
// CreateActionPipelineJob Create action pipeline job
func CreateActionPipelineJob(containerName string, action string, pipelineInfo *model.PipelineInfo, appName string, initContainers []corev1.Container, envVars *[]corev1.EnvVar) *batchv1.Job {
imageTag := pipelineInfo.PipelineArguments.ImageTag
jobName := pipelineInfo.PipelineArguments.JobName
Expand Down Expand Up @@ -56,6 +56,8 @@ func CreateActionPipelineJob(containerName string, action string, pipelineInfo *
},
Volumes: getJobVolumes(),
RestartPolicy: "Never",
Affinity: utils.GetPodSpecAffinity(nil, appName, "", false, true),
Tolerations: utils.GetPodSpecTolerations(nil, false, true),
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/batch/kubejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func (s *syncer) buildJob(batchJob *radixv1.RadixBatchJob, jobComponent *radixv1
SecurityContext: securitycontext.Pod(securitycontext.WithPodSeccompProfile(corev1.SeccompProfileTypeRuntimeDefault)),
RestartPolicy: corev1.RestartPolicyNever,
ImagePullSecrets: rd.Spec.ImagePullSecrets,
Affinity: operatorUtils.GetPodSpecAffinity(node, rd.Spec.AppName, jobComponent.GetName()),
Tolerations: operatorUtils.GetPodSpecTolerations(node),
Affinity: operatorUtils.GetPodSpecAffinity(node, rd.Spec.AppName, jobComponent.GetName(), true, false),
Tolerations: operatorUtils.GetPodSpecTolerations(jobComponent.GetNode(), true, false),
ActiveDeadlineSeconds: timeLimitSeconds,
ServiceAccountName: serviceAccountSpec.ServiceAccountName(),
AutomountServiceAccountToken: serviceAccountSpec.AutomountServiceAccountToken(),
Expand Down
83 changes: 47 additions & 36 deletions pkg/apis/batch/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,17 @@ func (s *syncerTestSuite) Test_BatchStaticConfiguration() {
s.Equal(corev1.PullAlways, kubejob.Spec.Template.Spec.Containers[0].ImagePullPolicy)
s.Equal("default", kubejob.Spec.Template.Spec.ServiceAccountName)
s.Equal(utils.BoolPtr(false), kubejob.Spec.Template.Spec.AutomountServiceAccountToken)
s.Nil(kubejob.Spec.Template.Spec.Affinity.NodeAffinity)
s.Len(kubejob.Spec.Template.Spec.Tolerations, 0)
s.Len(kubejob.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, 1)
s.Equal(corev1.NodeSelectorRequirement{
Key: kube.RadixJobNodeLabel,
Operator: corev1.NodeSelectorOpExists,
}, kubejob.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0])
s.Len(kubejob.Spec.Template.Spec.Tolerations, 1)
s.Equal(corev1.Toleration{
Key: kube.NodeTaintJobsKey,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
}, kubejob.Spec.Template.Spec.Tolerations[0])
s.Len(kubejob.Spec.Template.Spec.Volumes, 0)
s.Len(kubejob.Spec.Template.Spec.Containers[0].VolumeMounts, 0)
services, err := s.kubeClient.CoreV1().Services(namespace).List(context.Background(), metav1.ListOptions{})
Expand Down Expand Up @@ -1023,34 +1032,45 @@ func (s *syncerTestSuite) Test_JobWithGpuNode() {
s.Require().Len(jobs.Items, 2)

job1 := slice.FindAll(jobs.Items, func(job batchv1.Job) bool { return job.GetName() == getKubeJobName(batchName, job1Name) })[0]
s.Len(job1.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, 1)
s.Len(job1.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, 2)
gpu := s.getNodeSelectorRequirementByKeyForTest(job1.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, kube.RadixGpuLabel)
s.Equal(corev1.NodeSelectorOpIn, gpu.Operator)
s.ElementsMatch([]string{"gpu1", "gpu2"}, gpu.Values)
gpuCount := s.getNodeSelectorRequirementByKeyForTest(job1.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, kube.RadixGpuCountLabel)
s.Equal(corev1.NodeSelectorOpGt, gpuCount.Operator)
s.Equal([]string{"3"}, gpuCount.Values)
job1NodeSelectorTerms := job1.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
s.Len(job1NodeSelectorTerms, 2)
s.Equal(corev1.NodeSelectorRequirement{
Key: kube.RadixGpuLabel,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"gpu1", "gpu2"},
}, job1NodeSelectorTerms[0].MatchExpressions[0])
s.Equal(corev1.NodeSelectorRequirement{
Key: kube.RadixGpuCountLabel,
Operator: corev1.NodeSelectorOpGt,
Values: []string{"3"},
}, job1NodeSelectorTerms[0].MatchExpressions[1])
s.Equal(corev1.NodeSelectorRequirement{
Key: kube.RadixJobNodeLabel,
Operator: corev1.NodeSelectorOpExists,
}, job1NodeSelectorTerms[1].MatchExpressions[0])

tolerations := job1.Spec.Template.Spec.Tolerations
s.Len(tolerations, 1)
s.Equal(kube.NodeTaintGpuCountKey, tolerations[0].Key)
s.Equal(corev1.TolerationOpExists, tolerations[0].Operator)
s.Equal(corev1.TaintEffectNoSchedule, tolerations[0].Effect)
s.Len(tolerations, 2)
s.Equal(corev1.Toleration{Key: kube.RadixGpuCountLabel, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule}, tolerations[0])
s.Equal(corev1.Toleration{Key: kube.NodeTaintJobsKey, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule}, tolerations[1])

job2 := slice.FindAll(jobs.Items, func(job batchv1.Job) bool { return job.GetName() == getKubeJobName(batchName, job2Name) })[0]
s.Len(job2.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, 1)
s.Len(job2.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, 2)
gpu = s.getNodeSelectorRequirementByKeyForTest(job2.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, kube.RadixGpuLabel)
s.Equal(corev1.NodeSelectorOpIn, gpu.Operator)
s.ElementsMatch([]string{"gpu3", "gpu4"}, gpu.Values)
gpuCount = s.getNodeSelectorRequirementByKeyForTest(job2.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, kube.RadixGpuCountLabel)
s.Equal(corev1.NodeSelectorOpGt, gpuCount.Operator)
s.Equal([]string{"7"}, gpuCount.Values)
tolerations = job2.Spec.Template.Spec.Tolerations
s.Len(tolerations, 1)
s.Equal(kube.NodeTaintGpuCountKey, tolerations[0].Key)
s.Equal(corev1.TolerationOpExists, tolerations[0].Operator)
s.Equal(corev1.TaintEffectNoSchedule, tolerations[0].Effect)
job2NodeSelectorTerms := job2.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
s.Len(job2NodeSelectorTerms, 2)
s.Equal(corev1.NodeSelectorRequirement{
Key: kube.RadixGpuLabel,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"gpu3", "gpu4"},
}, job2NodeSelectorTerms[0].MatchExpressions[0])
s.Equal(corev1.NodeSelectorRequirement{
Key: kube.RadixGpuCountLabel,
Operator: corev1.NodeSelectorOpGt,
Values: []string{"7"},
}, job2NodeSelectorTerms[0].MatchExpressions[1])
s.Equal(corev1.NodeSelectorRequirement{
Key: kube.RadixJobNodeLabel,
Operator: corev1.NodeSelectorOpExists,
}, job2NodeSelectorTerms[1].MatchExpressions[0])
}

func (s *syncerTestSuite) Test_StopJob() {
Expand Down Expand Up @@ -1503,15 +1523,6 @@ func (s *syncerTestSuite) Test_BatchJobStatusWaitingToStopped() {
s.NotNil(batch.Status.JobStatuses[0].EndTime)
}

func (s *syncerTestSuite) getNodeSelectorRequirementByKeyForTest(requirements []corev1.NodeSelectorRequirement, key string) *corev1.NodeSelectorRequirement {
for _, requirement := range requirements {
if requirement.Key == key {
return &requirement
}
}
return nil
}

func (s *syncerTestSuite) updateKubeJobStatus(jobName, namespace string) func(updater func(status *batchv1.JobStatus)) {
job, err := s.kubeClient.BatchV1().Jobs(namespace).Get(context.Background(), jobName, metav1.GetOptions{})
if err != nil {
Expand Down
Loading

0 comments on commit d547981

Please sign in to comment.