Skip to content

Commit

Permalink
Removing extra calls to k8s api in the job sync
Browse files Browse the repository at this point in the history
  • Loading branch information
satr committed Aug 13, 2024
1 parent 6c660b0 commit ff03468
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 46 deletions.
38 changes: 18 additions & 20 deletions pkg/apis/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (job *Job) getStoppedSteps(ctx context.Context, isRunning bool) (*[]v1.Radi
return nil, err
}

err = deleteJobPodIfExistsAndNotCompleted(ctx, job.kubeclient, job.radixJob.Namespace, job.radixJob.Name)
err = job.deleteJobPodIfExistsAndNotCompleted(ctx, job.radixJob.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func (job *Job) deleteStepJobs(ctx context.Context) error {
if err = job.kubeclient.BatchV1().Jobs(job.radixJob.Namespace).Delete(ctx, kubernetesJob.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return err
}
if err = deleteJobPodIfExistsAndNotCompleted(ctx, job.kubeclient, job.radixJob.Namespace, kubernetesJob.Name); err != nil {
if err = job.deleteJobPodIfExistsAndNotCompleted(ctx, kubernetesJob.Name); err != nil {
return err
}
}
Expand Down Expand Up @@ -431,7 +431,7 @@ func (job *Job) getJobSteps(ctx context.Context, pipelineJobs []batchv1.Job) ([]

func (job *Job) getPipelineJobsPods(ctx context.Context) ([]corev1.Pod, error) {
podList, err := job.kubeclient.CoreV1().Pods(job.radixJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set{kube.RadixJobNameLabel: job.radixJob.Name}.String(),
LabelSelector: job.getRadixJobNameLabelSelector(),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -553,12 +553,10 @@ func getComponentImagesForContainer(name string, componentImages pipeline.BuildC
}

func (job *Job) getPipelinePod(pods []corev1.Pod) *corev1.Pod {
jobPods := slice.FindAll(pods, func(pod corev1.Pod) bool { return pod.GetLabels()[jobNameLabel] == job.radixJob.Name })
if len(jobPods) == 0 {
// pipeline pod not found
return nil
if jobPod, ok := slice.FindFirst(pods, func(pod corev1.Pod) bool { return pod.GetLabels()[jobNameLabel] == job.radixJob.Name }); ok {
return &jobPod
}
return &jobPods[0]
return nil
}

func getPipelineJobStep(pipelinePod *corev1.Pod) v1.RadixJobStep {
Expand All @@ -573,18 +571,16 @@ func (job *Job) getCloneConfigApplyConfigAndPreparePipelineStep(pipelineJobs []b
return nil, nil
}

jobPods := slice.FindAll(pipelinePods, func(pod corev1.Pod) bool { return pod.GetLabels()[jobNameLabel] == preparePipelineStepJobs[0].Name })

if len(jobPods) != 1 {
jobPod, jobPodExists := slice.FindFirst(pipelinePods, func(pod corev1.Pod) bool { return pod.GetLabels()[jobNameLabel] == preparePipelineStepJobs[0].Name })
if !jobPodExists {
return nil, nil
}
jobPod := jobPods[0]

cloneContainerStatus := getContainerStatusByName(git.CloneConfigContainerName, jobPod.Status.InitContainerStatuses)
if cloneContainerStatus == nil {
return nil, nil
}
cloneConfigContainerStep := getJobStepWithNoComponents(jobPods[0].GetName(), cloneContainerStatus)
cloneConfigContainerStep := getJobStepWithNoComponents(jobPod.GetName(), cloneContainerStatus)
applyConfigAndPreparePipelineContainerJobStep := getJobStepWithNoComponents(jobPod.GetName(), &jobPod.Status.ContainerStatuses[0])

return &cloneConfigContainerStep, &applyConfigAndPreparePipelineContainerJobStep
Expand Down Expand Up @@ -646,9 +642,7 @@ func getJobStepWithContainerName(podName, containerName string, containerStatus
func (job *Job) getJobEnvironments(ctx context.Context) ([]string, error) {
deploymentsLinkedToJob, err := job.radixclient.RadixV1().RadixDeployments(corev1.NamespaceAll).List(
ctx,
metav1.ListOptions{
LabelSelector: labels.Set{"radix-job-name": job.radixJob.Name}.String(),
})
metav1.ListOptions{LabelSelector: job.getRadixJobNameLabelSelector()})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -694,14 +688,18 @@ func (job *Job) updateRadixJobStatus(ctx context.Context, rj *v1.RadixJob, chang

func (job *Job) getPipelineJobs(ctx context.Context) ([]batchv1.Job, error) {
jobList, err := job.kubeclient.BatchV1().Jobs(job.radixJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set{kube.RadixJobNameLabel: job.radixJob.Name}.String(),
LabelSelector: job.getRadixJobNameLabelSelector(),
})
if err != nil {
return nil, err
}
return jobList.Items, nil
}

func (job *Job) getRadixJobNameLabelSelector() string {
return labels.Set{kube.RadixJobNameLabel: job.radixJob.Name}.String()
}

func (job *Job) getRadixJobs(ctx context.Context) ([]v1.RadixJob, error) {
radixJobList, err := job.radixclient.RadixV1().RadixJobs(job.radixJob.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -710,8 +708,8 @@ func (job *Job) getRadixJobs(ctx context.Context) ([]v1.RadixJob, error) {
return radixJobList.Items, err
}

func deleteJobPodIfExistsAndNotCompleted(ctx context.Context, client kubernetes.Interface, namespace, jobName string) error {
pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{"job-name": jobName}.String()})
func (job *Job) deleteJobPodIfExistsAndNotCompleted(ctx context.Context, jobName string) error {
pods, err := job.kubeclient.CoreV1().Pods(job.radixJob.Namespace).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{jobNameLabel: jobName}.String()})
if err != nil {
return err
}
Expand All @@ -721,7 +719,7 @@ func deleteJobPodIfExistsAndNotCompleted(ctx context.Context, client kubernetes.

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err = client.CoreV1().Pods(namespace).Delete(ctx, pods.Items[0].Name, metav1.DeleteOptions{})
err = job.kubeclient.CoreV1().Pods(job.radixJob.Namespace).Delete(ctx, pods.Items[0].Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
Expand Down
52 changes: 26 additions & 26 deletions pkg/apis/job/job_steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type setStatusOfJobTestScenarioExpected struct {

type setStatusOfJobTestScenario struct {
name string
radixjob *v1.RadixJob
radixJob *v1.RadixJob
jobs []*batchv1.Job
pods []*corev1.Pod
configMaps []*corev1.ConfigMap
Expand Down Expand Up @@ -82,7 +82,7 @@ func (s *RadixJobStepTestSuite) TestIt() {
expected: v1.RadixJobStep{Condition: v1.JobFailed, Started: &startedAt, Ended: &finishedAt},
},
{
name: "test podName, containerName and componenets set",
name: "test podName, containerName and components set",
podName: "a_pod",
containerName: "a_container",
components: []string{"comp1", "comp2"},
Expand All @@ -100,7 +100,7 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_NoPipelineJob() {
scenario := setStatusOfJobTestScenario{

name: "missing pipeline job",
radixjob: s.getBuildDeployJob("job-1", "app-1").BuildRJ(),
radixJob: s.getBuildDeployJob("job-1", "app-1").BuildRJ(),
}

s.testSetStatusOfJobTestScenario(&scenario)
Expand All @@ -109,7 +109,7 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_NoPipelineJob() {
func (s *RadixJobStepTestSuite) Test_StatusSteps_NoPipelinePod() {
scenario := setStatusOfJobTestScenario{
name: "missing pipeline pod",
radixjob: s.getBuildDeployJob("job-2", "app-2").BuildRJ(),
radixJob: s.getBuildDeployJob("job-2", "app-2").BuildRJ(),
jobs: []*batchv1.Job{
s.getPipelineJob("job-2", "app-2", ""),
},
Expand All @@ -121,13 +121,13 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_NoPipelinePod() {
func (s *RadixJobStepTestSuite) Test_StatusSteps_NoEmptyCloneSteps() {
scenario := setStatusOfJobTestScenario{
name: "no empty steps when clone config has not been created",
radixjob: s.getBuildDeployJob("job-3", "app-3").BuildRJ(),
radixJob: s.getBuildDeployJob("job-3", "app-3").BuildRJ(),
jobs: []*batchv1.Job{
s.getPipelineJob("job-3", "app-3", ""),
},
pods: []*corev1.Pod{
s.appendJobPodContainerStatus(
s.getJobPod("pipeline-pod-3", "job-3", utils.GetAppNamespace("app-3")),
s.getJobPod("pipeline-pod-3", "job-3", "job-3", utils.GetAppNamespace("app-3")),
s.getWaitingContainerStatus("radix-pipeline")),
},
expected: setStatusOfJobTestScenarioExpected{
Expand All @@ -141,23 +141,23 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_NoEmptyCloneSteps() {
func (s *RadixJobStepTestSuite) Test_StatusSteps_CorrectCloneStepsSequence() {
scenario := setStatusOfJobTestScenario{
name: "clone and apply config steps added before pipeline step",
radixjob: s.getBuildDeployJob("job-4", "app-4").BuildRJ(),
radixJob: s.getBuildDeployJob("job-4", "app-4").BuildRJ(),
jobs: []*batchv1.Job{
s.getPipelineJob("job-4", "app-4", "a_tag"),
s.getPreparePipelineJob("prepare-pipeline-4", "job-4", "app-4", "a_tag"),
s.getRunPipelineJob("run-pipeline-job-4", "job-4", "app-4", "a_tag"),
},
pods: []*corev1.Pod{
s.appendJobPodContainerStatus(
s.getJobPod("pipeline-pod-4", "job-4", utils.GetAppNamespace("app-4")),
s.getJobPod("pipeline-pod-4", "job-4", "job-4", utils.GetAppNamespace("app-4")),
s.getWaitingContainerStatus("radix-pipeline")),
s.appendJobPodInitContainerStatus(
s.appendJobPodContainerStatus(
s.getJobPod("prepare-pipeline-pod-4", "prepare-pipeline-4", utils.GetAppNamespace("app-4")),
s.getJobPod("prepare-pipeline-pod-4", "job-4", "prepare-pipeline-4", utils.GetAppNamespace("app-4")),
s.getWaitingContainerStatus("prepare-pipeline")),
s.getWaitingContainerStatus("clone-config")),
s.appendJobPodContainerStatus(
s.getJobPod("run-pipeline-pod-4", "run-pipeline-job-4", utils.GetAppNamespace("app-4")),
s.getJobPod("run-pipeline-pod-4", "job-4", "run-pipeline-job-4", utils.GetAppNamespace("app-4")),
s.getWaitingContainerStatus("run-pipeline")),
},
expected: setStatusOfJobTestScenarioExpected{
Expand All @@ -176,7 +176,7 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_CorrectCloneStepsSequence() {
func (s *RadixJobStepTestSuite) Test_StatusSteps_BuildSteps() {
scenario := setStatusOfJobTestScenario{
name: "pipeline with build steps",
radixjob: s.getBuildDeployJob("job-5", "app-5").BuildRJ(),
radixJob: s.getBuildDeployJob("job-5", "app-5").BuildRJ(),
jobs: []*batchv1.Job{
s.getPipelineJob("job-5", "app-5", "a_tag"),
s.getPreparePipelineJob("prepare-pipeline-5", "job-5", "app-5", "a_tag"),
Expand All @@ -190,20 +190,20 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_BuildSteps() {
},
pods: []*corev1.Pod{
s.appendJobPodContainerStatus(
s.getJobPod("pipeline-pod-5", "job-5", utils.GetAppNamespace("app-5")),
s.getJobPod("pipeline-pod-5", "job-5", "job-5", utils.GetAppNamespace("app-5")),
s.getWaitingContainerStatus("radix-pipeline")),
s.appendJobPodInitContainerStatus(
s.appendJobPodContainerStatus(
s.getJobPod("prepare-pipeline-pod-5", "prepare-pipeline-5", utils.GetAppNamespace("app-5")),
s.getJobPod("prepare-pipeline-pod-5", "job-5", "prepare-pipeline-5", utils.GetAppNamespace("app-5")),
s.getWaitingContainerStatus("prepare-pipeline")),
s.getWaitingContainerStatus("clone-config")),
s.appendJobPodContainerStatus(
s.getJobPod("build-pod-5", "build-job-5", utils.GetAppNamespace("app-5")),
s.getJobPod("build-pod-5", "job-5", "build-job-5", utils.GetAppNamespace("app-5")),
s.getWaitingContainerStatus("build-app"),
s.getWaitingContainerStatus("build-multi"),
),
s.appendJobPodContainerStatus(
s.getJobPod("run-pipeline-pod-5", "run-pipeline-5", utils.GetAppNamespace("app-5")),
s.getJobPod("run-pipeline-pod-5", "job-5", "run-pipeline-5", utils.GetAppNamespace("app-5")),
s.getWaitingContainerStatus("run-pipeline")),
},
expected: setStatusOfJobTestScenarioExpected{
Expand All @@ -224,7 +224,7 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_BuildSteps() {
func (s *RadixJobStepTestSuite) Test_StatusSteps_InitContainers() {
scenario := setStatusOfJobTestScenario{
name: "steps with init containers",
radixjob: s.getBuildDeployJob("job-1", "app-1").BuildRJ(),
radixJob: s.getBuildDeployJob("job-1", "app-1").BuildRJ(),
jobs: []*batchv1.Job{
s.getPipelineJob("job-1", "app-1", "a_tag"),
s.getPreparePipelineJob("prepare-pipeline-1", "job-1", "app-1", "a_tag"),
Expand All @@ -233,21 +233,21 @@ func (s *RadixJobStepTestSuite) Test_StatusSteps_InitContainers() {
},
pods: []*corev1.Pod{
s.appendJobPodContainerStatus(
s.getJobPod("pipeline-pod-1", "job-1", utils.GetAppNamespace("app-1")),
s.getJobPod("pipeline-pod-1", "job-1", "job-1", utils.GetAppNamespace("app-1")),
s.getWaitingContainerStatus("radix-pipeline")),
s.appendJobPodInitContainerStatus(
s.appendJobPodContainerStatus(
s.getJobPod("prepare-pipeline-pod-1", "prepare-pipeline-1", utils.GetAppNamespace("app-1")),
s.getJobPod("prepare-pipeline-pod-1", "job-1", "prepare-pipeline-1", utils.GetAppNamespace("app-1")),
s.getWaitingContainerStatus("prepare-pipeline")),
s.getWaitingContainerStatus("clone-config")),
s.appendJobPodInitContainerStatus(
s.getJobPod("build-pod-1", "build-job-1", utils.GetAppNamespace("app-1")),
s.getJobPod("build-pod-1", "job-1", "build-job-1", utils.GetAppNamespace("app-1")),
s.getWaitingContainerStatus("build-init1"),
s.getWaitingContainerStatus("build-init2"),
s.getWaitingContainerStatus("internal-build-init"),
),
s.appendJobPodContainerStatus(
s.getJobPod("run-pipeline-pod-1", "run-pipeline-1", utils.GetAppNamespace("app-1")),
s.getJobPod("run-pipeline-pod-1", "job-1", "run-pipeline-1", utils.GetAppNamespace("app-1")),
s.getWaitingContainerStatus("run-pipeline")),
},
expected: setStatusOfJobTestScenarioExpected{
Expand All @@ -269,20 +269,20 @@ func (s *RadixJobStepTestSuite) testSetStatusOfJobTestScenario(scenario *setStat
err := s.initScenario(scenario)
require.NoError(s.T(), err, "scenario %s", scenario.name)

job := NewJob(s.kubeClient, s.kubeUtils, s.radixClient, scenario.radixjob, nil)
job := NewJob(s.kubeClient, s.kubeUtils, s.radixClient, scenario.radixJob, nil)
err = job.setStatusOfJob(context.Background())
require.NoError(s.T(), err, "scenario %s", scenario.name)

actualRj, err := s.radixClient.RadixV1().RadixJobs(scenario.radixjob.Namespace).Get(context.Background(), scenario.radixjob.Name, metav1.GetOptions{})
actualRj, err := s.radixClient.RadixV1().RadixJobs(scenario.radixJob.Namespace).Get(context.Background(), scenario.radixJob.Name, metav1.GetOptions{})
require.NoError(s.T(), err, "scenario %s", scenario.name)

assert.Equal(s.T(), scenario.expected.returnsError, err != nil, scenario.name)
assert.ElementsMatch(s.T(), scenario.expected.steps, actualRj.Status.Steps, scenario.name)
}

func (s *RadixJobStepTestSuite) initScenario(scenario *setStatusOfJobTestScenario) error {
if scenario.radixjob != nil {
if _, err := s.radixClient.RadixV1().RadixJobs(scenario.radixjob.Namespace).Create(context.Background(), scenario.radixjob, metav1.CreateOptions{}); err != nil {
if scenario.radixJob != nil {
if _, err := s.radixClient.RadixV1().RadixJobs(scenario.radixJob.Namespace).Create(context.Background(), scenario.radixJob, metav1.CreateOptions{}); err != nil {
return err
}
}
Expand Down Expand Up @@ -374,12 +374,12 @@ func (s *RadixJobStepTestSuite) getBuildJob(name, radixJobName, appName, imageTa
}
}

func (s *RadixJobStepTestSuite) getJobPod(podName, jobName, namespace string) *corev1.Pod {
func (s *RadixJobStepTestSuite) getJobPod(podName, radixJobName, jobName, namespace string) *corev1.Pod {
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{"job-name": jobName},
Labels: map[string]string{kube.RadixJobNameLabel: radixJobName, jobNameLabel: jobName},
},
}

Expand Down

0 comments on commit ff03468

Please sign in to comment.