From 329190d0494b95f1c1ae11c6d2ed0e834b99d5c3 Mon Sep 17 00:00:00 2001 From: Vanio Begic Date: Sun, 11 Aug 2024 12:10:37 +0200 Subject: [PATCH 1/2] chore: extract out blocks of code into intuitively named functions and methods to increase code readability Signed-off-by: Vanio Begic --- pkg/sync/sync_context.go | 242 ++++++++++++++++++++++----------------- 1 file changed, 140 insertions(+), 102 deletions(-) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index d56de6a1b..9a0606cdb 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -629,87 +629,20 @@ func (sc *syncContext) containsResource(resource reconciledResource) bool { // generates the list of sync tasks we will be performing during this sync. func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { - resourceTasks := syncTasks{} - successful = true - - for k, resource := range sc.resources { - if !sc.containsResource(resource) { - sc.log.WithValues("group", k.Group, "kind", k.Kind, "name", k.Name).V(1).Info("Skipping") - continue - } - - obj := obj(resource.Target, resource.Live) - - // this creates garbage tasks - if hook.IsHook(obj) { - sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook") - continue - } - - for _, phase := range syncPhases(obj) { - resourceTasks = append(resourceTasks, &syncTask{phase: phase, targetObj: resource.Target, liveObj: resource.Live}) - } - } - - sc.log.WithValues("resourceTasks", resourceTasks).V(1).Info("Tasks from managed resources") - - hookTasks := syncTasks{} - if !sc.skipHooks { - for _, obj := range sc.hooks { - for _, phase := range syncPhases(obj) { - // Hook resources names are deterministic, whether they are defined by the user (metadata.name), - // or formulated at the time of the operation (metadata.generateName). If user specifies - // metadata.generateName, then we will generate a formulated metadata.name before submission. - targetObj := obj.DeepCopy() - if targetObj.GetName() == "" { - var syncRevision string - if len(sc.revision) >= 8 { - syncRevision = sc.revision[0:7] - } else { - syncRevision = sc.revision - } - postfix := strings.ToLower(fmt.Sprintf("%s-%s-%d", syncRevision, phase, sc.startedAt.UTC().Unix())) - generateName := obj.GetGenerateName() - targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix)) - } - - hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj}) - } - } - } - sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks") + successful = true + resourceTasks := sc.createResourceTasks() + hookTasks := sc.createHookTasks() tasks := resourceTasks tasks = append(tasks, hookTasks...) // enrich target objects with the namespace - for _, task := range tasks { - if task.targetObj == nil { - continue - } - - if task.targetObj.GetNamespace() == "" { - // If target object's namespace is empty, we set namespace in the object. We do - // this even though it might be a cluster-scoped resource. This prevents any - // possibility of the resource from unintentionally becoming created in the - // namespace during the `kubectl apply` - task.targetObj = task.targetObj.DeepCopy() - task.targetObj.SetNamespace(sc.namespace) - } - } - - if sc.syncNamespace != nil && sc.namespace != "" { - tasks = sc.autoCreateNamespace(tasks) - } + sc.enrichTargetObjectsWithNamespace(tasks) + tasks = sc.addAutoCreateNamespaceTask(tasks) // enrich task with live obj - for _, task := range tasks { - if task.targetObj == nil || task.liveObj != nil { - continue - } - task.liveObj = sc.liveObj(task.targetObj) - } + sc.enrichTasksWithLiveObjects(tasks) isRetryable := func(err error) bool { return apierr.IsUnauthorized(err) @@ -717,11 +650,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { serverResCache := make(map[schema.GroupVersionKind]*metav1.APIResource) - // check permissions - for _, task := range tasks { - - var serverRes *metav1.APIResource - var err error + cachedServerResourceForGroupVersionKind := func(sc *syncContext, task *syncTask) (serverRes *metav1.APIResource, err error) { if val, ok := serverResCache[task.groupVersionKind()]; ok { serverRes = val @@ -735,6 +664,13 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { serverResCache[task.groupVersionKind()] = serverRes } } + return serverRes, err + } + + // check permissions + for _, task := range tasks { + + serverRes, err := cachedServerResourceForGroupVersionKind(sc, task) if err != nil { // Special case for custom resources: if CRD is not yet known by the K8s API server, @@ -759,23 +695,61 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) - pruneTasks := make(map[int][]*syncTask) + pruneTasksByWave := sc.groupPruneTasksIntoWaves(tasks) + + // reorder waves for pruning tasks using symmetric swap on prune waves + // waves to swap + sc.reorderPruneTaskWaves(pruneTasksByWave) + + // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 + // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave + sc.reorderPruneLastTaskWaves(tasks) + tasks.Sort() + sc.enrichTasksWithResult(tasks) + + return tasks, successful +} + +func (sc *syncContext) enrichTasksWithResult(tasks syncTasks) { for _, task := range tasks { - if task.isPrune() { - pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) + result, ok := sc.syncRes[task.resultKey()] + if ok { + task.syncStatus = result.Status + task.operationState = result.HookPhase + task.message = result.Message + } + } +} + +func (sc *syncContext) reorderPruneLastTaskWaves(tasks syncTasks) { + syncPhaseLastWave := 0 + for _, task := range tasks { + if task.phase == common.SyncPhaseSync { + if task.wave() > syncPhaseLastWave { + syncPhaseLastWave = task.wave() + } + } + } + syncPhaseLastWave = syncPhaseLastWave + 1 + + for _, task := range tasks { + if task.isPrune() && + (sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) { + task.waveOverride = &syncPhaseLastWave } } +} +func (*syncContext) reorderPruneTaskWaves(pruneTasks map[int][]*syncTask) { var uniquePruneWaves []int for k := range pruneTasks { uniquePruneWaves = append(uniquePruneWaves, k) } sort.Ints(uniquePruneWaves) - // reorder waves for pruning tasks using symmetric swap on prune waves n := len(uniquePruneWaves) for i := 0; i < n/2; i++ { - // waves to swap + startWave := uniquePruneWaves[i] endWave := uniquePruneWaves[n-1-i] @@ -787,39 +761,103 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { task.waveOverride = &startWave } } +} - // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 - // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave - syncPhaseLastWave := 0 +func (*syncContext) groupPruneTasksIntoWaves(tasks syncTasks) map[int][]*syncTask { + pruneTasks := make(map[int][]*syncTask) for _, task := range tasks { - if task.phase == common.SyncPhaseSync { - if task.wave() > syncPhaseLastWave { - syncPhaseLastWave = task.wave() - } + if task.isPrune() { + pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) } } - syncPhaseLastWave = syncPhaseLastWave + 1 + return pruneTasks +} +func (sc *syncContext) enrichTasksWithLiveObjects(tasks syncTasks) { for _, task := range tasks { - if task.isPrune() && - (sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) { - task.waveOverride = &syncPhaseLastWave + if task.targetObj == nil || task.liveObj != nil { + continue } + task.liveObj = sc.liveObj(task.targetObj) } +} - tasks.Sort() +func (sc *syncContext) addAutoCreateNamespaceTask(tasks syncTasks) syncTasks { + if sc.syncNamespace != nil && sc.namespace != "" { + tasks = sc.autoCreateNamespace(tasks) + } + return tasks +} - // finally enrich tasks with the result +func (sc *syncContext) enrichTargetObjectsWithNamespace(tasks syncTasks) { for _, task := range tasks { - result, ok := sc.syncRes[task.resultKey()] - if ok { - task.syncStatus = result.Status - task.operationState = result.HookPhase - task.message = result.Message + if task.targetObj == nil { + continue + } + + if task.targetObj.GetNamespace() == "" { + // If target object's namespace is empty, we set namespace in the object. We do + // this even though it might be a cluster-scoped resource. This prevents any + // possibility of the resource from unintentionally becoming created in the + // namespace during the `kubectl apply` + task.targetObj = task.targetObj.DeepCopy() + task.targetObj.SetNamespace(sc.namespace) } } +} - return tasks, successful +func (sc *syncContext) createHookTasks() syncTasks { + hookTasks := syncTasks{} + if !sc.skipHooks { + for _, obj := range sc.hooks { + for _, phase := range syncPhases(obj) { + // Hook resources names are deterministic, whether they are defined by the user (metadata.name), + // or formulated at the time of the operation (metadata.generateName). If user specifies + // metadata.generateName, then we will generate a formulated metadata.name before submission. + targetObj := obj.DeepCopy() + if targetObj.GetName() == "" { + var syncRevision string + if len(sc.revision) >= 8 { + syncRevision = sc.revision[0:7] + } else { + syncRevision = sc.revision + } + postfix := strings.ToLower(fmt.Sprintf("%s-%s-%d", syncRevision, phase, sc.startedAt.UTC().Unix())) + generateName := obj.GetGenerateName() + targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix)) + } + + hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj}) + } + } + } + + sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks") + return hookTasks +} + +func (sc *syncContext) createResourceTasks() syncTasks { + resourceTasks := syncTasks{} + for k, resource := range sc.resources { + if !sc.containsResource(resource) { + sc.log.WithValues("group", k.Group, "kind", k.Kind, "name", k.Name).V(1).Info("Skipping") + continue + } + + obj := obj(resource.Target, resource.Live) + + // this creates garbage tasks + if hook.IsHook(obj) { + sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook") + continue + } + + for _, phase := range syncPhases(obj) { + resourceTasks = append(resourceTasks, &syncTask{phase: phase, targetObj: resource.Target, liveObj: resource.Live}) + } + } + sc.log.WithValues("resourceTasks", resourceTasks).V(1).Info("Tasks from managed resources") + return resourceTasks } func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks { From a8ad2eeb792bc384a03e7b2ea127b1cbdf516a7b Mon Sep 17 00:00:00 2001 From: Vanio Begic Date: Sun, 11 Aug 2024 12:42:28 +0200 Subject: [PATCH 2/2] chore: undo refactoring of permission checking Signed-off-by: Vanio Begic --- pkg/sync/sync_context.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 9a0606cdb..c78600196 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -650,7 +650,11 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { serverResCache := make(map[schema.GroupVersionKind]*metav1.APIResource) - cachedServerResourceForGroupVersionKind := func(sc *syncContext, task *syncTask) (serverRes *metav1.APIResource, err error) { + // check permissions + for _, task := range tasks { + + var serverRes *metav1.APIResource + var err error if val, ok := serverResCache[task.groupVersionKind()]; ok { serverRes = val @@ -664,13 +668,6 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { serverResCache[task.groupVersionKind()] = serverRes } } - return serverRes, err - } - - // check permissions - for _, task := range tasks { - - serverRes, err := cachedServerResourceForGroupVersionKind(sc, task) if err != nil { // Special case for custom resources: if CRD is not yet known by the K8s API server,