From 92d7b6a260131fb7593d3ab6a93979d9ca62b14f Mon Sep 17 00:00:00 2001 From: Tianxin Dong Date: Tue, 14 Feb 2023 17:24:19 +0800 Subject: [PATCH] Feat: support resume a specific suspend step in workflow (#133) * Feat: support resume a specific suspend step in workflow Signed-off-by: FogDong * resolve the comment Signed-off-by: FogDong --------- Signed-off-by: FogDong --- .../templates/workflow-controller.yaml | 9 +- makefiles/e2e.mk | 3 +- pkg/cue/model/sets/operation.go | 5 + pkg/providers/kube/handle_test.go | 19 +++ pkg/utils/operation.go | 96 +++++++++-- pkg/utils/operation_test.go | 155 ++++++++++++++++-- 6 files changed, 259 insertions(+), 28 deletions(-) diff --git a/charts/vela-workflow/templates/workflow-controller.yaml b/charts/vela-workflow/templates/workflow-controller.yaml index f493c1b..eb8eab1 100644 --- a/charts/vela-workflow/templates/workflow-controller.yaml +++ b/charts/vela-workflow/templates/workflow-controller.yaml @@ -109,6 +109,11 @@ spec: securityContext: {{- toYaml .Values.securityContext | nindent 12 }} args: + - "-test.coverprofile=/workspace/data/e2e-profile.out" + - "__DEVEL__E2E" + - "-test.run=E2EMain" + - "-test.coverpkg=$(go list ./pkg/...| tr ' +' ','| sed 's/,$//g')" {{ if .Values.admissionWebhooks.enabled }} - "--use-webhook=true" - "--webhook-port={{ .Values.webhookService.port }}" @@ -132,8 +137,8 @@ spec: - "--max-workflow-wait-backoff-time={{ .Values.workflow.backoff.maxTime.waitState }}" - "--max-workflow-failed-backoff-time={{ .Values.workflow.backoff.maxTime.failedState }}" - "--max-workflow-step-error-retry-times={{ .Values.workflow.step.errorRetryTimes }}" - - "--feature-gates=EnableWatchEventListener={{- .Values.enableWatchEventListener | toString -}}" - - "--feature-gates=EnablePatchStatusAtOnce={{- .Values.enablePatchStatusAtOnce | toString -}}" + - "--feature-gates=EnableWatchEventListener={{- .Values.workflow.enableWatchEventListener | toString -}}" + - "--feature-gates=EnablePatchStatusAtOnce={{- .Values.workflow.enablePatchStatusAtOnce | toString -}}" - "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}" - "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}" {{ if .Values.backup.enable }} diff --git a/makefiles/e2e.mk b/makefiles/e2e.mk index f37ad43..6d2dea2 100644 --- a/makefiles/e2e.mk +++ b/makefiles/e2e.mk @@ -12,7 +12,8 @@ e2e-setup-controller: --set image.tag=latest \ --set image.pullPolicy=IfNotPresent \ --wait vela-workflow \ - ./charts/vela-workflow + ./charts/vela-workflow \ + --debug .PHONY: end-e2e end-e2e: diff --git a/pkg/cue/model/sets/operation.go b/pkg/cue/model/sets/operation.go index 8fc0e78..5966336 100644 --- a/pkg/cue/model/sets/operation.go +++ b/pkg/cue/model/sets/operation.go @@ -298,6 +298,11 @@ func strategyUnify(base cue.Value, patch cue.Value, params *UnifyParams, patchOp baseInst := cuecontext.New().BuildFile(openBase) patchInst := cuecontext.New().BuildFile(patchFile) + // s, _ := ToString(patchInst) + // fmt.Println("======patch", s) + // s, _ = ToString(baseInst) + // fmt.Println("======base", s) + ret := baseInst.Unify(patchInst) _, err = toString(ret, removeTmpVar) diff --git a/pkg/providers/kube/handle_test.go b/pkg/providers/kube/handle_test.go index 6c78fec..04cf461 100644 --- a/pkg/providers/kube/handle_test.go +++ b/pkg/providers/kube/handle_test.go @@ -192,6 +192,25 @@ patch: { sub, err := v.LookupValue("value") Expect(err).ToNot(HaveOccurred()) Expect(sub.Error()).To(BeNil()) + v, err = v.MakeValue(` + cluster: "" + patch: { + metadata: name: "test-app-1" + spec: { + containers: [{ + // +patchStrategy=retainKeys + image: "nginx:latest" + }] + } + }`) + Expect(err).ToNot(HaveOccurred()) + err = v.FillObject(sub, "value") + Expect(err).ToNot(HaveOccurred()) + err = p.Apply(mCtx, ctx, v, nil) + Expect(err).ToNot(HaveOccurred()) + sub2, err := v.LookupValue("value") + Expect(err).ToNot(HaveOccurred()) + Expect(sub2.Error()).To(BeNil()) pod := &corev1.Pod{} Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/utils/operation.go b/pkg/utils/operation.go index 32d8081..3bff32b 100644 --- a/pkg/utils/operation.go +++ b/pkg/utils/operation.go @@ -43,17 +43,29 @@ type WorkflowOperator interface { Suspend(ctx context.Context) error Resume(ctx context.Context) error Rollback(ctx context.Context) error - Restart(ctx context.Context, step string) error + Restart(ctx context.Context) error Terminate(ctx context.Context) error } +// WorkflowStepOperator is operation handler for workflow steps' operations +type WorkflowStepOperator interface { + Resume(ctx context.Context, step string) error + Restart(ctx context.Context, step string) error +} + type workflowRunOperator struct { cli client.Client outputWriter io.Writer run *v1alpha1.WorkflowRun } -// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and application +type workflowRunStepOperator struct { + cli client.Client + outputWriter io.Writer + run *v1alpha1.WorkflowRun +} + +// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and workflow run func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowOperator { return workflowRunOperator{ cli: cli, @@ -62,6 +74,15 @@ func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.Workfl } } +// NewWorkflowRunStepOperator get an workflow step operator with k8sClient, ioWriter(optional, useful for cli) and workflow run +func NewWorkflowRunStepOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowStepOperator { + return workflowRunStepOperator{ + cli: cli, + outputWriter: w, + run: run, + } +} + // Suspend suspend workflow func (wo workflowRunOperator) Suspend(ctx context.Context) error { run := wo.run @@ -77,7 +98,7 @@ func (wo workflowRunOperator) Suspend(ctx context.Context) error { return err } - return wo.writeOutputF("Successfully suspend workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", run.Name) } // Resume resume a suspended workflow @@ -88,27 +109,62 @@ func (wo workflowRunOperator) Resume(ctx context.Context) error { } if run.Status.Suspend { - if err := ResumeWorkflow(ctx, wo.cli, run); err != nil { + if err := ResumeWorkflow(ctx, wo.cli, run, ""); err != nil { + return err + } + } + return writeOutputF(wo.outputWriter, "Successfully resume workflow: %s\n", run.Name) +} + +// Resume resume a suspended workflow from a specific step +func (wo workflowRunStepOperator) Resume(ctx context.Context, step string) error { + if step == "" { + return fmt.Errorf("step can not be empty") + } + run := wo.run + if run.Status.Terminated { + return fmt.Errorf("can not resume a terminated workflow") + } + + if run.Status.Suspend { + if err := ResumeWorkflow(ctx, wo.cli, run, step); err != nil { return err } } - return wo.writeOutputF("Successfully resume workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully resume workflow %s from step %s\n", run.Name, step) } // ResumeWorkflow resume workflow -func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun) error { +func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun, stepName string) error { run.Status.Suspend = false steps := run.Status.Steps + found := stepName == "" + for i, step := range steps { if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == v1alpha1.WorkflowStepPhaseRunning { - steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded + if stepName == "" { + steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded + } else if stepName == step.Name { + steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded + found = true + break + } } for j, sub := range step.SubStepsStatus { if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseRunning { - steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded + if stepName == "" { + steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded + } else if stepName == sub.Name { + steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded + found = true + break + } } } } + if !found { + return fmt.Errorf("can not find step %s", stepName) + } if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { return cli.Status().Patch(ctx, run, client.Merge) }); err != nil { @@ -123,12 +179,24 @@ func (wo workflowRunOperator) Rollback(ctx context.Context) error { } // Restart restart workflow -func (wo workflowRunOperator) Restart(ctx context.Context, step string) error { +func (wo workflowRunOperator) Restart(ctx context.Context) error { + run := wo.run + if err := RestartWorkflow(ctx, wo.cli, run, ""); err != nil { + return err + } + return writeOutputF(wo.outputWriter, "Successfully restart workflow: %s\n", run.Name) +} + +// Restart restart workflow from a specific step +func (wo workflowRunStepOperator) Restart(ctx context.Context, step string) error { + if step == "" { + return fmt.Errorf("step can not be empty") + } run := wo.run if err := RestartWorkflow(ctx, wo.cli, run, step); err != nil { return err } - return wo.writeOutputF("Successfully restart workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully restart workflow %s from step %s\n", run.Name, step) } // RestartWorkflow restart workflow @@ -162,7 +230,7 @@ func (wo workflowRunOperator) Terminate(ctx context.Context) error { if err := TerminateWorkflow(ctx, wo.cli, run); err != nil { return err } - return wo.writeOutputF("Successfully terminate workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully terminate workflow: %s\n", run.Name) } // TerminateWorkflow terminate workflow @@ -448,10 +516,10 @@ func findDependency(stepName string, dependsOn map[string][]string) []string { return dependency } -func (wo workflowRunOperator) writeOutputF(format string, a ...interface{}) error { - if wo.outputWriter == nil { +func writeOutputF(outputWriter io.Writer, format string, a ...interface{}) error { + if outputWriter == nil { return nil } - _, err := fmt.Fprintf(wo.outputWriter, format, a...) + _, err := fmt.Fprintf(outputWriter, format, a...) return err } diff --git a/pkg/utils/operation_test.go b/pkg/utils/operation_test.go index 8619c69..d533a91 100644 --- a/pkg/utils/operation_test.go +++ b/pkg/utils/operation_test.go @@ -227,8 +227,10 @@ func TestResumeWorkflowRun(t *testing.T) { ctx := context.Background() testCases := map[string]struct { - run *v1alpha1.WorkflowRun - expected *v1alpha1.WorkflowRun + run *v1alpha1.WorkflowRun + step string + expected *v1alpha1.WorkflowRun + expectedErr string }{ "not suspend": { run: &v1alpha1.WorkflowRun{ @@ -258,6 +260,18 @@ func TestResumeWorkflowRun(t *testing.T) { }, }, }, + "step not found": { + run: &v1alpha1.WorkflowRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "suspend", + }, + Status: v1alpha1.WorkflowRunStatus{ + Suspend: true, + }, + }, + step: "not-found", + expectedErr: "can not find step not-found", + }, "suspend step": { run: &v1alpha1.WorkflowRun{ ObjectMeta: metav1.ObjectMeta{ @@ -300,6 +314,100 @@ func TestResumeWorkflowRun(t *testing.T) { }, }, }, + "resume the specific step": { + step: "step1", + run: &v1alpha1.WorkflowRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "resume-specific-step", + }, + Status: v1alpha1.WorkflowRunStatus{ + Suspend: true, + Steps: []v1alpha1.WorkflowStepStatus{ + { + StepStatus: v1alpha1.StepStatus{ + Name: "step1", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseRunning, + }, + }, + { + StepStatus: v1alpha1.StepStatus{ + Name: "step2", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseRunning, + }, + }, + }, + }, + }, + expected: &v1alpha1.WorkflowRun{ + Status: v1alpha1.WorkflowRunStatus{ + Steps: []v1alpha1.WorkflowStepStatus{ + { + StepStatus: v1alpha1.StepStatus{ + Name: "step1", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseSucceeded, + }, + }, + { + StepStatus: v1alpha1.StepStatus{ + Name: "step2", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseRunning, + }, + }, + }, + }, + }, + }, + "resume the specific sub step": { + step: "sub-step1", + run: &v1alpha1.WorkflowRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "resume-specific-sub-step", + }, + Status: v1alpha1.WorkflowRunStatus{ + Suspend: true, + Steps: []v1alpha1.WorkflowStepStatus{ + { + StepStatus: v1alpha1.StepStatus{ + Name: "step1", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseRunning, + }, + SubStepsStatus: []v1alpha1.StepStatus{ + { + Name: "sub-step1", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseRunning, + }, + }, + }, + }, + }, + }, + expected: &v1alpha1.WorkflowRun{ + Status: v1alpha1.WorkflowRunStatus{ + Steps: []v1alpha1.WorkflowStepStatus{ + { + StepStatus: v1alpha1.StepStatus{ + Name: "step1", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseRunning, + }, + SubStepsStatus: []v1alpha1.StepStatus{ + { + Name: "sub-step1", + Type: wfTypes.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseSucceeded, + }, + }, + }, + }, + }, + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { @@ -310,14 +418,29 @@ func TestResumeWorkflowRun(t *testing.T) { err = cli.Delete(ctx, tc.run) r.NoError(err) }() - operator := NewWorkflowRunOperator(cli, nil, tc.run) - err = operator.Resume(ctx) + if tc.step == "" { + operator := NewWorkflowRunOperator(cli, nil, tc.run) + err = operator.Resume(ctx) + if tc.expectedErr != "" { + r.Error(err) + r.Equal(tc.expectedErr, err.Error()) + return + } + } else { + operator := NewWorkflowRunStepOperator(cli, nil, tc.run) + err = operator.Resume(ctx, tc.step) + if tc.expectedErr != "" { + r.Error(err) + r.Equal(tc.expectedErr, err.Error()) + return + } + } r.NoError(err) run := &v1alpha1.WorkflowRun{} err = cli.Get(ctx, client.ObjectKey{Name: tc.run.Name}, run) r.NoError(err) r.Equal(false, run.Status.Suspend) - r.Equal(tc.expected.Status, run.Status) + r.Equal(tc.expected.Status, run.Status, name) }) } } @@ -924,13 +1047,23 @@ func TestRestartRunStep(t *testing.T) { r.NoError(err) }() } - operator := NewWorkflowRunOperator(cli, nil, tc.run) - err = operator.Restart(ctx, tc.stepName) - if tc.expectedErr != "" { - r.Contains(err.Error(), tc.expectedErr) - return + if tc.stepName == "" { + operator := NewWorkflowRunOperator(cli, nil, tc.run) + err = operator.Restart(ctx) + if tc.expectedErr != "" { + r.Contains(err.Error(), tc.expectedErr) + return + } + r.NoError(err) + } else { + operator := NewWorkflowRunStepOperator(cli, nil, tc.run) + err = operator.Restart(ctx, tc.stepName) + if tc.expectedErr != "" { + r.Contains(err.Error(), tc.expectedErr) + return + } + r.NoError(err) } - r.NoError(err) run := &v1alpha1.WorkflowRun{} err = cli.Get(ctx, client.ObjectKey{Name: tc.run.Name}, run) r.NoError(err)