From bf9dc432f17eef674f382acb8bf7e5bfc0b75ead Mon Sep 17 00:00:00 2001 From: FogDong Date: Sun, 11 Aug 2024 20:19:06 +0800 Subject: [PATCH] feat: add new providers Signed-off-by: FogDong --- .../crds/cue.oam.dev_packages.yaml | 81 ++++ controllers/testdata/apply-object.yaml | 20 +- controllers/testdata/suspend-and-deploy.yaml | 51 +- controllers/testdata/test-apply.yaml | 75 +-- controllers/workflow_test.go | 4 +- pkg/executor/workflow_test.go | 2 +- pkg/generator/generator.go | 2 +- pkg/hooks/data_passing.go | 3 - pkg/providers/compiler.go | 42 +- pkg/providers/email/email.cue | 33 ++ pkg/providers/email/email.go | 124 +++++ pkg/providers/email/email_test.go | 130 +++++ pkg/providers/http/http.cue | 59 +++ pkg/providers/http/http.go | 242 ++++++++++ pkg/providers/http/http_test.go | 332 +++++++++++++ pkg/providers/http/ratelimiter/ratelimiter.go | 49 ++ .../http/ratelimiter/ratelimiter_test.go | 60 +++ pkg/providers/http/testdata/certs.go | 39 ++ pkg/providers/kube/kube.cue | 152 ++++++ pkg/providers/kube/kube.go | 360 ++++++++++++++ pkg/providers/kube/kube_test.go | 453 ++++++++++++++++++ pkg/providers/metrics/metrics.cue | 21 + pkg/providers/metrics/prom_check.go | 230 +++++++++ pkg/providers/metrics/prom_check_test.go | 100 ++++ pkg/providers/time/time.cue | 31 ++ pkg/providers/time/time.go | 113 +++++ pkg/providers/time/time_test.go | 131 +++++ pkg/providers/types/types.go | 46 ++ pkg/providers/util/util.cue | 59 +++ pkg/providers/util/util.go | 193 ++++++++ pkg/providers/util/util_test.go | 309 ++++++++++++ pkg/tasks/builtin/step_group.go | 4 +- pkg/tasks/custom/task.go | 12 +- pkg/tasks/custom/task_test.go | 8 +- 34 files changed, 3479 insertions(+), 91 deletions(-) create mode 100644 charts/vela-workflow/crds/cue.oam.dev_packages.yaml create mode 100644 pkg/providers/email/email.cue create mode 100644 pkg/providers/email/email.go create mode 100644 pkg/providers/email/email_test.go create mode 100644 pkg/providers/http/http.cue create mode 100644 pkg/providers/http/http.go create mode 100644 pkg/providers/http/http_test.go create mode 100644 pkg/providers/http/ratelimiter/ratelimiter.go create mode 100644 pkg/providers/http/ratelimiter/ratelimiter_test.go create mode 100644 pkg/providers/http/testdata/certs.go create mode 100644 pkg/providers/kube/kube.cue create mode 100644 pkg/providers/kube/kube.go create mode 100644 pkg/providers/kube/kube_test.go create mode 100644 pkg/providers/metrics/metrics.cue create mode 100644 pkg/providers/metrics/prom_check.go create mode 100644 pkg/providers/metrics/prom_check_test.go create mode 100644 pkg/providers/time/time.cue create mode 100644 pkg/providers/time/time.go create mode 100644 pkg/providers/time/time_test.go create mode 100644 pkg/providers/util/util.cue create mode 100644 pkg/providers/util/util.go create mode 100644 pkg/providers/util/util_test.go diff --git a/charts/vela-workflow/crds/cue.oam.dev_packages.yaml b/charts/vela-workflow/crds/cue.oam.dev_packages.yaml new file mode 100644 index 0000000..c6f8322 --- /dev/null +++ b/charts/vela-workflow/crds/cue.oam.dev_packages.yaml @@ -0,0 +1,81 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.3 + creationTimestamp: null + name: packages.cue.oam.dev +spec: + group: cue.oam.dev + names: + kind: Package + listKind: PackageList + plural: packages + shortNames: + - pkg + - cpkg + - cuepkg + - cuepackage + singular: package + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.path + name: PATH + type: string + - jsonPath: .spec.provider.protocol + name: PROTO + type: string + - jsonPath: .spec.provider.endpoint + name: ENDPOINT + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: Package is an extension for cuex engine + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: PackageSpec the spec for Package + properties: + path: + type: string + provider: + description: Provider the external Provider in Package for cuex to + run functions + properties: + endpoint: + type: string + protocol: + description: ProviderProtocol the protocol type for external Provider + type: string + required: + - endpoint + - protocol + type: object + templates: + additionalProperties: + type: string + type: object + required: + - path + - templates + type: object + required: + - spec + type: object + served: true + storage: true + subresources: {} diff --git a/controllers/testdata/apply-object.yaml b/controllers/testdata/apply-object.yaml index e8a095e..ce40919 100644 --- a/controllers/testdata/apply-object.yaml +++ b/controllers/testdata/apply-object.yaml @@ -8,16 +8,18 @@ spec: cue: template: | import ( - "vela/op" + "vela/kube" ) - apply: op.#Apply & { - value: parameter.value - cluster: parameter.cluster + apply: kube.#Apply & { + $params: { + value: parameter.value + cluster: parameter.cluster + } } parameter: { - // +usage=Specify the value of the object - value: {...} - // +usage=Specify the cluster of the object - cluster: *"" | string - } \ No newline at end of file + // +usage=Specify Kubernetes native resource object to be applied + value: {...} + // +usage=The cluster you want to apply the resource to, default is the current control plane cluster + cluster: *"" | string + } diff --git a/controllers/testdata/suspend-and-deploy.yaml b/controllers/testdata/suspend-and-deploy.yaml index 0c410a9..c5ef591 100644 --- a/controllers/testdata/suspend-and-deploy.yaml +++ b/controllers/testdata/suspend-and-deploy.yaml @@ -9,38 +9,40 @@ spec: cue: template: | import ( - "strconv" - "strings" + "vela/kube" "vela/op" ) + suspend: op.#Suspend & {duration: "1s"} - output: op.#Apply & { - cluster: parameter.cluster - value: { - apiVersion: "apps/v1" - kind: "Deployment" - metadata: { - name: context.stepName - namespace: context.namespace - } - spec: { - selector: matchLabels: "workflow.oam.dev/step-name": "\(context.name)-\(context.stepName)" - replicas: parameter.replicas - template: { - metadata: labels: "workflow.oam.dev/step-name": "\(context.name)-\(context.stepName)" - spec: containers: [{ - name: context.stepName - image: parameter.image - if parameter["cmd"] != _|_ { - command: parameter.cmd - } - }] + output: kube.#Apply & { + $params: { + cluster: parameter.cluster + value: { + apiVersion: "apps/v1" + kind: "Deployment" + metadata: { + name: context.stepName + namespace: context.namespace + } + spec: { + selector: matchLabels: "workflow.oam.dev/step-name": "\(context.name)-\(context.stepName)" + replicas: parameter.replicas + template: { + metadata: labels: "workflow.oam.dev/step-name": "\(context.name)-\(context.stepName)" + spec: containers: [{ + name: context.stepName + image: parameter.image + if parameter["cmd"] != _|_ { + command: parameter.cmd + } + }] + } } } } } wait: op.#ConditionalWait & { - continue: output.value.status.readyReplicas == parameter.replicas + continue: output.$returns.value.status.readyReplicas == parameter.replicas } parameter: { image: string @@ -48,4 +50,3 @@ spec: cluster: *"" | string cmd?: [...string] } - diff --git a/controllers/testdata/test-apply.yaml b/controllers/testdata/test-apply.yaml index c5274b9..d9f890d 100644 --- a/controllers/testdata/test-apply.yaml +++ b/controllers/testdata/test-apply.yaml @@ -7,45 +7,48 @@ spec: schematic: cue: template: | - import ( "vela/op" + import ( + "vela/kube" + "vela/op" ) - output: op.#Apply & { - value: { - apiVersion: "apps/v1" - kind: "Deployment" - metadata: { - name: context.stepName - namespace: context.namespace - } - spec: { - selector: matchLabels: wr: context.stepName - template: { - metadata: labels: wr: context.stepName - spec: containers: [{ - name: context.stepName - image: parameter.image - if parameter["cmd"] != _|_ { - command: parameter.cmd - } - if parameter["message"] != _|_ { - env: [{ - name: "MESSAGE" - value: parameter.message - }] - } - }] - } - } - } + output: kube.#Apply & { + $params: value: { + apiVersion: "apps/v1" + kind: "Deployment" + metadata: { + name: context.stepName + namespace: context.namespace + } + spec: { + selector: matchLabels: wr: context.stepName + template: { + metadata: labels: wr: context.stepName + spec: containers: [{ + name: context.stepName + image: parameter.image + if parameter["cmd"] != _|_ { + command: parameter.cmd + } + if parameter["message"] != _|_ { + env: [{ + name: "MESSAGE" + value: parameter.message + }] + } + }] + } + } + } } wait: op.#ConditionalWait & { - if len(output.value.status) > 0 if output.value.status.readyReplicas == 1 { - continue: true - } + if len(output.$returns.value.status) > 0 if output.$returns.value.status.readyReplicas == 1 { + continue: true + } } parameter: { - image: string - cmd?: [...string] - message?: string - } \ No newline at end of file + image: string + cmd?: [...string] + message?: string + } + diff --git a/controllers/workflow_test.go b/controllers/workflow_test.go index 81db30b..8c024d9 100644 --- a/controllers/workflow_test.go +++ b/controllers/workflow_test.go @@ -336,7 +336,7 @@ var _ = Describe("Test Workflow", func() { Outputs: v1alpha1.StepOutputs{ { Name: "message", - ValueFrom: `"message: " +output.value.status.conditions[0].message`, + ValueFrom: `"message: " +output.$returns.value.status.conditions[0].message`, }, }, }, @@ -416,7 +416,7 @@ var _ = Describe("Test Workflow", func() { Outputs: v1alpha1.StepOutputs{ { Name: "message", - ValueFrom: `"message: " +output.value.status.conditions[0].message`, + ValueFrom: `"message: " +output.$returns.value.status.conditions[0].message`, }, }, }, diff --git a/pkg/executor/workflow_test.go b/pkg/executor/workflow_test.go index d997cbd..b2acba9 100644 --- a/pkg/executor/workflow_test.go +++ b/pkg/executor/workflow_test.go @@ -2374,7 +2374,7 @@ func (tr *testTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptio resetter := tr.fillContext(logCtx, options.PCtx) defer resetter(options.PCtx) - basicVal, err := custom.MakeBasicValue(logCtx, providers.Compiler.Get(), nil, options.PCtx) + basicVal, err := custom.MakeBasicValue(logCtx, providers.DefaultCompiler.Get(), nil, options.PCtx) if err != nil { return v1alpha1.StepStatus{}, nil, err } diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go index 0772f66..8c16790 100644 --- a/pkg/generator/generator.go +++ b/pkg/generator/generator.go @@ -141,7 +141,7 @@ func initStepGeneratorOptions(_ monitorContext.Context, instance *types.Workflow options.TemplateLoader = template.NewWorkflowStepTemplateLoader() } if options.Compiler == nil { - options.Compiler = providers.Compiler.Get() + options.Compiler = providers.DefaultCompiler.Get() } return options } diff --git a/pkg/hooks/data_passing.go b/pkg/hooks/data_passing.go index 5b5d846..9bc2813 100644 --- a/pkg/hooks/data_passing.go +++ b/pkg/hooks/data_passing.go @@ -43,9 +43,6 @@ func Input(ctx wfContext.Context, paramValue cue.Value, step v1alpha1.WorkflowSt } } if input.ParameterKey != "" { - fmt.Println("===filledVal", filledVal) - fmt.Println("===inputValue", inputValue) - fmt.Println("====path", strings.Join([]string{"parameter", input.ParameterKey}, ".")) filledVal, err = value.SetValueByScript(filledVal, inputValue, strings.Join([]string{"parameter", input.ParameterKey}, ".")) if err != nil || filledVal.Err() != nil { if err != nil { diff --git a/pkg/providers/compiler.go b/pkg/providers/compiler.go index b5e8bf3..42a5219 100644 --- a/pkg/providers/compiler.go +++ b/pkg/providers/compiler.go @@ -17,12 +17,22 @@ limitations under the License. package providers import ( + "context" + "github.com/kubevela/pkg/cue/cuex" cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" "github.com/kubevela/pkg/util/runtime" "github.com/kubevela/pkg/util/singleton" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + "github.com/kubevela/workflow/pkg/providers/email" + "github.com/kubevela/workflow/pkg/providers/http" + "github.com/kubevela/workflow/pkg/providers/kube" "github.com/kubevela/workflow/pkg/providers/legacy" + "github.com/kubevela/workflow/pkg/providers/metrics" + "github.com/kubevela/workflow/pkg/providers/time" + "github.com/kubevela/workflow/pkg/providers/util" ) const ( @@ -30,10 +40,38 @@ const ( LegacyProviderName = "op" ) -// Compiler is the workflow default compiler -var Compiler = singleton.NewSingletonE[*cuex.Compiler](func() (*cuex.Compiler, error) { +var ( + // EnableExternalPackageForDefaultCompiler . + EnableExternalPackageForDefaultCompiler = true + // EnableExternalPackageWatchForDefaultCompiler . + EnableExternalPackageWatchForDefaultCompiler = false +) + +var compiler = singleton.NewSingletonE[*cuex.Compiler](func() (*cuex.Compiler, error) { return cuex.NewCompilerWithInternalPackages( // legacy packages runtime.Must(cuexruntime.NewInternalPackage(LegacyProviderName, legacy.GetLegacyTemplate(), legacy.GetLegacyProviders())), + + // internal packages + runtime.Must(cuexruntime.NewInternalPackage("email", email.GetTemplate(), email.GetProviders())), + runtime.Must(cuexruntime.NewInternalPackage("http", http.GetTemplate(), http.GetProviders())), + runtime.Must(cuexruntime.NewInternalPackage("kube", kube.GetTemplate(), kube.GetProviders())), + runtime.Must(cuexruntime.NewInternalPackage("metrics", metrics.GetTemplate(), metrics.GetProviders())), + runtime.Must(cuexruntime.NewInternalPackage("time", time.GetTemplate(), time.GetProviders())), + runtime.Must(cuexruntime.NewInternalPackage("util", util.GetTemplate(), util.GetProviders())), ), nil }) + +// DefaultCompiler compiler for cuex to compile +var DefaultCompiler = singleton.NewSingleton[*cuex.Compiler](func() *cuex.Compiler { + c := compiler.Get() + if EnableExternalPackageForDefaultCompiler { + if err := c.LoadExternalPackages(context.Background()); err != nil && !kerrors.IsNotFound(err) { + klog.Errorf("failed to load external packages for cuex default compiler: %s", err.Error()) + } + } + if EnableExternalPackageWatchForDefaultCompiler { + go c.ListenExternalPackages(nil) + } + return c +}) diff --git a/pkg/providers/email/email.cue b/pkg/providers/email/email.cue new file mode 100644 index 0000000..1d79c2c --- /dev/null +++ b/pkg/providers/email/email.cue @@ -0,0 +1,33 @@ +// email.cue + +#SendEmail: { + #do: "send" + #provider: "email" + + $params: { + // +usage=The info of the sender + from: { + // +usage=The address of the sender + address: string + // +usage=The alias of the sender + alias?: string + // +usage=The password of the sender + password: string + // +usage=The host of the sender server + host: string + // +usage=The port of the sender server + port: int + } + // +usgae=The email address list of the recievers + to: [...string] + // +usage=The content of the email + content: { + // +usage=The subject of the email + subject: string + // +usage=The body of the email + body: string + } + } + // this provider has no returns + ... +} diff --git a/pkg/providers/email/email.go b/pkg/providers/email/email.go new file mode 100644 index 0000000..6633dca --- /dev/null +++ b/pkg/providers/email/email.go @@ -0,0 +1,124 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package email + +import ( + "context" + _ "embed" + "fmt" + "sync" + + "gopkg.in/gomail.v2" + + cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/errors" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +const ( + // ProviderName is provider name for install. + ProviderName = "email" +) + +// Sender is the sender of email +type Sender struct { + Address string `json:"address"` + Alias string `json:"alias,omitempty"` + Password string `json:"password"` + Host string `json:"host"` + Port int `json:"port"` +} + +// Content is the content of email +type Content struct { + Subject string `json:"subject"` + Body string `json:"body"` +} + +// MailVars . +type MailVars struct { + From Sender `json:"from"` + To []string `json:"to"` + Content Content `json:"content"` +} + +// MailParams . +type MailParams = providertypes.Params[MailVars] + +var emailRoutine sync.Map + +// Send sends email +func Send(_ context.Context, params *MailParams) (res *any, err error) { + pCtx := params.ProcessContext + act := params.Action + id := fmt.Sprint(pCtx.GetData(model.ContextStepSessionID)) + routine, ok := emailRoutine.Load(id) + if ok { + switch routine { + case "success": + emailRoutine.Delete(id) + return nil, nil + case "initializing", "sending": + act.Wait("wait for the email") + return nil, errors.GenericActionError(errors.ActionWait) + default: + emailRoutine.Delete(id) + return nil, fmt.Errorf("failed to send email: %v", routine) + } + } else { + emailRoutine.Store(id, "initializing") + } + + sender := params.Params.From + content := params.Params.Content + m := gomail.NewMessage() + m.SetAddressHeader("From", sender.Address, sender.Alias) + m.SetHeader("To", params.Params.To...) + m.SetHeader("Subject", content.Subject) + m.SetBody("text/html", content.Body) + + dial := gomail.NewDialer(sender.Host, sender.Port, sender.Address, sender.Password) + go func() { + if routine, ok := emailRoutine.Load(id); ok && routine == "initializing" { + emailRoutine.Store(id, "sending") + if err = dial.DialAndSend(m); err != nil { + emailRoutine.Store(id, err.Error()) + return + } + emailRoutine.Store(id, "success") + } + }() + act.Wait("wait for the email") + return nil, errors.GenericActionError(errors.ActionWait) +} + +//go:embed email.cue +var template string + +// GetTemplate returns the template +func GetTemplate() string { + return template +} + +// GetProviders returns the provider +func GetProviders() map[string]cuexruntime.ProviderFn { + return map[string]cuexruntime.ProviderFn{ + "send": providertypes.GenericProviderFn[MailVars, any](Send), + } +} diff --git a/pkg/providers/email/email_test.go b/pkg/providers/email/email_test.go new file mode 100644 index 0000000..4b2fb0e --- /dev/null +++ b/pkg/providers/email/email_test.go @@ -0,0 +1,130 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package email + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + . "github.com/agiledragon/gomonkey/v2" + "github.com/stretchr/testify/require" + "gopkg.in/gomail.v2" + + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/cue/process" + "github.com/kubevela/workflow/pkg/errors" + "github.com/kubevela/workflow/pkg/mock" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +func TestSendEmail(t *testing.T) { + ctx := context.Background() + var dial *gomail.Dialer + pCtx := process.NewContext(process.ContextData{}) + pCtx.PushData(model.ContextStepSessionID, "test-id") + act := &mock.Action{} + + testCases := map[string]struct { + vars MailVars + from string + expectedErr error + errMsg string + }{ + "success": { + vars: MailVars{ + From: Sender{ + Address: "kubevela@gmail.com", + Alias: "kubevela-bot", + Password: "pwd", + Host: "smtp.test.com", + Port: 465, + }, + To: []string{"user1@gmail.com", "user2@gmail.com"}, + Content: Content{ + Subject: "Subject", + Body: "Test body.", + }, + }, + }, + "send-fail": { + vars: MailVars{ + From: Sender{ + Address: "kubevela@gmail.com", + Alias: "kubevela-bot", + Password: "pwd", + Host: "smtp.test.com", + Port: 465, + }, + To: []string{"user1@gmail.com", "user2@gmail.com"}, + Content: Content{ + Subject: "fail", + Body: "Test body.", + }, + }, + errMsg: "fail to send", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + r := require.New(t) + + patch := ApplyMethod(reflect.TypeOf(dial), "DialAndSend", func(_ *gomail.Dialer, _ ...*gomail.Message) error { + return nil + }) + defer patch.Reset() + + if tc.errMsg != "" { + patch.Reset() + patch = ApplyMethod(reflect.TypeOf(dial), "DialAndSend", func(_ *gomail.Dialer, _ ...*gomail.Message) error { + return fmt.Errorf(tc.errMsg) + }) + defer patch.Reset() + } + _, err := Send(ctx, &MailParams{ + Params: tc.vars, + RuntimeParams: providertypes.RuntimeParams{ + ProcessContext: pCtx, + Action: act, + }, + }) + if tc.expectedErr != nil { + r.Equal(tc.expectedErr.Error(), err.Error()) + return + } + _, ok := err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.Phase, "Wait") + + // mock reconcile + time.Sleep(time.Second) + _, err = Send(ctx, &MailParams{ + Params: tc.vars, + RuntimeParams: providertypes.RuntimeParams{ + ProcessContext: pCtx, + Action: act, + }, + }) + if tc.errMsg != "" { + r.Contains(err.Error(), tc.errMsg) + } + }) + } +} diff --git a/pkg/providers/http/http.cue b/pkg/providers/http/http.cue new file mode 100644 index 0000000..c570ac8 --- /dev/null +++ b/pkg/providers/http/http.cue @@ -0,0 +1,59 @@ +// http.cue + +#HTTPDo: { + #do: "do" + #provider: "http" + + $params: { + // +usage=The method of HTTP request + method: *"GET" | "POST" | "PUT" | "DELETE" + // +usage=The url to request + url: string + // +usage=The request config + request?: { + // +usage=The timeout of this request + timeout?: string + // +usage=The request body + body?: string + // +usage=The header of the request + header?: [string]: string + // +usage=The trailer of the request + trailer?: [string]: string + // +usage=The rate limiter of the request + ratelimiter?: { + limit: int + period: string + } + ... + } + // +usgae=The tls config of the request + tls_config?: { + secret: string + namespace: context.namespace + } + } + + $returns?: { + // +usage=The response of the request will be filled in this field after the action is executed + response: { + // +usage=The body of the response + body: string + // +usage=The header of the response + header?: [string]: [...string] + // +usage=The trailer of the response + trailer?: [string]: [...string] + // +usage=The status code of the response + statusCode: int + ... + } + } + ... +} + +#HTTPGet: #HTTPDo & {method: "GET"} + +#HTTPPost: #HTTPDo & {method: "POST"} + +#HTTPPut: #HTTPDo & {method: "PUT"} + +#HTTPDelete: #HTTPDo & {method: "DELETE"} diff --git a/pkg/providers/http/http.go b/pkg/providers/http/http.go new file mode 100644 index 0000000..f6069f0 --- /dev/null +++ b/pkg/providers/http/http.go @@ -0,0 +1,242 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package http + +import ( + "context" + "crypto/tls" + "crypto/x509" + _ "embed" + "encoding/base64" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + + "github.com/kubevela/workflow/pkg/providers/legacy/http/ratelimiter" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +const ( + // ProviderName is provider name for install. + ProviderName = "http" +) + +var ( + rateLimiter *ratelimiter.RateLimiter +) + +func init() { + rateLimiter = ratelimiter.NewRateLimiter(128) +} + +// Request . +type Request struct { + Timeout string `json:"timeout,omitempty"` + Body string `json:"body,omitempty"` + Header map[string]string `json:"header,omitempty"` + Trailer map[string]string `json:"trailer,omitempty"` + RateLimiter *RateLimiter `json:"rateLimiter,omitempty"` +} + +// RateLimiter . +type RateLimiter struct { + Limit int `json:"limit"` + Period string `json:"period"` +} + +// TLSConfig . +type TLSConfig struct { + Secret string `json:"secret"` + Namespace string `json:"namespace"` +} + +// RequestVars is the vars for http request +type RequestVars struct { + Method string `json:"method"` + URL string `json:"url"` + Request *Request `json:"request,omitempty"` + TLSConfig *TLSConfig `json:"tls_config,omitempty"` +} + +// ResponseVars is the vars for http response +type ResponseVars struct { + Body string `json:"body"` + Header http.Header `json:"header,omitempty"` + Trailer http.Header `json:"trailer,omitempty"` + StatusCode int `json:"statusCode"` +} + +// DoParams is the params for http request +type DoParams = providertypes.Params[RequestVars] + +// DoReturns is the returns for http response +type DoReturns = providertypes.Returns[ResponseVars] + +// Do process http request. +func Do(ctx context.Context, params *DoParams) (*DoReturns, error) { + return runHTTP(ctx, params) +} + +func runHTTP(ctx context.Context, params *DoParams) (*DoReturns, error) { + var ( + err error + header, trailer http.Header + reader io.Reader + ) + defaultClient := &http.Client{ + Transport: http.DefaultTransport, + Timeout: time.Second * 3, + } + method := params.Params.Method + url := params.Params.URL + if request := params.Params.Request; request != nil { + if request.Timeout != "" { + timeout, err := time.ParseDuration(request.Timeout) + if err != nil { + return nil, fmt.Errorf("invalid timeout %s: %w", timeout, err) + } + defaultClient.Timeout = timeout + } + if request.RateLimiter != nil { + period, err := time.ParseDuration(request.RateLimiter.Period) + if err != nil { + return nil, fmt.Errorf("invalid period %s: %w", period, err) + } + if !rateLimiter.Allow(fmt.Sprintf("%s-%s", method, strings.Split(url, "?")[0]), request.RateLimiter.Limit, period) { + return nil, errors.New("request exceeds the rate limiter") + } + } + reader = strings.NewReader(request.Body) + header = parseHeaders(request.Header) + trailer = parseHeaders(request.Trailer) + } + if len(header) == 0 { + header = map[string][]string{} + header.Set("Content-Type", "application/json") + } + + req, err := http.NewRequestWithContext(context.Background(), method, url, reader) + if err != nil { + return nil, err + } + req.Header = header + req.Trailer = trailer + + if params.Params.TLSConfig != nil { + if tr, err := getTransport(ctx, params.KubeClient, params.Params.TLSConfig.Secret, params.Params.TLSConfig.Namespace); err == nil && tr != nil { + defaultClient.Transport = tr + } + } + + resp, err := defaultClient.Do(req) + if err != nil { + return nil, err + } + //nolint:errcheck + defer resp.Body.Close() + b, _ := io.ReadAll(resp.Body) + // parse response body and headers + return &DoReturns{ + Returns: ResponseVars{ + Body: string(b), + Header: resp.Header, + Trailer: resp.Trailer, + StatusCode: resp.StatusCode, + }, + }, nil +} + +func getTransport(ctx context.Context, cli client.Client, secretName, ns string) (http.RoundTripper, error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + NextProtos: []string{"http/1.1"}, + }, + } + objectKey := client.ObjectKey{ + Namespace: ns, + Name: secretName, + } + index := strings.Index(secretName, "/") + if index > 0 { + objectKey.Namespace = secretName[:index-1] + objectKey.Name = secretName[index:] + } + secret := new(v1.Secret) + if err := cli.Get(ctx, objectKey, secret); err != nil { + return nil, err + } + if ca, ok := secret.Data["ca.crt"]; ok { + caData, err := base64.StdEncoding.DecodeString(string(ca)) + if err != nil { + return nil, err + } + pool := x509.NewCertPool() + pool.AppendCertsFromPEM(caData) + tr.TLSClientConfig.RootCAs = pool + } + var err error + var certData, keyData []byte + if clientCert, ok := secret.Data["client.crt"]; ok { + certData, err = base64.StdEncoding.DecodeString(string(clientCert)) + if err != nil { + return nil, err + } + } + if clientKey, ok := secret.Data["client.key"]; ok { + keyData, err = base64.StdEncoding.DecodeString(string(clientKey)) + if err != nil { + return nil, err + } + } + cliCrt, err := tls.X509KeyPair(certData, keyData) + if err != nil { + return nil, errors.WithMessage(err, "parse client keypair") + } + tr.TLSClientConfig.Certificates = []tls.Certificate{cliCrt} + return tr, nil +} + +func parseHeaders(obj map[string]string) http.Header { + h := http.Header{} + for k, v := range obj { + h.Add(k, v) + } + return h +} + +//go:embed http.cue +var template string + +// GetTemplate returns the cue template +func GetTemplate() string { + return template +} + +// GetProviders returns the providers +func GetProviders() map[string]cuexruntime.ProviderFn { + return map[string]cuexruntime.ProviderFn{ + "do": providertypes.GenericProviderFn[RequestVars, DoReturns](Do), + } +} diff --git a/pkg/providers/http/http_test.go b/pkg/providers/http/http_test.go new file mode 100644 index 0000000..85a9bf1 --- /dev/null +++ b/pkg/providers/http/http_test.go @@ -0,0 +1,332 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package http + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/crossplane/crossplane-runtime/pkg/test" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubevela/workflow/pkg/providers/legacy/http/ratelimiter" + "github.com/kubevela/workflow/pkg/providers/legacy/http/testdata" + "github.com/kubevela/workflow/pkg/providers/types" +) + +func TestHttpDo(t *testing.T) { + shutdown := make(chan struct{}) + runMockServer(shutdown) + defer func() { + close(shutdown) + }() + ctx := context.Background() + + testCases := map[string]struct { + request RequestVars + expected ResponseVars + expectedErr string + }{ + "hello": { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/hello", + Request: &Request{ + Timeout: "2s", + }, + }, + expected: ResponseVars{ + Body: "hello", + StatusCode: 200, + }, + }, + + "echo": { + request: RequestVars{ + Method: "POST", + URL: "http://127.0.0.1:1229/echo", + Request: &Request{ + Body: "I am vela", + Header: map[string]string{ + "Content-Type": "text/plain; charset=utf-8", + }, + }, + }, + expected: ResponseVars{ + Body: "I am vela", + StatusCode: 200, + }, + }, + "json": { + request: RequestVars{ + Method: "POST", + URL: "http://127.0.0.1:1229/echo", + Request: &Request{ + Body: `{"name":"foo","score":100}`, + Header: map[string]string{ + "Content-Type": "text/plain; charset=utf-8", + }, + }, + }, + expected: ResponseVars{ + Body: `{"name":"foo","score":100}`, + StatusCode: 200, + }, + }, + "timeout": { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/timeout", + Request: &Request{ + Timeout: "1s", + }, + }, + expected: ResponseVars{ + Body: `{"name":"foo","score":100}`, + StatusCode: 200, + }, + expectedErr: "context deadline exceeded", + }, + "not-timeout": { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/timeout", + Request: &Request{ + Timeout: "3s", + }, + }, + expected: ResponseVars{ + Body: "hello", + StatusCode: 200, + }, + }, + "notfound": { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/notfound", + Request: &Request{ + Timeout: "1s", + }, + }, + expected: ResponseVars{ + StatusCode: 404, + }, + }, + } + + for tName, tc := range testCases { + r := require.New(t) + res, err := Do(ctx, &DoParams{ + Params: tc.request, + }) + if tc.expectedErr != "" { + r.Error(err) + r.Contains(err.Error(), tc.expectedErr) + continue + } + r.NoError(err, tName) + r.Equal(res.Returns.Body, tc.expected.Body, tName) + r.Equal(res.Returns.StatusCode, tc.expected.StatusCode, tName) + } + + // test ratelimiter + rateLimiter = ratelimiter.NewRateLimiter(1) + limiterTestCases := []struct { + request RequestVars + expectedErr string + }{ + { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/hello", + Request: &Request{ + RateLimiter: &RateLimiter{ + Limit: 1, + Period: "1m", + }, + }, + }, + }, + { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/hello?query=1", + Request: &Request{ + RateLimiter: &RateLimiter{ + Limit: 1, + Period: "1m", + }, + }, + }, + expectedErr: "request exceeds the rate limiter", + }, + { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/echo", + Request: &Request{ + RateLimiter: &RateLimiter{ + Limit: 1, + Period: "1m", + }, + }, + }, + }, + { + request: RequestVars{ + Method: "GET", + URL: "http://127.0.0.1:1229/hello?query=2", + Request: &Request{ + RateLimiter: &RateLimiter{ + Limit: 1, + Period: "1m", + }, + }, + }, + }, + } + + for tName, tc := range limiterTestCases { + r := require.New(t) + _, err := Do(ctx, &DoParams{ + Params: tc.request, + }) + if tc.expectedErr != "" { + r.Error(err) + r.Contains(err.Error(), tc.expectedErr) + continue + } + r.NoError(err, tName) + } +} + +func runMockServer(shutdown chan struct{}) { + http.HandleFunc("/timeout", func(w http.ResponseWriter, req *http.Request) { + time.Sleep(time.Second * 2) + _, _ = w.Write([]byte("hello")) + }) + http.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) { + _, _ = w.Write([]byte("hello")) + }) + http.HandleFunc("/echo", func(w http.ResponseWriter, req *http.Request) { + bt, _ := io.ReadAll(req.Body) + _, _ = w.Write(bt) + }) + http.HandleFunc("/notfound", func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(404) + }) + srv := &http.Server{Addr: ":1229"} + go srv.ListenAndServe() //nolint + go func() { + <-shutdown + srv.Close() + }() + + client := &http.Client{} + // wait server started. + for { + time.Sleep(time.Millisecond * 300) + req, _ := http.NewRequest("GET", "http://127.0.0.1:1229/hello", nil) + _, err := client.Do(req) + if err == nil { + break + } + } +} + +func TestHTTPSDo(t *testing.T) { + ctx := context.Background() + s := newMockHttpsServer() + defer s.Close() + cli := &test.MockClient{ + MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + secret := obj.(*v1.Secret) + *secret = v1.Secret{ + Data: map[string][]byte{ + "ca.crt": []byte(testdata.MockCerts.Ca), + "client.crt": []byte(testdata.MockCerts.ClientCrt), + "client.key": []byte(testdata.MockCerts.ClientKey), + }, + } + return nil + }, + } + r := require.New(t) + _, err := Do(ctx, &DoParams{ + Params: RequestVars{ + Method: "GET", + URL: "https://127.0.0.1:8443/api/v1/token?val=test-token", + TLSConfig: &TLSConfig{ + Secret: "certs", + Namespace: "default", + }, + }, + RuntimeParams: types.RuntimeParams{ + KubeClient: cli, + }, + }) + r.NoError(err) +} + +func newMockHttpsServer() *httptest.Server { + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + fmt.Printf("Expected 'GET' request, got '%s'", r.Method) + } + if r.URL.EscapedPath() != "/api/v1/token" { + fmt.Printf("Expected request to '/person', got '%s'", r.URL.EscapedPath()) + } + _ = r.ParseForm() + token := r.Form.Get("val") + tokenBytes, _ := json.Marshal(map[string]interface{}{"token": token}) + + w.WriteHeader(http.StatusOK) + _, _ = w.Write(tokenBytes) + })) + l, _ := net.Listen("tcp", "127.0.0.1:8443") + ts.Listener.Close() + ts.Listener = l + + decode := func(in string) []byte { + out, _ := base64.StdEncoding.DecodeString(in) + return out + } + + pool := x509.NewCertPool() + pool.AppendCertsFromPEM(decode(testdata.MockCerts.Ca)) + cert, _ := tls.X509KeyPair(decode(testdata.MockCerts.ServerCrt), decode(testdata.MockCerts.ServerKey)) + ts.TLS = &tls.Config{ + ClientCAs: pool, + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"http/1.1"}, + } + ts.StartTLS() + return ts +} diff --git a/pkg/providers/http/ratelimiter/ratelimiter.go b/pkg/providers/http/ratelimiter/ratelimiter.go new file mode 100644 index 0000000..b4f7f5e --- /dev/null +++ b/pkg/providers/http/ratelimiter/ratelimiter.go @@ -0,0 +1,49 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimiter + +import ( + "time" + + "github.com/golang/groupcache/lru" + "golang.org/x/time/rate" +) + +// RateLimiter is the rate limiter. +type RateLimiter struct { + store *lru.Cache +} + +// NewRateLimiter returns a new rate limiter. +func NewRateLimiter(len int) *RateLimiter { + store := lru.New(len) + store.Clear() + return &RateLimiter{store: store} +} + +// Allow returns true if the operation is allowed. +func (rl *RateLimiter) Allow(id string, limit int, duration time.Duration) bool { + if l, ok := rl.store.Get(id); ok { + limiter := l.(*rate.Limiter) + if limiter.Limit() == rate.Every(duration) && limiter.Burst() == limit { + return limiter.Allow() + } + } + limiter := rate.NewLimiter(rate.Every(duration), limit) + rl.store.Add(id, limiter) + return limiter.Allow() +} diff --git a/pkg/providers/http/ratelimiter/ratelimiter_test.go b/pkg/providers/http/ratelimiter/ratelimiter_test.go new file mode 100644 index 0000000..a86630b --- /dev/null +++ b/pkg/providers/http/ratelimiter/ratelimiter_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimiter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRateLimiter(t *testing.T) { + rl := NewRateLimiter(2) + r := require.New(t) + duration := time.Second + testCases := []struct { + id string + limit int + expected bool + }{ + { + id: "1", + limit: 2, + }, + { + id: "2", + limit: 2, + }, + { + id: "3", + limit: 2, + }, + { + id: "2", + limit: 3, + }, + } + for _, tc := range testCases { + for i := 0; i < tc.limit; i++ { + allow := rl.Allow(tc.id, tc.limit, duration) + r.Equal(true, allow) + } + allow := rl.Allow(tc.id, tc.limit, duration) + r.Equal(false, allow) + } +} diff --git a/pkg/providers/http/testdata/certs.go b/pkg/providers/http/testdata/certs.go new file mode 100644 index 0000000..20fe36b --- /dev/null +++ b/pkg/providers/http/testdata/certs.go @@ -0,0 +1,39 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testdata + +var ( + _ = `LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcEFJQkFBS0NBUUVBcHRMN21JQmdaaDVEd1FYYkJXaHRSeURGK01qaytQeStNU1Vyck5pc1EyeXF4cnphCjhEeWw5a0pRWW5oMnVYVFV1RnBzbDRhM1J5dEJaVkdrMDNQT0RXOWJIblR2QUNPZHJjMnR2WmR5ZXRXU1ZtQ2cKOHhuc3Y3WHVQS0VGb0VwakVaMDdCWjY2blFIRDg2MHFMeGFGRWtMNHk2MzU5SThWVlRBYk5RejVPQ3dmM29mUQpMN0JPL2RVNUJtRTNXTDhhVHF3SXRSa0hJeE5pWCs4OWU2Z3dCY3RHdUZLR3ZacFhGaW1VeXA1Y0crVWI2RzkyCi9KUTZJWm45dGFIZ3NFYWIvWUNwZ2U1Rkp5WVR1dzVlakhRajRYNVh3ZVRKU0tsN0UwUmZhMjl5VnM5aXdhNDQKcmVNSzVXR2hVUFl6T1o0MURGZnU1MmJnMjVPODF6QWJFSFpLUndJREFRQUJBb0lCQUNUVUJ2OFB1RGhURGhvYQp0Tk5vemxjWmdSci9IcTFvL29QUzlPVmZvQWZ5Z1hFR1dEOFk1SHFOQVRuNzVobmpGT0x0ODNNd0psM3J5ckFYCmFnL1VUUFRpVkhkUTBVSnltbWk0TTFiYmpFWlp4OGlSNUhaR2p1Rnp4SGhXQSt2ekFCUHZaZ3hEa21iKzhNZG0KdngxT0YycUVwbkF3cERHOU5MUnR2bFBqM1ZEczhVODU2c2hWeDdBdFE3RGJUWkQwdEpsQ0pzTzR5TitjL1oxOApiRzJKNDB2RWFLalVGTE9HNitScE43NEZLeGtvOFJJejZxeERQMk5VMUg1ajVVVi9tZXdRdDBsRTNqbEc5MmcvCnVwTngyK0xnYUkrMWhCR3AzV2prQlRWcWloZWxrUk5XZkNLczdXOHJtYk83V3MvK2cwcVNidnAvUjBWQWpQd0MKdGt4SENFRUNnWUVBM2s3K0hOVkNZY0YxN2k2ZTJnNTJDVHV0cDJBYzkvSUNMdVJzNGFRZlB4MkxFc2VDalJnNgovaHNsOGpLbmRDS1JQdTBJbkoxckF4NzVrZXBKZWpWcTBIbkFEN2VtcVhuMDN0UjJmb3hvbkxBOEtQMzdSSnJqClhlZ0k5NiswWUU3QUY5dWZqQVhPeXpFU3RQVkNSVDlJOFRMSlEwRFhraW56bDhVUm5aZ1RjdmtDZ1lFQXdCdFYKLzNnbFR5Z0syNTFpMS9FakdrK3I3THF5NzdCY29LVzZHTm91K0FiQ3gxalhZVE1URDNTRXVyMzBueHB6VWNkdgpIbEI1NkI2Q1JmRkdXN0o1U0tkeXI5WmhQUUtITUQ1TkZhbm00S1F4NmZmVFhubExRdnhhT2c2TFRnTDRSdjFyCjVaeUdEbDhBKzRRckpNVk1OOTZOVEY1VDB0TXRUaHlIVnpLbHR6OENnWUJ3Q3BQYjZFZUtpVHhzakthVzg4N2QKbkd4Sy9RL2NqdVkyeC8xd1E0MVQvQW5KcnkvRytMMVNzRkFSbnlIeVVER3Y2enI1NUFTNUQvVnNhdzRaUDY3VAozMmpEQXlaR0tDY1gzekRSV3VhbWdkUHdQUUZVZEZPL1VtQ2lwTFZlREpLWDg2S1hxWjJ0bnMvMHo5OVVreTZxCkVaU0tCclllL25HOHZoL0FzNUtwMFFLQmdRQzFxT1BncWFkMk8rSlFuSHE4d3UwejAwVTduYXpabFlkeDdtV1YKWExUdm04MFNuME5FU2Z6ckwzN1g3QXJuYlNiQm5YckpTc2FNcGxVQWVORFVvMmVuT1pqdENDZDVmdXVCeGxnMApkUzY3SE9tS1d1ekl1S0JmM3F3Zm5HTkV5UEFvaVRvL3JZempDQm13dmVIaWFxUFJiU1Ztb3doWEk1VUMrVjFPCktybWtGd0tCZ1FEVERDWlg1WWQ5ZUdXZG1OM3pUU2Z6YkRrRkxqZkYyYTVBK2lDL281TmoyVmpHRG4xTjRvVUwKajF0dVZLb0xoVjhVZzd0Lzc4V0V0UkRnK1p3QVZhSW84bE1zU244dDVQNFFrY2pkSDI4bHpFaTQwWHpxQkF0Lwpoalppb1pNN2ZHUmJWK29yakZSQ2tZWnNaMUdua2FrbG5Mdk4vYVRuM25HV2tEZjFaZGM0YVE9PQotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQ==` //ca.key + caCrt = `LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNyakNDQVpZQ0NRRDV0K3E3ZkswSlVEQU5CZ2txaGtpRzl3MEJBUXNGQURBWk1SY3dGUVlEVlFRRERBNHEKTG10bGRXSmxkbVZzWVM1cGJ6QWVGdzB5TVRFeE1qSXdPRFEyTlRsYUZ3MHpOVEE0TURFd09EUTJOVGxhTUJreApGekFWQmdOVkJBTU1EaW91YTJWMVltVjJaV3hoTG1sdk1JSUJJakFOQmdrcWhraUc5dzBCQVFFRkFBT0NBUThBCk1JSUJDZ0tDQVFFQXB0TDdtSUJnWmg1RHdRWGJCV2h0UnlERitNamsrUHkrTVNVcnJOaXNRMnlxeHJ6YThEeWwKOWtKUVluaDJ1WFRVdUZwc2w0YTNSeXRCWlZHazAzUE9EVzliSG5UdkFDT2RyYzJ0dlpkeWV0V1NWbUNnOHhucwp2N1h1UEtFRm9FcGpFWjA3Qlo2Nm5RSEQ4NjBxTHhhRkVrTDR5NjM1OUk4VlZUQWJOUXo1T0N3ZjNvZlFMN0JPCi9kVTVCbUUzV0w4YVRxd0l0UmtISXhOaVgrODllNmd3QmN0R3VGS0d2WnBYRmltVXlwNWNHK1ViNkc5Mi9KUTYKSVpuOXRhSGdzRWFiL1lDcGdlNUZKeVlUdXc1ZWpIUWo0WDVYd2VUSlNLbDdFMFJmYTI5eVZzOWl3YTQ0cmVNSwo1V0doVVBZek9aNDFERmZ1NTJiZzI1TzgxekFiRUhaS1J3SURBUUFCTUEwR0NTcUdTSWIzRFFFQkN3VUFBNElCCkFRQWJ1dGZjbENOZVhTaldBR0NSb2tyTVN2Z0VvMlZEdnE4Y1lEN3hIT3gzRllQRWE0Rk01VC9uSXVsNGJxSCsKY09mOCtMOTZXTGNUUnpNRnhrMmNKT2VKV3hFMDkzcDN2dHRZMFUrOGZ4T1FIY3JxK1N3U0dPTUpWTHhEcGtPNApscFVpc0JYOENGQld5VG9vN05WRy9FZGRVS1FHa2ttaGJMdXJIZStHTnFmT0VpS01GYm1PRHBzZ1Zqc0oxK2hPCjZDWG8zZW01Mlh4eVZqbGtoNzBJK29UMW5PeGFYSEhwK0NNT2JPSXkzcFhMejJROWNmRU1uTlZrVTBDMmFaeksKS1ViMGZXOTlpbjBJRmlUd0NkQlhRTlRpMzh6bVEvUUlEYlJEQTJFREtMa2pZRzdUUFR4Wm9xL04rQUQ3ZElLWQpyaE56TXB6cGhhRGR4Ymt3cmlHRGQ5TEkKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQ==` + serverKey = `LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcFFJQkFBS0NBUUVBemdMMTJpL0gwaXk5MGhLb0sySUwrbnIvRDB0RGlPdVlwYk5NUktCTTJyMGtCblhCClJUdU1peWNWMUd5OGlwQ1FLMmwwV1E3dDJpTStLT1BBTFYxQ0thNEtpTlg5M015cWlmVFVBc3YzemRtNkhiZFMKa3owcUlOYTBoZTBaclA3M3g0YWZHRmJ2SXRwU1pWQThwSUhrc281SHFhcTRndTcwVUhRSlhZcFIrRjlKY3l3eAppWTZ4S0x6QWNGcjlsdktZdlVaWitRR1FUbWh4TFFtKzI5cUJsZXIvRGpEQzlkZjM2blFkSU5wdUpIak1tZXI2CnZQVU5CekQ5OWliT3NpYVBxMlo1VHRuY3U0TlRzKzlLN09BWnhLK1d4VVp6c3pKbnhHNjgxR010MXJ1anBIVlMKSGIrYmh2ek9aRmt6aFhWVE5wL0FYMElNdy92K3hpZzl3WXkyK1FJREFRQUJBb0lCQVFESmFObUdWRnB1NERGQgpGZDUyYzZnMFhsWEpWUk1VNVFsYlR2MU14cy84dHhobWZHL1ZTUS94NStlT3hEUmM0Rk1qTGptQzdIYWNZd0pkCnBiVDRaUW5QaUFsaW1KeFdaMzUvMiszL1Fmem1zMndqcTF3KytYaWJuRzNuMWRQWmIzaytDQjY1QkIxT0hOYWIKbUtPQlRrRVNWTW81VmVDSW1pZ2dGQ0luNHBpYlVvSXpHcmdabVRsTmlQTjFPb2FNNk9IcEorZDVGNDFNdGYwbgpJYjAyTVYzdUtZb1hqUWtFYytBL3B3WDJ0TVlGUTMza3NwTlZjSURnYU4zUWRsb3IxQXhDN0xQaStKeTZNelY1CmJ4VDhlcFhZN2VmUFhqblR0VmVHQWFUNThJSFdWS29ncmt6V21xdHROMUtzc2RBWGpja1NWRksxL0U5V1c4Q28KWDBTa3VpUFJBb0dCQVA1SERCSDZkRURqbVp0TVVFeHEwNWlwcWxSbFhnYUdYT01lT0VIZ1VSVzhEdnJiNC9zLwo4cEswUWZCUUpkYTIwT1d0dVprSDNYWEVnNktjL0NyMWZIQU9uNElmOFI1NlJNRUpWUnhvTzlBZWZVem9nZjAvCndWUlpmZmRUTkhKV2E3VExJaVdNdXpmY2k0bTdldnZkNkpUS3lzTlRvREJuemM0ZFpzcGhxWHh0QW9HQkFNOW8KTnBqSmxsZDBrUENicFRtRjZMRGpBMzAvaS9DRzlVNEoxaUFrQno3SzA2TU9ENDA3TlZ5QTV0V2p5QjlSTThnbwpMcjZhK1g3MDN2YWVMZDZQcDY2UnlDbzU4RWFhcFk2SFQvTGVNSGRzUllEdk9PU2pOT0FtbkJxcFFVR1Z2ZVhTClpCN2srclpVK3g2UktEblFod3crZGsveUZkZmJ5VmxhTDRKVFZiVTlBb0dCQVBOYUNYbzNTUVZGRGFBc0EvbHUKajMxZWUwMzBDVzJUTDlpSTlteE5neXlhNDNjLzlNdGpZd0wyRXRrcnkxclhjY3N1WFI3UkFTaVJYeTNFc2kxbQo3YVhNeU9sZktvTHhuMVZqV2hvcXczdWxnbU9WYmJweVJ0TTBKck1KNVhxN3JLN0ZiYk9rSVJVUU5GY25uMGJuCkZJMDZHNTJlTGdQRmhKaUxXUEc5VDlodEFvR0JBS1JrWEluamxqZEJYRFJwbVo4clpWRDJ6bWd5dXc5dFdQZCsKMG1wdFJCVGdITGtyeHVYUlhTMHh1a1R4YVFoeGkxS0ZqdTlpMUlodFBHQks1ZDUzREpoUVVsQXQxaVdRSTlNQgpxenU4SXJ3MVpDMmE3d1JCM0FJaWVDNmxvdVNCOUo4NWtFUHdpRXVHdGZmM1krUFhSWU5ONnViWTRibFRLcGVZCjVQa3Vaa3VkQW9HQVozTHg0UGNnK3RSR0dsclR0WllXZmNxT3FDcUIya0NCa2tDVnlvWW9qSzM4a21CNVJ0QmQKZzBnMTc5TGdUTWE0Y0ZWRUhyZC9xU2RRQVZiTmxyRDh4MjJNYnJMam1aa05aOXQ1alpJSWQwRVRxYTJBNC8rNApOV05HeU12b0ttTmMxcFd2UEJiT1R1RHp2WEg3YnZzbXAzallucjJQU09WaFU1RGdJRHpEQTJBPQotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQ==` + serverCrt = `LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMzekNDQWNlZ0F3SUJBZ0lKQU55SVoyTElxQTNqTUEwR0NTcUdTSWIzRFFFQkJRVUFNQmt4RnpBVkJnTlYKQkFNTURpb3VhMlYxWW1WMlpXeGhMbWx2TUI0WERUSXhNVEV5TWpBNU16UXpORm9YRFRNMU1EZ3dNVEE1TXpRegpORm93R0RFV01CUUdBMVVFQXd3TktpNXJkV0psZG1Wc1lTNXBiekNDQVNJd0RRWUpLb1pJaHZjTkFRRUJCUUFECmdnRVBBRENDQVFvQ2dnRUJBTTRDOWRvdng5SXN2ZElTcUN0aUMvcDYvdzlMUTRqcm1LV3pURVNnVE5xOUpBWjEKd1VVN2pJc25GZFJzdklxUWtDdHBkRmtPN2RvalBpamp3QzFkUWltdUNvalYvZHpNcW9uMDFBTEw5ODNadWgyMwpVcE05S2lEV3RJWHRHYXorOThlR254aFc3eUxhVW1WUVBLU0I1TEtPUjZtcXVJTHU5RkIwQ1YyS1VmaGZTWE1zCk1ZbU9zU2k4d0hCYS9aYnltTDFHV2ZrQmtFNW9jUzBKdnR2YWdaWHEvdzR3d3ZYWDkrcDBIU0RhYmlSNHpKbnEKK3J6MURRY3cvZlltenJJbWo2dG1lVTdaM0x1RFU3UHZTdXpnR2NTdmxzVkdjN015WjhSdXZOUmpMZGE3bzZSMQpVaDIvbTRiOHptUlpNNFYxVXphZndGOUNETVA3L3NZb1BjR010dmtDQXdFQUFhTXJNQ2t3Q1FZRFZSMFRCQUl3CkFEQUxCZ05WSFE4RUJBTUNCZUF3RHdZRFZSMFJCQWd3Qm9jRWZ3QUFBVEFOQmdrcWhraUc5dzBCQVFVRkFBT0MKQVFFQUJNbUhSZG4rS043QWZTL3JicHI2dGx6SHBoRFJud29KR0NxSkZYZjdabUN1TEF3NzVsTlhxOUxka2NJeApSZXhleXk2cnk2SmF6RGN4OVltWHVnZzFtTTlrWE5kTmc0NmVSangzRk4vL2FRUFJOMHNuTDVOaXRyM0kvdEJmCkxNdlduUisrQ2tZSnFtM1NuTnRicVR0cDhodTZKWnVRUVh2WWM0ZEg5VmJRM2d3UzFSUzdhQ0RBZHlLZEhnSFQKZmN3VnNqZmk2TzhLdlROaG43aU1LWERZQUhiWXh3ekpsdjBEWFhuZjhmRlo3U09FT3VkbGM5Y3hOQW0xVlBjZAo1YXcwR0hvbWMxNTdHU244UmpWenlJOHRPdGE4WU9uaU9SOE5qNElzMExRQXF3VjJ1OU52Zkg3bkJMUjg3d1Z2CmFxUmxaWE5uOG5qcWgwdzZkc3BUWjRwQWFBPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQ==` + clientCrt = `LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUN5VENDQWJHZ0F3SUJBZ0lKQU55SVoyTElxQTNpTUEwR0NTcUdTSWIzRFFFQkJRVUFNQmt4RnpBVkJnTlYKQkFNTURpb3VhMlYxWW1WMlpXeGhMbWx2TUI0WERUSXhNVEV5TWpBNE5UVTFNRm9YRFRNMU1EZ3dNVEE0TlRVMQpNRm93RmpFVU1CSUdBMVVFQXd3TGRHVnpkRjlzYVdwcFlXNHdnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCCkR3QXdnZ0VLQW9JQkFRQzhZNGU1ZElYd3FaL1dWOFVLMEhVc2toRlRmTytxMC96VU9CZVl0amplNFQrTmhlc2oKS0tkLzhWLzBYZTY5ck5CYXE3RVBhYUFReDVYYTF1aGh2SWRXU2F2QWhmaTgwd3lXSy9reUtKR0xsaTZleTZtLwpjZkdxRVJ5eUtnc2NoZWFrQ3dCajArZkxnVUZ5aVM0MzVubnVZZ2Y0MXFmRVJ1azREUExDVk9uRE13dXBmQkdyClNacmY2U0R3Z0V3SnV2QVlhQ0owcE91NkovRzhkMVNOb3I4UFlGQjBFbGl2d2ZESk9CRjFvMEg2elYzMHZ1bVoKRzM0anpoNjh3aThPNG1TZjZCbi9XMlEyckNvY1FlSE9nUVRGcHVtdC9qYy9lMnhlRFpOd3RpS25OMTdWa3o1QwpXVGxpUEZuQlRWTWZCeHEyOGNIemtzaTVBRDZ5bVlRMkNzcTVBZ01CQUFHakZ6QVZNQk1HQTFVZEpRUU1NQW9HCkNDc0dBUVVGQndNQ01BMEdDU3FHU0liM0RRRUJCUVVBQTRJQkFRQnpKRnFlUk9BRDZ0ZkVrNElsNGxvbDI4OGIKaFZMMnRXdThXbGtDdHFQaFNOR0hkeWJQcGdLL3dCajQzS3FGcFRMVGo0TStDT0cwR08xZDVMK1lROHdHOHJGQQpHWTd3ZndQLzRlenpzSzNocmI5NnNpdm04TUZqdXRzSEdzenFWRkZ0UXBNWkhBTm5FQXY0ZkxGSEtQM0ZubmkyCnpjYmwrVXNQWFk3QU5NelpOelIwQVdLWmxwbm5hMUpuQWtzQnBBTzlweFRKOU55MzhVNlc0SERrN2gyVk5BUHAKbGpxRmNoYXdjTkN1MDIzV2hhWWxuNGowTG9NRlh0NDJNMXgxL2R4SnQxNUlnNFB5LysrbmZRbmtvN09vSmVpVAppb0lNc3VBcmNJaG1MSU8zZzFTNVJtNzJ6NDUwSXV0blFWQUc3MVQ0alZyR3libHhnMWpGVjFXWHJ1V2MKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQ==` + clientKey = `LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcEFJQkFBS0NBUUVBdkdPSHVYU0Y4S21mMWxmRkN0QjFMSklSVTN6dnF0UDgxRGdYbUxZNDN1RS9qWVhyCkl5aW5mL0ZmOUYzdXZhelFXcXV4RDJtZ0VNZVYydGJvWWJ5SFZrbXJ3SVg0dk5NTWxpdjVNaWlSaTVZdW5zdXAKdjNIeHFoRWNzaW9MSElYbXBBc0FZOVBueTRGQmNva3VOK1o1N21JSCtOYW54RWJwT0F6eXdsVHB3ek1McVh3UgpxMG1hMytrZzhJQk1DYnJ3R0dnaWRLVHJ1aWZ4dkhkVWphSy9EMkJRZEJKWXI4SHd5VGdSZGFOQitzMWQ5TDdwCm1SdCtJODRldk1JdkR1SmtuK2daLzF0a05xd3FIRUhoem9FRXhhYnByZjQzUDN0c1hnMlRjTFlpcHpkZTFaTSsKUWxrNVlqeFp3VTFUSHdjYXR2SEI4NUxJdVFBK3NwbUVOZ3JLdVFJREFRQUJBb0lCQUhYYjZ1aTZucVUrNmRHbQpYWTd6ZGFzcHd3OHhaWnZCUGpiaTFOaGtnRlhvSStOOWVlc29Id3FyVHZYSjRuZmw2d0FlMUFvcGNjdXRvZklrCmE0UGg5K1dpOTRIZUR3ekxHTi9HcVFPWlg5MHRXd05idFZvaGhpaDR4alFzbTREL3dKaTJqVXJuSXVndGVHMlkKcDBLdnZXN0hBK2ZKRzNKdlRxOFRZcmp6ZUwvMlYzUnRSbk1oNldjcWI0cWpRb2NZWkR3VU43MVBVYURrUTRLSApWdHNNSjloc0dBUDFDVFRQckZZR1daRUZiN0xtZGNRemJyWVZyQnhwaGRoZGpqUXBjVzAySjZHU1ZrY0NMeFkvCnVRYnpZS0RkUXU1SHJKbVpiNjlFRmJWZHp3OTh3alIvNyt0dUhmdlJzYkUxRWdKWlQxNkhSUHEwbW1mMWxHM3EKNm9ZbnNCRUNnWUVBNVd2SEE4bWliMVpYWiszaXN3SUh0SlZ5WGJWM0Y4ejZ5UHRaK2ZNa1ljY3BBb3g0SDRTbgp1azllN2NlN0lhMDJueXlwTmlIaXZMRXZ3dGlqNjlBQ3I1WWpMMzlDMUlYbjJYMUg3V2FKaHF2K21zOW5ybmV0CmRqY0RtZXJRRHBVU0IzbXFLQlhXY2dVaXA4NUVYMlo5SzhPbnRhNXV6MG8vb2R4a0VTdWM3M1VDZ1lFQTBqYlAKUVBjY1N4RmRsVDFEcjg0WWhkUDJzQUJ6YUpoWDNzUm44NnhBTWk4aW9PY05lMTh1cVFBdk4rQWpRQUFndlkyOQo3dVdRcXB1SlFueG9NcGJMVjdjSkw2aHBlekpEL2lhZDVzREpyMC9jTWVhK3JSWVFQK2xxSW9YZTI2TlZPZnNHCmZGNkpHZUdvUGpRTmVKM1FSV0NaMEo0UkRob242YjZCWHVFVTZiVUNnWUVBczlCRGpiNXQ1K0crWkNEWlBBQnQKVmFhRW10bnQyK08yOCt1OVcrQ3NOVTdKMzh1Rkl2N3dEMkRDUUkvNUphNERUOExMWlRndDVFTGo4azJtUE44dQpHNzBMR3VFZDJrQ1J0YTh4dnVwTkJCYXVXVndTSVhaL3FGWDZKcHNhTXpPM2k5QmFBMDBLWlJlTlVBU2xKampJCkJwTTFVWHJFTXdnNDAzNVBsLzJjNVRrQ2dZQnRmL054cWNicEs0Q043cjNGWkJ2T0NsMmp6SGhSY1puRUJwY0gKalNCYmc4WUwvbzg5UnBWdG54VDVqQjJRaHdDRy9NQ0ZJcnU2d3c0NnZjY2hJditGRDJrUGxEQnQ1ZjhZOGxDcQpGSjU2WGFVYnNWQjlwTktPR0M0YkVaVEc0RXZTeWZuVTZ3R0xvOG9ack0rZmxzVVlmbnRnK2hWMFBSZXhZSFRQClVYdXRTUUtCZ1FDck9WeHBqNXFKdmMyaUZrMXNoWDRXeDBYbHpzZzI3QkdUNy9zYmtrMHMxT1ViZEZueTErTUkKNUVpVU1xUHM5TU5IOWZxbHNmMEJCS1BXMW0wVFAvSHo2OHFrRm80cnJrZVlMYmYvYVN2OWFJNnRodGJTWUoyUQpKTm9qeW1Ea2ZFbmNDOTNsMUV5alF0Y1lJSGNDWFRGMlhibXVwdEtlT2lqeC84c3FTOUVkRmc9PQotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQ==` + MockCerts = struct { + Ca string + ServerKey string + ServerCrt string + ClientCrt string + ClientKey string + }{ + Ca: caCrt, + ServerCrt: serverCrt, + ServerKey: serverKey, + ClientKey: clientKey, + ClientCrt: clientCrt, + } +) diff --git a/pkg/providers/kube/kube.cue b/pkg/providers/kube/kube.cue new file mode 100644 index 0000000..02faaac --- /dev/null +++ b/pkg/providers/kube/kube.cue @@ -0,0 +1,152 @@ +// kube.cue + +#Apply: { + #do: "apply" + #provider: "kube" + + $params: { + // +usage=The cluster to use + cluster: *"" | string + // +usage=The resource to apply + value: {...} + // +usage=The patcher that will be applied to the resource, you can define the strategy of list merge through comments. Reference doc here: https://kubevela.io/docs/platform-engineers/traits/patch-trait#patch-in-workflow-step + patch?: {...} + } + + $returns?: { + // +usage=The resource after applied will be filled in this field after the action is executed + value?: {...} + // +usage=The error message if the action failed + err?: string + } + ... +} + +#Patch: { + #do: "patch" + #provider: "kube" + + $params: { + // +usage=The cluster to use + cluster: *"" | string + // +usage=The resource to patch, we'll first get the resource from the cluster, then apply the patcher to it + value: {...} + // +usage=The patcher that will be applied to the resource, you can define the strategy of list merge through comments. Reference doc here: https://kubevela.io/docs/platform-engineers/traits/patch-trait#patch-in-workflow-step + patch: {...} + } + + $returns?: { + // +usage=The resource after patched will be filled in this field after the action is executed + result?: {...} + } + ... +} + +#ApplyInParallel: { + #do: "apply-in-parallel" + #provider: "kube" + + $params: { + // +usage=The cluster to use + cluster: *"" | string + // +usage=The resources to apply in parallel + value: [...{...}] + } + + $returns?: { + // +usage=The resource after applied will be filled in this field after the action is executed + value?: [...{...}] + } + ... +} + +#Read: { + #do: "read" + #provider: "kube" + + $params: { + // +usage=The cluster to use + cluster: *"" | string + // +usage=The resource to read, this field will be filled with the resource read from the cluster after the action is executed + value: {...} + } + + $returns?: { + // +usage=The read resource will be filled in this field after the action is executed + value?: {...} + // +usage=The error message if the action failed + err?: string + } + ... +} + +#List: { + #do: "list" + #provider: "kube" + + $params: { + // +usage=The cluster to use + cluster: *"" | string + // +usage=The resource to list + resource: { + // +usage=The api version of the resource + apiVersion: string + // +usage=The kind of the resource + kind: string + } + // +usage=The filter to list the resources + filter?: { + // +usage=The namespace to list the resources + namespace: *"" | string + // +usage=The label selector to filter the resources + matchingLabels?: {...} + } + } + + $returns?: { + // +usage=The listed resources will be filled in this field after the action is executed + values?: {...} + // +usage=The error message if the action failed + err?: string + } + ... +} + +#Delete: { + #do: "delete" + #provider: "kube" + + $params: { + // +usage=The cluster to use + cluster: *"" | string + // +usage=The resource to delete + value: { + // +usage=The api version of the resource + apiVersion: string + // +usage=The kind of the resource + kind: string + // +usage=The metadata of the resource + metadata: { + // +usage=The name of the resource + name?: string + // +usage=The namespace of the resource + namespace: *"default" | string + } + } + // +usage=The filter to delete the resources + filter?: { + // +usage=The namespace to list the resources + namespace?: string + // +usage=The label selector to filter the resources + matchingLabels?: {...} + } + } + + $returns?: { + // +usage=The deleted resource will be filled in this field after the action is executed + value?: {...} + // +usage=The error message if the action failed + err?: string + } + ... +} diff --git a/pkg/providers/kube/kube.go b/pkg/providers/kube/kube.go new file mode 100644 index 0000000..89acea2 --- /dev/null +++ b/pkg/providers/kube/kube.go @@ -0,0 +1,360 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + _ "embed" + "encoding/json" + + "cuelang.org/go/cue" + "cuelang.org/go/cue/cuecontext" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + ktypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + "github.com/kubevela/pkg/multicluster" + "github.com/kubevela/pkg/util/k8s" + "github.com/kubevela/pkg/util/k8s/patch" + + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/cue/model/value" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +const ( + // AnnoWorkflowLastAppliedConfig is the annotation for last applied config + AnnoWorkflowLastAppliedConfig = "workflow.oam.dev/last-applied-configuration" + // AnnoWorkflowLastAppliedTime is annotation for last applied time + AnnoWorkflowLastAppliedTime = "workflow.oam.dev/last-applied-time" +) + +const ( + // WorkflowResourceCreator is the creator name of workflow resource + WorkflowResourceCreator string = "workflow" +) + +func handleContext(ctx context.Context, cluster string) context.Context { + return multicluster.WithCluster(ctx, cluster) +} + +func apply(ctx context.Context, cli client.Client, _, _ string, workloads ...*unstructured.Unstructured) error { + for _, workload := range workloads { + existing := new(unstructured.Unstructured) + existing.GetObjectKind().SetGroupVersionKind(workload.GetObjectKind().GroupVersionKind()) + if err := cli.Get(ctx, ktypes.NamespacedName{ + Namespace: workload.GetNamespace(), + Name: workload.GetName(), + }, existing); err != nil { + if errors.IsNotFound(err) { + // TODO: make the annotation optional + b, err := workload.MarshalJSON() + if err != nil { + return err + } + if err := k8s.AddAnnotation(workload, AnnoWorkflowLastAppliedConfig, string(b)); err != nil { + return err + } + if err := cli.Create(ctx, workload); err != nil { + return err + } + } else { + return err + } + } else { + patcher, err := patch.ThreeWayMergePatch(existing, workload, &patch.PatchAction{ + UpdateAnno: true, + AnnoLastAppliedConfig: AnnoWorkflowLastAppliedConfig, + AnnoLastAppliedTime: AnnoWorkflowLastAppliedTime, + }) + if err != nil { + return err + } + if err := cli.Patch(ctx, workload, patcher); err != nil { + return err + } + } + } + return nil +} + +// nolint:revive +func delete(ctx context.Context, cli client.Client, _, _ string, manifest *unstructured.Unstructured) error { + return cli.Delete(ctx, manifest) +} + +// ListFilter filter for list resources +type ListFilter struct { + Namespace string `json:"namespace,omitempty"` + MatchingLabels map[string]string `json:"matchingLabels,omitempty"` +} + +// ResourceVars . +type ResourceVars struct { + Resource *unstructured.Unstructured `json:"value"` + Filter *ListFilter `json:"filter,omitempty"` + Cluster string `json:"cluster,omitempty"` +} + +// ResourceReturnVars . +type ResourceReturnVars struct { + Resource *unstructured.Unstructured `json:"value"` + Error string `json:"err,omitempty"` +} + +// ResourceParams . +type ResourceParams = providertypes.Params[ResourceVars] + +// ResourceReturns . +type ResourceReturns = providertypes.Returns[ResourceReturnVars] + +func getHandlers(runtimeParams providertypes.RuntimeParams) *providertypes.KubeHandlers { + if runtimeParams.KubeHandlers != nil { + return runtimeParams.KubeHandlers + } + return &providertypes.KubeHandlers{ + Apply: apply, + Delete: delete, + } +} + +// Apply create or update CR in cluster. +func Apply(ctx context.Context, params *ResourceParams) (*ResourceReturns, error) { + workload := params.Params.Resource + handlers := getHandlers(params.RuntimeParams) + if workload.GetNamespace() == "" { + workload.SetNamespace("default") + } + for k, v := range params.RuntimeParams.Labels { + if err := k8s.AddLabel(workload, k, v); err != nil { + return nil, err + } + } + deployCtx := handleContext(ctx, params.Params.Cluster) + if err := handlers.Apply(deployCtx, params.KubeClient, params.Params.Cluster, WorkflowResourceCreator, workload); err != nil { + return nil, err + } + return &ResourceReturns{ + Returns: ResourceReturnVars{ + Resource: workload, + }, + }, nil +} + +// ApplyInParallelVars . +type ApplyInParallelVars struct { + Resources []*unstructured.Unstructured `json:"value"` + Cluster string `json:"cluster,omitempty"` +} + +// ApplyInParallelReturnVars . +type ApplyInParallelReturnVars struct { + Resource []*unstructured.Unstructured `json:"value"` +} + +// ApplyInParallelParams . +type ApplyInParallelParams = providertypes.Params[ApplyInParallelVars] + +// ApplyInParallelReturns . +type ApplyInParallelReturns = providertypes.Returns[ApplyInParallelReturnVars] + +// ApplyInParallel create or update CRs in parallel. +func ApplyInParallel(ctx context.Context, params *ApplyInParallelParams) (*ApplyInParallelReturns, error) { + workloads := params.Params.Resources + handlers := getHandlers(params.RuntimeParams) + for i := range workloads { + if workloads[i].GetNamespace() == "" { + workloads[i].SetNamespace("default") + } + } + deployCtx := handleContext(ctx, params.Params.Cluster) + if err := handlers.Apply(deployCtx, params.KubeClient, params.Params.Cluster, WorkflowResourceCreator, workloads...); err != nil { + return nil, err + } + return &ApplyInParallelReturns{ + Returns: ApplyInParallelReturnVars{ + Resource: workloads, + }, + }, nil +} + +// Patch patch CR in cluster. +func Patch(ctx context.Context, params *providertypes.Params[cue.Value]) (cue.Value, error) { + handlers := getHandlers(params.RuntimeParams) + val := params.Params.LookupPath(cue.ParsePath("value")) + obj := new(unstructured.Unstructured) + b, err := val.MarshalJSON() + if err != nil { + return cue.Value{}, err + } + if err := json.Unmarshal(b, obj); err != nil { + return cue.Value{}, err + } + key := client.ObjectKeyFromObject(obj) + if key.Namespace == "" { + key.Namespace = "default" + } + cluster, err := params.Params.LookupPath(cue.ParsePath("cluster")).String() + if err != nil { + return cue.Value{}, err + } + multiCtx := handleContext(ctx, cluster) + if err := params.KubeClient.Get(multiCtx, key, obj); err != nil { + return cue.Value{}, err + } + baseVal := cuecontext.New().CompileString("").FillPath(cue.ParsePath(""), obj) + patcher := params.Params.LookupPath(cue.ParsePath("patch")) + + base, err := model.NewBase(baseVal) + if err != nil { + return cue.Value{}, err + } + if err := base.Unify(patcher); err != nil { + return cue.Value{}, err + } + workload, err := base.Unstructured() + if err != nil { + return cue.Value{}, err + } + for k, v := range params.RuntimeParams.Labels { + if err := k8s.AddLabel(workload, k, v); err != nil { + return cue.Value{}, err + } + } + if err := handlers.Apply(multiCtx, params.KubeClient, cluster, WorkflowResourceCreator, workload); err != nil { + return cue.Value{}, err + } + return params.Params.FillPath(value.FieldPath("$returns", "result"), workload), nil +} + +// Read get CR from cluster. +func Read(ctx context.Context, params *ResourceParams) (*ResourceReturns, error) { + workload := params.Params.Resource + key := client.ObjectKeyFromObject(workload) + if key.Namespace == "" { + key.Namespace = "default" + } + readCtx := handleContext(ctx, params.Params.Cluster) + if err := params.KubeClient.Get(readCtx, key, workload); err != nil { + return &ResourceReturns{ + Returns: ResourceReturnVars{ + Resource: workload, + Error: err.Error(), + }, + }, nil + } + return &ResourceReturns{ + Returns: ResourceReturnVars{ + Resource: workload, + }, + }, nil +} + +// ListReturnVars . +type ListReturnVars struct { + Resources *unstructured.UnstructuredList `json:"values"` + Error string `json:"err,omitempty"` +} + +// ListReturns . +type ListReturns = providertypes.Returns[ListReturnVars] + +// List lists CRs from cluster. +func List(ctx context.Context, params *ResourceParams) (*ListReturns, error) { + workload := params.Params.Resource + list := &unstructured.UnstructuredList{Object: map[string]interface{}{ + "kind": workload.GetKind(), + "apiVersion": workload.GetAPIVersion(), + }} + + filter := params.Params.Filter + listOpts := []client.ListOption{ + client.InNamespace(filter.Namespace), + client.MatchingLabels(filter.MatchingLabels), + } + readCtx := handleContext(ctx, params.Params.Cluster) + if err := params.KubeClient.List(readCtx, list, listOpts...); err != nil { + return &ListReturns{ + Returns: ListReturnVars{ + Resources: list, + Error: err.Error(), + }, + }, nil + } + return &ListReturns{ + Returns: ListReturnVars{ + Resources: list, + }, + }, nil +} + +// Delete deletes CR from cluster. +func Delete(ctx context.Context, params *ResourceParams) (*ResourceReturns, error) { + workload := params.Params.Resource + handlers := getHandlers(params.RuntimeParams) + deleteCtx := handleContext(ctx, params.Params.Cluster) + + if filter := params.Params.Filter; filter != nil { + labelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: filter.MatchingLabels}) + if err != nil { + return nil, err + } + if err := params.KubeClient.DeleteAllOf(deleteCtx, workload, &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: filter.Namespace, LabelSelector: labelSelector}}); err != nil { + return &ResourceReturns{ + Returns: ResourceReturnVars{ + Resource: workload, + Error: err.Error(), + }, + }, nil + } + return nil, nil + } + + if err := handlers.Delete(deleteCtx, params.KubeClient, params.Params.Cluster, WorkflowResourceCreator, workload); err != nil { + return &ResourceReturns{ + Returns: ResourceReturnVars{ + Resource: workload, + Error: err.Error(), + }, + }, nil + } + + return nil, nil +} + +//go:embed kube.cue +var template string + +// GetTemplate get kube template. +func GetTemplate() string { + return template +} + +// GetProviders get kube providers. +func GetProviders() map[string]cuexruntime.ProviderFn { + return map[string]cuexruntime.ProviderFn{ + "apply": providertypes.GenericProviderFn[ResourceVars, ResourceReturns](Apply), + "apply-in-parallel": providertypes.GenericProviderFn[ApplyInParallelVars, ApplyInParallelReturns](ApplyInParallel), + "read": providertypes.GenericProviderFn[ResourceVars, ResourceReturns](Read), + "list": providertypes.GenericProviderFn[ResourceVars, ListReturns](List), + "delete": providertypes.GenericProviderFn[ResourceVars, ResourceReturns](Delete), + "patch": providertypes.NativeProviderFn(Patch), + } +} diff --git a/pkg/providers/kube/kube_test.go b/pkg/providers/kube/kube_test.go new file mode 100644 index 0000000..fba9791 --- /dev/null +++ b/pkg/providers/kube/kube_test.go @@ -0,0 +1,453 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "fmt" + "testing" + "time" + + "cuelang.org/go/cue" + "cuelang.org/go/cue/cuecontext" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + crdv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment +var scheme = runtime.NewScheme() + +func TestProvider(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Test Definition Suite") +} + +var _ = BeforeSuite(func(done Done) { + By("Bootstrapping test environment") + testEnv = &envtest.Environment{ + ControlPlaneStartTimeout: time.Minute, + ControlPlaneStopTimeout: time.Minute, + UseExistingCluster: pointer.BoolPtr(false), + } + var err error + cfg, err = testEnv.Start() + Expect(err).ToNot(HaveOccurred()) + Expect(cfg).ToNot(BeNil()) + Expect(clientgoscheme.AddToScheme(scheme)).Should(BeNil()) + Expect(crdv1.AddToScheme(scheme)).Should(BeNil()) + // +kubebuilder:scaffold:scheme + By("Create the k8s client") + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme}) + Expect(err).ToNot(HaveOccurred()) + Expect(k8sClient).ToNot(BeNil()) + close(done) +}, 120) + +var _ = AfterSuite(func() { + By("Tearing down the test environment") + err := testEnv.Stop() + Expect(err).ToNot(HaveOccurred()) +}) + +var _ = Describe("Test Workflow Provider Kube", func() { + It("apply and read", func() { + ctx := context.Background() + un := testUnstructured.DeepCopy() + un.SetName("app") + un.SetLabels(map[string]string{ + "test": "test", + }) + res, err := Apply(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: un, + }, + RuntimeParams: providertypes.RuntimeParams{ + Labels: map[string]string{ + "hello": "world", + }, + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Returns.Resource.GetLabels()).Should(Equal(un.GetLabels())) + workload := &corev1.Pod{} + Eventually(func() error { + return k8sClient.Get(context.Background(), client.ObjectKey{ + Namespace: "default", + Name: "app", + }, workload) + }, time.Second*2, time.Millisecond*300).Should(BeNil()) + Expect(workload.GetLabels()).Should(Equal(map[string]string{ + "test": "test", + "hello": "world", + })) + + res, err = Read(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "app", + }, + }, + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + Labels: map[string]string{ + "hello": "world", + }, + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Returns.Error).Should(Equal("")) + Expect(res.Returns.Resource.GetLabels()).Should(Equal(map[string]string{ + "test": "test", + "hello": "world", + })) + }) + + It("patch & apply", func() { + ctx := context.Background() + + un := testUnstructured + un.SetName("test-app-1") + un.SetLabels(map[string]string{ + "test": "test", + }) + _, err := Apply(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: &un, + }, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + v := cuecontext.New().CompileString(` +value: { + apiVersion: "v1" + kind: "Pod" + metadata: name: "test-app-1" +} +cluster: "" +patch: { + metadata: name: "test-app-1" + spec: { + containers: [{ + // +patchStrategy=retainKeys + image: "nginx:notfound" + }] + } +} +`) + _, err = Patch(ctx, &providertypes.Params[cue.Value]{ + Params: v, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + pod := &corev1.Pod{} + Expect(err).ToNot(HaveOccurred()) + Eventually(func() error { + return k8sClient.Get(context.Background(), client.ObjectKey{ + Namespace: "default", + Name: "test-app-1", + }, pod) + }, time.Second*2, time.Millisecond*300).Should(BeNil()) + Expect(pod.Name).To(Equal("test-app-1")) + Expect(pod.Spec.Containers[0].Image).To(Equal("nginx:notfound")) + }) + + It("list", func() { + ctx := context.Background() + for i := 2; i >= 0; i-- { + err := k8sClient.Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-%v", i), + Namespace: "default", + Labels: map[string]string{ + "test": "test", + "index": fmt.Sprintf("test-%v", i), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: fmt.Sprintf("test-%v", i), + Image: "busybox", + }, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + } + + By("List pods with labels test=test") + res, err := List(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + }, + }, + Filter: &ListFilter{ + Namespace: "default", + MatchingLabels: map[string]string{ + "test": "test", + }, + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(len(res.Returns.Resources.Items)).Should(Equal(5)) + + By("List pods with labels index=test-1") + res, err = List(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + }, + }, + Filter: &ListFilter{ + MatchingLabels: map[string]string{ + "index": "test-1", + }, + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(len(res.Returns.Resources.Items)).Should(Equal(1)) + }) + + It("delete", func() { + ctx := context.Background() + err := k8sClient.Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "busybox", + }, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + err = k8sClient.Get(ctx, k8stypes.NamespacedName{ + Name: "test", + Namespace: "default", + }, &corev1.Pod{}) + Expect(err).ToNot(HaveOccurred()) + + _, err = Delete(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test", + "namespace": "default", + }, + }, + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + err = k8sClient.Get(ctx, k8stypes.NamespacedName{ + Name: "test", + Namespace: "default", + }, &corev1.Pod{}) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).Should(Equal(true)) + }) + + It("delete with labels", func() { + ctx := context.Background() + err := k8sClient.Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + Labels: map[string]string{ + "test.oam.dev": "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "busybox", + }, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + err = k8sClient.Get(ctx, k8stypes.NamespacedName{ + Name: "test", + Namespace: "default", + }, &corev1.Pod{}) + Expect(err).ToNot(HaveOccurred()) + + _, err = Delete(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": "default", + }, + }, + }, + Filter: &ListFilter{ + Namespace: "default", + MatchingLabels: map[string]string{ + "test.oam.dev": "true", + }, + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + err = k8sClient.Get(ctx, k8stypes.NamespacedName{ + Name: "test", + Namespace: "default", + }, &corev1.Pod{}) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).Should(Equal(true)) + }) + + It("apply parallel", func() { + un1 := testUnstructured.DeepCopy() + un1.SetName("app1") + un2 := testUnstructured.DeepCopy() + un2.SetName("app2") + ctx := context.Background() + _, err := ApplyInParallel(ctx, &ApplyInParallelParams{ + Params: ApplyInParallelVars{ + Resources: []*unstructured.Unstructured{un1, un2}, + }, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + pod := &corev1.Pod{} + Eventually(func() error { + return k8sClient.Get(context.Background(), client.ObjectKey{ + Namespace: "default", + Name: "app1", + }, pod) + }, time.Second*2, time.Millisecond*300).Should(BeNil()) + Eventually(func() error { + return k8sClient.Get(context.Background(), client.ObjectKey{ + Namespace: "default", + Name: "app2", + }, pod) + }, time.Second*2, time.Millisecond*300).Should(BeNil()) + }) + + It("test error case", func() { + ctx := context.Background() + res, err := Read(ctx, &ResourceParams{ + Params: ResourceVars{ + Resource: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "not-exist", + }, + }, + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + KubeClient: k8sClient, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Returns.Error).ShouldNot(BeNil()) + }) +}) + +var ( + testUnstructured = unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{}, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "main", + "image": "nginx:1.14.2", + "env": []interface{}{ + map[string]interface{}{ + "name": "APP", + "value": "nginx", + }, + }, + }, + }, + }, + }, + } +) diff --git a/pkg/providers/metrics/metrics.cue b/pkg/providers/metrics/metrics.cue new file mode 100644 index 0000000..ac8dc00 --- /dev/null +++ b/pkg/providers/metrics/metrics.cue @@ -0,0 +1,21 @@ +// metrics.cue + +#PromCheck: { + #do: "promCheck" + #provider: "metrics" + + $params: { + query: string + metricEndpoint: *"http://prometheus-server.o11y-system.svc:9090" | string + condition: string + failDuration: *"2m" | string + duration: *"5m" | string + } + + $returns?: { + message?: string + failed: bool + result: bool + } + ... +} diff --git a/pkg/providers/metrics/prom_check.go b/pkg/providers/metrics/prom_check.go new file mode 100644 index 0000000..1a53550 --- /dev/null +++ b/pkg/providers/metrics/prom_check.go @@ -0,0 +1,230 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + _ "embed" + "fmt" + "strconv" + "time" + + "cuelang.org/go/cue" + "cuelang.org/go/cue/cuecontext" + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + prommodel "github.com/prometheus/common/model" + + cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + + wfContext "github.com/kubevela/workflow/pkg/context" + "github.com/kubevela/workflow/pkg/cue/model" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +const ( + // ProviderName is provider name for install. + ProviderName = "metrics" +) + +// PromVars . +type PromVars struct { + Query string `json:"query"` + MetricEndpoint string `json:"metricEndpoint"` + Condition string `json:"condition"` + Duration string `json:"duration"` + FailDuration string `json:"failDuration"` +} + +// PromReturnVars . +type PromReturnVars struct { + Message string `json:"message,omitempty"` + Failed bool `json:"failed"` + Result bool `json:"result"` +} + +// PromParams . +type PromParams = providertypes.Params[PromVars] + +// PromReturns . +type PromReturns = providertypes.Returns[PromReturnVars] + +// PromCheck do health check from metrics from prometheus +func PromCheck(ctx context.Context, params *PromParams) (*PromReturns, error) { + pCtx := params.ProcessContext + wfCtx := params.WorkflowContext + stepID := fmt.Sprint(pCtx.GetData(model.ContextStepSessionID)) + + valueStr, err := getQueryResult(ctx, params.Params) + if err != nil { + return nil, err + } + + res, err := compareValueWithCondition(ctx, valueStr, params.Params) + if err != nil { + return nil, err + } + + if res { + // meet the condition + return handleSuccessCompare(wfCtx, stepID, valueStr, params.Params) + } + return handleFailCompare(wfCtx, stepID, valueStr, params.Params) +} + +func handleSuccessCompare(wfCtx wfContext.Context, stepID, valueStr string, vars PromVars) (*PromReturns, error) { + // clean up fail timeStamp + setMetricsStatusTime(wfCtx, stepID, "fail", 0) + + st := getMetricsStatusTime(wfCtx, stepID, "success") + if st == 0 { + // first success + setMetricsStatusTime(wfCtx, stepID, "success", time.Now().Unix()) + return &PromReturns{ + Returns: PromReturnVars{ + Result: false, + Failed: false, + Message: fmt.Sprintf("The healthy condition should be %s, and the query result is %s, indicating success.", vars.Condition, valueStr), + }, + }, nil + } + successTime := time.Unix(st, 0) + duration, err := time.ParseDuration(vars.Duration) + if err != nil { + return nil, fmt.Errorf("failed to parse duration %s: %w", vars.Duration, err) + } + if successTime.Add(duration).Before(time.Now()) { + return &PromReturns{ + Returns: PromReturnVars{ + Result: true, + Failed: false, + Message: "The metric check has passed successfully.", + }, + }, nil + } + return &PromReturns{ + Returns: PromReturnVars{ + Result: false, + Failed: false, + Message: fmt.Sprintf("The healthy condition should be %s, and the query result is %s, indicating success. The success has persisted for %s, with success duration being %s.", vars.Condition, valueStr, time.Since(successTime).String(), vars.Duration), + }, + }, nil +} + +func handleFailCompare(wfCtx wfContext.Context, stepID, valueStr string, vars PromVars) (*PromReturns, error) { + // clean up success timeStamp + setMetricsStatusTime(wfCtx, stepID, "success", 0) + ft := getMetricsStatusTime(wfCtx, stepID, "") + + if ft == 0 { + // first failed + return &PromReturns{ + Returns: PromReturnVars{ + Result: false, + Failed: false, + Message: fmt.Sprintf("The healthy condition should be %s, but the query result is %s, indicating failure, with the failure duration being %s. This is first failed checking.", vars.Condition, valueStr, vars.FailDuration), + }, + }, nil + } + + failTime := time.Unix(ft, 0) + duration, err := time.ParseDuration(vars.FailDuration) + if err != nil { + return nil, fmt.Errorf("failed to parse duration %s: %w", vars.FailDuration, err) + } + if failTime.Add(duration).Before(time.Now()) { + return &PromReturns{ + Returns: PromReturnVars{ + Result: false, + Failed: true, + Message: fmt.Sprintf("The healthy condition should be %s, but the query result is %s, indicating failure. The failure has persisted for %s, with the failure duration being %s. The check has terminated.", vars.Condition, valueStr, time.Since(failTime).String(), vars.FailDuration), + }, + }, nil + } + return &PromReturns{ + Returns: PromReturnVars{ + Result: false, + Failed: false, + Message: fmt.Sprintf("The healthy condition should be %s, but the query result is %s, indicating failure. The failure has persisted for %s, with the failure duration being %s.", vars.Condition, valueStr, time.Since(failTime).String(), vars.FailDuration), + }, + }, nil +} + +func getQueryResult(ctx context.Context, vars PromVars) (string, error) { + c, err := api.NewClient(api.Config{ + Address: vars.MetricEndpoint, + }) + if err != nil { + return "", err + } + promCli := v1.NewAPI(c) + resp, _, err := promCli.Query(ctx, vars.Query, time.Now()) + if err != nil { + return "", err + } + + var valueStr string + switch v := resp.(type) { + case *prommodel.Scalar: + valueStr = v.Value.String() + case prommodel.Vector: + if len(v) != 1 { + return "", fmt.Errorf(fmt.Sprintf("ehe query is returning %d results when it should only return one. Please review the query to identify and fix the issue", len(v))) + } + valueStr = v[0].Value.String() + default: + return "", fmt.Errorf("cannot handle the not query value") + } + return valueStr, nil +} + +func compareValueWithCondition(_ context.Context, valueStr string, vars PromVars) (bool, error) { + template := fmt.Sprintf("if: %s %s", valueStr, vars.Condition) + res, err := cuecontext.New().CompileString(template).LookupPath(cue.ParsePath("if")).Bool() + if err != nil { + return false, err + } + return res, nil +} + +func setMetricsStatusTime(wfCtx wfContext.Context, stepID string, status string, time int64) { + wfCtx.SetMutableValue(strconv.FormatInt(time, 10), stepID, "metrics", status, "time") +} + +func getMetricsStatusTime(wfCtx wfContext.Context, stepID string, status string) int64 { + str := wfCtx.GetMutableValue(stepID, "metrics", status, "time") + if len(str) == 0 { + return 0 + } + t, _ := strconv.ParseInt(str, 10, 64) + return t +} + +//go:embed metrics.cue +var template string + +// GetTemplate returns the metrics template +func GetTemplate() string { + return template +} + +// GetProviders returns the metrics provider +func GetProviders() map[string]cuexruntime.ProviderFn { + return map[string]cuexruntime.ProviderFn{ + "promCheck": providertypes.GenericProviderFn[PromVars, PromReturns](PromCheck), + } +} diff --git a/pkg/providers/metrics/prom_check_test.go b/pkg/providers/metrics/prom_check_test.go new file mode 100644 index 0000000..7f5e00d --- /dev/null +++ b/pkg/providers/metrics/prom_check_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package metrics + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/crossplane/crossplane-runtime/pkg/test" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubevela/pkg/util/singleton" + + context2 "github.com/kubevela/workflow/pkg/context" + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/cue/process" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +func TestMetricCheck(t *testing.T) { + srv := runMockPrometheusServer() // no lint + r := require.New(t) + ctx := context.Background() + cli := &test.MockClient{ + MockCreate: func(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + return nil + }, + MockPatch: func(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + return nil + }, + MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + return nil + }, + } + singleton.KubeClient.Set(cli) + wfCtx, err := context2.NewContext(context.Background(), "default", "v1", nil) + r.NoError(err) + pCtx := process.NewContext(process.ContextData{}) + pCtx.PushData(model.ContextStepSessionID, "test-id") + res, err := PromCheck(ctx, &PromParams{ + Params: PromVars{ + MetricEndpoint: "http://127.0.0.1:18089", + Query: "sum(nginx_ingress_controller_requests{host=\"canary-demo.com\",status=\"200\"})", + Duration: "4s", + FailDuration: "2s", + Condition: ">=3", + }, + RuntimeParams: providertypes.RuntimeParams{ + WorkflowContext: wfCtx, + ProcessContext: pCtx, + }, + }) + r.NoError(err) + r.Equal(res.Returns.Result, false) + r.Equal(res.Returns.Message, "The healthy condition should be >=3, and the query result is 10, indicating success.") + if err := srv.Close(); err != nil { + fmt.Printf("Server shutdown error: %v\n", err) + } +} + +func runMockPrometheusServer() *http.Server { + srv := http.Server{Addr: ":18089", Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {}, + "value": [ + 1678701380.73, + "10" + ] + } + ] + } +}`)) + })} + go srv.ListenAndServe() // no lint + time.Sleep(3 * time.Second) + return &srv +} diff --git a/pkg/providers/time/time.cue b/pkg/providers/time/time.cue new file mode 100644 index 0000000..d68f394 --- /dev/null +++ b/pkg/providers/time/time.cue @@ -0,0 +1,31 @@ +// time.cue + +#DateToTimestamp: { + #do: "timestamp" + #provider: "time" + + $params: { + date: string + layout: *"" | string + } + + $returns?: { + timestamp: int64 + } + ... +} + +#TimestampToDate: { + #do: "date" + #provider: "time" + + $params: { + timestamp: int64 + layout: *"" | string + } + + $returns?: { + date: string + } + ... +} diff --git a/pkg/providers/time/time.go b/pkg/providers/time/time.go new file mode 100644 index 0000000..c3af4c9 --- /dev/null +++ b/pkg/providers/time/time.go @@ -0,0 +1,113 @@ +/* + Copyright 2021. The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package time + +import ( + "context" + _ "embed" + "fmt" + "time" + + cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +// TimestampVars . +type TimestampVars struct { + Date string `json:"date"` + Layout string `json:"layout,omitempty"` +} + +// TimestampReturnVars . +type TimestampReturnVars struct { + Timestamp int64 `json:"timestamp"` +} + +// TimestampParams . +type TimestampParams = providertypes.Params[TimestampVars] + +// TimestampReturns . +type TimestampReturns = providertypes.Returns[TimestampReturnVars] + +// Timestamp convert date to timestamp +func Timestamp(_ context.Context, params *TimestampParams) (*TimestampReturns, error) { + date := params.Params.Date + layout := params.Params.Layout + if date == "" { + return nil, fmt.Errorf("empty date to convert") + } + if layout == "" { + layout = time.RFC3339 + } + t, err := time.Parse(layout, date) + if err != nil { + return nil, err + } + return &TimestampReturns{ + Returns: TimestampReturnVars{ + Timestamp: t.Unix(), + }, + }, nil +} + +// DateVars . +type DateVars struct { + Timestamp int64 `json:"timestamp"` + Layout string `json:"layout,omitempty"` +} + +// DateReturnVars . +type DateReturnVars struct { + Date string `json:"date"` +} + +// DateParams . +type DateParams = providertypes.Params[DateVars] + +// DateReturns . +type DateReturns = providertypes.Returns[DateReturnVars] + +// Date convert timestamp to date +func Date(_ context.Context, params *DateParams) (*DateReturns, error) { + timestamp := params.Params.Timestamp + layout := params.Params.Layout + if layout == "" { + layout = time.RFC3339 + } + t := time.Unix(timestamp, 0) + return &DateReturns{ + Returns: DateReturnVars{ + Date: t.UTC().Format(layout), + }, + }, nil +} + +//go:embed time.cue +var template string + +// GetTemplate return the template +func GetTemplate() string { + return template +} + +// GetProviders return the provider +func GetProviders() map[string]cuexruntime.ProviderFn { + return map[string]cuexruntime.ProviderFn{ + "timestamp": providertypes.GenericProviderFn[TimestampVars, TimestampReturns](Timestamp), + "date": providertypes.GenericProviderFn[DateVars, DateReturns](Date), + } +} diff --git a/pkg/providers/time/time_test.go b/pkg/providers/time/time_test.go new file mode 100644 index 0000000..4bd1b7a --- /dev/null +++ b/pkg/providers/time/time_test.go @@ -0,0 +1,131 @@ +/* + Copyright 2021. The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package time + +import ( + "context" + "fmt" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestTimestamp(t *testing.T) { + ctx := context.Background() + testcases := map[string]struct { + from TimestampVars + expected int64 + expectedErr error + }{ + "test convert date with default time layout": { + from: TimestampVars{ + Date: "2021-11-07T01:47:51Z", + }, + expected: 1636249671, + expectedErr: nil, + }, + "test convert date with RFC3339 layout": { + from: TimestampVars{ + Date: "2021-11-07T01:47:51Z", + Layout: "2006-01-02T15:04:05Z07:00", + }, + expected: 1636249671, + expectedErr: nil, + }, + "test convert date with RFC1123 layout": { + from: TimestampVars{ + Date: "Fri, 01 Mar 2019 15:00:00 GMT", + Layout: "Mon, 02 Jan 2006 15:04:05 MST", + }, + expected: 1551452400, + expectedErr: nil, + }, + "test convert without date": { + from: TimestampVars{}, + expected: 0, + expectedErr: fmt.Errorf("empty date to convert"), + }, + "test convert date with wrong time layout": { + from: TimestampVars{ + Date: "2021-11-07T01:47:51Z", + Layout: "Mon, 02 Jan 2006 15:04:05 MST", + }, + expected: 0, + expectedErr: errors.New(`parsing time "2021-11-07T01:47:51Z" as "Mon, 02 Jan 2006 15:04:05 MST": cannot parse "2021-11-07T01:47:51Z" as "Mon"`), + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + r := require.New(t) + res, err := Timestamp(ctx, &TimestampParams{ + Params: tc.from, + }) + if tc.expectedErr != nil { + r.Equal(tc.expectedErr.Error(), err.Error()) + return + } + r.NoError(err) + r.Equal(tc.expected, res.Returns.Timestamp) + }) + } +} + +func TestDate(t *testing.T) { + ctx := context.Background() + testcases := map[string]struct { + from DateVars + expected string + }{ + "test convert timestamp to default time layout": { + from: DateVars{ + Timestamp: 1636249671, + }, + expected: "2021-11-07T01:47:51Z", + }, + "test convert date to RFC3339 layout": { + from: DateVars{ + Timestamp: 1636249671, + Layout: "2006-01-02T15:04:05Z07:00", + }, + expected: "2021-11-07T01:47:51Z", + }, + "test convert date to RFC1123 layout": { + from: DateVars{ + Timestamp: 1551452400, + Layout: "Mon, 02 Jan 2006 15:04:05 MST", + }, + expected: "Fri, 01 Mar 2019 15:00:00 UTC", + }, + "test convert without timestamp": { + from: DateVars{}, + expected: "1970-01-01T00:00:00Z", + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + r := require.New(t) + res, err := Date(ctx, &DateParams{ + Params: tc.from, + }) + r.NoError(err) + r.Equal(tc.expected, res.Returns.Date) + }) + } +} diff --git a/pkg/providers/types/types.go b/pkg/providers/types/types.go index 4a78c54..471c151 100644 --- a/pkg/providers/types/types.go +++ b/pkg/providers/types/types.go @@ -71,6 +71,42 @@ type RuntimeParams struct { KubeClient client.Client } +type Params[T any] struct { + Params T `json:"$params"` + RuntimeParams +} + +type Returns[T any] struct { + Returns T `json:"$returns"` +} + +// GenericProviderFn is the provider function +type GenericProviderFn[T any, U any] func(context.Context, *Params[T]) (*U, error) + +// Call marshal value into json and decode into underlying function input +// parameters, then fill back the returned output value +func (fn GenericProviderFn[T, U]) Call(ctx context.Context, value cue.Value) (cue.Value, error) { + type p struct { + Params T `json:"$params"` + } + params := new(p) + bs, err := value.MarshalJSON() + if err != nil { + return value, err + } + if err = json.Unmarshal(bs, params); err != nil { + return value, err + } + runtimeParams := RuntimeParamsFrom(ctx) + label, _ := value.Label() + runtimeParams.FieldLabel = label + ret, err := fn(ctx, &Params[T]{Params: params.Params, RuntimeParams: runtimeParams}) + if err != nil { + return value, err + } + return value.FillPath(cue.ParsePath(""), ret), nil +} + // LegacyParams is the legacy input parameters of a provider. type LegacyParams[T any] struct { Params T @@ -101,6 +137,16 @@ func (fn LegacyGenericProviderFn[T, U]) Call(ctx context.Context, value cue.Valu return value.FillPath(cue.ParsePath(""), ret), nil } +// NativeProviderFn is the legacy native provider function +type NativeProviderFn func(context.Context, *Params[cue.Value]) (cue.Value, error) + +// Call marshal value into json and decode into underlying function input +// parameters, then fill back the returned output value +func (fn NativeProviderFn) Call(ctx context.Context, value cue.Value) (cue.Value, error) { + runtimeParams := RuntimeParamsFrom(ctx) + return fn(ctx, &Params[cue.Value]{Params: value, RuntimeParams: runtimeParams}) +} + // LegacyNativeProviderFn is the legacy native provider function type LegacyNativeProviderFn func(context.Context, *LegacyParams[cue.Value]) (cue.Value, error) diff --git a/pkg/providers/util/util.cue b/pkg/providers/util/util.cue new file mode 100644 index 0000000..2c537a4 --- /dev/null +++ b/pkg/providers/util/util.cue @@ -0,0 +1,59 @@ +// util.cue + +#PatchK8sObject: { + #do: "patch-k8s-object" + #provider: "util" + + $params: { + value: {...} + patch: {...} + } + + $returns?: { + result: {...} + } + ... +} + +#ConvertString: { + #do: "string" + #provider: "util" + + $params: { + bt: bytes + } + + $returns?: { + str: string + } + ... +} + +#Log: { + #do: "log" + #provider: "util" + + $params: { + // +usage=The data to print in the controller logs + data?: {...} | string + // +usage=The log level of the data + level: *3 | int + // +usage=The log source of this step. You can specify it from a url or resources. Note that if you set source in multiple util.#Log, only the latest one will work + source?: close({ + // +usage=Specify the log source url of this step + url: string + }) | close({ + // +usage=Specify the log resources of this step + resources?: [...{ + // +usage=Specify the name of the resource + name?: string + // +usage=Specify the cluster of the resource + cluster?: string + // +usage=Specify the namespace of the resource + namespace?: string + // +usage=Specify the label selector of the resource + labelSelector?: {...} + }] + }) + } +} diff --git a/pkg/providers/util/util.go b/pkg/providers/util/util.go new file mode 100644 index 0000000..0f05900 --- /dev/null +++ b/pkg/providers/util/util.go @@ -0,0 +1,193 @@ +/* + Copyright 2022. The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package util + +import ( + "context" + _ "embed" + "encoding/json" + "fmt" + + "cuelang.org/go/cue" + "k8s.io/klog/v2" + + cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + utilruntime "github.com/kubevela/pkg/util/runtime" + + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/cue/model/value" + "github.com/kubevela/workflow/pkg/cue/process" + providertypes "github.com/kubevela/workflow/pkg/providers/types" + "github.com/kubevela/workflow/pkg/types" +) + +// PatchVars is the vars for patch +type PatchVars struct { + Resource cue.Value `json:"value"` + Patch cue.Value `json:"patch"` +} + +// PatchK8sObject patch k8s object +func PatchK8sObject(_ context.Context, params *providertypes.Params[cue.Value]) (cue.Value, error) { + base, err := model.NewBase(params.Params.LookupPath(cue.ParsePath("value"))) + if err != nil { + return cue.Value{}, err + } + if err = base.Unify(params.Params.LookupPath(cue.ParsePath("patch"))); err != nil { + return params.Params.FillPath(cue.ParsePath("err"), err.Error()), nil + } + + workload, err := base.Compile() + if err != nil { + return params.Params.FillPath(cue.ParsePath("err"), err.Error()), nil + } + return params.Params.FillPath(value.FieldPath("$returns", "result"), params.Params.Context().CompileBytes(workload)), nil +} + +// StringVars . +type StringVars struct { + Byte []byte `json:"bt"` +} + +// StringReturnVars . +type StringReturnVars struct { + String string `json:"str"` +} + +// StringParams . +type StringParams = providertypes.Params[StringVars] + +// StringReturns . +type StringReturns = providertypes.Returns[StringReturnVars] + +// String convert byte to string +func String(_ context.Context, params *StringParams) (*StringReturns, error) { + return &StringReturns{ + Returns: StringReturnVars{ + String: string(params.Params.Byte), + }, + }, nil +} + +// Resource is the log resources +type Resource struct { + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + Cluster string `json:"cluster,omitempty"` + LabelSelector map[string]string `json:"labelSelector,omitempty"` +} + +// LogSource is the source of the log +type LogSource struct { + URL string `json:"url,omitempty"` + Resources []Resource `json:"resources,omitempty"` +} + +// LogConfig is the config of the log +type LogConfig struct { + Data bool `json:"data,omitempty"` + Source *LogSource `json:"source,omitempty"` +} + +// LogVars is the vars for log +type LogVars struct { + Data any `json:"data,omitempty"` + Level int `json:"level"` + Source *LogSource `json:"source,omitempty"` +} + +// LogParams . +type LogParams = providertypes.Params[LogVars] + +// Log print cue value in log +func Log(ctx context.Context, params *LogParams) (*any, error) { + pCtx := params.ProcessContext + stepName := fmt.Sprint(pCtx.GetData(model.ContextStepName)) + wfCtx := params.WorkflowContext + config := make(map[string]LogConfig) + c := wfCtx.GetMutableValue(types.ContextKeyLogConfig) + if c != "" { + if err := json.Unmarshal([]byte(c), &config); err != nil { + return nil, err + } + } + + stepConfig := config[stepName] + data := params.Params.Data + if !utilruntime.IsNil(data) { + stepConfig.Data = true + if err := printDataInLog(ctx, data, params.Params.Level, pCtx); err != nil { + return nil, err + } + } + if source := params.Params.Source; source != nil { + if stepConfig.Source == nil { + stepConfig.Source = &LogSource{} + } + if source.URL != "" { + stepConfig.Source.URL = source.URL + } + if len(source.Resources) != 0 { + stepConfig.Source.Resources = source.Resources + } + } + config[stepName] = stepConfig + b, err := json.Marshal(config) + if err != nil { + return nil, err + } + wfCtx.SetMutableValue(string(b), types.ContextKeyLogConfig) + return nil, nil +} + +func printDataInLog(_ context.Context, data any, level int, pCtx process.Context) error { + var message string + switch v := data.(type) { + case string: + message = v + default: + b, err := json.Marshal(data) + if err != nil { + return err + } + message = string(b) + } + klog.V(klog.Level(level)).InfoS(message, + model.ContextName, fmt.Sprint(pCtx.GetData(model.ContextName)), + model.ContextNamespace, fmt.Sprint(pCtx.GetData(model.ContextNamespace)), + model.ContextStepName, fmt.Sprint(pCtx.GetData(model.ContextStepName)), + model.ContextStepSessionID, fmt.Sprint(pCtx.GetData(model.ContextStepSessionID)), + ) + return nil +} + +//go:embed util.cue +var template string + +// GetTemplate return the template +func GetTemplate() string { + return template +} + +// GetProviders return the provider +func GetProviders() map[string]cuexruntime.ProviderFn { + return map[string]cuexruntime.ProviderFn{ + "patch-k8s-object": providertypes.NativeProviderFn(PatchK8sObject), + "string": providertypes.GenericProviderFn[StringVars, StringReturns](String), + "log": providertypes.GenericProviderFn[LogVars, any](Log), + } +} diff --git a/pkg/providers/util/util_test.go b/pkg/providers/util/util_test.go new file mode 100644 index 0000000..fcac06c --- /dev/null +++ b/pkg/providers/util/util_test.go @@ -0,0 +1,309 @@ +/* + Copyright 2022. The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package util + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "cuelang.org/go/cue" + "cuelang.org/go/cue/cuecontext" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/yaml" + + "github.com/kubevela/pkg/cue/util" + wfContext "github.com/kubevela/workflow/pkg/context" + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/cue/model/value" + "github.com/kubevela/workflow/pkg/cue/process" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +func TestPatchK8sObject(t *testing.T) { + ctx := context.Background() + cuectx := cuecontext.New() + testcases := map[string]struct { + value string + expectedErr error + patchResult string + }{ + "test patch k8s object": { + value: ` +value: { + apiVersion: "apps/v1" + kind: "Deployment" + spec: template: metadata: { + labels: { + "oam.dev/name": "test" + } + } +} +patch: { + spec: template: metadata: { + labels: { + "test-label": "true" + } + } +}`, + expectedErr: nil, + patchResult: `apiVersion: "apps/v1" +kind: "Deployment" +spec: template: metadata: labels: { + "oam.dev/name": "test" + "test-label": "true" +}`, + }, + "test patch k8s object with patchKey": { + value: ` +value: { + apiVersion: "apps/v1" + kind: "Deployment" + spec: template: spec: { + containers: [{ + name: "test" + }] + } +} +patch: { + spec: template: spec: { + // +patchKey=name + containers: [{ + name: "test" + env: [{ + name: "test-env" + value: "test-value" + }] + }] + } +}`, + expectedErr: nil, + patchResult: `apiVersion: "apps/v1" +kind: "Deployment" +spec: template: spec: containers: [{ + name: "test" + env: [{ + name: "test-env" + value: "test-value" + }] +}]`, + }, + "test patch k8s object with patchStrategy": { + value: ` +value: { + apiVersion: "apps/v1" + kind: "Deployment" + spec: template: metadata: { + name: "test-name" + } +} +patch: { + // +patchStrategy=retainKeys + spec: template: metadata: { + name: "test-patchStrategy" + } +} +`, + expectedErr: nil, + patchResult: `apiVersion: "apps/v1" +kind: "Deployment" +spec: template: metadata: name: "test-patchStrategy"`, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + r := require.New(t) + res, err := PatchK8sObject(ctx, &providertypes.Params[cue.Value]{ + Params: cuectx.CompileString(tc.value), + }) + if tc.expectedErr != nil { + r.Equal(tc.expectedErr.Error(), err.Error()) + return + } + r.NoError(err) + s, err := util.ToString(res.LookupPath(value.FieldPath("$returns", "result"))) + r.NoError(err) + r.Equal(tc.patchResult, s) + }) + } +} + +func TestConvertString(t *testing.T) { + ctx := context.Background() + testCases := map[string]struct { + from []byte + expected string + }{ + "success": { + from: []byte("test"), + expected: "test", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + r := require.New(t) + res, err := String(ctx, &StringParams{ + Params: StringVars{ + Byte: []byte(tc.from), + }, + }) + r.NoError(err) + r.Equal(tc.expected, res.Returns.String) + }) + } +} + +func TestLog(t *testing.T) { + ctx := context.Background() + wfCtx := newWorkflowContextForTest(t) + pCtx := process.NewContext(process.ContextData{}) + pCtx.PushData(model.ContextStepName, "test-step") + + testCases := []struct { + value LogVars + expected string + expectedErr string + }{ + { + value: LogVars{ + Data: "test", + }, + expected: `{"test-step":{"data":true}}`, + }, + { + value: LogVars{ + Data: map[string]string{ + "message": "test", + }, + Level: 3, + }, + expected: `{"test-step":{"data":true}}`, + }, + { + value: LogVars{ + Data: map[string]string{ + "test": "", + }, + }, + expected: `{"test-step":{"data":true}}`, + }, + { + value: LogVars{ + Source: &LogSource{ + URL: "https://kubevela.io", + }, + }, + expected: `{"test-step":{"data":true,"source":{"url":"https://kubevela.io"}}}`, + }, + { + value: LogVars{ + Source: &LogSource{ + Resources: []Resource{ + { + LabelSelector: map[string]string{ + "test": "test", + }, + }, + }, + }, + }, + expected: `{"test-step":{"data":true,"source":{"url":"https://kubevela.io","resources":[{"labelSelector":{"test":"test"}}]}}}`, + }, + { + value: LogVars{ + Source: &LogSource{ + Resources: []Resource{ + { + Name: "test", + Namespace: "test", + Cluster: "test", + }, + }, + }, + }, + expected: `{"test-step":{"data":true,"source":{"url":"https://kubevela.io","resources":[{"name":"test","namespace":"test","cluster":"test"}]}}}`, + }, + { + value: LogVars{ + Source: &LogSource{ + URL: "https://kubevela.com", + }, + }, + expected: `{"test-step":{"data":true,"source":{"url":"https://kubevela.com","resources":[{"name":"test","namespace":"test","cluster":"test"}]}}}`, + }, + } + for i, tc := range testCases { + t.Run(fmt.Sprint(i), func(t *testing.T) { + r := require.New(t) + _, err := Log(ctx, &LogParams{ + Params: tc.value, + RuntimeParams: providertypes.RuntimeParams{ + ProcessContext: pCtx, + WorkflowContext: wfCtx, + }, + }) + if tc.expectedErr != "" { + r.Contains(err.Error(), tc.expectedErr) + return + } + r.NoError(err) + if tc.expected != "" { + config := wfCtx.GetMutableValue("logConfig") + r.Equal(tc.expected, config) + } + }) + } +} + +// func TestInstall(t *testing.T) { +// p := providers.NewProviders() +// pCtx := process.NewContext(process.ContextData{}) +// pCtx.PushData(model.ContextStepName, "test-step") +// Install(p, pCtx) +// h, ok := p.GetHandler("util", "string") +// r := require.New(t) +// r.Equal(ok, true) +// r.Equal(h != nil, true) +// } + +func newWorkflowContextForTest(t *testing.T) wfContext.Context { + cm := corev1.ConfigMap{} + r := require.New(t) + testCaseJson, err := yaml.YAMLToJSON([]byte(testCaseYaml)) + r.NoError(err) + err = json.Unmarshal(testCaseJson, &cm) + r.NoError(err) + + wfCtx := new(wfContext.WorkflowContext) + err = wfCtx.LoadFromConfigMap(context.Background(), cm) + r.NoError(err) + return wfCtx +} + +var ( + testCaseYaml = `apiVersion: v1 +data: + logConfig: "" +kind: ConfigMap +metadata: + name: app-v1 +` +) diff --git a/pkg/tasks/builtin/step_group.go b/pkg/tasks/builtin/step_group.go index 6fbe8f4..262a9d9 100644 --- a/pkg/tasks/builtin/step_group.go +++ b/pkg/tasks/builtin/step_group.go @@ -62,7 +62,7 @@ func (tr *stepGroupTaskRunner) Name() string { func (tr *stepGroupTaskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { resetter := tr.FillContextData(ctx, tr.pCtx) defer resetter(tr.pCtx) - basicVal, _ := custom.MakeBasicValue(ctx, providers.Compiler.Get(), nil, tr.pCtx) + basicVal, _ := custom.MakeBasicValue(ctx, providers.DefaultCompiler.Get(), nil, tr.pCtx) return custom.CheckPending(wfCtx, tr.step, tr.id, stepStatus, basicVal) } @@ -84,7 +84,7 @@ func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRun tracer := options.GetTracer(tr.id, tr.step).AddTag("step_name", tr.name, "step_type", types.WorkflowStepTypeStepGroup) resetter := tr.FillContextData(tracer, tr.pCtx) defer resetter(tr.pCtx) - basicVal, err := custom.MakeBasicValue(tracer, providers.Compiler.Get(), nil, tr.pCtx) + basicVal, err := custom.MakeBasicValue(tracer, providers.DefaultCompiler.Get(), nil, tr.pCtx) if err != nil { return status, nil, err } diff --git a/pkg/tasks/custom/task.go b/pkg/tasks/custom/task.go index 09df92c..3e607ba 100644 --- a/pkg/tasks/custom/task.go +++ b/pkg/tasks/custom/task.go @@ -176,12 +176,12 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error var taskv cue.Value defer func() { - if r := recover(); r != nil { - exec.err(wfCtx, false, fmt.Errorf("invalid cue task for evaluation: %v", r), types.StatusReasonRendering) - stepStatus = exec.status() - operations = exec.operation() - return - } + // if r := recover(); r != nil { + // exec.err(wfCtx, false, fmt.Errorf("invalid cue task for evaluation: %v", r), types.StatusReasonRendering) + // stepStatus = exec.status() + // operations = exec.operation() + // return + // } if taskv == (cue.Value{}) { taskv = basicVal.FillPath(cue.ParsePath(""), templ) } diff --git a/pkg/tasks/custom/task_test.go b/pkg/tasks/custom/task_test.go index ff43e67..55b0d4a 100644 --- a/pkg/tasks/custom/task_test.go +++ b/pkg/tasks/custom/task_test.go @@ -332,7 +332,7 @@ func TestPendingInputCheck(t *testing.T) { Name: "app", Namespace: "default", }) - tasksLoader := NewTaskLoader(mockLoadTemplate, 0, pCtx, providers.Compiler.Get()) + tasksLoader := NewTaskLoader(mockLoadTemplate, 0, pCtx, providers.DefaultCompiler.Get()) gen, err := tasksLoader.GetTaskGenerator(context.Background(), step.Type) r.NoError(err) run, err := gen(step, &types.TaskGeneratorOptions{}) @@ -362,7 +362,7 @@ func TestPendingDependsOnCheck(t *testing.T) { Name: "app", Namespace: "default", }) - tasksLoader := NewTaskLoader(mockLoadTemplate, 0, pCtx, providers.Compiler.Get()) + tasksLoader := NewTaskLoader(mockLoadTemplate, 0, pCtx, providers.DefaultCompiler.Get()) gen, err := tasksLoader.GetTaskGenerator(context.Background(), step.Type) r.NoError(err) run, err := gen(step, &types.TaskGeneratorOptions{}) @@ -391,7 +391,7 @@ func TestSkip(t *testing.T) { Name: "app", Namespace: "default", }) - tasksLoader := NewTaskLoader(mockLoadTemplate, 0, pCtx, providers.Compiler.Get()) + tasksLoader := NewTaskLoader(mockLoadTemplate, 0, pCtx, providers.DefaultCompiler.Get()) gen, err := tasksLoader.GetTaskGenerator(context.Background(), step.Type) r.NoError(err) runner, err := gen(step, &types.TaskGeneratorOptions{}) @@ -458,7 +458,7 @@ func TestValidateIfValue(t *testing.T) { r := require.New(t) logCtx := monitorContext.NewTraceContext(context.Background(), "test-app") - basicVal, err := MakeBasicValue(logCtx, providers.Compiler.Get(), &runtime.RawExtension{Raw: []byte(`{"key": "value"}`)}, pCtx) + basicVal, err := MakeBasicValue(logCtx, providers.DefaultCompiler.Get(), &runtime.RawExtension{Raw: []byte(`{"key": "value"}`)}, pCtx) r.NoError(err) testCases := []struct {