Skip to content

Commit

Permalink
Replaced patch with update and retry in case of conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Helber Belmiro <[email protected]>
  • Loading branch information
hbelmiro committed Dec 23, 2024
1 parent 8a082e4 commit 76d08ae
Showing 1 changed file with 31 additions and 38 deletions.
69 changes: 31 additions & 38 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"io"
"net"
"reflect"
Expand Down Expand Up @@ -582,30 +583,39 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

nextJob:
for i := range jobs {
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToReconcileSwfCrsError(err)
}

newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i])
if err != nil {
return failedToReconcileSwfCrsError(err)
}
retryJob:
for {
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToReconcileSwfCrsError(err)
}

currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
if err != nil {
return failedToReconcileSwfCrsError(err)
}
newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i])
if err != nil {
return failedToReconcileSwfCrsError(err)
}

if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
err = r.patchSwfCrSpec(ctx, jobs[i].Namespace, jobs[i].K8SName, newScheduledWorkflow.Spec)
currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
if err != nil {
if util.IsNotFound(errors.Cause(err)) {
continue
}
return failedToReconcileSwfCrsError(err)
}

if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
newScheduledWorkflow.Name = currentScheduledWorkflow.Name
newScheduledWorkflow.ResourceVersion = currentScheduledWorkflow.ResourceVersion
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, newScheduledWorkflow)
if err != nil {
if apierrors.IsConflict(errors.Unwrap(err)) {
continue retryJob
} else if util.IsNotFound(errors.Cause(err)) {
continue nextJob
}
return failedToReconcileSwfCrsError(err)
}
}
continue nextJob
}
}

Expand All @@ -616,28 +626,11 @@ func failedToReconcileSwfCrsError(err error) error {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

func (r *ResourceManager) patchSwfCrSpec(ctx context.Context, k8sNamespace string, crdName string, newSpec interface{}) error {
patchPayload := map[string]interface{}{
"spec": newSpec,
}

patchBytes, err := json.Marshal(patchPayload)
if err != nil {
return util.NewInternalServerError(err,
"Failed to marshal patch spec")
}

_, err = r.getScheduledWorkflowClient(k8sNamespace).Patch(
ctx,
crdName,
types.MergePatchType,
patchBytes,
)
func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
if err != nil {
return util.NewInternalServerError(err,
"Failed to patch ScheduledWorkflow")
return util.Wrap(err, "Failed to update ScheduledWorkflow")
}

return nil
}

Expand Down

0 comments on commit 76d08ae

Please sign in to comment.