From f02987e8fff61431342ecc8bd6dfa34e040b4405 Mon Sep 17 00:00:00 2001 From: Dejan Zele Pejchev Date: Mon, 9 Dec 2024 16:27:42 +0100 Subject: [PATCH] refactor logic for attaching and cleaning up hook finalizers to make it generic Signed-off-by: Dejan Zele Pejchev --- pkg/health/health.go | 14 +--- pkg/sync/hook/hook.go | 15 ++++ pkg/sync/sync_context.go | 153 +++++++++++++--------------------- pkg/sync/sync_context_test.go | 7 ++ pkg/sync/sync_task.go | 2 - 5 files changed, 82 insertions(+), 109 deletions(-) diff --git a/pkg/health/health.go b/pkg/health/health.go index 6cb37e879..f9b5022ff 100644 --- a/pkg/health/health.go +++ b/pkg/health/health.go @@ -1,6 +1,7 @@ package health import ( + "github.com/argoproj/gitops-engine/pkg/sync/hook" "github.com/argoproj/gitops-engine/pkg/utils/kube" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -64,7 +65,7 @@ func IsWorse(current, new HealthStatusCode) bool { // GetResourceHealth returns the health of a k8s resource func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOverride) (health *HealthStatus, err error) { - if obj.GetDeletionTimestamp() != nil && !hasHookFinalizer(obj) { + if obj.GetDeletionTimestamp() != nil && !hook.HasHookFinalizer(obj) { return &HealthStatus{ Status: HealthStatusProgressing, Message: "Pending deletion", @@ -97,17 +98,6 @@ func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOver } -func hasHookFinalizer(obj *unstructured.Unstructured) bool { - hookFinalizer := "argoproj.io/hook-finalizer" - finalizers := obj.GetFinalizers() - for _, finalizer := range finalizers { - if finalizer == hookFinalizer { - return true - } - } - return false -} - // GetHealthCheckFunc returns built-in health check function or nil if health check is not supported func GetHealthCheckFunc(gvk schema.GroupVersionKind) func(obj *unstructured.Unstructured) (*HealthStatus, error) { switch gvk.Group { diff --git a/pkg/sync/hook/hook.go b/pkg/sync/hook/hook.go index 7e0c38752..66dfc26e5 100644 --- a/pkg/sync/hook/hook.go +++ b/pkg/sync/hook/hook.go @@ -8,6 +8,21 @@ import ( resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource" ) +const ( + // HookFinalizer is the finalizer added to hooks to ensure they are deleted only after the sync phase is completed. + HookFinalizer = "argocd.argoproj.io/hook-finalizer" +) + +func HasHookFinalizer(obj *unstructured.Unstructured) bool { + finalizers := obj.GetFinalizers() + for _, finalizer := range finalizers { + if finalizer == HookFinalizer { + return true + } + } + return false +} + func IsHook(obj *unstructured.Unstructured) bool { _, ok := obj.GetAnnotations()[common.AnnotationKeyHook] if ok { diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index f01b34795..b9a3a433d 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "k8s.io/client-go/kubernetes" "sort" "strings" "sync" @@ -229,10 +228,6 @@ func NewSyncContext( if err != nil { return nil, nil, err } - clientset, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, nil, err - } ctx := &syncContext{ revision: revision, resources: groupResources(reconciliationResult), @@ -241,7 +236,6 @@ func NewSyncContext( rawConfig: rawConfig, dynamicIf: dynamicIf, disco: disco, - clientset: clientset, extensionsclientset: extensionsclientset, kubectl: kubectl, resourceOps: resourceOps, @@ -337,7 +331,6 @@ type syncContext struct { dynamicIf dynamic.Interface disco discovery.DiscoveryInterface extensionsclientset *clientset.Clientset - clientset *kubernetes.Clientset kubectl kube.Kubectl resourceOps kube.ResourceOperations namespace string @@ -487,10 +480,8 @@ func (sc *syncContext) Sync() { return task.isHook() && task.completed() }) for _, task := range hooksCompleted { - if task.cleanup != nil { - if err := task.cleanup(); err != nil { - sc.log.V(1).Error(err, "failed to run hook task cleanup") - } + if err := sc.removeHookFinalizer(task); err != nil { + sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err)) } } @@ -595,6 +586,56 @@ func (sc *syncContext) filterOutOfSyncTasks(tasks syncTasks) syncTasks { }) } +func (sc *syncContext) removeHookFinalizer(task *syncTask) error { + if task.liveObj == nil { + return nil + } + finalizers := task.targetObj.GetFinalizers() + var mutated bool + for i, finalizer := range finalizers { + if finalizer == hook.HookFinalizer { + finalizers = append(finalizers[:i], finalizers[i+1:]...) + mutated = true + break + } + } + if mutated { + task.targetObj.SetFinalizers(finalizers) + task.liveObj.SetFinalizers(finalizers) + // The cached live object may be stale in the controller cache, and the actual object may have been updated in the meantime, + // and Kubernetes API will return a conflict error on the Update call. + // In that case, we need to get the latest version of the object and retry the update. + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + updateErr := sc.updateResource(task) + if apierr.IsConflict(updateErr) { + sc.log.WithValues("task", task).V(1).Info("Retrying hook finalizer removal due to conflict on update") + resIf, err := sc.getResourceIf(task, "get") + if err != nil { + return err + } + liveObj, err := resIf.Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + task.liveObj = liveObj + } + return updateErr + }) + + } + return nil +} + +func (sc *syncContext) updateResource(task *syncTask) error { + sc.log.WithValues("task", task).V(1).Info("Updating resource") + resIf, err := sc.getResourceIf(task, "update") + if err != nil { + return err + } + _, err = resIf.Update(context.TODO(), task.liveObj, metav1.UpdateOptions{}) + return err +} + func (sc *syncContext) deleteHooks(hooksPendingDeletion syncTasks) { for _, task := range hooksPendingDeletion { err := sc.deleteResource(task) @@ -698,6 +739,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { generateName := obj.GetGenerateName() targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix)) } + targetObj.SetFinalizers(append(targetObj.GetFinalizers(), hook.HookFinalizer)) hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj}) } @@ -706,8 +748,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks") - sc.processHookTasks(hookTasks) - tasks := resourceTasks tasks = append(tasks, hookTasks...) @@ -850,83 +890,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { return tasks, successful } -// processHookTasks applies additional logic to hook tasks. -func (sc *syncContext) processHookTasks(tasks syncTasks) { - for _, task := range tasks { - // This is a safety check to ensure that we process only currently running hook tasks. - if !task.isHook() || !task.pending() { - continue - } - // Safety check to ensure that the target object is not nil. - if task.targetObj == nil { - continue - } - // Currently, we only process hook tasks where the target object is a Job. - if task.targetObj.GetKind() == "Job" { - sc.processJobHookTask(task) - } - } -} - -// processJobHookTask processes a hook task where the target object is a Job and has defined ttlSecondsAfterFinished. -// This addresses the issue where a Job with a ttlSecondsAfterFinished set to a low value gets deleted fast and the hook phase gets stuck. -// For more info, see issue https://github.com/argoproj/argo-cd/issues/6880 -func (sc *syncContext) processJobHookTask(task *syncTask) { - hookFinalizer := "argoproj.io/hook-finalizer" - - task.postprocess = func() error { - sc.log.V(1).Info("Processing hook task with a Job resource - attaching hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace()) - - job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{}) - if err != nil { - return err - } - - // Skip postprocessing if the Job does not have a ttlSecondsAfterFinished set. - if job.Spec.TTLSecondsAfterFinished == nil { - return nil - } - // Attach the hook finalizer to the Job resource so it does not get deleted before the sync phase is marked as completed. - job.Finalizers = append(job.Finalizers, hookFinalizer) - - _, err = sc.clientset. - BatchV1(). - Jobs(job.Namespace). - Update(context.TODO(), job, metav1.UpdateOptions{}) - if err != nil { - return err - } - return nil - } - - task.cleanup = func() error { - sc.log.V(1).Info("Cleaning up hook task with a Job resource - removing hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace()) - - job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{}) - if err != nil { - return err - } - - // Remove the hook finalizer from the Job resource. - var filtered []string - for _, s := range job.Finalizers { - if s != hookFinalizer { - filtered = append(filtered, s) - } - } - job.Finalizers = filtered - - _, err = sc.clientset. - BatchV1(). - Jobs(job.Namespace). - Update(context.TODO(), job, metav1.UpdateOptions{}) - if err != nil { - return err - } - return nil - } -} - func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks { isNamespaceCreationNeeded := true @@ -1104,11 +1067,6 @@ func (sc *syncContext) applyObject(t *syncTask, dryRun, validate bool) (common.R if err != nil { return common.ResultCodeSyncFailed, err.Error() } - if t.postprocess != nil && !dryRun { - if err := t.postprocess(); err != nil { - sc.log.Error(err, "failed to call postprocess function for task") - } - } if kube.IsCRD(t.targetObj) && !dryRun { crdName := t.targetObj.GetName() if err = sc.ensureCRDReady(crdName); err != nil { @@ -1195,6 +1153,11 @@ func (sc *syncContext) Terminate() { if !task.isHook() || task.liveObj == nil { continue } + if err := sc.removeHookFinalizer(task); err != nil { + sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err)) + terminateSuccessful = false + continue + } phase, msg, err := sc.getOperationPhase(task.liveObj) if err != nil { sc.setOperationPhase(common.OperationError, fmt.Sprintf("Failed to get hook health: %v", err)) diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 7e416d20b..f9dd38e66 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -1362,6 +1362,12 @@ func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) { )) fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) syncCtx.dynamicIf = fakeDynamicClient + // Each completed hook needs to have its hook finalizer removed in an Update call to the dynamic client. + updatedCount := 0 + fakeDynamicClient.PrependReactor("update", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + updatedCount += 1 + return true, nil, nil + }) deletedCount := 0 fakeDynamicClient.PrependReactor("delete", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { deletedCount += 1 @@ -1381,6 +1387,7 @@ func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) { assert.Equal(t, synccommon.OperationSucceeded, syncCtx.phase) assert.Equal(t, 2, deletedCount) + assert.Equal(t, 2, updatedCount) } func TestRunSync_HooksDeletedAfterPhaseCompletedFailed(t *testing.T) { diff --git a/pkg/sync/sync_task.go b/pkg/sync/sync_task.go index 289993ba7..01c67a98b 100644 --- a/pkg/sync/sync_task.go +++ b/pkg/sync/sync_task.go @@ -24,8 +24,6 @@ type syncTask struct { operationState common.OperationPhase message string waveOverride *int - postprocess func() error - cleanup func() error } func ternary(val bool, a, b string) string {