Skip to content

Commit

Permalink
refactor logic for attaching and cleaning up hook finalizers to make …
Browse files Browse the repository at this point in the history
…it generic

Signed-off-by: Dejan Zele Pejchev <[email protected]>
  • Loading branch information
dejanzele committed Dec 11, 2024
1 parent 9e573ae commit dd67dd2
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 109 deletions.
14 changes: 2 additions & 12 deletions pkg/health/health.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package health

import (
"github.com/argoproj/gitops-engine/pkg/sync/hook"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -64,7 +65,7 @@ func IsWorse(current, new HealthStatusCode) bool {

// GetResourceHealth returns the health of a k8s resource
func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOverride) (health *HealthStatus, err error) {
if obj.GetDeletionTimestamp() != nil && !hasHookFinalizer(obj) {
if obj.GetDeletionTimestamp() != nil && !hook.HasHookFinalizer(obj) {
return &HealthStatus{
Status: HealthStatusProgressing,
Message: "Pending deletion",
Expand Down Expand Up @@ -97,17 +98,6 @@ func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOver

}

func hasHookFinalizer(obj *unstructured.Unstructured) bool {
hookFinalizer := "argoproj.io/hook-finalizer"
finalizers := obj.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == hookFinalizer {
return true
}
}
return false
}

// GetHealthCheckFunc returns built-in health check function or nil if health check is not supported
func GetHealthCheckFunc(gvk schema.GroupVersionKind) func(obj *unstructured.Unstructured) (*HealthStatus, error) {
switch gvk.Group {
Expand Down
15 changes: 15 additions & 0 deletions pkg/sync/hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ import (
resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource"
)

const (
// HookFinalizer is the finalizer added to hooks to ensure they are deleted only after the sync phase is completed.
HookFinalizer = "argocd.argoproj.io/hook-finalizer"
)

func HasHookFinalizer(obj *unstructured.Unstructured) bool {
finalizers := obj.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == HookFinalizer {
return true
}
}
return false
}

func IsHook(obj *unstructured.Unstructured) bool {
_, ok := obj.GetAnnotations()[common.AnnotationKeyHook]
if ok {
Expand Down
136 changes: 41 additions & 95 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/kubernetes"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -229,10 +228,6 @@ func NewSyncContext(
if err != nil {
return nil, nil, err
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
}
ctx := &syncContext{
revision: revision,
resources: groupResources(reconciliationResult),
Expand All @@ -241,7 +236,6 @@ func NewSyncContext(
rawConfig: rawConfig,
dynamicIf: dynamicIf,
disco: disco,
clientset: clientset,
extensionsclientset: extensionsclientset,
kubectl: kubectl,
resourceOps: resourceOps,
Expand Down Expand Up @@ -337,7 +331,6 @@ type syncContext struct {
dynamicIf dynamic.Interface
disco discovery.DiscoveryInterface
extensionsclientset *clientset.Clientset
clientset *kubernetes.Clientset
kubectl kube.Kubectl
resourceOps kube.ResourceOperations
namespace string
Expand Down Expand Up @@ -487,10 +480,8 @@ func (sc *syncContext) Sync() {
return task.isHook() && task.completed()
})
for _, task := range hooksCompleted {
if task.cleanup != nil {
if err := task.cleanup(); err != nil {
sc.log.V(1).Error(err, "failed to run hook task cleanup")
}
if err := sc.removeHookFinalizer(task); err != nil {
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err))
}
}

Expand Down Expand Up @@ -595,6 +586,29 @@ func (sc *syncContext) filterOutOfSyncTasks(tasks syncTasks) syncTasks {
})
}

func (sc *syncContext) removeHookFinalizer(task *syncTask) error {
if task.liveObj == nil {
return nil
}
finalizers := task.targetObj.GetFinalizers()
var mutated bool
for i, finalizer := range finalizers {
if finalizer == hook.HookFinalizer {
finalizers = append(finalizers[:i], finalizers[i+1:]...)
mutated = true
break
}
}
if mutated {
task.targetObj.SetFinalizers(finalizers)
task.liveObj.SetFinalizers(finalizers)
if err := sc.updateResource(task); err != nil {
return err
}
}
return nil
}

func (sc *syncContext) deleteHooks(hooksPendingDeletion syncTasks) {
for _, task := range hooksPendingDeletion {
err := sc.deleteResource(task)
Expand Down Expand Up @@ -698,6 +712,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
generateName := obj.GetGenerateName()
targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix))
}
targetObj.SetFinalizers(append(targetObj.GetFinalizers(), hook.HookFinalizer))

hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj})
}
Expand All @@ -706,8 +721,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {

sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks")

sc.processHookTasks(hookTasks)

tasks := resourceTasks
tasks = append(tasks, hookTasks...)

Expand Down Expand Up @@ -850,83 +863,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
return tasks, successful
}

// processHookTasks applies additional logic to hook tasks.
func (sc *syncContext) processHookTasks(tasks syncTasks) {
for _, task := range tasks {
// This is a safety check to ensure that we process only currently running hook tasks.
if !task.isHook() || !task.pending() {
continue
}
// Safety check to ensure that the target object is not nil.
if task.targetObj == nil {
continue
}
// Currently, we only process hook tasks where the target object is a Job.
if task.targetObj.GetKind() == "Job" {
sc.processJobHookTask(task)
}
}
}

// processJobHookTask processes a hook task where the target object is a Job and has defined ttlSecondsAfterFinished.
// This addresses the issue where a Job with a ttlSecondsAfterFinished set to a low value gets deleted fast and the hook phase gets stuck.
// For more info, see issue https://github.com/argoproj/argo-cd/issues/6880
func (sc *syncContext) processJobHookTask(task *syncTask) {
hookFinalizer := "argoproj.io/hook-finalizer"

task.postprocess = func() error {
sc.log.V(1).Info("Processing hook task with a Job resource - attaching hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace())

job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{})
if err != nil {
return err
}

// Skip postprocessing if the Job does not have a ttlSecondsAfterFinished set.
if job.Spec.TTLSecondsAfterFinished == nil {
return nil
}
// Attach the hook finalizer to the Job resource so it does not get deleted before the sync phase is marked as completed.
job.Finalizers = append(job.Finalizers, hookFinalizer)

_, err = sc.clientset.
BatchV1().
Jobs(job.Namespace).
Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

task.cleanup = func() error {
sc.log.V(1).Info("Cleaning up hook task with a Job resource - removing hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace())

job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{})
if err != nil {
return err
}

// Remove the hook finalizer from the Job resource.
var filtered []string
for _, s := range job.Finalizers {
if s != hookFinalizer {
filtered = append(filtered, s)
}
}
job.Finalizers = filtered

_, err = sc.clientset.
BatchV1().
Jobs(job.Namespace).
Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
}

func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks {
isNamespaceCreationNeeded := true

Expand Down Expand Up @@ -1104,11 +1040,6 @@ func (sc *syncContext) applyObject(t *syncTask, dryRun, validate bool) (common.R
if err != nil {
return common.ResultCodeSyncFailed, err.Error()
}
if t.postprocess != nil && !dryRun {
if err := t.postprocess(); err != nil {
sc.log.Error(err, "failed to call postprocess function for task")
}
}
if kube.IsCRD(t.targetObj) && !dryRun {
crdName := t.targetObj.GetName()
if err = sc.ensureCRDReady(crdName); err != nil {
Expand Down Expand Up @@ -1195,6 +1126,11 @@ func (sc *syncContext) Terminate() {
if !task.isHook() || task.liveObj == nil {
continue
}
if err := sc.removeHookFinalizer(task); err != nil {
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err))
terminateSuccessful = false
continue
}
phase, msg, err := sc.getOperationPhase(task.liveObj)
if err != nil {
sc.setOperationPhase(common.OperationError, fmt.Sprintf("Failed to get hook health: %v", err))
Expand Down Expand Up @@ -1228,6 +1164,16 @@ func (sc *syncContext) deleteResource(task *syncTask) error {
return resIf.Delete(context.TODO(), task.name(), sc.getDeleteOptions())
}

func (sc *syncContext) updateResource(task *syncTask) error {
sc.log.WithValues("task", task).V(1).Info("Updating resource")
resIf, err := sc.getResourceIf(task, "update")
if err != nil {
return err
}
_, err = resIf.Update(context.TODO(), task.liveObj, metav1.UpdateOptions{})
return err
}

func (sc *syncContext) getResourceIf(task *syncTask, verb string) (dynamic.ResourceInterface, error) {
apiResource, err := kube.ServerResourceForGroupVersionKind(sc.disco, task.groupVersionKind(), verb)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sync/sync_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,12 @@ func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) {
))
fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme())
syncCtx.dynamicIf = fakeDynamicClient
// Each completed hook needs to have its hook finalizer removed in an Update call to the dynamic client.
updatedCount := 0
fakeDynamicClient.PrependReactor("update", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
updatedCount += 1
return true, nil, nil
})
deletedCount := 0
fakeDynamicClient.PrependReactor("delete", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
deletedCount += 1
Expand All @@ -1381,6 +1387,7 @@ func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) {

assert.Equal(t, synccommon.OperationSucceeded, syncCtx.phase)
assert.Equal(t, 2, deletedCount)
assert.Equal(t, 2, updatedCount)
}

func TestRunSync_HooksDeletedAfterPhaseCompletedFailed(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/sync/sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ type syncTask struct {
operationState common.OperationPhase
message string
waveOverride *int
postprocess func() error
cleanup func() error
}

func ternary(val bool, a, b string) string {
Expand Down

0 comments on commit dd67dd2

Please sign in to comment.