Skip to content

Commit

Permalink
refactor logic for attaching and cleaning up hook finalizers to make …
Browse files Browse the repository at this point in the history
…it generic

Signed-off-by: Dejan Zele Pejchev <[email protected]>
  • Loading branch information
dejanzele committed Dec 9, 2024
1 parent 9e573ae commit 376d9d0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 97 deletions.
134 changes: 39 additions & 95 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/kubernetes"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -229,10 +228,6 @@ func NewSyncContext(
if err != nil {
return nil, nil, err
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
}
ctx := &syncContext{
revision: revision,
resources: groupResources(reconciliationResult),
Expand All @@ -241,7 +236,6 @@ func NewSyncContext(
rawConfig: rawConfig,
dynamicIf: dynamicIf,
disco: disco,
clientset: clientset,
extensionsclientset: extensionsclientset,
kubectl: kubectl,
resourceOps: resourceOps,
Expand Down Expand Up @@ -299,6 +293,8 @@ func groupDiffResults(diffResultList *diff.DiffResultList) map[kubeutil.Resource
}

const (
// hookFinalizer is the finalizer added to hooks to ensure they are deleted only after the sync phase is completed.
hookFinalizer = "argoproj.io/hook-finalizer"
crdReadinessTimeout = time.Duration(3) * time.Second
)

Expand Down Expand Up @@ -337,7 +333,6 @@ type syncContext struct {
dynamicIf dynamic.Interface
disco discovery.DiscoveryInterface
extensionsclientset *clientset.Clientset
clientset *kubernetes.Clientset
kubectl kube.Kubectl
resourceOps kube.ResourceOperations
namespace string
Expand Down Expand Up @@ -487,10 +482,9 @@ func (sc *syncContext) Sync() {
return task.isHook() && task.completed()
})
for _, task := range hooksCompleted {
if task.cleanup != nil {
if err := task.cleanup(); err != nil {
sc.log.V(1).Error(err, "failed to run hook task cleanup")
}
if err := sc.removeHookFinalizer(task); err != nil {
sc.setResourceResult(task, task.syncStatus, common.OperationError, err.Error())
sc.log.Error(err, "failed to remove hook finalizer", "task", task)
}
}

Expand Down Expand Up @@ -595,6 +589,29 @@ func (sc *syncContext) filterOutOfSyncTasks(tasks syncTasks) syncTasks {
})
}

func (sc *syncContext) removeHookFinalizer(task *syncTask) error {
if task.liveObj == nil {
return nil
}
finalizers := task.targetObj.GetFinalizers()
var mutated bool
for i, finalizer := range finalizers {
if finalizer == hookFinalizer {
finalizers = append(finalizers[:i], finalizers[i+1:]...)
mutated = true
break
}
}
if mutated {
task.targetObj.SetFinalizers(finalizers)
task.liveObj.SetFinalizers(finalizers)
if err := sc.updateResource(task); err != nil {
return err
}
}
return nil
}

func (sc *syncContext) deleteHooks(hooksPendingDeletion syncTasks) {
for _, task := range hooksPendingDeletion {
err := sc.deleteResource(task)
Expand Down Expand Up @@ -698,6 +715,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
generateName := obj.GetGenerateName()
targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix))
}
targetObj.SetFinalizers(append(targetObj.GetFinalizers(), hookFinalizer))

hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj})
}
Expand All @@ -706,8 +724,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {

sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks")

sc.processHookTasks(hookTasks)

tasks := resourceTasks
tasks = append(tasks, hookTasks...)

Expand Down Expand Up @@ -850,83 +866,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
return tasks, successful
}

// processHookTasks applies additional logic to hook tasks.
func (sc *syncContext) processHookTasks(tasks syncTasks) {
for _, task := range tasks {
// This is a safety check to ensure that we process only currently running hook tasks.
if !task.isHook() || !task.pending() {
continue
}
// Safety check to ensure that the target object is not nil.
if task.targetObj == nil {
continue
}
// Currently, we only process hook tasks where the target object is a Job.
if task.targetObj.GetKind() == "Job" {
sc.processJobHookTask(task)
}
}
}

// processJobHookTask processes a hook task where the target object is a Job and has defined ttlSecondsAfterFinished.
// This addresses the issue where a Job with a ttlSecondsAfterFinished set to a low value gets deleted fast and the hook phase gets stuck.
// For more info, see issue https://github.com/argoproj/argo-cd/issues/6880
func (sc *syncContext) processJobHookTask(task *syncTask) {
hookFinalizer := "argoproj.io/hook-finalizer"

task.postprocess = func() error {
sc.log.V(1).Info("Processing hook task with a Job resource - attaching hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace())

job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{})
if err != nil {
return err
}

// Skip postprocessing if the Job does not have a ttlSecondsAfterFinished set.
if job.Spec.TTLSecondsAfterFinished == nil {
return nil
}
// Attach the hook finalizer to the Job resource so it does not get deleted before the sync phase is marked as completed.
job.Finalizers = append(job.Finalizers, hookFinalizer)

_, err = sc.clientset.
BatchV1().
Jobs(job.Namespace).
Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

task.cleanup = func() error {
sc.log.V(1).Info("Cleaning up hook task with a Job resource - removing hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace())

job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{})
if err != nil {
return err
}

// Remove the hook finalizer from the Job resource.
var filtered []string
for _, s := range job.Finalizers {
if s != hookFinalizer {
filtered = append(filtered, s)
}
}
job.Finalizers = filtered

_, err = sc.clientset.
BatchV1().
Jobs(job.Namespace).
Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
}

func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks {
isNamespaceCreationNeeded := true

Expand Down Expand Up @@ -1104,11 +1043,6 @@ func (sc *syncContext) applyObject(t *syncTask, dryRun, validate bool) (common.R
if err != nil {
return common.ResultCodeSyncFailed, err.Error()
}
if t.postprocess != nil && !dryRun {
if err := t.postprocess(); err != nil {
sc.log.Error(err, "failed to call postprocess function for task")
}
}
if kube.IsCRD(t.targetObj) && !dryRun {
crdName := t.targetObj.GetName()
if err = sc.ensureCRDReady(crdName); err != nil {
Expand Down Expand Up @@ -1228,6 +1162,16 @@ func (sc *syncContext) deleteResource(task *syncTask) error {
return resIf.Delete(context.TODO(), task.name(), sc.getDeleteOptions())
}

func (sc *syncContext) updateResource(task *syncTask) error {
sc.log.WithValues("task", task).V(1).Info("Updating resource")
resIf, err := sc.getResourceIf(task, "update")
if err != nil {
return err
}
_, err = resIf.Update(context.TODO(), task.liveObj, metav1.UpdateOptions{})
return err
}

func (sc *syncContext) getResourceIf(task *syncTask, verb string) (dynamic.ResourceInterface, error) {
apiResource, err := kube.ServerResourceForGroupVersionKind(sc.disco, task.groupVersionKind(), verb)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sync/sync_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,12 @@ func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) {
))
fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme())
syncCtx.dynamicIf = fakeDynamicClient
// Each completed hook needs to have its hook finalizer removed in an Update call to the dynamic client.
updatedCount := 0
fakeDynamicClient.PrependReactor("update", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
updatedCount += 1
return true, nil, nil
})
deletedCount := 0
fakeDynamicClient.PrependReactor("delete", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
deletedCount += 1
Expand All @@ -1381,6 +1387,7 @@ func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) {

assert.Equal(t, synccommon.OperationSucceeded, syncCtx.phase)
assert.Equal(t, 2, deletedCount)
assert.Equal(t, 2, updatedCount)
}

func TestRunSync_HooksDeletedAfterPhaseCompletedFailed(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/sync/sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ type syncTask struct {
operationState common.OperationPhase
message string
waveOverride *int
postprocess func() error
cleanup func() error
}

func ternary(val bool, a, b string) string {
Expand Down

0 comments on commit 376d9d0

Please sign in to comment.