Skip to content

Commit

Permalink
Feat: support resume a specific suspend step in workflow (#133)
Browse files Browse the repository at this point in the history
* Feat: support resume a specific suspend step in workflow

Signed-off-by: FogDong <[email protected]>

* resolve the comment

Signed-off-by: FogDong <[email protected]>

---------

Signed-off-by: FogDong <[email protected]>
  • Loading branch information
FogDong authored Feb 14, 2023
1 parent 2daa3cb commit 92d7b6a
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 28 deletions.
9 changes: 7 additions & 2 deletions charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ spec:
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
args:
- "-test.coverprofile=/workspace/data/e2e-profile.out"
- "__DEVEL__E2E"
- "-test.run=E2EMain"
- "-test.coverpkg=$(go list ./pkg/...| tr '
' ','| sed 's/,$//g')"
{{ if .Values.admissionWebhooks.enabled }}
- "--use-webhook=true"
- "--webhook-port={{ .Values.webhookService.port }}"
Expand All @@ -132,8 +137,8 @@ spec:
- "--max-workflow-wait-backoff-time={{ .Values.workflow.backoff.maxTime.waitState }}"
- "--max-workflow-failed-backoff-time={{ .Values.workflow.backoff.maxTime.failedState }}"
- "--max-workflow-step-error-retry-times={{ .Values.workflow.step.errorRetryTimes }}"
- "--feature-gates=EnableWatchEventListener={{- .Values.enableWatchEventListener | toString -}}"
- "--feature-gates=EnablePatchStatusAtOnce={{- .Values.enablePatchStatusAtOnce | toString -}}"
- "--feature-gates=EnableWatchEventListener={{- .Values.workflow.enableWatchEventListener | toString -}}"
- "--feature-gates=EnablePatchStatusAtOnce={{- .Values.workflow.enablePatchStatusAtOnce | toString -}}"
- "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}"
- "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}"
{{ if .Values.backup.enable }}
Expand Down
3 changes: 2 additions & 1 deletion makefiles/e2e.mk
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ e2e-setup-controller:
--set image.tag=latest \
--set image.pullPolicy=IfNotPresent \
--wait vela-workflow \
./charts/vela-workflow
./charts/vela-workflow \
--debug

.PHONY: end-e2e
end-e2e:
Expand Down
5 changes: 5 additions & 0 deletions pkg/cue/model/sets/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ func strategyUnify(base cue.Value, patch cue.Value, params *UnifyParams, patchOp
baseInst := cuecontext.New().BuildFile(openBase)
patchInst := cuecontext.New().BuildFile(patchFile)

// s, _ := ToString(patchInst)
// fmt.Println("======patch", s)
// s, _ = ToString(baseInst)
// fmt.Println("======base", s)

ret := baseInst.Unify(patchInst)

_, err = toString(ret, removeTmpVar)
Expand Down
19 changes: 19 additions & 0 deletions pkg/providers/kube/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ patch: {
sub, err := v.LookupValue("value")
Expect(err).ToNot(HaveOccurred())
Expect(sub.Error()).To(BeNil())
v, err = v.MakeValue(`
cluster: ""
patch: {
metadata: name: "test-app-1"
spec: {
containers: [{
// +patchStrategy=retainKeys
image: "nginx:latest"
}]
}
}`)
Expect(err).ToNot(HaveOccurred())
err = v.FillObject(sub, "value")
Expect(err).ToNot(HaveOccurred())
err = p.Apply(mCtx, ctx, v, nil)
Expect(err).ToNot(HaveOccurred())
sub2, err := v.LookupValue("value")
Expect(err).ToNot(HaveOccurred())
Expect(sub2.Error()).To(BeNil())

pod := &corev1.Pod{}
Expect(err).ToNot(HaveOccurred())
Expand Down
96 changes: 82 additions & 14 deletions pkg/utils/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,29 @@ type WorkflowOperator interface {
Suspend(ctx context.Context) error
Resume(ctx context.Context) error
Rollback(ctx context.Context) error
Restart(ctx context.Context, step string) error
Restart(ctx context.Context) error
Terminate(ctx context.Context) error
}

// WorkflowStepOperator is operation handler for workflow steps' operations
type WorkflowStepOperator interface {
Resume(ctx context.Context, step string) error
Restart(ctx context.Context, step string) error
}

type workflowRunOperator struct {
cli client.Client
outputWriter io.Writer
run *v1alpha1.WorkflowRun
}

// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and application
type workflowRunStepOperator struct {
cli client.Client
outputWriter io.Writer
run *v1alpha1.WorkflowRun
}

// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and workflow run
func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowOperator {
return workflowRunOperator{
cli: cli,
Expand All @@ -62,6 +74,15 @@ func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.Workfl
}
}

// NewWorkflowRunStepOperator get an workflow step operator with k8sClient, ioWriter(optional, useful for cli) and workflow run
func NewWorkflowRunStepOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowStepOperator {
return workflowRunStepOperator{
cli: cli,
outputWriter: w,
run: run,
}
}

// Suspend suspend workflow
func (wo workflowRunOperator) Suspend(ctx context.Context) error {
run := wo.run
Expand All @@ -77,7 +98,7 @@ func (wo workflowRunOperator) Suspend(ctx context.Context) error {
return err
}

return wo.writeOutputF("Successfully suspend workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", run.Name)
}

// Resume resume a suspended workflow
Expand All @@ -88,27 +109,62 @@ func (wo workflowRunOperator) Resume(ctx context.Context) error {
}

if run.Status.Suspend {
if err := ResumeWorkflow(ctx, wo.cli, run); err != nil {
if err := ResumeWorkflow(ctx, wo.cli, run, ""); err != nil {
return err
}
}
return writeOutputF(wo.outputWriter, "Successfully resume workflow: %s\n", run.Name)
}

// Resume resume a suspended workflow from a specific step
func (wo workflowRunStepOperator) Resume(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
run := wo.run
if run.Status.Terminated {
return fmt.Errorf("can not resume a terminated workflow")
}

if run.Status.Suspend {
if err := ResumeWorkflow(ctx, wo.cli, run, step); err != nil {
return err
}
}
return wo.writeOutputF("Successfully resume workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully resume workflow %s from step %s\n", run.Name, step)
}

// ResumeWorkflow resume workflow
func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun) error {
func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun, stepName string) error {
run.Status.Suspend = false
steps := run.Status.Steps
found := stepName == ""

for i, step := range steps {
if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
if stepName == "" {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == step.Name {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
found = true
break
}
}
for j, sub := range step.SubStepsStatus {
if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
if stepName == "" {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == sub.Name {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
found = true
break
}
}
}
}
if !found {
return fmt.Errorf("can not find step %s", stepName)
}
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return cli.Status().Patch(ctx, run, client.Merge)
}); err != nil {
Expand All @@ -123,12 +179,24 @@ func (wo workflowRunOperator) Rollback(ctx context.Context) error {
}

// Restart restart workflow
func (wo workflowRunOperator) Restart(ctx context.Context, step string) error {
func (wo workflowRunOperator) Restart(ctx context.Context) error {
run := wo.run
if err := RestartWorkflow(ctx, wo.cli, run, ""); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully restart workflow: %s\n", run.Name)
}

// Restart restart workflow from a specific step
func (wo workflowRunStepOperator) Restart(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
run := wo.run
if err := RestartWorkflow(ctx, wo.cli, run, step); err != nil {
return err
}
return wo.writeOutputF("Successfully restart workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully restart workflow %s from step %s\n", run.Name, step)
}

// RestartWorkflow restart workflow
Expand Down Expand Up @@ -162,7 +230,7 @@ func (wo workflowRunOperator) Terminate(ctx context.Context) error {
if err := TerminateWorkflow(ctx, wo.cli, run); err != nil {
return err
}
return wo.writeOutputF("Successfully terminate workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully terminate workflow: %s\n", run.Name)
}

// TerminateWorkflow terminate workflow
Expand Down Expand Up @@ -448,10 +516,10 @@ func findDependency(stepName string, dependsOn map[string][]string) []string {
return dependency
}

func (wo workflowRunOperator) writeOutputF(format string, a ...interface{}) error {
if wo.outputWriter == nil {
func writeOutputF(outputWriter io.Writer, format string, a ...interface{}) error {
if outputWriter == nil {
return nil
}
_, err := fmt.Fprintf(wo.outputWriter, format, a...)
_, err := fmt.Fprintf(outputWriter, format, a...)
return err
}
Loading

0 comments on commit 92d7b6a

Please sign in to comment.