Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor the method creating sync tasks #618

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 132 additions & 97 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,87 +629,20 @@ func (sc *syncContext) containsResource(resource reconciledResource) bool {

// generates the list of sync tasks we will be performing during this sync.
func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
resourceTasks := syncTasks{}
successful = true

for k, resource := range sc.resources {
if !sc.containsResource(resource) {
sc.log.WithValues("group", k.Group, "kind", k.Kind, "name", k.Name).V(1).Info("Skipping")
continue
}

obj := obj(resource.Target, resource.Live)

// this creates garbage tasks
if hook.IsHook(obj) {
sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook")
continue
}

for _, phase := range syncPhases(obj) {
resourceTasks = append(resourceTasks, &syncTask{phase: phase, targetObj: resource.Target, liveObj: resource.Live})
}
}

sc.log.WithValues("resourceTasks", resourceTasks).V(1).Info("Tasks from managed resources")

hookTasks := syncTasks{}
if !sc.skipHooks {
for _, obj := range sc.hooks {
for _, phase := range syncPhases(obj) {
// Hook resources names are deterministic, whether they are defined by the user (metadata.name),
// or formulated at the time of the operation (metadata.generateName). If user specifies
// metadata.generateName, then we will generate a formulated metadata.name before submission.
targetObj := obj.DeepCopy()
if targetObj.GetName() == "" {
var syncRevision string
if len(sc.revision) >= 8 {
syncRevision = sc.revision[0:7]
} else {
syncRevision = sc.revision
}
postfix := strings.ToLower(fmt.Sprintf("%s-%s-%d", syncRevision, phase, sc.startedAt.UTC().Unix()))
generateName := obj.GetGenerateName()
targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix))
}

hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj})
}
}
}

sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks")
successful = true
resourceTasks := sc.createResourceTasks()
hookTasks := sc.createHookTasks()

tasks := resourceTasks
tasks = append(tasks, hookTasks...)

// enrich target objects with the namespace
for _, task := range tasks {
if task.targetObj == nil {
continue
}

if task.targetObj.GetNamespace() == "" {
// If target object's namespace is empty, we set namespace in the object. We do
// this even though it might be a cluster-scoped resource. This prevents any
// possibility of the resource from unintentionally becoming created in the
// namespace during the `kubectl apply`
task.targetObj = task.targetObj.DeepCopy()
task.targetObj.SetNamespace(sc.namespace)
}
}

if sc.syncNamespace != nil && sc.namespace != "" {
tasks = sc.autoCreateNamespace(tasks)
}
sc.enrichTargetObjectsWithNamespace(tasks)
tasks = sc.addAutoCreateNamespaceTask(tasks)

// enrich task with live obj
for _, task := range tasks {
if task.targetObj == nil || task.liveObj != nil {
continue
}
task.liveObj = sc.liveObj(task.targetObj)
}
sc.enrichTasksWithLiveObjects(tasks)

isRetryable := func(err error) bool {
return apierr.IsUnauthorized(err)
Expand Down Expand Up @@ -759,23 +692,61 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
}

// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
pruneTasks := make(map[int][]*syncTask)
pruneTasksByWave := sc.groupPruneTasksIntoWaves(tasks)

// reorder waves for pruning tasks using symmetric swap on prune waves
// waves to swap
sc.reorderPruneTaskWaves(pruneTasksByWave)

// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
sc.reorderPruneLastTaskWaves(tasks)
tasks.Sort()
sc.enrichTasksWithResult(tasks)

return tasks, successful
}

func (sc *syncContext) enrichTasksWithResult(tasks syncTasks) {
for _, task := range tasks {
if task.isPrune() {
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
result, ok := sc.syncRes[task.resultKey()]
if ok {
task.syncStatus = result.Status
task.operationState = result.HookPhase
task.message = result.Message
}
}
}

func (sc *syncContext) reorderPruneLastTaskWaves(tasks syncTasks) {
syncPhaseLastWave := 0
for _, task := range tasks {
if task.phase == common.SyncPhaseSync {
if task.wave() > syncPhaseLastWave {
syncPhaseLastWave = task.wave()
}
}
}
syncPhaseLastWave = syncPhaseLastWave + 1

for _, task := range tasks {
if task.isPrune() &&
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
task.waveOverride = &syncPhaseLastWave
}
}
}

func (*syncContext) reorderPruneTaskWaves(pruneTasks map[int][]*syncTask) {
var uniquePruneWaves []int
for k := range pruneTasks {
uniquePruneWaves = append(uniquePruneWaves, k)
}
sort.Ints(uniquePruneWaves)

// reorder waves for pruning tasks using symmetric swap on prune waves
n := len(uniquePruneWaves)
for i := 0; i < n/2; i++ {
// waves to swap

startWave := uniquePruneWaves[i]
endWave := uniquePruneWaves[n-1-i]

Expand All @@ -787,39 +758,103 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
task.waveOverride = &startWave
}
}
}

// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
syncPhaseLastWave := 0
func (*syncContext) groupPruneTasksIntoWaves(tasks syncTasks) map[int][]*syncTask {
pruneTasks := make(map[int][]*syncTask)
for _, task := range tasks {
if task.phase == common.SyncPhaseSync {
if task.wave() > syncPhaseLastWave {
syncPhaseLastWave = task.wave()
}
if task.isPrune() {
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
}
}
syncPhaseLastWave = syncPhaseLastWave + 1
return pruneTasks
}

func (sc *syncContext) enrichTasksWithLiveObjects(tasks syncTasks) {
for _, task := range tasks {
if task.isPrune() &&
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
task.waveOverride = &syncPhaseLastWave
if task.targetObj == nil || task.liveObj != nil {
continue
}
task.liveObj = sc.liveObj(task.targetObj)
}
}

tasks.Sort()
func (sc *syncContext) addAutoCreateNamespaceTask(tasks syncTasks) syncTasks {
if sc.syncNamespace != nil && sc.namespace != "" {
tasks = sc.autoCreateNamespace(tasks)
}
return tasks
}

// finally enrich tasks with the result
func (sc *syncContext) enrichTargetObjectsWithNamespace(tasks syncTasks) {
for _, task := range tasks {
result, ok := sc.syncRes[task.resultKey()]
if ok {
task.syncStatus = result.Status
task.operationState = result.HookPhase
task.message = result.Message
if task.targetObj == nil {
continue
}

if task.targetObj.GetNamespace() == "" {
// If target object's namespace is empty, we set namespace in the object. We do
// this even though it might be a cluster-scoped resource. This prevents any
// possibility of the resource from unintentionally becoming created in the
// namespace during the `kubectl apply`
task.targetObj = task.targetObj.DeepCopy()
task.targetObj.SetNamespace(sc.namespace)
}
}
}

return tasks, successful
func (sc *syncContext) createHookTasks() syncTasks {
hookTasks := syncTasks{}
if !sc.skipHooks {
for _, obj := range sc.hooks {
for _, phase := range syncPhases(obj) {
// Hook resources names are deterministic, whether they are defined by the user (metadata.name),
// or formulated at the time of the operation (metadata.generateName). If user specifies
// metadata.generateName, then we will generate a formulated metadata.name before submission.
targetObj := obj.DeepCopy()
if targetObj.GetName() == "" {
var syncRevision string
if len(sc.revision) >= 8 {
syncRevision = sc.revision[0:7]
} else {
syncRevision = sc.revision
}
postfix := strings.ToLower(fmt.Sprintf("%s-%s-%d", syncRevision, phase, sc.startedAt.UTC().Unix()))
generateName := obj.GetGenerateName()
targetObj.SetName(fmt.Sprintf("%s%s", generateName, postfix))
}

hookTasks = append(hookTasks, &syncTask{phase: phase, targetObj: targetObj})
}
}
}

sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks")
return hookTasks
}

func (sc *syncContext) createResourceTasks() syncTasks {
resourceTasks := syncTasks{}
for k, resource := range sc.resources {
if !sc.containsResource(resource) {
sc.log.WithValues("group", k.Group, "kind", k.Kind, "name", k.Name).V(1).Info("Skipping")
continue
}

obj := obj(resource.Target, resource.Live)

// this creates garbage tasks
if hook.IsHook(obj) {
sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook")
continue
}

for _, phase := range syncPhases(obj) {
resourceTasks = append(resourceTasks, &syncTask{phase: phase, targetObj: resource.Target, liveObj: resource.Live})
}
}
sc.log.WithValues("resourceTasks", resourceTasks).V(1).Info("Tasks from managed resources")
return resourceTasks
}

func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks {
Expand Down
Loading