Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start stopped restart scheduled job #518

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/deployments/models/component_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ type ScheduledJobSummary struct {
// required: true
// example: 1
FailedCount int32 `json:"failedCount"`

// Timestamp of the job restart, if applied.
// +optional
Restart string
}

// Node Defines node attributes, where pod should be scheduled
Expand Down
140 changes: 140 additions & 0 deletions api/environments/environment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (c *environmentController) GetRoutes() models.Routes {
Method: "POST",
HandlerFunc: c.StopJob,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName}/restart",
Method: "POST",
HandlerFunc: c.RestartJob,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName}",
Method: "DELETE",
Expand All @@ -168,6 +173,11 @@ func (c *environmentController) GetRoutes() models.Routes {
Method: "POST",
HandlerFunc: c.StopBatch,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName}/restart",
Method: "POST",
HandlerFunc: c.RestartBatch,
},
models.Route{
Path: rootPath + "/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName}",
Method: "DELETE",
Expand Down Expand Up @@ -1385,6 +1395,71 @@ func (c *environmentController) StopJob(accounts models.Accounts, w http.Respons
w.WriteHeader(http.StatusNoContent)
}

// RestartJob Start a running or stopped scheduled job
func (c *environmentController) RestartJob(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation POST /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName}/restart job restartJob
// ---
// summary: Restart a running or stopped scheduled job
// parameters:
// - name: appName
// in: path
// description: Name of application
// type: string
// required: true
// - name: envName
// in: path
// description: Name of environment
// type: string
// required: true
// - name: jobComponentName
// in: path
// description: Name of job-component
// type: string
// required: true
// - name: jobName
// in: path
// description: Name of job
// type: string
// required: true
// - name: Impersonate-User
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test users (Required if Impersonate-Group is set)
// type: string
// required: false
// - name: Impersonate-Group
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test group (Required if Impersonate-User is set)
// type: array
// items:
// type: string
// required: false
// responses:
// "204":
// description: "Success"
// "400":
// description: "Invalid job"
// "401":
// description: "Unauthorized"
// "403":
// description: "Forbidden"
// "404":
// description: "Not found"

appName := mux.Vars(r)["appName"]
envName := mux.Vars(r)["envName"]
jobComponentName := mux.Vars(r)["jobComponentName"]
jobName := mux.Vars(r)["jobName"]

eh := c.environmentHandlerFactory(accounts)
err := eh.RestartJob(r.Context(), appName, envName, jobComponentName, jobName)
if err != nil {
radixhttp.ErrorResponse(w, r, err)
return
}

w.WriteHeader(http.StatusNoContent)
}

// DeleteJob Delete a job
func (c *environmentController) DeleteJob(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation DELETE /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/jobs/{jobName} job deleteJob
Expand Down Expand Up @@ -1633,6 +1708,71 @@ func (c *environmentController) StopBatch(accounts models.Accounts, w http.Respo
w.WriteHeader(http.StatusNoContent)
}

// RestartBatch Restart a scheduled or stopped batch
func (c *environmentController) RestartBatch(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation POST /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName}/restart job restartBatch
// ---
// summary: Restart a scheduled or stopped batch
// parameters:
// - name: appName
// in: path
// description: Name of application
// type: string
// required: true
// - name: envName
// in: path
// description: Name of environment
// type: string
// required: true
// - name: jobComponentName
// in: path
// description: Name of job-component
// type: string
// required: true
// - name: batchName
// in: path
// description: Name of batch
// type: string
// required: true
// - name: Impersonate-User
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test users (Required if Impersonate-Group is set)
// type: string
// required: false
// - name: Impersonate-Group
// in: header
// description: Works only with custom setup of cluster. Allow impersonation of test group (Required if Impersonate-User is set)
// type: array
// items:
// type: string
// required: false
// responses:
// "204":
// description: "Success"
// "400":
// description: "Invalid batch"
// "401":
// description: "Unauthorized"
// "403":
// description: "Forbidden"
// "404":
// description: "Not found"

appName := mux.Vars(r)["appName"]
envName := mux.Vars(r)["envName"]
jobComponentName := mux.Vars(r)["jobComponentName"]
batchName := mux.Vars(r)["batchName"]

eh := c.environmentHandlerFactory(accounts)
err := eh.RestartBatch(r.Context(), appName, envName, jobComponentName, batchName)
if err != nil {
radixhttp.ErrorResponse(w, r, err)
return
}

w.WriteHeader(http.StatusNoContent)
}

// DeleteBatch Delete a batch
func (c *environmentController) DeleteBatch(accounts models.Accounts, w http.ResponseWriter, r *http.Request) {
// swagger:operation DELETE /applications/{appName}/environments/{envName}/jobcomponents/{jobComponentName}/batches/{batchName} job deleteBatch
Expand Down
66 changes: 54 additions & 12 deletions api/environments/job_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"sort"
"strings"
"time"

deploymentModels "github.com/equinor/radix-api/api/deployments/models"
environmentModels "github.com/equinor/radix-api/api/environments/models"
Expand Down Expand Up @@ -73,31 +74,53 @@ func (eh EnvironmentHandler) GetJob(ctx context.Context, appName, envName, jobCo

// StopJob Stop job by name
func (eh EnvironmentHandler) StopJob(ctx context.Context, appName, envName, jobComponentName, jobName string) error {
batchName, batchJobName, ok := parseBatchAndJobNameFromScheduledJobName(jobName)
if !ok {
return jobNotFoundError(jobName)
}

batch, err := eh.getRadixBatch(ctx, appName, envName, jobComponentName, batchName, "")
batch, jobId, batchJobName, err := eh.getBatchJob(ctx, appName, envName, jobComponentName, jobName)
if err != nil {
return err
}

idx := slice.FindIndex(batch.Spec.Jobs, func(job radixv1.RadixBatchJob) bool { return job.Name == batchJobName })
if idx == -1 {
return jobNotFoundError(jobName)
}

nonStoppableJob := slice.FindAll(batch.Status.JobStatuses, func(js radixv1.RadixBatchJobStatus) bool { return js.Name == batchJobName && !isBatchJobStoppable(js) })
if len(nonStoppableJob) > 0 {
return radixhttp.ValidationError(jobName, fmt.Sprintf("invalid job running state=%s", nonStoppableJob[0].Phase))
}

batch.Spec.Jobs[idx].Stop = radixutils.BoolPtr(true)
batch.Spec.Jobs[jobId].Stop = radixutils.BoolPtr(true)
_, err = eh.accounts.UserAccount.RadixClient.RadixV1().RadixBatches(batch.GetNamespace()).Update(ctx, batch, metav1.UpdateOptions{})
return err
}

// RestartJob Start running or stopped job by name
func (eh EnvironmentHandler) RestartJob(ctx context.Context, appName, envName, jobComponentName, jobName string) error {
batch, jobIdx, _, err := eh.getBatchJob(ctx, appName, envName, jobComponentName, jobName)
if err != nil {
return err
}

setRestartJobTimeout(batch, jobIdx, radixutils.FormatTimestamp(time.Now()))
_, err = eh.accounts.UserAccount.RadixClient.RadixV1().RadixBatches(batch.GetNamespace()).Update(ctx, batch, metav1.UpdateOptions{})
return err
}

// RestartBatch Restart a scheduled or stopped batch
func (eh EnvironmentHandler) RestartBatch(ctx context.Context, appName, envName, jobComponentName, batchName string) error {
batch, err := eh.getRadixBatch(ctx, appName, envName, jobComponentName, batchName, kube.RadixBatchTypeBatch)
if err != nil {
return err
}

restartTimestamp := radixutils.FormatTimestamp(time.Now())
for jobIdx := 0; jobIdx < len(batch.Spec.Jobs); jobIdx++ {
setRestartJobTimeout(batch, jobIdx, restartTimestamp)
}
_, err = eh.accounts.UserAccount.RadixClient.RadixV1().RadixBatches(batch.GetNamespace()).Update(ctx, batch, metav1.UpdateOptions{})
return err
}

func setRestartJobTimeout(batch *radixv1.RadixBatch, jobIdx int, restartTimestamp string) {
batch.Spec.Jobs[jobIdx].Stop = nil
batch.Spec.Jobs[jobIdx].Restart = restartTimestamp
}

// DeleteJob Delete job by name
func (eh EnvironmentHandler) DeleteJob(ctx context.Context, appName, envName, jobComponentName, jobName string) error {
batchName, _, ok := parseBatchAndJobNameFromScheduledJobName(jobName)
Expand Down Expand Up @@ -456,6 +479,7 @@ func (eh EnvironmentHandler) getScheduledJobSummary(batch *radixv1.RadixBatch, j
summary.Ended = radixutils.FormatTime(status.EndTime)
summary.Message = status.Message
summary.FailedCount = status.Failed
summary.Restart = status.Restart
}

return summary
Expand Down Expand Up @@ -550,3 +574,21 @@ func parseBatchAndJobNameFromScheduledJobName(scheduleJobName string) (batchName
ok = true
return
}

func (eh EnvironmentHandler) getBatchJob(ctx context.Context, appName string, envName string, jobComponentName string, jobName string) (*radixv1.RadixBatch, int, string, error) {
batchName, batchJobName, ok := parseBatchAndJobNameFromScheduledJobName(jobName)
if !ok {
return nil, 0, "", jobNotFoundError(jobName)
}

batch, err := eh.getRadixBatch(ctx, appName, envName, jobComponentName, batchName, "")
if err != nil {
return nil, 0, "", err
}

idx := slice.FindIndex(batch.Spec.Jobs, func(job radixv1.RadixBatchJob) bool { return job.Name == batchJobName })
if idx == -1 {
return nil, 0, "", jobNotFoundError(jobName)
}
return batch, idx, batchJobName, err
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/equinor/radix-common v1.3.0
github.com/equinor/radix-job-scheduler v1.7.10
github.com/equinor/radix-operator v1.38.3
github.com/equinor/radix-operator v1.39.0
github.com/evanphx/json-patch/v5 v5.6.0
github.com/go-openapi/errors v0.20.3
github.com/go-openapi/strfmt v0.21.7
Expand All @@ -30,7 +30,7 @@ require (
k8s.io/apimachinery v0.25.8
k8s.io/client-go v0.25.8
knative.dev/pkg v0.0.0-20230320014357-4c84b1b51ee8
sigs.k8s.io/secrets-store-csi-driver v1.3.2
sigs.k8s.io/secrets-store-csi-driver v1.3.3
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/equinor/radix-common v1.3.0 h1:ud2AofqF8ukCm9t1bUz+CCidRYG9Yh8g6myLWl
github.com/equinor/radix-common v1.3.0/go.mod h1:E0gZfGfr8MNj1zuzjgyf1oV+1pry1VNmNZP8TscTq7U=
github.com/equinor/radix-job-scheduler v1.7.10 h1:V9gcWEy5vfldsfvmfDqLsTe2nD3Yp2+XAC9dMB9Zrjg=
github.com/equinor/radix-job-scheduler v1.7.10/go.mod h1:a0aRY1vOB34fabTTueaYapdAcb0FKjZSDBNcm8wJn6A=
github.com/equinor/radix-operator v1.38.3 h1:x6WK3j0cRivZJcDZpsE4joZIV+bawmJS2yJjvYdNt2g=
github.com/equinor/radix-operator v1.38.3/go.mod h1:zGs7q/A+P5z+ZGlLhb20Ch5nFv33fY+/at647diEaqU=
github.com/equinor/radix-operator v1.39.0 h1:sWTdYc3JReT6dgz051Vvf4cnJ2bkxkpNVzklECUWXjg=
github.com/equinor/radix-operator v1.39.0/go.mod h1:Xvl+5C6BJi5RTyJ0P5ZfwzsHcUG1KDiOOb5OOZJib2Q=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
Expand Down Expand Up @@ -831,8 +831,8 @@ sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE
sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/secrets-store-csi-driver v1.3.2 h1:8RKutIS8mWMazII4la4p8+oO93XdSAI9ho3sEvtu/Q8=
sigs.k8s.io/secrets-store-csi-driver v1.3.2/go.mod h1:jh6wML45aTbxT2YZtU4khzSm8JYxwVrQbhsum+WR6j8=
sigs.k8s.io/secrets-store-csi-driver v1.3.3 h1:8UXTMIO4kZqGLJ65UWRfJXbRnb6PU6olP+vSriGZRp0=
sigs.k8s.io/secrets-store-csi-driver v1.3.3/go.mod h1:jh6wML45aTbxT2YZtU4khzSm8JYxwVrQbhsum+WR6j8=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
Expand Down