From 4c1a4cba154fc0805ee94bd7d5218e281d127c3d Mon Sep 17 00:00:00 2001 From: FogDong Date: Mon, 18 Jul 2022 15:00:31 +0800 Subject: [PATCH] add metrics and pending phase Signed-off-by: FogDong --- Makefile | 1 + api/v1alpha1/types.go | 2 + charts/vela-workflow/README.md | 4 +- .../templates/workflow-controller.yaml | 10 +- charts/vela-workflow/values.yaml | 4 +- cmd/main.go | 140 ++++-- .../crd/bases/core.oam.dev_workflowruns.yaml | 400 ------------------ config/crd/bases/core.oam.dev_workflows.yaml | 164 ------- .../core.oam.dev_workflowstepdefinitions.yaml | 275 ------------ controllers/suite_test.go | 2 +- controllers/workflow_test.go | 29 -- controllers/workflowrun_controller.go | 29 +- go.mod | 1 + go.sum | 2 + pkg/client/controller_client.go | 81 ++++ pkg/client/delegating_client.go | 99 +++++ pkg/client/delegating_handler_client.go | 46 ++ pkg/client/monitor_client.go | 137 ++++++ pkg/executor/workflow.go | 52 ++- pkg/executor/workflow_test.go | 84 ++-- pkg/hooks/data_passing.go | 54 ++- pkg/hooks/data_passing_test.go | 4 +- pkg/monitor/metrics/workflow.go | 35 +- pkg/monitor/watcher/workflow.go | 2 +- pkg/providers/util/util.go | 16 +- pkg/providers/util/util_test.go | 9 + pkg/stdlib/pkgs/util.cue | 2 +- pkg/steps/generator.go | 8 +- pkg/tasks/builtin/step_group.go | 13 +- pkg/tasks/builtin/step_group_test.go | 6 +- pkg/tasks/builtin/suspend.go | 17 +- pkg/tasks/builtin/suspend_test.go | 6 +- pkg/tasks/custom/task.go | 51 ++- pkg/tasks/custom/task_test.go | 19 +- pkg/types/types.go | 4 +- version/version.go | 46 ++ 36 files changed, 815 insertions(+), 1039 deletions(-) delete mode 100644 config/crd/bases/core.oam.dev_workflowruns.yaml delete mode 100644 config/crd/bases/core.oam.dev_workflows.yaml delete mode 100644 config/crd/bases/core.oam.dev_workflowstepdefinitions.yaml create mode 100644 pkg/client/controller_client.go create mode 100644 pkg/client/delegating_client.go create mode 100644 pkg/client/delegating_handler_client.go create mode 100644 pkg/client/monitor_client.go create mode 100644 version/version.go diff --git a/Makefile b/Makefile index d4d01b6..5af836e 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,7 @@ check-diff: reviewable ## Execute auto-gen code commands and ensure branch is cl .PHONY: manifests manifests: controller-gen ## Generate CustomResourceDefinition objects. $(CONTROLLER_GEN) crd paths="./..." output:crd:artifacts:config=config/crd/bases + mv config/crd/bases/* charts/vela-workflow/crds/ .PHONY: generate generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. diff --git a/api/v1alpha1/types.go b/api/v1alpha1/types.go index 6da29cc..5909a30 100644 --- a/api/v1alpha1/types.go +++ b/api/v1alpha1/types.go @@ -222,6 +222,8 @@ const ( WorkflowStepPhaseStopped WorkflowStepPhase = "stopped" // WorkflowStepPhaseRunning will make the controller continue the workflow. WorkflowStepPhaseRunning WorkflowStepPhase = "running" + // WorkflowStepPhasePending will make the controller wait for the step to run. + WorkflowStepPhasePending WorkflowStepPhase = "pending" ) // StepOutputs defines output variable of WorkflowStep diff --git a/charts/vela-workflow/README.md b/charts/vela-workflow/README.md index 8b3484d..f2ee9d4 100644 --- a/charts/vela-workflow/README.md +++ b/charts/vela-workflow/README.md @@ -80,8 +80,8 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow - | `logDebug` | Enable debug logs for development purpose | `false` | | `logFilePath` | If non-empty, write log files in this path | `""` | | `logFileMaxSize` | Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. | `1024` | -| `kubeClient.qps` | The qps for reconcile clients, default is 50 | `50` | -| `kubeClient.burst` | The burst for reconcile clients, default is 100 | `100` | +| `kubeClient.qps` | The qps for reconcile clients, default is 50 | `500` | +| `kubeClient.burst` | The burst for reconcile clients, default is 100 | `1000` | ## Uninstallation diff --git a/charts/vela-workflow/templates/workflow-controller.yaml b/charts/vela-workflow/templates/workflow-controller.yaml index e3368ef..b7d2751 100644 --- a/charts/vela-workflow/templates/workflow-controller.yaml +++ b/charts/vela-workflow/templates/workflow-controller.yaml @@ -114,10 +114,12 @@ spec: - "--webhook-port={{ .Values.webhookService.port }}" - "--webhook-cert-dir={{ .Values.admissionWebhooks.certificate.mountPath }}" {{ end }} - # - "--health-addr=:{{ .Values.healthCheck.port }}" - # - "--concurrent-reconciles={{ .Values.concurrentReconciles }}" - # - "--kube-api-qps={{ .Values.kubeClient.qps }}" - # - "--kube-api-burst={{ .Values.kubeClient.burst }}" + - "--metrics-bind-address=:8080" + - "--leader-elect" + - "--health-probe-bind-address=:{{ .Values.healthCheck.port }}" + - "--concurrent-reconciles={{ .Values.concurrentReconciles }}" + - "--kube-api-qps={{ .Values.kubeClient.qps }}" + - "--kube-api-burst={{ .Values.kubeClient.burst }}" - "--max-workflow-wait-backoff-time={{ .Values.workflow.backoff.maxTime.waitState }}" - "--max-workflow-failed-backoff-time={{ .Values.workflow.backoff.maxTime.failedState }}" - "--max-workflow-step-error-retry-times={{ .Values.workflow.step.errorRetryTimes }}" diff --git a/charts/vela-workflow/values.yaml b/charts/vela-workflow/values.yaml index fb6f290..7c45dec 100644 --- a/charts/vela-workflow/values.yaml +++ b/charts/vela-workflow/values.yaml @@ -144,5 +144,5 @@ admissionWebhooks: ## @param kubeClient.qps The qps for reconcile clients, default is 50 ## @param kubeClient.burst The burst for reconcile clients, default is 100 kubeClient: - qps: 50 - burst: 100 + qps: 500 + burst: 1000 diff --git a/cmd/main.go b/cmd/main.go index 9410217..2041c70 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,7 +17,14 @@ limitations under the License. package main import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/pprof" "os" + "strings" + "time" "github.com/crossplane/crossplane-runtime/pkg/event" flag "github.com/spf13/pflag" @@ -25,7 +32,8 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/feature" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/klog" + "k8s.io/klog/v2" + "k8s.io/klog/v2/klogr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -33,14 +41,16 @@ import ( "github.com/kubevela/workflow/api/v1alpha1" "github.com/kubevela/workflow/controllers" + ctrlClient "github.com/kubevela/workflow/pkg/client" "github.com/kubevela/workflow/pkg/cue/packages" + "github.com/kubevela/workflow/pkg/monitor/watcher" "github.com/kubevela/workflow/pkg/types" + "github.com/kubevela/workflow/version" //+kubebuilder:scaffold:imports ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + scheme = runtime.NewScheme() ) func init() { @@ -55,46 +65,108 @@ func main() { var metricsAddr string var enableLeaderElection bool var probeAddr string + var qps float64 + var burst int + var webhookPort int + var leaderElectionResourceLock string + var leaseDuration time.Duration + var renewDeadline time.Duration + var retryPeriod time.Duration + var pprofAddr string + var controllerArgs controllers.Args + flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + flag.StringVar(&leaderElectionResourceLock, "leader-election-resource-lock", "configmapsleases", "The resource lock to use for leader election") + flag.DurationVar(&leaseDuration, "leader-election-lease-duration", 15*time.Second, + "The duration that non-leader candidates will wait to force acquire leadership") + flag.DurationVar(&renewDeadline, "leader-election-renew-deadline", 10*time.Second, + "The duration that the acting controlplane will retry refreshing leadership before giving up") + flag.DurationVar(&retryPeriod, "leader-election-retry-period", 2*time.Second, + "The duration the LeaderElector clients should wait between tries of actions") + flag.IntVar(&webhookPort, "webhook-port", 9443, "admission webhook listen address") + flag.IntVar(&controllerArgs.ConcurrentReconciles, "concurrent-reconciles", 4, "concurrent-reconciles is the concurrent reconcile number of the controller. The default value is 4") + flag.Float64Var(&qps, "kube-api-qps", 50, "the qps for reconcile clients. Low qps may lead to low throughput. High qps may give stress to api-server. Raise this value if concurrent-reconciles is set to be high.") + flag.IntVar(&burst, "kube-api-burst", 100, "the burst for reconcile clients. Recommend setting it qps*2.") + flag.StringVar(&pprofAddr, "pprof-addr", "", "The address for pprof to use while exporting profiling results. The default value is empty which means do not expose it. Set it to address like :6666 to expose it.") flag.IntVar(&types.MaxWorkflowWaitBackoffTime, "max-workflow-wait-backoff-time", 60, "Set the max workflow wait backoff time, default is 60") flag.IntVar(&types.MaxWorkflowFailedBackoffTime, "max-workflow-failed-backoff-time", 300, "Set the max workflow wait backoff time, default is 300") flag.IntVar(&types.MaxWorkflowStepErrorRetryTimes, "max-workflow-step-error-retry-times", 10, "Set the max workflow step error retry times, default is 10") feature.DefaultMutableFeatureGate.AddFlag(flag.CommandLine) flag.Parse() - klog.InitFlags(nil) - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - MetricsBindAddress: metricsAddr, - Port: 9443, - HealthProbeBindAddress: probeAddr, - LeaderElection: enableLeaderElection, - LeaderElectionID: "0ef1568c.core.oam.dev", - // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily - // when the Manager ends. This requires the binary to immediately end when the - // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly - // speeds up voluntary leader transitions as the new leader don't have to wait - // LeaseDuration time first. - // - // In the default scaffold provided, the program ends immediately after - // the manager stops, so would be fine to enable this option. However, - // if you are doing or is intended to do any operation such as perform cleanups - // after the manager stops then its usage might be unsafe. - // LeaderElectionReleaseOnCancel: true, + + if pprofAddr != "" { + // Start pprof server if enabled + mux := http.NewServeMux() + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + pprofServer := http.Server{ + Addr: pprofAddr, + Handler: mux, + } + klog.InfoS("Starting debug HTTP server", "addr", pprofServer.Addr) + + go func() { + go func() { + ctx := context.Background() + <-ctx.Done() + + ctx, cancelFunc := context.WithTimeout(context.Background(), 60*time.Minute) + defer cancelFunc() + + if err := pprofServer.Shutdown(ctx); err != nil { + klog.Error(err, "Failed to shutdown debug HTTP server") + } + }() + + if err := pprofServer.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) { + klog.Error(err, "Failed to start debug HTTP server") + panic(err) + } + }() + } + + ctrl.SetLogger(klogr.New()) + + klog.InfoS("KubeVela Workflow information", "version", version.VelaVersion, "revision", version.GitRevision) + + restConfig := ctrl.GetConfigOrDie() + restConfig.QPS = float32(qps) + restConfig.Burst = burst + klog.InfoS("Kubernetes Config Loaded", + "QPS", restConfig.QPS, + "Burst", restConfig.Burst, + ) + + leaderElectionID := fmt.Sprintf("workflow-%s", strings.ToLower(strings.ReplaceAll(version.VelaVersion, ".", "-"))) + mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ + Scheme: scheme, + MetricsBindAddress: metricsAddr, + Port: webhookPort, + HealthProbeBindAddress: probeAddr, + LeaderElection: enableLeaderElection, + LeaderElectionID: leaderElectionID, + LeaderElectionResourceLock: leaderElectionResourceLock, + LeaseDuration: &leaseDuration, + RenewDeadline: &renewDeadline, + RetryPeriod: &retryPeriod, + NewClient: ctrlClient.DefaultNewControllerClient, }) if err != nil { - setupLog.Error(err, "unable to start manager") + klog.Error(err, "unable to start manager") os.Exit(1) } pd, err := packages.NewPackageDiscover(mgr.GetConfig()) if err != nil { - setupLog.Error(err, "Failed to create CRD discovery for CUE package client") + klog.Error(err, "Failed to create CRD discovery for CUE package client") if !packages.IsCUEParseErr(err) { os.Exit(1) } @@ -105,24 +177,32 @@ func main() { Scheme: mgr.GetScheme(), PackageDiscover: pd, Recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("WorkflowRun")), + Args: controllerArgs, }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "WorkflowRun") + klog.Error(err, "unable to create controller", "controller", "WorkflowRun") os.Exit(1) } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") + klog.Error(err, "unable to set up health check") os.Exit(1) } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") + klog.Error(err, "unable to set up ready check") os.Exit(1) } - setupLog.Info("starting manager") + klog.Info("Start the vela workflow monitor") + informer, err := mgr.GetCache().GetInformer(context.Background(), &v1alpha1.WorkflowRun{}) + if err != nil { + klog.ErrorS(err, "Unable to get informer for application") + } + watcher.StartWorkflowRunMetricsWatcher(informer) + + klog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - setupLog.Error(err, "problem running manager") + klog.Error(err, "problem running manager") os.Exit(1) } } diff --git a/config/crd/bases/core.oam.dev_workflowruns.yaml b/config/crd/bases/core.oam.dev_workflowruns.yaml deleted file mode 100644 index afd98ef..0000000 --- a/config/crd/bases/core.oam.dev_workflowruns.yaml +++ /dev/null @@ -1,400 +0,0 @@ ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.9.0 - creationTimestamp: null - name: workflowruns.core.oam.dev -spec: - group: core.oam.dev - names: - categories: - - oam - kind: WorkflowRun - listKind: WorkflowRunList - plural: workflowruns - shortNames: - - wr - singular: workflowrun - scope: Namespaced - versions: - - additionalPrinterColumns: - - jsonPath: .status.status - name: PHASE - type: string - - jsonPath: .metadata.creationTimestamp - name: AGE - type: date - name: v1alpha1 - schema: - openAPIV3Schema: - description: WorkflowRun is the Schema for the workflowRun API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - properties: - mode: - description: WorkflowExecuteMode defines the mode of workflow execution - properties: - steps: - description: WorkflowMode describes the mode of workflow - type: string - subSteps: - description: WorkflowMode describes the mode of workflow - type: string - type: object - workflowRef: - type: string - workflowSpec: - description: Workflow defines workflow steps and other attributes - properties: - steps: - items: - description: WorkflowStep defines how to execute a workflow - step. - properties: - dependsOn: - description: DependsOn is the dependency of the step - items: - type: string - type: array - if: - description: If is the if condition of the step - type: string - inputs: - description: Inputs is the inputs of the step - items: - properties: - from: - type: string - parameterKey: - type: string - required: - - from - - parameterKey - type: object - type: array - meta: - description: Meta is the meta data of the workflow step. - properties: - alias: - type: string - type: object - name: - description: Name is the unique name of the workflow step. - type: string - outputs: - description: Outputs is the outputs of the step - items: - properties: - name: - type: string - valueFrom: - type: string - required: - - name - - valueFrom - type: object - type: array - properties: - description: Properties is the properties of the step - type: object - x-kubernetes-preserve-unknown-fields: true - subSteps: - items: - description: WorkflowSubStep defines how to execute a - workflow subStep. - properties: - dependsOn: - description: DependsOn is the dependency of the step - items: - type: string - type: array - if: - description: If is the if condition of the step - type: string - inputs: - description: Inputs is the inputs of the step - items: - properties: - from: - type: string - parameterKey: - type: string - required: - - from - - parameterKey - type: object - type: array - meta: - description: Meta is the meta data of the workflow - step. - properties: - alias: - type: string - type: object - name: - description: Name is the unique name of the workflow - step. - type: string - outputs: - description: Outputs is the outputs of the step - items: - properties: - name: - type: string - valueFrom: - type: string - required: - - name - - valueFrom - type: object - type: array - properties: - description: Properties is the properties of the step - type: object - x-kubernetes-preserve-unknown-fields: true - timeout: - description: Timeout is the timeout of the step - type: string - type: - description: Type is the type of the workflow step. - type: string - required: - - name - - type - type: object - type: array - timeout: - description: Timeout is the timeout of the step - type: string - type: - description: Type is the type of the workflow step. - type: string - required: - - name - - type - type: object - type: array - type: object - type: object - status: - description: WorkflowRunStatus record the status of workflow run - properties: - conditions: - description: Conditions of the resource. - items: - description: A Condition that may apply to a resource. - properties: - lastTransitionTime: - description: LastTransitionTime is the last time this condition - transitioned from one status to another. - format: date-time - type: string - message: - description: A Message containing details about this condition's - last transition from one status to another, if any. - type: string - reason: - description: A Reason for this condition's last transition from - one status to another. - type: string - status: - description: Status of this condition; is it currently True, - False, or Unknown? - type: string - type: - description: Type of this condition. At most one of each condition - type may apply to a resource at any point in time. - type: string - required: - - lastTransitionTime - - reason - - status - - type - type: object - type: array - contextBackend: - description: 'ObjectReference contains enough information to let you - inspect or modify the referred object. --- New uses of this type - are discouraged because of difficulty describing its usage when - embedded in APIs. 1. Ignored fields. It includes many fields which - are not generally honored. For instance, ResourceVersion and FieldPath - are both very rarely valid in actual usage. 2. Invalid usage help. It - is impossible to add specific help for individual usage. In most - embedded usages, there are particular restrictions like, "must refer - only to types A and B" or "UID not honored" or "name must be restricted". - Those cannot be well described when embedded. 3. Inconsistent validation. Because - the usages are different, the validation rules are different by - usage, which makes it hard for users to predict what will happen. - 4. The fields are both imprecise and overly precise. Kind is not - a precise mapping to a URL. This can produce ambiguity during interpretation - and require a REST mapping. In most cases, the dependency is on - the group,resource tuple and the version of the actual struct is - irrelevant. 5. We cannot easily change it. Because this type is - embedded in many locations, updates to this type will affect numerous - schemas. Don''t make new APIs embed an underspecified API type - they do not control. Instead of using this type, create a locally - provided and used type that is well-focused on your reference. For - example, ServiceReferences for admission registration: https://github.com/kubernetes/api/blob/release-1.17/admissionregistration/v1/types.go#L533 - .' - properties: - apiVersion: - description: API version of the referent. - type: string - fieldPath: - description: 'If referring to a piece of an object instead of - an entire object, this string should contain a valid JSON/Go - field access statement, such as desiredState.manifest.containers[2]. - For example, if the object reference is to a container within - a pod, this would take on a value like: "spec.containers{name}" - (where "name" refers to the name of the container that triggered - the event) or if no container name is specified "spec.containers[2]" - (container with index 2 in this pod). This syntax is chosen - only to have some well-defined way of referencing a part of - an object. TODO: this design is not final and this field is - subject to change in the future.' - type: string - kind: - description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - name: - description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' - type: string - namespace: - description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/' - type: string - resourceVersion: - description: 'Specific resourceVersion to which this reference - is made, if any. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency' - type: string - uid: - description: 'UID of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids' - type: string - type: object - endTime: - format: date-time - type: string - finished: - type: boolean - message: - type: string - mode: - description: WorkflowExecuteMode defines the mode of workflow execution - properties: - steps: - description: WorkflowMode describes the mode of workflow - type: string - subSteps: - description: WorkflowMode describes the mode of workflow - type: string - type: object - startTime: - format: date-time - type: string - status: - description: WorkflowRunPhase is a label for the condition of a WorkflowRun - at the current time - type: string - steps: - items: - description: WorkflowStepStatus record the status of a workflow - step, include step status and subStep status - properties: - firstExecuteTime: - description: FirstExecuteTime is the first time this step execution. - format: date-time - type: string - id: - type: string - lastExecuteTime: - description: LastExecuteTime is the last time this step execution. - format: date-time - type: string - message: - description: A human readable message indicating details about - why the workflowStep is in this state. - type: string - name: - type: string - phase: - description: WorkflowStepPhase describes the phase of a workflow - step. - type: string - reason: - description: A brief CamelCase message indicating details about - why the workflowStep is in this state. - type: string - subSteps: - items: - description: StepStatus record the base status of workflow - step, which could be workflow step or subStep - properties: - firstExecuteTime: - description: FirstExecuteTime is the first time this step - execution. - format: date-time - type: string - id: - type: string - lastExecuteTime: - description: LastExecuteTime is the last time this step - execution. - format: date-time - type: string - message: - description: A human readable message indicating details - about why the workflowStep is in this state. - type: string - name: - type: string - phase: - description: WorkflowStepPhase describes the phase of - a workflow step. - type: string - reason: - description: A brief CamelCase message indicating details - about why the workflowStep is in this state. - type: string - type: - type: string - required: - - id - type: object - type: array - type: - type: string - required: - - id - type: object - type: array - suspend: - type: boolean - suspendState: - type: string - terminated: - type: boolean - required: - - finished - - mode - - status - - suspend - - terminated - type: object - type: object - served: true - storage: true - subresources: - status: {} diff --git a/config/crd/bases/core.oam.dev_workflows.yaml b/config/crd/bases/core.oam.dev_workflows.yaml deleted file mode 100644 index da503cc..0000000 --- a/config/crd/bases/core.oam.dev_workflows.yaml +++ /dev/null @@ -1,164 +0,0 @@ ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.9.0 - creationTimestamp: null - name: workflows.core.oam.dev -spec: - group: core.oam.dev - names: - categories: - - oam - kind: Workflow - listKind: WorkflowList - plural: workflows - singular: workflow - scope: Namespaced - versions: - - name: v1alpha1 - schema: - openAPIV3Schema: - description: Workflow is the Schema for the workflow API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - steps: - items: - description: WorkflowStep defines how to execute a workflow step. - properties: - dependsOn: - description: DependsOn is the dependency of the step - items: - type: string - type: array - if: - description: If is the if condition of the step - type: string - inputs: - description: Inputs is the inputs of the step - items: - properties: - from: - type: string - parameterKey: - type: string - required: - - from - - parameterKey - type: object - type: array - meta: - description: Meta is the meta data of the workflow step. - properties: - alias: - type: string - type: object - name: - description: Name is the unique name of the workflow step. - type: string - outputs: - description: Outputs is the outputs of the step - items: - properties: - name: - type: string - valueFrom: - type: string - required: - - name - - valueFrom - type: object - type: array - properties: - description: Properties is the properties of the step - type: object - x-kubernetes-preserve-unknown-fields: true - subSteps: - items: - description: WorkflowSubStep defines how to execute a workflow - subStep. - properties: - dependsOn: - description: DependsOn is the dependency of the step - items: - type: string - type: array - if: - description: If is the if condition of the step - type: string - inputs: - description: Inputs is the inputs of the step - items: - properties: - from: - type: string - parameterKey: - type: string - required: - - from - - parameterKey - type: object - type: array - meta: - description: Meta is the meta data of the workflow step. - properties: - alias: - type: string - type: object - name: - description: Name is the unique name of the workflow step. - type: string - outputs: - description: Outputs is the outputs of the step - items: - properties: - name: - type: string - valueFrom: - type: string - required: - - name - - valueFrom - type: object - type: array - properties: - description: Properties is the properties of the step - type: object - x-kubernetes-preserve-unknown-fields: true - timeout: - description: Timeout is the timeout of the step - type: string - type: - description: Type is the type of the workflow step. - type: string - required: - - name - - type - type: object - type: array - timeout: - description: Timeout is the timeout of the step - type: string - type: - description: Type is the type of the workflow step. - type: string - required: - - name - - type - type: object - type: array - type: object - served: true - storage: true diff --git a/config/crd/bases/core.oam.dev_workflowstepdefinitions.yaml b/config/crd/bases/core.oam.dev_workflowstepdefinitions.yaml deleted file mode 100644 index 85957b3..0000000 --- a/config/crd/bases/core.oam.dev_workflowstepdefinitions.yaml +++ /dev/null @@ -1,275 +0,0 @@ - ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.6.2 - name: workflowstepdefinitions.core.oam.dev -spec: - group: core.oam.dev - names: - categories: - - oam - kind: WorkflowStepDefinition - listKind: WorkflowStepDefinitionList - plural: workflowstepdefinitions - shortNames: - - workflowstep - singular: workflowstepdefinition - scope: Namespaced - versions: - - name: v1beta1 - schema: - openAPIV3Schema: - description: WorkflowStepDefinition is the Schema for the workflowstepdefinitions - API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: WorkflowStepDefinitionSpec defines the desired state of WorkflowStepDefinition - properties: - definitionRef: - description: Reference to the CustomResourceDefinition that defines - this trait kind. - properties: - name: - description: Name of the referenced CustomResourceDefinition. - type: string - version: - description: Version indicate which version should be used if - CRD has multiple versions by default it will use the first one - if not specified - type: string - required: - - name - type: object - schematic: - description: Schematic defines the data format and template of the - encapsulation of the workflow step definition - properties: - cue: - description: CUE defines the encapsulation in CUE format - properties: - template: - description: Template defines the abstraction template data - of the capability, it will replace the old CUE template - in extension field. Template is a required field if CUE - is defined in Capability Definition. - type: string - required: - - template - type: object - helm: - description: A Helm represents resources used by a Helm module - properties: - release: - description: Release records a Helm release used by a Helm - module workload. - type: object - x-kubernetes-preserve-unknown-fields: true - repository: - description: HelmRelease records a Helm repository used by - a Helm module workload. - type: object - x-kubernetes-preserve-unknown-fields: true - required: - - release - - repository - type: object - kube: - description: Kube defines the encapsulation in raw Kubernetes - resource format - properties: - parameters: - description: Parameters defines configurable parameters - items: - description: A KubeParameter defines a configurable parameter - of a component. - properties: - description: - description: Description of this parameter. - type: string - fieldPaths: - description: "FieldPaths specifies an array of fields - within this workload that will be overwritten by the - value of this parameter. \tAll fields must be of the - same type. Fields are specified as JSON field paths - without a leading dot, for example 'spec.replicas'." - items: - type: string - type: array - name: - description: Name of this parameter - type: string - required: - default: false - description: Required specifies whether or not a value - for this parameter must be supplied when authoring - an Application. - type: boolean - type: - description: 'ValueType indicates the type of the parameter - value, and only supports basic data types: string, - number, boolean.' - enum: - - string - - number - - boolean - type: string - required: - - fieldPaths - - name - - type - type: object - type: array - template: - description: Template defines the raw Kubernetes resource - type: object - x-kubernetes-preserve-unknown-fields: true - required: - - template - type: object - terraform: - description: Terraform is the struct to describe cloud resources - managed by Hashicorp Terraform - properties: - configuration: - description: Configuration is Terraform Configuration - type: string - customRegion: - description: Region is cloud provider's region. It will override - the region in the region field of ProviderReference - type: string - deleteResource: - default: true - description: DeleteResource will determine whether provisioned - cloud resources will be deleted when CR is deleted - type: boolean - path: - description: Path is the sub-directory of remote git repository. - It's valid when remote is set - type: string - providerRef: - description: ProviderReference specifies the reference to - Provider - properties: - name: - description: Name of the referenced object. - type: string - namespace: - default: default - description: Namespace of the referenced object. - type: string - required: - - name - type: object - type: - default: hcl - description: Type specifies which Terraform configuration - it is, HCL or JSON syntax - enum: - - hcl - - json - - remote - type: string - writeConnectionSecretToRef: - description: WriteConnectionSecretToReference specifies the - namespace and name of a Secret to which any connection details - for this managed resource should be written. Connection - details frequently include the endpoint, username, and password - required to connect to the managed resource. - properties: - name: - description: Name of the secret. - type: string - namespace: - description: Namespace of the secret. - type: string - required: - - name - type: object - required: - - configuration - type: object - type: object - type: object - status: - description: WorkflowStepDefinitionStatus is the status of WorkflowStepDefinition - properties: - conditions: - description: Conditions of the resource. - items: - description: A Condition that may apply to a resource. - properties: - lastTransitionTime: - description: LastTransitionTime is the last time this condition - transitioned from one status to another. - format: date-time - type: string - message: - description: A Message containing details about this condition's - last transition from one status to another, if any. - type: string - reason: - description: A Reason for this condition's last transition from - one status to another. - type: string - status: - description: Status of this condition; is it currently True, - False, or Unknown? - type: string - type: - description: Type of this condition. At most one of each condition - type may apply to a resource at any point in time. - type: string - required: - - lastTransitionTime - - reason - - status - - type - type: object - type: array - configMapRef: - description: ConfigMapRef refer to a ConfigMap which contains OpenAPI - V3 JSON schema of Component parameters. - type: string - latestRevision: - description: LatestRevision of the component definition - properties: - name: - type: string - revision: - format: int64 - type: integer - revisionHash: - description: RevisionHash record the hash value of the spec of - ApplicationRevision object. - type: string - required: - - name - - revision - type: object - type: object - type: object - served: true - storage: true - subresources: - status: {} -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] diff --git a/controllers/suite_test.go b/controllers/suite_test.go index c4800d7..ed8644a 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -64,7 +64,7 @@ var _ = BeforeSuite(func() { By("bootstrapping test environment") testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")}, + CRDDirectoryPaths: []string{filepath.Join("..", "charts", "vela-workflow", "crds")}, ErrorIfCRDPathMissing: true, } diff --git a/controllers/workflow_test.go b/controllers/workflow_test.go index 3376a52..f7538c2 100644 --- a/controllers/workflow_test.go +++ b/controllers/workflow_test.go @@ -86,35 +86,6 @@ var _ = Describe("Test Workflow", func() { Expect(k8sClient.DeleteAllOf(ctx, &corev1.ConfigMap{}, client.InNamespace(namespace))).Should(Succeed()) }) - It("should record event", func() { - wr := wrTemplate.DeepCopy() - wr.Spec.WorkflowSpec.Steps = []v1alpha1.WorkflowStep{ - { - WorkflowStepBase: v1alpha1.WorkflowStepBase{ - Name: "step2", - Type: "test-apply", - Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, - Inputs: v1alpha1.StepInputs{ - { - From: "invalid", - ParameterKey: "message", - }, - }, - }, - }} - Expect(k8sClient.Create(ctx, wr)).Should(BeNil()) - - err := reconcileWithReturn(reconciler, wr.Name, wr.Namespace) - Expect(err).ShouldNot(BeNil()) - - events, err := recorder.GetEventsWithName(wr.Name) - Expect(err).Should(BeNil()) - Expect(len(events)).Should(Equal(1)) - Expect(events[0].EventType).Should(Equal(corev1.EventTypeWarning)) - Expect(events[0].Reason).Should(Equal(v1alpha1.ReasonExecute)) - Expect(events[0].Message).Should(ContainSubstring(v1alpha1.MessageFailedExecute)) - }) - It("get steps from workflow ref", func() { workflow := &v1alpha1.Workflow{ TypeMeta: metav1.TypeMeta{ diff --git a/controllers/workflowrun_controller.go b/controllers/workflowrun_controller.go index 6ceca3e..e7e54d8 100644 --- a/controllers/workflowrun_controller.go +++ b/controllers/workflowrun_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" ctrlEvent "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -43,12 +44,19 @@ import ( "github.com/kubevela/workflow/pkg/types" ) +// Args args used by controller +type Args struct { + // ConcurrentReconciles is the concurrent reconcile number of the controller + ConcurrentReconciles int +} + // WorkflowRunReconciler reconciles a WorkflowRun object type WorkflowRunReconciler struct { client.Client Scheme *runtime.Scheme PackageDiscover *packages.PackageDiscover Recorder event.Recorder + Args } var ( @@ -81,6 +89,11 @@ func (r *WorkflowRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) timeReporter := timeReconcile(run) defer timeReporter() + if run.Status.Finished { + logCtx.Info("WorkflowRun is finished, skip reconcile") + return ctrl.Result{}, nil + } + switch { case run.Spec.WorkflowSpec != nil && len(run.Spec.WorkflowSpec.Steps) > 0: case run.Spec.WorkflowRef != "": @@ -150,6 +163,9 @@ func (r *WorkflowRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) // SetupWithManager sets up the controller with the Manager. func (r *WorkflowRunReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + MaxConcurrentReconciles: r.ConcurrentReconciles, + }). WithEventFilter(predicate.Funcs{ // filter the changes in workflow status // let workflow handle its reconcile @@ -160,6 +176,11 @@ func (r *WorkflowRunReconciler) SetupWithManager(mgr ctrl.Manager) error { return false } + // if the workflow is finished, skip the reconcile + if new.Status.Finished { + return false + } + // filter managedFields changes old.ManagedFields = nil new.ManagedFields = nil @@ -180,9 +201,6 @@ func (r *WorkflowRunReconciler) SetupWithManager(mgr ctrl.Manager) error { CreateFunc: func(e ctrlEvent.CreateEvent) bool { return true }, - DeleteFunc: func(e ctrlEvent.DeleteEvent) bool { - return true - }, }). For(&v1alpha1.WorkflowRun{}). Complete(r) @@ -191,7 +209,7 @@ func (r *WorkflowRunReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *WorkflowRunReconciler) endWithNegativeCondition(ctx context.Context, wr *v1alpha1.WorkflowRun, condition condition.Condition, phase v1alpha1.WorkflowRunPhase) (ctrl.Result, error) { wr.SetConditions(condition) if err := r.patchStatus(ctx, wr, phase); err != nil { - return ctrl.Result{}, errors.WithMessage(err, "cannot update workflowrun status") + return ctrl.Result{}, errors.WithMessage(err, "failed to patch workflowrun status") } return ctrl.Result{}, fmt.Errorf("reconcile WorkflowRun error, msg: %s", condition.Message) } @@ -199,7 +217,8 @@ func (r *WorkflowRunReconciler) endWithNegativeCondition(ctx context.Context, wr func (r *WorkflowRunReconciler) patchStatus(ctx context.Context, wr *v1alpha1.WorkflowRun, phase v1alpha1.WorkflowRunPhase) error { wr.Status.Phase = phase if err := r.Status().Patch(ctx, wr, client.Merge); err != nil { - return err + executor.StepStatusCache.Store(fmt.Sprintf("%s-%s", wr.Name, wr.Namespace), -1) + return errors.WithMessage(err, "failed to patch workflowrun status") } return nil } diff --git a/go.mod b/go.mod index 1e68d50..0936f4f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/crossplane/crossplane-runtime v0.14.1-0.20210722005935-0b469fcc77cd github.com/evanphx/json-patch v4.12.0+incompatible github.com/google/go-cmp v0.5.8 + github.com/hashicorp/go-version v1.3.0 github.com/oam-dev/kubevela v1.5.0-alpha.2.0.20220706095416-09acc8a98942 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.19.0 diff --git a/go.sum b/go.sum index d3411a5..19dcd41 100644 --- a/go.sum +++ b/go.sum @@ -438,6 +438,8 @@ github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdv github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/go-version v1.3.0 h1:McDWVJIU/y+u1BRV06dPaLfLCaT7fUTJLp5r04x7iNw= +github.com/hashicorp/go-version v1.3.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= diff --git a/pkg/client/controller_client.go b/pkg/client/controller_client.go new file mode 100644 index 0000000..4015a9f --- /dev/null +++ b/pkg/client/controller_client.go @@ -0,0 +1,81 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "strings" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +var ( + // CachedGVKs identifies the GVKs of resources to be cached during dispatching + CachedGVKs = "" +) + +// DefaultNewControllerClient function for creating controller client +func DefaultNewControllerClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (c client.Client, err error) { + rawClient, err := client.New(config, options) + if err != nil { + return nil, errors.Wrapf(err, "failed to get raw client") + } + + mClient := &monitorClient{rawClient} + mCache := &monitorCache{cache} + + uncachedStructuredGVKs := map[schema.GroupVersionKind]struct{}{} + for _, obj := range uncachedObjects { + gvk, err := apiutil.GVKForObject(obj, mClient.Scheme()) + if err != nil { + return nil, err + } + uncachedStructuredGVKs[gvk] = struct{}{} + } + + cachedUnstructuredGVKs := map[schema.GroupVersionKind]struct{}{} + for _, s := range strings.Split(CachedGVKs, ",") { + s = strings.Trim(s, " ") + if len(s) > 0 { + gvk, _ := schema.ParseKindArg(s) + if gvk == nil { + return nil, errors.Errorf("invalid cached gvk: %s", s) + } + cachedUnstructuredGVKs[*gvk] = struct{}{} + } + } + + dClient := &delegatingClient{ + scheme: mClient.Scheme(), + mapper: mClient.RESTMapper(), + Reader: &delegatingReader{ + CacheReader: mCache, + ClientReader: mClient, + scheme: mClient.Scheme(), + uncachedStructuredGVKs: uncachedStructuredGVKs, + cachedUnstructuredGVKs: cachedUnstructuredGVKs, + }, + Writer: mClient, + StatusClient: mClient, + } + + return dClient, nil +} diff --git a/pkg/client/delegating_client.go b/pkg/client/delegating_client.go new file mode 100644 index 0000000..9a9f70c --- /dev/null +++ b/pkg/client/delegating_client.go @@ -0,0 +1,99 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "strings" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +type delegatingClient struct { + client.Reader + client.Writer + client.StatusClient + + scheme *runtime.Scheme + mapper meta.RESTMapper +} + +// Scheme returns the scheme this client is using. +func (d *delegatingClient) Scheme() *runtime.Scheme { + return d.scheme +} + +// RESTMapper returns the rest mapper this client is using. +func (d *delegatingClient) RESTMapper() meta.RESTMapper { + return d.mapper +} + +// delegatingReader extend the delegatingReader from controller-runtime/pkg/client +// 1. for requests not in local cluster, disable cache +// 2. for structured types, inherit the cache blacklist +// 3. for unstructured types, use cache whitelist +type delegatingReader struct { + CacheReader client.Reader + ClientReader client.Reader + + uncachedStructuredGVKs map[schema.GroupVersionKind]struct{} + cachedUnstructuredGVKs map[schema.GroupVersionKind]struct{} + scheme *runtime.Scheme +} + +func (d *delegatingReader) shouldBypassCache(ctx context.Context, obj runtime.Object) (bool, error) { + gvk, err := apiutil.GVKForObject(obj, d.scheme) + if err != nil { + return false, err + } + if meta.IsListType(obj) { + gvk.Kind = strings.TrimSuffix(gvk.Kind, "List") + } + _, isUnstructured := obj.(*unstructured.Unstructured) + _, isUnstructuredList := obj.(*unstructured.UnstructuredList) + if isUnstructured || isUnstructuredList { + _, shouldCache := d.cachedUnstructuredGVKs[gvk] + return !shouldCache, nil + } + _, shouldNotCache := d.uncachedStructuredGVKs[gvk] + return shouldNotCache, nil +} + +// Get retrieves an obj for a given object key from the Kubernetes Cluster. +func (d *delegatingReader) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { + if isUncached, err := d.shouldBypassCache(ctx, obj); err != nil { + return err + } else if isUncached { + return d.ClientReader.Get(ctx, key, obj) + } + return d.CacheReader.Get(ctx, key, obj) +} + +// List retrieves list of objects for a given namespace and list options. +func (d *delegatingReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + if isUncached, err := d.shouldBypassCache(ctx, list); err != nil { + return err + } else if isUncached { + return d.ClientReader.List(ctx, list, opts...) + } + return d.CacheReader.List(ctx, list, opts...) +} diff --git a/pkg/client/delegating_handler_client.go b/pkg/client/delegating_handler_client.go new file mode 100644 index 0000000..71a63ec --- /dev/null +++ b/pkg/client/delegating_handler_client.go @@ -0,0 +1,46 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// DelegatingHandlerClient override the original client's function +type DelegatingHandlerClient struct { + client.Client + Getter func(ctx context.Context, key client.ObjectKey, obj client.Object) error + Lister func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error +} + +// Get resource by overridden getter +func (c DelegatingHandlerClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { + if c.Getter != nil { + return c.Getter(ctx, key, obj) + } + return c.Client.Get(ctx, key, obj) +} + +// List resource by overridden lister +func (c DelegatingHandlerClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + if c.Lister != nil { + return c.Lister(ctx, list, opts...) + } + return c.Client.List(ctx, list, opts...) +} diff --git a/pkg/client/monitor_client.go b/pkg/client/monitor_client.go new file mode 100644 index 0000000..181b7fd --- /dev/null +++ b/pkg/client/monitor_client.go @@ -0,0 +1,137 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "reflect" + "strings" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubevela/workflow/pkg/monitor/metrics" +) + +func monitor(ctx context.Context, verb string, obj runtime.Object) func() { + o := obj.GetObjectKind().GroupVersionKind() + _, isUnstructured := obj.(*unstructured.Unstructured) + _, isUnstructuredList := obj.(*unstructured.UnstructuredList) + un := "structured" + if isUnstructured || isUnstructuredList { + un = "unstructured" + } + kind := o.Kind + if kind == "" { + if t := reflect.TypeOf(obj); t.Kind() == reflect.Ptr { + kind = t.Elem().Name() + } else { + kind = t.Name() + } + } + kind = strings.TrimSuffix(kind, "List") + begin := time.Now() + return func() { + v := time.Since(begin).Seconds() + metrics.ClientRequestHistogram.WithLabelValues(verb, kind, o.GroupVersion().String(), un).Observe(v) + } +} + +type monitorCache struct { + cache.Cache +} + +func (c *monitorCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { + cb := monitor(ctx, "GetCache", obj) + defer cb() + return c.Cache.Get(ctx, key, obj) +} + +func (c *monitorCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + cb := monitor(ctx, "ListCache", list) + defer cb() + return c.Cache.List(ctx, list, opts...) +} + +type monitorClient struct { + client.Client +} + +func (c *monitorClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { + cb := monitor(ctx, "Get", obj) + defer cb() + return c.Client.Get(ctx, key, obj) +} + +func (c *monitorClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + cb := monitor(ctx, "List", list) + defer cb() + return c.Client.List(ctx, list, opts...) +} + +func (c *monitorClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + cb := monitor(ctx, "Create", obj) + defer cb() + return c.Client.Create(ctx, obj, opts...) +} + +func (c *monitorClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + cb := monitor(ctx, "Delete", obj) + defer cb() + return c.Client.Delete(ctx, obj, opts...) +} + +func (c *monitorClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + cb := monitor(ctx, "Update", obj) + defer cb() + return c.Client.Update(ctx, obj, opts...) +} + +func (c *monitorClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + cb := monitor(ctx, "Patch", obj) + defer cb() + return c.Client.Patch(ctx, obj, patch, opts...) +} + +func (c *monitorClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { + cb := monitor(ctx, "DeleteAllOf", obj) + defer cb() + return c.Client.DeleteAllOf(ctx, obj, opts...) +} + +func (c *monitorClient) Status() client.StatusWriter { + return &monitorStatusWriter{c.Client.Status()} +} + +type monitorStatusWriter struct { + client.StatusWriter +} + +func (w *monitorStatusWriter) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + cb := monitor(ctx, "StatusUpdate", obj) + defer cb() + return w.StatusWriter.Update(ctx, obj, opts...) +} + +func (w *monitorStatusWriter) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + cb := monitor(ctx, "StatusPatch", obj) + defer cb() + return w.StatusWriter.Patch(ctx, obj, patch, opts...) +} diff --git a/pkg/executor/workflow.go b/pkg/executor/workflow.go index 00a3c02..9aeb586 100644 --- a/pkg/executor/workflow.go +++ b/pkg/executor/workflow.go @@ -181,17 +181,14 @@ func checkWorkflowSuspended(status *v1alpha1.WorkflowRunStatus) bool { func newEngine(ctx monitorContext.Context, wfCtx wfContext.Context, w *workflowExecutor, wfStatus *v1alpha1.WorkflowRunStatus) *engine { stepStatus := make(map[string]v1alpha1.StepStatus) - for _, ss := range wfStatus.Steps { - setStepStatus(stepStatus, ss.StepStatus) - for _, sss := range ss.SubStepsStatus { - setStepStatus(stepStatus, sss) - } - } + setStepStatus(stepStatus, wfStatus.Steps) stepDependsOn := make(map[string][]string) if w.wr.Spec.WorkflowSpec != nil { for _, step := range w.wr.Spec.WorkflowSpec.Steps { + hooks.SetAdditionalNameInStatus(stepStatus, step.Name, step.Properties, stepStatus[step.Name]) stepDependsOn[step.Name] = append(stepDependsOn[step.Name], step.DependsOn...) for _, sub := range step.SubSteps { + hooks.SetAdditionalNameInStatus(stepStatus, step.Name, step.Properties, stepStatus[step.Name]) stepDependsOn[sub.Name] = append(stepDependsOn[sub.Name], sub.DependsOn...) } } @@ -213,12 +210,12 @@ func newEngine(ctx monitorContext.Context, wfCtx wfContext.Context, w *workflowE } } -func setStepStatus(statusMap map[string]v1alpha1.StepStatus, status v1alpha1.StepStatus) { - statusMap[status.Name] = v1alpha1.StepStatus{ - Phase: status.Phase, - ID: status.ID, - Reason: status.Reason, - FirstExecuteTime: status.FirstExecuteTime, +func setStepStatus(statusMap map[string]v1alpha1.StepStatus, status []v1alpha1.WorkflowStepStatus) { + for _, ss := range status { + statusMap[ss.Name] = ss.StepStatus + for _, sss := range ss.SubStepsStatus { + statusMap[sss.Name] = sss + } } } @@ -227,12 +224,7 @@ func (w *workflowExecutor) GetSuspendBackoffWaitTime() time.Duration { return 0 } stepStatus := make(map[string]v1alpha1.StepStatus) - for _, ss := range w.wr.Status.Steps { - setStepStatus(stepStatus, ss.StepStatus) - for _, sss := range ss.SubStepsStatus { - setStepStatus(stepStatus, sss) - } - } + setStepStatus(stepStatus, w.wr.Status.Steps) max := time.Duration(1<<63 - 1) min := max for _, step := range w.wr.Spec.WorkflowSpec.Steps { @@ -464,7 +456,7 @@ func (e *engine) setNextExecuteTime() { e.wfCtx.SetValueInMemory(next, types.ContextKeyNextExecuteTime) } -func (e *engine) runAsDAG(taskRunners []types.TaskRunner) error { +func (e *engine) runAsDAG(taskRunners []types.TaskRunner, pendingRunners bool) error { var ( todoTasks []types.TaskRunner pendingTasks []types.TaskRunner @@ -480,9 +472,15 @@ func (e *engine) runAsDAG(taskRunners []types.TaskRunner) error { } if !finish { done = false - if tRunner.Pending(wfCtx, e.stepStatus) { + if pending, status := tRunner.Pending(wfCtx, e.stepStatus); pending { + if !pendingRunners { + wfCtx.IncreaseCountValueInMemory(types.ContextPrefixBackoffTimes, status.ID) + e.updateStepStatus(status) + } pendingTasks = append(pendingTasks, tRunner) continue + } else if status.Phase == v1alpha1.WorkflowStepPhasePending { + wfCtx.DeleteValueInMemory(types.ContextPrefixBackoffTimes, stepID) } todoTasks = append(todoTasks, tRunner) } else { @@ -504,7 +502,7 @@ func (e *engine) runAsDAG(taskRunners []types.TaskRunner) error { } if len(pendingTasks) > 0 { - return e.runAsDAG(pendingTasks) + return e.runAsDAG(pendingTasks, true) } } return nil @@ -514,7 +512,7 @@ func (e *engine) runAsDAG(taskRunners []types.TaskRunner) error { func (e *engine) Run(taskRunners []types.TaskRunner, dag bool) error { var err error if dag { - err = e.runAsDAG(taskRunners) + err = e.runAsDAG(taskRunners, false) } else { err = e.steps(taskRunners, dag) } @@ -542,6 +540,14 @@ func (e *engine) steps(taskRunners []types.TaskRunner, dag bool) error { continue } } + if pending, status := runner.Pending(wfCtx, e.stepStatus); pending { + wfCtx.IncreaseCountValueInMemory(types.ContextPrefixBackoffTimes, status.ID) + e.updateStepStatus(status) + if dag { + continue + } + return nil + } options := e.generateRunOptions(e.findDependPhase(taskRunners, index, dag)) status, operation, err := runner.Run(wfCtx, options) @@ -583,7 +589,7 @@ func (e *engine) generateRunOptions(dependsOnPhase v1alpha1.WorkflowStepPhase) * options := &types.TaskRunOptions{ GetTracer: func(id string, stepStatus v1alpha1.WorkflowStep) monitorContext.Context { return e.monitorCtx.Fork(id, monitorContext.DurationMetric(func(v float64) { - metrics.WorkflowStepDurationHistogram.WithLabelValues("workflowrun", stepStatus.Type).Observe(v) + metrics.WorkflowRunStepDurationHistogram.WithLabelValues("workflowrun", stepStatus.Type).Observe(v) })) }, StepStatus: e.stepStatus, diff --git a/pkg/executor/workflow_test.go b/pkg/executor/workflow_test.go index f15931a..dc90067 100644 --- a/pkg/executor/workflow_test.go +++ b/pkg/executor/workflow_test.go @@ -2077,19 +2077,29 @@ var _ = Describe("Test Workflow", func() { cleanStepTimeStamp(&wr.Status) Expect(cmp.Diff(wr.Status, v1alpha1.WorkflowRunStatus{ Mode: dagMode, - Steps: []v1alpha1.WorkflowStepStatus{{ - StepStatus: v1alpha1.StepStatus{ - Name: "s1", - Type: "success", - Phase: v1alpha1.WorkflowStepPhaseSucceeded, + Steps: []v1alpha1.WorkflowStepStatus{ + { + StepStatus: v1alpha1.StepStatus{ + Name: "s2", + Type: "pending", + Phase: v1alpha1.WorkflowStepPhasePending, + }, }, - }, { - StepStatus: v1alpha1.StepStatus{ - Name: "s3", - Type: "success", - Phase: v1alpha1.WorkflowStepPhaseSucceeded, + { + StepStatus: v1alpha1.StepStatus{ + Name: "s1", + Type: "success", + Phase: v1alpha1.WorkflowStepPhaseSucceeded, + }, }, - }}, + { + StepStatus: v1alpha1.StepStatus{ + Name: "s3", + Type: "success", + Phase: v1alpha1.WorkflowStepPhaseSucceeded, + }, + }, + }, })).Should(BeEquivalentTo("")) state, err = wf.ExecuteRunners(ctx, runners) @@ -2104,25 +2114,29 @@ var _ = Describe("Test Workflow", func() { cleanStepTimeStamp(&wr.Status) Expect(cmp.Diff(wr.Status, v1alpha1.WorkflowRunStatus{ Mode: dagMode, - Steps: []v1alpha1.WorkflowStepStatus{{ - StepStatus: v1alpha1.StepStatus{ - Name: "s1", - Type: "success", - Phase: v1alpha1.WorkflowStepPhaseSucceeded, + Steps: []v1alpha1.WorkflowStepStatus{ + { + StepStatus: v1alpha1.StepStatus{ + Name: "s2", + Type: "pending", + Phase: v1alpha1.WorkflowStepPhaseSucceeded, + }, }, - }, { - StepStatus: v1alpha1.StepStatus{ - Name: "s3", - Type: "success", - Phase: v1alpha1.WorkflowStepPhaseSucceeded, + { + StepStatus: v1alpha1.StepStatus{ + Name: "s1", + Type: "success", + Phase: v1alpha1.WorkflowStepPhaseSucceeded, + }, }, - }, { - StepStatus: v1alpha1.StepStatus{ - Name: "s2", - Type: "pending", - Phase: v1alpha1.WorkflowStepPhaseSucceeded, + { + StepStatus: v1alpha1.StepStatus{ + Name: "s3", + Type: "success", + Phase: v1alpha1.WorkflowStepPhaseSucceeded, + }, }, - }}, + }, })).Should(BeEquivalentTo("")) }) @@ -2288,14 +2302,18 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t return &testTaskRunner{ step: step, run: run, - checkPending: func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool { + checkPending: func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { if step.Type != "pending" { - return false + return false, v1alpha1.StepStatus{} } if pending == true { - return true + return true, v1alpha1.StepStatus{ + Phase: v1alpha1.WorkflowStepPhasePending, + Name: step.Name, + Type: step.Type, + } } - return false + return false, v1alpha1.StepStatus{} }, } } @@ -2303,7 +2321,7 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t type testTaskRunner struct { step v1alpha1.WorkflowStep run func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) - checkPending func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool + checkPending func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) } // Name return step name. @@ -2349,7 +2367,7 @@ func (tr *testTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptio } // Pending check task should be executed or not. -func (tr *testTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool { +func (tr *testTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { return tr.checkPending(ctx, stepStatus) } diff --git a/pkg/hooks/data_passing.go b/pkg/hooks/data_passing.go index a043288..a7bdac8 100644 --- a/pkg/hooks/data_passing.go +++ b/pkg/hooks/data_passing.go @@ -17,9 +17,12 @@ limitations under the License. package hooks import ( + "encoding/json" + "fmt" "strings" "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime" "github.com/kubevela/workflow/api/v1alpha1" wfContext "github.com/kubevela/workflow/pkg/context" @@ -42,24 +45,59 @@ func Input(ctx wfContext.Context, paramValue *value.Value, step v1alpha1.Workflo } // Output get data from task value. -func Output(ctx wfContext.Context, taskValue *value.Value, step v1alpha1.WorkflowStep, status v1alpha1.StepStatus) error { +func Output(ctx wfContext.Context, taskValue *value.Value, step v1alpha1.WorkflowStep, status v1alpha1.StepStatus, stepStatus map[string]v1alpha1.StepStatus) error { + errMsg := "" if wfTypes.IsStepFinish(status.Phase, status.Reason) { + SetAdditionalNameInStatus(stepStatus, step.Name, step.Properties, status) for _, output := range step.Outputs { v, err := taskValue.LookupByScript(output.ValueFrom) - if err != nil && !strings.Contains(err.Error(), "not found") { - return err + // if the error is not nil and the step is not skipped, return the error + if err != nil && status.Phase != v1alpha1.WorkflowStepPhaseSkipped { + errMsg += fmt.Sprintf("failed to get output from %s: %s\n", output.ValueFrom, err.Error()) } + // if the error is not nil, set the value to null if err != nil || v.Error() != nil { - v, err = taskValue.MakeValue("null") - if err != nil { - return err - } + v, _ = taskValue.MakeValue("null") } if err := ctx.SetVar(v, output.Name); err != nil { - return err + errMsg += fmt.Sprintf("failed to set output %s: %s\n", output.Name, err.Error()) } } } + if errMsg != "" { + return errors.New(errMsg) + } return nil } + +// SetAdditionalNameInStatus sets additional name from properties to status map +func SetAdditionalNameInStatus(stepStatus map[string]v1alpha1.StepStatus, name string, properties *runtime.RawExtension, status v1alpha1.StepStatus) { + if stepStatus == nil || properties == nil { + return + } + o := struct { + Name string `json:"name"` + Component string `json:"component"` + }{} + js, err := properties.MarshalJSON() + if err != nil { + return + } + if err := json.Unmarshal(js, &o); err != nil { + return + } + additionalName := "" + switch { + case o.Name != "": + additionalName = o.Name + case o.Component != "": + additionalName = o.Component + default: + return + } + if _, ok := stepStatus[additionalName]; !ok { + stepStatus[additionalName] = status + return + } +} diff --git a/pkg/hooks/data_passing_test.go b/pkg/hooks/data_passing_test.go index b0aabc5..3603398 100644 --- a/pkg/hooks/data_passing_test.go +++ b/pkg/hooks/data_passing_test.go @@ -66,6 +66,7 @@ func TestOutput(t *testing.T) { output: score: 99 `, nil, "") r.NoError(err) + stepStatus := make(map[string]v1alpha1.StepStatus) err = Output(wfCtx, taskValue, v1alpha1.WorkflowStep{ WorkflowStepBase: v1alpha1.WorkflowStepBase{ Properties: &runtime.RawExtension{ @@ -78,7 +79,7 @@ output: score: 99 }, }, v1alpha1.StepStatus{ Phase: v1alpha1.WorkflowStepPhaseSucceeded, - }) + }, stepStatus) r.NoError(err) result, err := wfCtx.GetVar("myscore") r.NoError(err) @@ -86,6 +87,7 @@ output: score: 99 r.NoError(err) r.Equal(s, `99 `) + r.Equal(stepStatus["mystep"].Phase, v1alpha1.WorkflowStepPhaseSucceeded) } func mockContext(t *testing.T) wfContext.Context { diff --git a/pkg/monitor/metrics/workflow.go b/pkg/monitor/metrics/workflow.go index cae0d77..e38fb63 100644 --- a/pkg/monitor/metrics/workflow.go +++ b/pkg/monitor/metrics/workflow.go @@ -44,47 +44,56 @@ var ( // WorkflowRunPhaseCounter report the number of workflow run phase WorkflowRunPhaseCounter = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "workflow_phase_number", + Name: "workflowrun_phase_number", Help: "workflow run phase number", }, []string{"phase"}) // WorkflowRunFinishedTimeHistogram report the time for finished workflow run WorkflowRunFinishedTimeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "workflow_finished_time_seconds", - Help: "workflow finished time distributions.", + Name: "workflowrun_finished_time_seconds", + Help: "workflow run finished time distributions.", Buckets: histogramBuckets, ConstLabels: prometheus.Labels{}, }, []string{"phase"}) // WorkflowRunInitializedCounter report the workflow run initialize execute number. WorkflowRunInitializedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "workflow_initialized_num", - Help: "workflow initialize times", + Name: "workflowrun_initialized_num", + Help: "workflow run initialize times", }, []string{}) - // WorkflowStepPhaseGauge report the number of workflow step state - WorkflowStepPhaseGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "workflow_step_phase_number", + // WorkflowRunStepPhaseGauge report the number of workflow run step state + WorkflowRunStepPhaseGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "workflowrun_step_phase_number", Help: "workflow step phase number", }, []string{"step_type", "phase"}) // WorkflowStepDurationHistogram report the step execution duration. - WorkflowStepDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "step_duration_ms", - Help: "step latency distributions.", + WorkflowRunStepDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "workflowrun_step_duration_ms", + Help: "workflow run step latency distributions.", Buckets: histogramBuckets, ConstLabels: prometheus.Labels{}, }, []string{"controller", "step_type"}) + + // ClientRequestHistogram report the client request execution duration. + ClientRequestHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "client_request_time_seconds", + Help: "client request duration distributions.", + Buckets: histogramBuckets, + ConstLabels: prometheus.Labels{}, + }, []string{"verb", "Kind", "apiVersion", "unstructured"}) ) var collectorGroup = []prometheus.Collector{ GenerateTaskRunnersDurationHistogram, - WorkflowStepDurationHistogram, + WorkflowRunStepDurationHistogram, WorkflowRunReconcileTimeHistogram, WorkflowRunFinishedTimeHistogram, WorkflowRunInitializedCounter, WorkflowRunPhaseCounter, - WorkflowStepPhaseGauge, + WorkflowRunStepPhaseGauge, + ClientRequestHistogram, } func init() { diff --git a/pkg/monitor/watcher/workflow.go b/pkg/monitor/watcher/workflow.go index 454063f..20933b1 100644 --- a/pkg/monitor/watcher/workflow.go +++ b/pkg/monitor/watcher/workflow.go @@ -75,7 +75,7 @@ func (watcher *workflowRunMetricsWatcher) report() { metrics.WorkflowRunPhaseCounter.WithLabelValues(phase).Set(float64(watcher.phaseCounter[phase])) } for stepPhase := range watcher.stepPhaseDirty { - metrics.WorkflowStepPhaseGauge.WithLabelValues(strings.Split(stepPhase, "/")...).Set(float64(watcher.stepPhaseCounter[stepPhase])) + metrics.WorkflowRunStepPhaseGauge.WithLabelValues(strings.Split(stepPhase, "/")[:2]...).Set(float64(watcher.stepPhaseCounter[stepPhase])) } watcher.phaseDirty = map[string]struct{}{} watcher.stepPhaseDirty = map[string]struct{}{} diff --git a/pkg/providers/util/util.go b/pkg/providers/util/util.go index 8ae3da2..94eb1e8 100644 --- a/pkg/providers/util/util.go +++ b/pkg/providers/util/util.go @@ -17,6 +17,8 @@ package util import ( + "encoding/json" + wfContext "github.com/kubevela/workflow/pkg/context" "github.com/kubevela/workflow/pkg/cue/model" "github.com/kubevela/workflow/pkg/cue/model/value" @@ -78,12 +80,20 @@ func (p *provider) Log(ctx monitorContext.Context, wfCtx wfContext.Context, v *v if err != nil { return err } - s, err := data.String() + logCtx := ctx.Fork("cue logs") + if s, err := data.GetString(); err == nil { + logCtx.Info(s) + return nil + } + var tmp interface{} + if err := data.UnmarshalTo(&tmp); err != nil { + return err + } + b, err := json.Marshal(tmp) if err != nil { return err } - logCtx := ctx.Fork("cue logs") - logCtx.Info(s) + logCtx.Info(string(b)) return nil } diff --git a/pkg/providers/util/util_test.go b/pkg/providers/util/util_test.go index 484eaf0..676a5cd 100644 --- a/pkg/providers/util/util_test.go +++ b/pkg/providers/util/util_test.go @@ -203,6 +203,15 @@ data: "test" prd := &provider{} err = prd.Log(logCtx, nil, v, nil) r.NoError(err) + + v, err = value.NewValue(` +data: { + message: "test" +} + `, nil, "") + r.NoError(err) + err = prd.Log(logCtx, nil, v, nil) + r.NoError(err) } func TestInstall(t *testing.T) { diff --git a/pkg/stdlib/pkgs/util.cue b/pkg/stdlib/pkgs/util.cue index 16df0c5..e6ea756 100644 --- a/pkg/stdlib/pkgs/util.cue +++ b/pkg/stdlib/pkgs/util.cue @@ -20,5 +20,5 @@ #do: "log" #provider: "util" - data: {...} + data: {...} | string } diff --git a/pkg/steps/generator.go b/pkg/steps/generator.go index 28152e3..47faba3 100644 --- a/pkg/steps/generator.go +++ b/pkg/steps/generator.go @@ -104,8 +104,12 @@ func generateTaskRunner(ctx context.Context, workflowStep := v1alpha1.WorkflowStep{ WorkflowStepBase: subStep, } - options.ID = generateSubStepID(wr.Status, subStep.Name, step.Name) - subTask, err := generateTaskRunner(ctx, wr, workflowStep, taskDiscover, options) + o := &types.TaskGeneratorOptions{ + ID: generateSubStepID(wr.Status, subStep.Name, step.Name), + PackageDiscover: options.PackageDiscover, + ProcessContext: options.ProcessContext, + } + subTask, err := generateTaskRunner(ctx, wr, workflowStep, taskDiscover, o) if err != nil { return nil, err } diff --git a/pkg/tasks/builtin/step_group.go b/pkg/tasks/builtin/step_group.go index be9be56..e23e973 100644 --- a/pkg/tasks/builtin/step_group.go +++ b/pkg/tasks/builtin/step_group.go @@ -56,16 +56,17 @@ func (tr *stepGroupTaskRunner) Name() string { } // Pending check task should be executed or not. -func (tr *stepGroupTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool { - return custom.CheckPending(ctx, tr.step, stepStatus) +func (tr *stepGroupTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { + return custom.CheckPending(ctx, tr.step, tr.id, stepStatus) } // Run make workflow step group. func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (status v1alpha1.StepStatus, operations *types.Operation, rErr error) { status = v1alpha1.StepStatus{ - ID: tr.id, - Name: tr.name, - Type: types.WorkflowStepTypeStepGroup, + ID: tr.id, + Name: tr.name, + Type: types.WorkflowStepTypeStepGroup, + Message: "", } pStatus := &status @@ -136,6 +137,8 @@ func getStepGroupStatus(status v1alpha1.StepStatus, stepStatus v1alpha1.Workflow status.Phase = v1alpha1.WorkflowStepPhaseRunning case subStepCounts[string(v1alpha1.WorkflowStepPhaseStopped)] > 0: status.Phase = v1alpha1.WorkflowStepPhaseStopped + case subStepCounts[string(v1alpha1.WorkflowStepPhasePending)] > 0: + status.Phase = v1alpha1.WorkflowStepPhasePending case subStepCounts[string(v1alpha1.WorkflowStepPhaseFailed)] > 0: status.Phase = v1alpha1.WorkflowStepPhaseFailed switch { diff --git a/pkg/tasks/builtin/step_group_test.go b/pkg/tasks/builtin/step_group_test.go index 6ccc56a..c8a1c8c 100644 --- a/pkg/tasks/builtin/step_group_test.go +++ b/pkg/tasks/builtin/step_group_test.go @@ -51,13 +51,15 @@ func TestStepGroupStep(t *testing.T) { r.Equal(runner.Name(), "test") // test pending - r.Equal(runner.Pending(nil, nil), true) + p, _ := runner.Pending(nil, nil) + r.Equal(p, true) ss := map[string]v1alpha1.StepStatus{ "depend": { Phase: v1alpha1.WorkflowStepPhaseSucceeded, }, } - r.Equal(runner.Pending(nil, ss), false) + p, _ = runner.Pending(nil, ss) + r.Equal(p, false) // test skip status, operations, err := runner.Run(nil, &types.TaskRunOptions{ diff --git a/pkg/tasks/builtin/suspend.go b/pkg/tasks/builtin/suspend.go index 797cccd..ab5dee2 100644 --- a/pkg/tasks/builtin/suspend.go +++ b/pkg/tasks/builtin/suspend.go @@ -59,10 +59,11 @@ func (tr *suspendTaskRunner) Name() string { // Run make workflow suspend. func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) { stepStatus = v1alpha1.StepStatus{ - ID: tr.id, - Name: tr.step.Name, - Type: types.WorkflowStepTypeSuspend, - Phase: v1alpha1.WorkflowStepPhaseRunning, + ID: tr.id, + Name: tr.step.Name, + Type: types.WorkflowStepTypeSuspend, + Phase: v1alpha1.WorkflowStepPhaseRunning, + Message: "", } operations = &types.Operation{Suspend: true} @@ -132,8 +133,8 @@ func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOp } // Pending check task should be executed or not. -func (tr *suspendTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool { - return custom.CheckPending(ctx, tr.step, stepStatus) +func (tr *suspendTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { + return custom.CheckPending(ctx, tr.step, tr.id, stepStatus) } // GetSuspendStepDurationWaiting get suspend step wait duration @@ -160,7 +161,7 @@ func GetSuspendStepDurationWaiting(step v1alpha1.WorkflowStep) (time.Duration, e func handleOutput(ctx wfContext.Context, stepStatus *v1alpha1.StepStatus, operations *types.Operation, step v1alpha1.WorkflowStep, postStopHooks []types.TaskPostStopHook, pd *packages.PackageDiscover, id string, pCtx process.Context) { status := *stepStatus - if status.Phase != v1alpha1.WorkflowStepPhaseSkipped && len(step.Outputs) > 0 { + if len(step.Outputs) > 0 { contextValue, err := custom.MakeValueForContext(ctx, pd, step.Name, id, pCtx) if err != nil { status.Phase = v1alpha1.WorkflowStepPhaseFailed @@ -173,7 +174,7 @@ func handleOutput(ctx wfContext.Context, stepStatus *v1alpha1.StepStatus, operat } for _, hook := range postStopHooks { - if err := hook(ctx, contextValue, step, status); err != nil { + if err := hook(ctx, contextValue, step, status, nil); err != nil { status.Phase = v1alpha1.WorkflowStepPhaseFailed if status.Reason == "" { status.Reason = types.StatusReasonOutput diff --git a/pkg/tasks/builtin/suspend_test.go b/pkg/tasks/builtin/suspend_test.go index d89d2f1..e5b8206 100644 --- a/pkg/tasks/builtin/suspend_test.go +++ b/pkg/tasks/builtin/suspend_test.go @@ -37,13 +37,15 @@ func TestSuspendStep(t *testing.T) { r.Equal(runner.Name(), "test") // test pending - r.Equal(runner.Pending(nil, nil), true) + p, _ := runner.Pending(nil, nil) + r.Equal(p, true) ss := map[string]v1alpha1.StepStatus{ "depend": { Phase: v1alpha1.WorkflowStepPhaseSucceeded, }, } - r.Equal(runner.Pending(nil, ss), false) + p, _ = runner.Pending(nil, ss) + r.Equal(p, false) // test skip status, operations, err := runner.Run(nil, &types.TaskRunOptions{ diff --git a/pkg/tasks/custom/task.go b/pkg/tasks/custom/task.go index 010e214..21fb8c8 100644 --- a/pkg/tasks/custom/task.go +++ b/pkg/tasks/custom/task.go @@ -61,7 +61,7 @@ func (t *TaskLoader) GetTaskGenerator(ctx context.Context, name string) (types.T type taskRunner struct { name string run func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) - checkPending func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool + checkPending func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) } // Name return step name. @@ -75,7 +75,7 @@ func (tr *taskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) } // Pending check task should be executed or not. -func (tr *taskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool { +func (tr *taskRunner) Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { return tr.checkPending(ctx, stepStatus) } @@ -118,10 +118,10 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error tRunner := new(taskRunner) tRunner.name = wfStep.Name - tRunner.checkPending = func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool { - return CheckPending(ctx, wfStep, stepStatus) + tRunner.checkPending = func(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { + return CheckPending(ctx, wfStep, exec.wfStatus.ID, stepStatus) } - tRunner.run = func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) { + tRunner.run = func(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) { if options.GetTracer == nil { options.GetTracer = func(id string, step v1alpha1.WorkflowStep) monitorContext.Context { return monitorContext.NewTraceContext(context.Background(), "") @@ -141,6 +141,23 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error var err error var paramFile string + defer func() { + if taskv == nil { + taskv, err = convertTemplate(ctx, t.pd, strings.Join([]string{templ, paramFile}, "\n"), wfStep.Name, exec.wfStatus.ID, options.PCtx) + if err != nil { + return + } + } + for _, hook := range options.PostStopHooks { + if err := hook(ctx, taskv, wfStep, exec.status(), options.StepStatus); err != nil { + exec.wfStatus.Message = err.Error() + stepStatus = exec.status() + operations = exec.operation() + return + } + } + }() + for _, hook := range options.PreCheckHooks { result, err := hook(wfStep, &types.PreCheckOptions{ PackageDiscover: t.pd, @@ -210,12 +227,6 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error exec.err(ctx, true, err, types.StatusReasonExecute) return exec.status(), exec.operation(), nil } - for _, hook := range options.PostStopHooks { - if err := hook(ctx, taskv, wfStep, exec.status()); err != nil { - exec.err(ctx, false, err, types.StatusReasonOutput) - return exec.status(), exec.operation(), nil - } - } return exec.status(), exec.operation(), nil } @@ -552,20 +563,28 @@ func NewTaskLoader(lt LoadTaskTemplate, pkgDiscover *packages.PackageDiscover, h } // CheckPending checks whether to pending task run -func CheckPending(ctx wfContext.Context, step v1alpha1.WorkflowStep, stepStatus map[string]v1alpha1.StepStatus) bool { +func CheckPending(ctx wfContext.Context, step v1alpha1.WorkflowStep, id string, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { + pStatus := v1alpha1.StepStatus{ + Phase: v1alpha1.WorkflowStepPhasePending, + Type: step.Type, + ID: id, + Name: step.Name, + } for _, depend := range step.DependsOn { + pStatus.Message = fmt.Sprintf("Pending on DependsOn: %s", depend) if status, ok := stepStatus[depend]; ok { if !types.IsStepFinish(status.Phase, status.Reason) { - return true + return true, pStatus } } else { - return true + return true, pStatus } } for _, input := range step.Inputs { + pStatus.Message = fmt.Sprintf("Pending on Input: %s", input.From) if _, err := ctx.GetVar(strings.Split(input.From, ".")...); err != nil { - return true + return true, pStatus } } - return false + return false, v1alpha1.StepStatus{} } diff --git a/pkg/tasks/custom/task_test.go b/pkg/tasks/custom/task_test.go index 9effb15..96f32e5 100644 --- a/pkg/tasks/custom/task_test.go +++ b/pkg/tasks/custom/task_test.go @@ -267,9 +267,9 @@ close({ case "input": r.Equal(err.Error(), "do preStartHook: get input from [podIP]: var(path=podIP) not exist") case "output-var-conflict": - r.Equal(status.Reason, types.StatusReasonOutput) + r.Contains(status.Message, "conflict") r.Equal(operation.Waiting, false) - r.Equal(status.Phase, v1alpha1.WorkflowStepPhaseFailed) + r.Equal(status.Phase, v1alpha1.WorkflowStepPhaseSucceeded) case "failed-after-retries": wfContext.CleanupMemoryStore("app-v1", "default") newCtx := newWorkflowContextForTest(t) @@ -454,14 +454,16 @@ func TestPendingInputCheck(t *testing.T) { r.NoError(err) run, err := gen(step, &types.TaskGeneratorOptions{}) r.NoError(err) - r.Equal(run.Pending(wfCtx, nil), true) + p, _ := run.Pending(wfCtx, nil) + r.Equal(p, true) score, err := value.NewValue(` 100 `, nil, "") r.NoError(err) err = wfCtx.SetVar(score, "score") r.NoError(err) - r.Equal(run.Pending(wfCtx, nil), false) + p, _ = run.Pending(wfCtx, nil) + r.Equal(p, false) } func TestPendingDependsOnCheck(t *testing.T) { @@ -489,13 +491,15 @@ func TestPendingDependsOnCheck(t *testing.T) { r.NoError(err) run, err := gen(step, &types.TaskGeneratorOptions{}) r.NoError(err) - r.Equal(run.Pending(wfCtx, nil), true) + p, _ := run.Pending(wfCtx, nil) + r.Equal(p, true) ss := map[string]v1alpha1.StepStatus{ "depend": { Phase: v1alpha1.WorkflowStepPhaseSucceeded, }, } - r.Equal(run.Pending(wfCtx, ss), false) + p, _ = run.Pending(wfCtx, ss) + r.Equal(p, false) } func TestSkip(t *testing.T) { @@ -521,7 +525,8 @@ func TestSkip(t *testing.T) { r.NoError(err) runner, err := gen(step, &types.TaskGeneratorOptions{}) r.NoError(err) - status, operations, err := runner.Run(nil, &types.TaskRunOptions{ + wfCtx := newWorkflowContextForTest(t) + status, operations, err := runner.Run(wfCtx, &types.TaskRunOptions{ PreCheckHooks: []types.TaskPreCheckHook{ func(step v1alpha1.WorkflowStep, options *types.PreCheckOptions) (*types.PreCheckResult, error) { return &types.PreCheckResult{Skip: true}, nil diff --git a/pkg/types/types.go b/pkg/types/types.go index 5c4599e..69ecb06 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -36,7 +36,7 @@ import ( // TaskRunner is a task runner type TaskRunner interface { Name() string - Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) bool + Pending(ctx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) Run(ctx wfContext.Context, options *TaskRunOptions) (v1alpha1.StepStatus, *Operation, error) } @@ -87,7 +87,7 @@ type TaskPreCheckHook func(step v1alpha1.WorkflowStep, options *PreCheckOptions) type TaskPreStartHook func(ctx wfContext.Context, paramValue *value.Value, step v1alpha1.WorkflowStep) error // TaskPostStopHook run after task execution. -type TaskPostStopHook func(ctx wfContext.Context, taskValue *value.Value, step v1alpha1.WorkflowStep, status v1alpha1.StepStatus) error +type TaskPostStopHook func(ctx wfContext.Context, taskValue *value.Value, step v1alpha1.WorkflowStep, status v1alpha1.StepStatus, stepStatus map[string]v1alpha1.StepStatus) error // Operation is workflow operation object. type Operation struct { diff --git a/version/version.go b/version/version.go new file mode 100644 index 0000000..962118e --- /dev/null +++ b/version/version.go @@ -0,0 +1,46 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +import "github.com/hashicorp/go-version" + +// GitRevision is the commit of repo +var GitRevision = "UNKNOWN" + +// VelaVersion is the version of cli. +var VelaVersion = "UNKNOWN" + +// IsOfficialWorkflowVersion checks whether the provided version string follows a KubeVela Workflow version pattern +func IsOfficialWorkflowVersion(versionStr string) bool { + _, err := version.NewSemver(versionStr) + return err == nil +} + +// GetOfficialWorkflowVersion extracts the KubeVela Workflow version from the provided string +// More precisely, this method returns the segments and prerelease info w/o metadata +func GetOfficialWorkflowVersion(versionStr string) (string, error) { + s, err := version.NewSemver(versionStr) + if err != nil { + return "", err + } + v := s.String() + metadata := s.Metadata() + if metadata != "" { + metadata = "+" + metadata + } + return v[:len(v)-len(metadata)], nil +}