Skip to content

Commit

Permalink
Start stopped restart scheduled job (#518)
Browse files Browse the repository at this point in the history
* Added method restart scheduled job

* Added method restart scheduled batch

* Added summary property restart scheduled job

* Updated ref

* Cleanup
  • Loading branch information
satr authored Jul 12, 2023
1 parent f6f815f commit ad17d80
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 18 deletions.
4 changes: 4 additions & 0 deletions api/deployments/models/component_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ type ScheduledJobSummary struct {
// required: true
// example: 1
FailedCount int32 `json:"failedCount"`

// Timestamp of the job restart, if applied.
// +optional
Restart string
}

// Node Defines node attributes, where pod should be scheduled
Expand Down
140 changes: 140 additions & 0 deletions api/environments/environment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (c *environmentController) GetRoutes() models.Routes {
Method: "POST",
HandlerFunc: c.StopJob,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName}/restart",
Method: "POST",
HandlerFunc: c.RestartJob,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName}",
Method: "DELETE",
Expand All @@ -168,6 +173,11 @@ func (c *environmentController) GetRoutes() models.Routes {
Method: "POST",
HandlerFunc: c.StopBatch,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName}/restart",
Method: "POST",
HandlerFunc: c.RestartBatch,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName}",
Method: "DELETE",
Expand Down Expand Up @@ -1385,6 +1395,71 @@ func (c *environmentController) StopJob(accounts models.Accounts, w http.Respons
w.WriteHeader(http.StatusNoContent)
}

// RestartJob Start a running or stopped scheduled job
func (c *environmentController) RestartJob(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation POST /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName}/restart job restartJob
// ---
// summary: Restart a running or stopped scheduled job
// parameters:
// - name: appName
// in: path
// description: Name of application
// type: string
// required: true
// - name: envName
// in: path
// description: Name of environment
// type: string
// required: true
// - name: jobComponentName
// in: path
// description: Name of job-component
// type: string
// required: true
// - name: jobName
// in: path
// description: Name of job
// type: string
// required: true
// - name: Impersonate-User
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test users (Required if Impersonate-Group is set)
// type: string
// required: false
// - name: Impersonate-Group
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test group (Required if Impersonate-User is set)
// type: array
// items:
// type: string
// required: false
// responses:
// "204":
// description: "Success"
// "400":
// description: "Invalid job"
// "401":
// description: "Unauthorized"
// "403":
// description: "Forbidden"
// "404":
// description: "Not found"

appName := mux.Vars(r)["appName"]
envName := mux.Vars(r)["envName"]
jobComponentName := mux.Vars(r)["jobComponentName"]
jobName := mux.Vars(r)["jobName"]

eh := c.environmentHandlerFactory(accounts)
err := eh.RestartJob(r.Context(), appName, envName, jobComponentName, jobName)
if err != nil {
radixhttp.ErrorResponse(w, r, err)
return
}

w.WriteHeader(http.StatusNoContent)
}

// DeleteJob Delete a job
func (c *environmentController) DeleteJob(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation DELETE /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName} job deleteJob
Expand Down Expand Up @@ -1633,6 +1708,71 @@ func (c *environmentController) StopBatch(accounts models.Accounts, w http.Respo
w.WriteHeader(http.StatusNoContent)
}

// RestartBatch Restart a scheduled or stopped batch
func (c *environmentController) RestartBatch(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation POST /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName}/restart job restartBatch
// ---
// summary: Restart a scheduled or stopped batch
// parameters:
// - name: appName
// in: path
// description: Name of application
// type: string
// required: true
// - name: envName
// in: path
// description: Name of environment
// type: string
// required: true
// - name: jobComponentName
// in: path
// description: Name of job-component
// type: string
// required: true
// - name: batchName
// in: path
// description: Name of batch
// type: string
// required: true
// - name: Impersonate-User
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test users (Required if Impersonate-Group is set)
// type: string
// required: false
// - name: Impersonate-Group
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test group (Required if Impersonate-User is set)
// type: array
// items:
// type: string
// required: false
// responses:
// "204":
// description: "Success"
// "400":
// description: "Invalid batch"
// "401":
// description: "Unauthorized"
// "403":
// description: "Forbidden"
// "404":
// description: "Not found"

appName := mux.Vars(r)["appName"]
envName := mux.Vars(r)["envName"]
jobComponentName := mux.Vars(r)["jobComponentName"]
batchName := mux.Vars(r)["batchName"]

eh := c.environmentHandlerFactory(accounts)
err := eh.RestartBatch(r.Context(), appName, envName, jobComponentName, batchName)
if err != nil {
radixhttp.ErrorResponse(w, r, err)
return
}

w.WriteHeader(http.StatusNoContent)
}

// DeleteBatch Delete a batch
func (c *environmentController) DeleteBatch(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation DELETE /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName} job deleteBatch
Expand Down
66 changes: 54 additions & 12 deletions api/environments/job_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"sort"
"strings"
"time"

deploymentModels "github.com/equinor/radix-api/api/deployments/models"
environmentModels "github.com/equinor/radix-api/api/environments/models"
Expand Down Expand Up @@ -73,31 +74,53 @@ func (eh EnvironmentHandler) GetJob(ctx context.Context, appName, envName, jobCo

// StopJob Stop job by name
func (eh EnvironmentHandler) StopJob(ctx context.Context, appName, envName, jobComponentName, jobName string) error {
batchName, batchJobName, ok := parseBatchAndJobNameFromScheduledJobName(jobName)
if !ok {
return jobNotFoundError(jobName)
}

batch, err := eh.getRadixBatch(ctx, appName, envName, jobComponentName, batchName, "")
batch, jobId, batchJobName, err := eh.getBatchJob(ctx, appName, envName, jobComponentName, jobName)
if err != nil {
return err
}

idx := slice.FindIndex(batch.Spec.Jobs, func(job radixv1.RadixBatchJob) bool { return job.Name == batchJobName })
if idx == -1 {
return jobNotFoundError(jobName)
}

nonStoppableJob := slice.FindAll(batch.Status.JobStatuses, func(js radixv1.RadixBatchJobStatus) bool { return js.Name == batchJobName && !isBatchJobStoppable(js) })
if len(nonStoppableJob) > 0 {
return radixhttp.ValidationError(jobName, fmt.Sprintf("invalid job running state=%s", nonStoppableJob[0].Phase))
}

batch.Spec.Jobs[idx].Stop = radixutils.BoolPtr(true)
batch.Spec.Jobs[jobId].Stop = radixutils.BoolPtr(true)
_, err = eh.accounts.UserAccount.RadixClient.RadixV1().RadixBatches(batch.GetNamespace()).Update(ctx, batch, metav1.UpdateOptions{})
return err
}

// RestartJob Start running or stopped job by name
func (eh EnvironmentHandler) RestartJob(ctx context.Context, appName, envName, jobComponentName, jobName string) error {
batch, jobIdx, _, err := eh.getBatchJob(ctx, appName, envName, jobComponentName, jobName)
if err != nil {
return err
}

setRestartJobTimeout(batch, jobIdx, radixutils.FormatTimestamp(time.Now()))
_, err = eh.accounts.UserAccount.RadixClient.RadixV1().RadixBatches(batch.GetNamespace()).Update(ctx, batch, metav1.UpdateOptions{})
return err
}

// RestartBatch Restart a scheduled or stopped batch
func (eh EnvironmentHandler) RestartBatch(ctx context.Context, appName, envName, jobComponentName, batchName string) error {
batch, err := eh.getRadixBatch(ctx, appName, envName, jobComponentName, batchName, kube.RadixBatchTypeBatch)
if err != nil {
return err
}

restartTimestamp := radixutils.FormatTimestamp(time.Now())
for jobIdx := 0; jobIdx < len(batch.Spec.Jobs); jobIdx++ {
setRestartJobTimeout(batch, jobIdx, restartTimestamp)
}
_, err = eh.accounts.UserAccount.RadixClient.RadixV1().RadixBatches(batch.GetNamespace()).Update(ctx, batch, metav1.UpdateOptions{})
return err
}

func setRestartJobTimeout(batch *radixv1.RadixBatch, jobIdx int, restartTimestamp string) {
batch.Spec.Jobs[jobIdx].Stop = nil
batch.Spec.Jobs[jobIdx].Restart = restartTimestamp
}

// DeleteJob Delete job by name
func (eh EnvironmentHandler) DeleteJob(ctx context.Context, appName, envName, jobComponentName, jobName string) error {
batchName, _, ok := parseBatchAndJobNameFromScheduledJobName(jobName)
Expand Down Expand Up @@ -456,6 +479,7 @@ func (eh EnvironmentHandler) getScheduledJobSummary(batch *radixv1.RadixBatch, j
summary.Ended = radixutils.FormatTime(status.EndTime)
summary.Message = status.Message
summary.FailedCount = status.Failed
summary.Restart = status.Restart
}

return summary
Expand Down Expand Up @@ -550,3 +574,21 @@ func parseBatchAndJobNameFromScheduledJobName(scheduleJobName string) (batchName
ok = true
return
}

func (eh EnvironmentHandler) getBatchJob(ctx context.Context, appName string, envName string, jobComponentName string, jobName string) (*radixv1.RadixBatch, int, string, error) {
batchName, batchJobName, ok := parseBatchAndJobNameFromScheduledJobName(jobName)
if !ok {
return nil, 0, "", jobNotFoundError(jobName)
}

batch, err := eh.getRadixBatch(ctx, appName, envName, jobComponentName, batchName, "")
if err != nil {
return nil, 0, "", err
}

idx := slice.FindIndex(batch.Spec.Jobs, func(job radixv1.RadixBatchJob) bool { return job.Name == batchJobName })
if idx == -1 {
return nil, 0, "", jobNotFoundError(jobName)
}
return batch, idx, batchJobName, err
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/equinor/radix-common v1.3.0
github.com/equinor/radix-job-scheduler v1.7.10
github.com/equinor/radix-operator v1.38.3
github.com/equinor/radix-operator v1.39.0
github.com/evanphx/json-patch/v5 v5.6.0
github.com/go-openapi/errors v0.20.3
github.com/go-openapi/strfmt v0.21.7
Expand All @@ -30,7 +30,7 @@ require (
k8s.io/apimachinery v0.25.8
k8s.io/client-go v0.25.8
knative.dev/pkg v0.0.0-20230320014357-4c84b1b51ee8
sigs.k8s.io/secrets-store-csi-driver v1.3.2
sigs.k8s.io/secrets-store-csi-driver v1.3.3
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/equinor/radix-common v1.3.0 h1:ud2AofqF8ukCm9t1bUz+CCidRYG9Yh8g6myLWl
github.com/equinor/radix-common v1.3.0/go.mod h1:E0gZfGfr8MNj1zuzjgyf1oV+1pry1VNmNZP8TscTq7U=
github.com/equinor/radix-job-scheduler v1.7.10 h1:V9gcWEy5vfldsfvmfDqLsTe2nD3Yp2+XAC9dMB9Zrjg=
github.com/equinor/radix-job-scheduler v1.7.10/go.mod h1:a0aRY1vOB34fabTTueaYapdAcb0FKjZSDBNcm8wJn6A=
github.com/equinor/radix-operator v1.38.3 h1:x6WK3j0cRivZJcDZpsE4joZIV+bawmJS2yJjvYdNt2g=
github.com/equinor/radix-operator v1.38.3/go.mod h1:zGs7q/A+P5z+ZGlLhb20Ch5nFv33fY+/at647diEaqU=
github.com/equinor/radix-operator v1.39.0 h1:sWTdYc3JReT6dgz051Vvf4cnJ2bkxkpNVzklECUWXjg=
github.com/equinor/radix-operator v1.39.0/go.mod h1:Xvl+5C6BJi5RTyJ0P5ZfwzsHcUG1KDiOOb5OOZJib2Q=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
Expand Down Expand Up @@ -831,8 +831,8 @@ sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE
sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/secrets-store-csi-driver v1.3.2 h1:8RKutIS8mWMazII4la4p8+oO93XdSAI9ho3sEvtu/Q8=
sigs.k8s.io/secrets-store-csi-driver v1.3.2/go.mod h1:jh6wML45aTbxT2YZtU4khzSm8JYxwVrQbhsum+WR6j8=
sigs.k8s.io/secrets-store-csi-driver v1.3.3 h1:8UXTMIO4kZqGLJ65UWRfJXbRnb6PU6olP+vSriGZRp0=
sigs.k8s.io/secrets-store-csi-driver v1.3.3/go.mod h1:jh6wML45aTbxT2YZtU4khzSm8JYxwVrQbhsum+WR6j8=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
Expand Down

0 comments on commit ad17d80

Please sign in to comment.