Skip to content

Commit

Permalink
release restart scheduled job (#890)
Browse files Browse the repository at this point in the history
* Bump sigs.k8s.io/secrets-store-csi-driver from 1.3.2 to 1.3.3

Bumps [sigs.k8s.io/secrets-store-csi-driver](https://github.com/kubernetes-sigs/secrets-store-csi-driver) from 1.3.2 to 1.3.3.
- [Release notes](https://github.com/kubernetes-sigs/secrets-store-csi-driver/releases)
- [Changelog](https://github.com/kubernetes-sigs/secrets-store-csi-driver/blob/main/docs/RELEASE.md)
- [Commits](kubernetes-sigs/secrets-store-csi-driver@v1.3.2...v1.3.3)

---
updated-dependencies:
- dependency-name: sigs.k8s.io/secrets-store-csi-driver
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <[email protected]>

* Start stopped restart scheduled job (#889)

* Added restart field to radixbatch

* Saved status of restart field to radixbatch

* Removed status of restart field from radixbatch, left only in job

* Incremented chart id

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
satr and dependabot[bot] authored Jul 13, 2023
1 parent 0038f49 commit b3e80cb
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 39 deletions.
4 changes: 2 additions & 2 deletions charts/radix-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
name: radix-operator
version: 1.18.5
appVersion: 1.38.5
version: 1.19.0
appVersion: 1.39.0
kubeVersion: ">=1.22.0"
description: Radix Operator
keywords:
Expand Down
11 changes: 10 additions & 1 deletion charts/radix-operator/templates/radixbatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ spec:
jobs:
description: List of batch jobs to run.
items:
description: Spec for a batch job
description: RadixBatchJob Spec for a batch job
properties:
backoffLimit:
description: Specifies the number of retries before marking
Expand Down Expand Up @@ -112,6 +112,12 @@ spec:
otherwise to an implementation-defined value.
type: object
type: object
restart:
description: Controls if a job should be restarted. If Restart
is set to new timestamp, and - the job is stopped - the job
is started again. - the job is running - the job is stopped
and started again. This timestamp set to the job's status.restart.
type: string
stop:
description: Controls if a job should be stopped. If Stop is
set to true, the underlying Kubernetes Job is deleted. A job
Expand Down Expand Up @@ -229,6 +235,9 @@ spec:
description: A brief CamelCase message indicating details about
why the job is in this phase
type: string
restart:
description: Timestamp of the job restart, if applied.
type: string
startTime:
description: The time at which the Kubernetes job was started.
format: date-time
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/equinor/radix-operator
go 1.18

require (
github.com/equinor/radix-common v1.2.10
github.com/equinor/radix-common v1.3.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.13
Expand All @@ -21,7 +21,7 @@ require (
k8s.io/apiextensions-apiserver v0.25.8
k8s.io/apimachinery v0.25.8
k8s.io/client-go v0.25.8
sigs.k8s.io/secrets-store-csi-driver v1.3.2
sigs.k8s.io/secrets-store-csi-driver v1.3.3
sigs.k8s.io/yaml v1.3.0
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/equinor/radix-common v1.2.10 h1:tqZgvGD1xEJPkFXq+MFo+3cg8cEnDBDti6cahYuOKLo=
github.com/equinor/radix-common v1.2.10/go.mod h1:E0gZfGfr8MNj1zuzjgyf1oV+1pry1VNmNZP8TscTq7U=
github.com/equinor/radix-common v1.3.0 h1:ud2AofqF8ukCm9t1bUz+CCidRYG9Yh8g6myLWlAB3UE=
github.com/equinor/radix-common v1.3.0/go.mod h1:E0gZfGfr8MNj1zuzjgyf1oV+1pry1VNmNZP8TscTq7U=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
Expand Down Expand Up @@ -749,8 +749,8 @@ sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE
sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/secrets-store-csi-driver v1.3.2 h1:8RKutIS8mWMazII4la4p8+oO93XdSAI9ho3sEvtu/Q8=
sigs.k8s.io/secrets-store-csi-driver v1.3.2/go.mod h1:jh6wML45aTbxT2YZtU4khzSm8JYxwVrQbhsum+WR6j8=
sigs.k8s.io/secrets-store-csi-driver v1.3.3 h1:8UXTMIO4kZqGLJ65UWRfJXbRnb6PU6olP+vSriGZRp0=
sigs.k8s.io/secrets-store-csi-driver v1.3.3/go.mod h1:jh6wML45aTbxT2YZtU4khzSm8JYxwVrQbhsum+WR6j8=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
Expand Down
87 changes: 65 additions & 22 deletions pkg/apis/batch/kubejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,85 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
)

const (
jobPayloadVolumeName = "job-payload"
)

func (s *syncer) reconcileKubeJob(batchJob radixv1.RadixBatchJob, rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, existingJobs []*batchv1.Job) error {
// Delete existing k8s job if stop is requested for batch job
func (s *syncer) reconcileKubeJob(batchJob *radixv1.RadixBatchJob, rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, existingJobs []*batchv1.Job) error {
if isBatchJobStopRequested(batchJob) {
for _, jobToDelete := range slice.FindAll(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) }) {
err := s.kubeclient.BatchV1().Jobs(jobToDelete.GetNamespace()).Delete(context.Background(), jobToDelete.GetName(), metav1.DeleteOptions{PropagationPolicy: pointers.Ptr(metav1.DeletePropagationBackground)})
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
// Delete existing k8s job if stop is requested for batch job
batchJobKubeJobs := slice.FindAll(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
return s.deleteJobs(batchJobKubeJobs)
}

if isBatchJobDone(s.batch, batchJob.Name) {
return nil
jobNeedToBeRestarted, err := s.handleJobToRestart(batchJob, existingJobs)
if err != nil {
return err
}

if slice.Any(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) }) {
if !jobNeedToBeRestarted && (isBatchJobDone(s.batch, batchJob.Name) ||
slice.Any(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })) {
return nil
}

job, err := s.buildJob(batchJob, jobComponent, rd)
if err != nil {
return err
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err = s.kubeclient.BatchV1().Jobs(s.batch.GetNamespace()).Create(context.TODO(), job, metav1.CreateOptions{})
return err
})
}

func (s *syncer) handleJobToRestart(batchJob *radixv1.RadixBatchJob, existingJobs []*batchv1.Job) (bool, error) {
jobStatusIdx := slice.FindIndex(s.batch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJob.Name
})

jobRestartTimestamp, jobStatusRestartTimestamp := s.getJobRestartTimestamps(batchJob, jobStatusIdx)
needRestartJob := len(jobRestartTimestamp) > 0 && jobRestartTimestamp != jobStatusRestartTimestamp
if !needRestartJob {
return false, nil
}

_, err = s.kubeclient.BatchV1().Jobs(s.batch.GetNamespace()).Create(context.TODO(), job, metav1.CreateOptions{})
return err
jobsToDelete := slice.FindAll(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
err := s.deleteJobs(jobsToDelete)
if err != nil {
return true, err
}

jobStatus := radixv1.RadixBatchJobStatus{
Name: batchJob.Name,
Restart: jobRestartTimestamp,
}
if jobStatusIdx >= 0 {
s.batch.Status.JobStatuses[jobStatusIdx] = jobStatus
return true, nil
}
s.batch.Status.JobStatuses = append(s.batch.Status.JobStatuses, jobStatus)
return true, nil
}

func (s *syncer) getJobRestartTimestamps(batchJob *radixv1.RadixBatchJob, jobStatusIdx int) (string, string) {
if jobStatusIdx >= 0 {
return batchJob.Restart, s.batch.Status.JobStatuses[jobStatusIdx].Restart
}
return batchJob.Restart, ""
}

func (s *syncer) deleteJobs(jobsToDelete []*batchv1.Job) error {
for _, jobToDelete := range jobsToDelete {
err := s.kubeclient.BatchV1().Jobs(jobToDelete.GetNamespace()).Delete(context.Background(), jobToDelete.GetName(), metav1.DeleteOptions{PropagationPolicy: pointers.Ptr(metav1.DeletePropagationBackground)})
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}

func (s *syncer) buildJob(batchJob radixv1.RadixBatchJob, jobComponent *radixv1.RadixDeployJobComponent, rd *radixv1.RadixDeployment) (*batchv1.Job, error) {
func (s *syncer) buildJob(batchJob *radixv1.RadixBatchJob, jobComponent *radixv1.RadixDeployJobComponent, rd *radixv1.RadixDeployment) (*batchv1.Job, error) {
jobLabels := s.batchJobIdentifierLabel(batchJob.Name, rd.Spec.AppName)
podLabels := radixlabels.Merge(
jobLabels,
Expand Down Expand Up @@ -120,7 +163,7 @@ func (s *syncer) buildJob(batchJob radixv1.RadixBatchJob, jobComponent *radixv1.
return job, nil
}

func (s *syncer) getVolumes(namespace, environment string, batchJob radixv1.RadixBatchJob, radixJobComponent *radixv1.RadixDeployJobComponent, radixDeploymentName string) ([]corev1.Volume, error) {
func (s *syncer) getVolumes(namespace, environment string, batchJob *radixv1.RadixBatchJob, radixJobComponent *radixv1.RadixDeployJobComponent, radixDeploymentName string) ([]corev1.Volume, error) {
volumes, err := deployment.GetVolumes(s.kubeclient, s.kubeutil, namespace, environment, radixJobComponent, radixDeploymentName)
if err != nil {
return nil, err
Expand All @@ -143,7 +186,7 @@ func (s *syncer) getVolumes(namespace, environment string, batchJob radixv1.Radi
return volumes, nil
}

func (s *syncer) getContainers(rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, batchJob radixv1.RadixBatchJob, kubeJobName string) ([]corev1.Container, error) {
func (s *syncer) getContainers(rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, batchJob *radixv1.RadixBatchJob, kubeJobName string) ([]corev1.Container, error) {
volumeMounts, err := s.getContainerVolumeMounts(batchJob, jobComponent, rd.GetName())
if err != nil {
return nil, err
Expand All @@ -153,7 +196,7 @@ func (s *syncer) getContainers(rd *radixv1.RadixDeployment, jobComponent *radixv
return nil, err
}
ports := getContainerPorts(jobComponent)
resources := s.getContainerResources(jobComponent, batchJob)
resources := s.getContainerResources(batchJob, jobComponent)

container := corev1.Container{
Name: jobComponent.Name,
Expand All @@ -178,7 +221,7 @@ func (s *syncer) getContainerEnvironmentVariables(rd *radixv1.RadixDeployment, j
return environmentVariables, nil
}

func (s *syncer) getContainerResources(jobComponent *radixv1.RadixDeployJobComponent, batchJob radixv1.RadixBatchJob) corev1.ResourceRequirements {
func (s *syncer) getContainerResources(batchJob *radixv1.RadixBatchJob, jobComponent *radixv1.RadixDeployJobComponent) corev1.ResourceRequirements {
if batchJob.Resources != nil {
return operatorUtils.BuildResourceRequirement(batchJob.Resources)
}
Expand All @@ -198,7 +241,7 @@ func getContainerPorts(radixJobComponent *radixv1.RadixDeployJobComponent) []cor
return ports
}

func (s *syncer) getContainerVolumeMounts(batchJob radixv1.RadixBatchJob, radixJobComponent *radixv1.RadixDeployJobComponent, radixDeploymentName string) ([]corev1.VolumeMount, error) {
func (s *syncer) getContainerVolumeMounts(batchJob *radixv1.RadixBatchJob, radixJobComponent *radixv1.RadixDeployJobComponent, radixDeploymentName string) ([]corev1.VolumeMount, error) {
volumeMounts, err := deployment.GetRadixDeployComponentVolumeMounts(radixJobComponent, radixDeploymentName)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/batch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (s *syncer) reconcileService(batchJob radixv1.RadixBatchJob, rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, existingServices []*corev1.Service) error {
func (s *syncer) reconcileService(batchJob *radixv1.RadixBatchJob, rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, existingServices []*corev1.Service) error {
if len(jobComponent.GetPorts()) == 0 {
return nil
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/apis/batch/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ func (s *syncer) buildJobStatuses() ([]radixv1.RadixBatchJobStatus, error) {
}

for _, batchJob := range s.batch.Spec.Jobs {
jobStatuses = append(jobStatuses, s.buildBatchJobStatus(batchJob, jobs))
jobStatuses = append(jobStatuses, s.buildBatchJobStatus(&batchJob, jobs))
}

return jobStatuses, nil
}

func (s *syncer) buildBatchJobStatus(batchJob radixv1.RadixBatchJob, allJobs []*batchv1.Job) radixv1.RadixBatchJobStatus {
func (s *syncer) buildBatchJobStatus(batchJob *radixv1.RadixBatchJob, allJobs []*batchv1.Job) radixv1.RadixBatchJobStatus {
currentStatus := slice.FindAll(s.batch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJob.Name
})
Expand All @@ -125,6 +125,9 @@ func (s *syncer) buildBatchJobStatus(batchJob radixv1.RadixBatchJob, allJobs []*
Name: batchJob.Name,
Phase: radixv1.BatchJobPhaseWaiting,
}
if len(currentStatus) > 0 {
status.Restart = currentStatus[0].Restart
}

if isBatchJobStopRequested(batchJob) {
now := metav1.Now()
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/batch/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func (s *syncer) reconcile() error {
}

for i, batchJob := range s.batch.Spec.Jobs {
if err := s.reconcileService(batchJob, rd, jobComponent, existingServices); err != nil {
if err := s.reconcileService(&batchJob, rd, jobComponent, existingServices); err != nil {
return err
}

if err := s.reconcileKubeJob(batchJob, rd, jobComponent, existingJobs); err != nil {
if err := s.reconcileKubeJob(&batchJob, rd, jobComponent, existingJobs); err != nil {
return err
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/apis/batch/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func isResourceLabeledWithBatchJobName(batchJobName string, resource metav1.Obje
return kubelabels.SelectorFromSet(radixlabels.ForBatchJobName(batchJobName)).Matches(kubelabels.Set(resource.GetLabels()))
}

func isBatchJobStopRequested(batchJob radixv1.RadixBatchJob) bool {
func isBatchJobStopRequested(batchJob *radixv1.RadixBatchJob) bool {
return batchJob.Stop != nil && *batchJob.Stop
}

Expand All @@ -34,7 +34,12 @@ func isBatchJobPhaseDone(phase radixv1.RadixBatchJobPhase) bool {
}

func isBatchDone(batch *radixv1.RadixBatch) bool {
return batch.Status.Condition.Type == radixv1.BatchConditionTypeCompleted
if batch.Status.Condition.Type != radixv1.BatchConditionTypeCompleted {
return false
}
return !slice.Any(batch.Spec.Jobs, func(batchJob radixv1.RadixBatchJob) bool {
return len(batchJob.Restart) > 0
})
}

func isBatchJobDone(batch *radixv1.RadixBatch, batchJobName string) bool {
Expand Down
14 changes: 13 additions & 1 deletion pkg/apis/radix/v1/radixbatchtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type RadixBatchSpec struct {
Jobs []RadixBatchJob `json:"jobs"`
}

// Spec for a batch job
// RadixBatchJob Spec for a batch job
type RadixBatchJob struct {
// Defines the unique name of the job in a RadixBatch.
// +kubebuilder:validation:MaxLength:=63
Expand Down Expand Up @@ -84,6 +84,14 @@ type RadixBatchJob struct {
// A job that is stopped cannot be started again by setting Stop to false.
// +optional
Stop *bool `json:"stop,omitempty"`

// Controls if a job should be restarted.
// If Restart is set to new timestamp, and
// - the job is stopped - the job is started again.
// - the job is running - the job is stopped and started again.
// This timestamp set to the job's status.restart.
// +optional
Restart string `json:"restart,omitempty"`
}

// PayloadSecretKeySelector selects a key of a Secret.
Expand Down Expand Up @@ -210,6 +218,10 @@ type RadixBatchJobStatus struct {
// The number of times the container for the job has failed.
// +optional
Failed int32 `json:"failed,omitempty"`

// Timestamp of the job restart, if applied.
// +optional
Restart string `json:"restart,omitempty"`
}

// LocalObjectReference contains enough information to let you locate the
Expand Down

0 comments on commit b3e80cb

Please sign in to comment.