Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add builtin providers and fix helm #192

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ spec:
- "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}"
- "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}"
- "--group-by-label={{ .Values.workflow.groupByLabel }}"
- "--enable-external-package-for-default-compiler={{- .Values.workflow.enableExternalPackageForDefaultCompiler | toString -}}"
- "--enable-external-package-watch-for-default-compiler={{- .Values.workflow.enableExternalPackageWatchForDefaultCompiler | toString -}}"
{{ if .Values.backup.enable }}
- "--backup-strategy={{ .Values.backup.strategy }}"
- "--backup-ignore-strategy={{ .Values.backup.ignoreStrategy }}"
Expand Down
2 changes: 2 additions & 0 deletions charts/vela-workflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ workflow:
enableSuspendOnFailure: false
enablePatchStatusAtOnce: false
enableWatchEventListener: false
enableExternalPackageForDefaultCompiler: true
enableExternalPackageWatchForDefaultCompiler: false
backoff:
maxTime:
waitState: 60
Expand Down
3 changes: 3 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/kubevela/workflow/pkg/common"
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/monitor/watcher"
"github.com/kubevela/workflow/pkg/providers"
"github.com/kubevela/workflow/pkg/types"
"github.com/kubevela/workflow/pkg/utils"
"github.com/kubevela/workflow/pkg/webhook"
Expand Down Expand Up @@ -122,6 +123,8 @@ func main() {
flag.BoolVar(&backupCleanOnBackup, "backup-clean-on-backup", false, "Set the auto clean for backup workflow records, default is false")
flag.StringVar(&backupConfigSecretName, "backup-config-secret-name", "backup-config", "Set the secret name for backup workflow configs, default is backup-config")
flag.StringVar(&backupConfigSecretNamespace, "backup-config-secret-namespace", "vela-system", "Set the secret namespace for backup workflow configs, default is backup-config")
flag.BoolVar(&providers.EnableExternalPackageForDefaultCompiler, "enable-external-package-for-default-compiler", true, "Enable external package for default compiler")
flag.BoolVar(&providers.EnableExternalPackageWatchForDefaultCompiler, "enable-external-package-watch-for-default-compiler", false, "Enable external package watch for default compiler")
multicluster.AddClusterGatewayClientFlags(flag.CommandLine)
feature.DefaultMutableFeatureGate.AddFlag(flag.CommandLine)
sharding.AddControllerFlags(flag.CommandLine)
Expand Down
6 changes: 3 additions & 3 deletions controllers/testdata/multi-suspend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
cue:
template: |
import (
"vela/op"
"vela/builtin"
)
suspend1: op.#Suspend & {}
suspend2: op.#Suspend & {}
suspend1: builtin.#Suspend & {}
suspend2: builtin.#Suspend & {}
8 changes: 4 additions & 4 deletions controllers/testdata/suspend-and-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ spec:
template: |
import (
"vela/kube"
"vela/op"
"vela/builtin"
)

suspend: op.#Suspend & {duration: "1s"}
suspend: builtin.#Suspend & {$params: duration: "1s"}
output: kube.#Apply & {
$params: {
cluster: parameter.cluster
Expand Down Expand Up @@ -41,8 +41,8 @@ spec:
}
}
}
wait: op.#ConditionalWait & {
continue: output.$returns.value.status.readyReplicas == parameter.replicas
wait: builtin.#ConditionalWait & {
$params: continue: output.$returns.value.status.readyReplicas == parameter.replicas
}
parameter: {
image: string
Expand Down
6 changes: 3 additions & 3 deletions controllers/testdata/test-apply.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
template: |
import (
"vela/kube"
"vela/op"
"vela/builtin"
)

output: kube.#Apply & {
Expand Down Expand Up @@ -41,9 +41,9 @@ spec:
}
}
}
wait: op.#ConditionalWait & {
wait: builtin.#ConditionalWait & {
if len(output.$returns.value.status) > 0 if output.$returns.value.status.readyReplicas == 1 {
continue: true
$params: continue: true
}
}
parameter: {
Expand Down
79 changes: 79 additions & 0 deletions pkg/providers/builtin/workspace.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// workspace.cue

#DoVar: {
#do: "var"
#provider: "builtin"

$params: {
// +usage=The method to call on the variable
method: *"Get" | "Put"
// +usage=The path to the variable
path: string
// +usage=The value of the variable
value?: _
}

$returns?: {
// +usage=The value of the variable
value: _
}
}

#ConditionalWait: {
#do: "wait"
#provider: "builtin"

$params: {
// +usage=If continue is false, the step will wait for continue to be true.
continue: *false | bool
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Suspend: {
#do: "suspend"
#provider: "builtin"

$params: {
// +usage=Specify the wait duration time to resume automaticlly such as "30s", "1min" or "2m15s"
duration?: string
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Break: {
#do: "break"
#provider: "builtin"

$params: {
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Fail: {
#do: "fail"
#provider: "builtin"

$params: {
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Message: {
#do: "message"
#provider: "builtin"

$params: {
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Steps: {
...
}

215 changes: 215 additions & 0 deletions pkg/providers/builtin/workspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
Copyright 2022 The KubeVela Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package builtin

import (
"context"
_ "embed"
"encoding/json"
"fmt"
"strings"
"time"

"cuelang.org/go/cue/cuecontext"

cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime"

"github.com/kubevela/workflow/api/v1alpha1"
"github.com/kubevela/workflow/pkg/cue/model"
"github.com/kubevela/workflow/pkg/errors"
providertypes "github.com/kubevela/workflow/pkg/providers/types"
)

const (
// ProviderName is provider name.
ProviderName = "builtin"
// ResumeTimeStamp is resume time stamp.
ResumeTimeStamp = "resumeTimeStamp"
// SuspendTimeStamp is suspend time stamp.
SuspendTimeStamp = "suspendTimeStamp"
)

// VarVars .
type VarVars struct {
Method string `json:"method"`
Path string `json:"path"`
Value any `json:"value"`
}

// VarReturnVars .
type VarReturnVars struct {
Value any `json:"value"`
}

// VarReturns .
type VarReturns = providertypes.Returns[VarReturnVars]

// VarParams .
type VarParams = providertypes.Params[VarVars]

// DoVar get & put variable from context.
func DoVar(_ context.Context, params *VarParams) (*VarReturns, error) {
wfCtx := params.RuntimeParams.WorkflowContext
path := params.Params.Path

switch params.Params.Method {
case "Get":
value, err := wfCtx.GetVar(strings.Split(path, ".")...)
if err != nil {
return nil, err
}
b, err := value.MarshalJSON()
if err != nil {
return nil, err
}
var v any
if err := json.Unmarshal(b, &v); err != nil {
return nil, err
}
return &VarReturns{
Returns: VarReturnVars{
Value: v,
},
}, nil
case "Put":
b, err := json.Marshal(params.Params.Value)
if err != nil {
return nil, err
}
if err := wfCtx.SetVar(cuecontext.New().CompileBytes(b), strings.Split(path, ".")...); err != nil {
return nil, err
}
return nil, nil
}
return nil, nil
}

// ActionVars .
type ActionVars struct {
Message string `json:"message,omitempty"`
}

// ActionParams .
type ActionParams = providertypes.Params[ActionVars]

// WaitVars .
type WaitVars struct {
Continue bool `json:"continue"`
ActionVars
}

// WaitParams .
type WaitParams = providertypes.Params[WaitVars]

// Wait let workflow wait.
func Wait(_ context.Context, params *WaitParams) (*any, error) {
if params.Params.Continue {
return nil, nil
}
params.Action.Wait(params.Params.Message)
return nil, errors.GenericActionError(errors.ActionWait)
}

// Break let workflow terminate.
func Break(_ context.Context, params *ActionParams) (*any, error) {
params.Action.Terminate(params.Params.Message)
return nil, errors.GenericActionError(errors.ActionTerminate)
}

// Fail let the step fail, its status is failed and reason is Action
func Fail(_ context.Context, params *ActionParams) (*any, error) {
params.Action.Fail(params.Params.Message)
return nil, errors.GenericActionError(errors.ActionTerminate)
}

// SuspendVars .
type SuspendVars struct {
Duration string `json:"duration,omitempty"`
ActionVars
}

// SuspendParams .
type SuspendParams = providertypes.Params[SuspendVars]

// Suspend let the step suspend, its status is suspending and reason is Suspend
func Suspend(_ context.Context, params *SuspendParams) (*any, error) {
pCtx := params.ProcessContext
wfCtx := params.WorkflowContext
act := params.Action
stepID := fmt.Sprint(pCtx.GetData(model.ContextStepSessionID))
timestamp := wfCtx.GetMutableValue(stepID, ResumeTimeStamp)

var msg string
if msg == "" {
msg = fmt.Sprintf("Suspended by field %s", params.FieldLabel)
}
if timestamp != "" {
t, err := time.Parse(time.RFC3339, timestamp)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp %s: %w", timestamp, err)
}
if time.Now().After(t) {
act.Resume("")
return nil, nil
}
act.Suspend(msg)
return nil, errors.GenericActionError(errors.ActionSuspend)
}
if params.Params.Duration != "" {
d, err := time.ParseDuration(params.Params.Duration)
if err != nil {
return nil, fmt.Errorf("failed to parse duration %s: %w", params.Params.Duration, err)
}
wfCtx.SetMutableValue(time.Now().Add(d).Format(time.RFC3339), stepID, ResumeTimeStamp)
}
if ts := wfCtx.GetMutableValue(stepID, params.FieldLabel, SuspendTimeStamp); ts != "" {
if act.GetStatus().Phase == v1alpha1.WorkflowStepPhaseRunning {
// if it is already suspended before and has been resumed, we should not suspend it again.
return nil, nil
}
} else {
wfCtx.SetMutableValue(time.Now().Format(time.RFC3339), stepID, params.FieldLabel, SuspendTimeStamp)
}
act.Suspend(msg)
return nil, errors.GenericActionError(errors.ActionSuspend)
}

// Message writes message to step status, note that the message will be overwritten by the next message.
func Message(_ context.Context, params *ActionParams) (*any, error) {
params.Action.Message(params.Params.Message)
return nil, nil
}

//go:embed workspace.cue
var template string

// GetTemplate returns the cue template.
func GetTemplate() string {
return template
}

// GetProviders returns the cue providers.
func GetProviders() map[string]cuexruntime.ProviderFn {
return map[string]cuexruntime.ProviderFn{
"wait": providertypes.GenericProviderFn[WaitVars, any](Wait),
"break": providertypes.GenericProviderFn[ActionVars, any](Break),
"fail": providertypes.GenericProviderFn[ActionVars, any](Fail),
"message": providertypes.GenericProviderFn[ActionVars, any](Message),
"var": providertypes.GenericProviderFn[VarVars, VarReturns](DoVar),
"suspend": providertypes.GenericProviderFn[SuspendVars, any](Suspend),
}
}
Loading
Loading