diff --git a/charts/vela-workflow/README.md b/charts/vela-workflow/README.md index 6fd808c..469a105 100644 --- a/charts/vela-workflow/README.md +++ b/charts/vela-workflow/README.md @@ -38,14 +38,15 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow - ### KubeVela workflow parameters -| Name | Description | Value | -| -------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -| `workflow.enableSuspendOnFailure` | Enable the capability of suspend an failed workflow automatically | `false` | -| `workflow.enablePatchStatusAtOnce` | Enable the capability of patch status at once | `false` | -| `workflow.enableWatchEventListener` | Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature | `false` | -| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` | -| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` | -| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` | +| Name | Description | Value | +| -------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `workflow.enableSuspendOnFailure` | Enable the capability of suspend an failed workflow automatically | `false` | +| `workflow.enablePatchStatusAtOnce` | Enable the capability of patch status at once | `false` | +| `workflow.enableWatchEventListener` | Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature | `false` | +| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` | +| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` | +| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` | +| `workflow.groupByLabel` | The label used to group workflow record | `pipeline.oam.dev/name` | ### KubeVela workflow backup parameters @@ -56,7 +57,6 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow - | `backup.strategy` | The backup strategy for workflow record | `BackupFinishedRecord` | | `backup.ignoreStrategy` | The ignore strategy for backup | `IgnoreLatestFailedRecord` | | `backup.cleanOnBackup` | Enable auto clean after backup workflow record | `false` | -| `backup.groupByLabel` | The label used to group workflow record | `""` | | `backup.persistType` | The persist type for workflow record | `""` | | `backup.configSecretName` | The secret name of backup config | `backup-config` | | `backup.configSecretNamespace` | The secret name of backup config namespace | `vela-system` | diff --git a/charts/vela-workflow/templates/workflow-controller.yaml b/charts/vela-workflow/templates/workflow-controller.yaml index eb8eab1..0dab479 100644 --- a/charts/vela-workflow/templates/workflow-controller.yaml +++ b/charts/vela-workflow/templates/workflow-controller.yaml @@ -141,10 +141,10 @@ spec: - "--feature-gates=EnablePatchStatusAtOnce={{- .Values.workflow.enablePatchStatusAtOnce | toString -}}" - "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}" - "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}" + - "--group-by-label={{ .Values.workflow.groupByLabel }}" {{ if .Values.backup.enable }} - "--backup-strategy={{ .Values.backup.strategy }}" - "--backup-ignore-strategy={{ .Values.backup.ignoreStrategy }}" - - "--backup-group-by-label={{ .Values.backup.groupByLabel }}" - "--backup-clean-on-backup={{ .Values.backup.cleanOnBackup }}" - "--backup-persist-type={{ .Values.backup.persisType }}" - "--backup-config-secret-name={{ .Values.backup.configSecretName }}" diff --git a/charts/vela-workflow/values.yaml b/charts/vela-workflow/values.yaml index d9c43b7..de62443 100644 --- a/charts/vela-workflow/values.yaml +++ b/charts/vela-workflow/values.yaml @@ -20,6 +20,7 @@ ignoreWorkflowWithoutControllerRequirement: false ## @param workflow.backoff.maxTime.waitState The max backoff time of workflow in a wait condition ## @param workflow.backoff.maxTime.failedState The max backoff time of workflow in a failed condition ## @param workflow.step.errorRetryTimes The max retry times of a failed workflow step +## @param workflow.groupByLabel The label used to group workflow record workflow: enableSuspendOnFailure: false enablePatchStatusAtOnce: false @@ -30,6 +31,7 @@ workflow: failedState: 300 step: errorRetryTimes: 10 + groupByLabel: "pipeline.oam.dev/name" ## @section KubeVela workflow backup parameters @@ -37,7 +39,6 @@ workflow: ## @param backup.strategy The backup strategy for workflow record ## @param backup.ignoreStrategy The ignore strategy for backup ## @param backup.cleanOnBackup Enable auto clean after backup workflow record -## @param backup.groupByLabel The label used to group workflow record ## @param backup.persistType The persist type for workflow record ## @param backup.configSecretName The secret name of backup config ## @param backup.configSecretNamespace The secret name of backup config namespace @@ -46,7 +47,6 @@ backup: strategy: BackupFinishedRecord ignoreStrategy: IgnoreLatestFailedRecord cleanOnBackup: false - groupByLabel: "" persistType: "" configSecretName: "backup-config" configSecretNamespace: "vela-system" diff --git a/cmd/main.go b/cmd/main.go index aee4e96..509a617 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -57,6 +57,7 @@ import ( "github.com/kubevela/workflow/pkg/features" "github.com/kubevela/workflow/pkg/monitor/watcher" "github.com/kubevela/workflow/pkg/types" + "github.com/kubevela/workflow/pkg/utils" "github.com/kubevela/workflow/pkg/webhook" "github.com/kubevela/workflow/version" //+kubebuilder:scaffold:imports @@ -82,7 +83,7 @@ func main() { var qps float64 var logFileMaxSize uint64 var burst, webhookPort int - var leaseDuration, renewDeadline, retryPeriod time.Duration + var leaseDuration, renewDeadline, retryPeriod, recycleDuration time.Duration var controllerArgs controllers.Args flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -100,6 +101,9 @@ func main() { "The duration that the acting controlplane will retry refreshing leadership before giving up") flag.DurationVar(&retryPeriod, "leader-election-retry-period", 2*time.Second, "The duration the LeaderElector clients should wait between tries of actions") + flag.DurationVar(&recycleDuration, "recycle-duration", 30*24*time.Hour, + "The recycle duration of a completed and is not the latest record in a set of workflowruns") + flag.BoolVar(&useWebhook, "use-webhook", false, "Enable Admission Webhook") flag.StringVar(&certDir, "webhook-cert-dir", "/k8s-webhook-server/serving-certs", "Admission webhook cert/key dir.") flag.IntVar(&webhookPort, "webhook-port", 9443, "admission webhook listen address") @@ -115,7 +119,7 @@ func main() { flag.StringVar(&backupStrategy, "backup-strategy", "BackupFinishedRecord", "Set the strategy for backup workflow records, default is RemainLatestFailedRecord") flag.StringVar(&backupIgnoreStrategy, "backup-ignore-strategy", "", "Set the strategy for ignore backup workflow records, default is IgnoreLatestFailedRecord") flag.StringVar(&backupPersistType, "backup-persist-type", "", "Set the persist type for backup workflow records, default is empty") - flag.StringVar(&groupByLabel, "backup-group-by-label", "", "Set the label for group by, default is empty") + flag.StringVar(&groupByLabel, "group-by-label", "pipeline.oam.dev/name", "Set the label for group by, default is pipeline.oam.dev/name") flag.BoolVar(&backupCleanOnBackup, "backup-clean-on-backup", false, "Set the auto clean for backup workflow records, default is false") flag.StringVar(&backupConfigSecretName, "backup-config-secret-name", "backup-config", "Set the secret name for backup workflow configs, default is backup-config") flag.StringVar(&backupConfigSecretNamespace, "backup-config-secret-namespace", "vela-system", "Set the secret namespace for backup workflow configs, default is backup-config") @@ -210,6 +214,14 @@ func main() { os.Exit(1) } + kubeClient := mgr.GetClient() + if groupByLabel != "" { + if err := mgr.Add(utils.NewRecycleCronJob(kubeClient, recycleDuration, "0 0 * * *", groupByLabel)); err != nil { + klog.Error(err, "unable to start recycle cronjob") + os.Exit(1) + } + } + pd, err := packages.NewPackageDiscover(mgr.GetConfig()) if err != nil { klog.Error(err, "Failed to create CRD discovery for CUE package client") @@ -228,8 +240,6 @@ func main() { } } - kubeClient := mgr.GetClient() - if err = (&controllers.WorkflowRunReconciler{ Client: kubeClient, Scheme: mgr.GetScheme(), diff --git a/go.mod b/go.mod index 8e93dff..b0cab0d 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/onsi/gomega v1.20.2 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.2 + github.com/robfig/cron/v3 v3.0.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 golang.org/x/time v0.0.0-20220922220347-f3bd1da661af diff --git a/go.sum b/go.sum index bf0a887..da6dc4a 100644 --- a/go.sum +++ b/go.sum @@ -1112,6 +1112,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uY github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8= github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/pkg/utils/recycle.go b/pkg/utils/recycle.go new file mode 100644 index 0000000..8634a40 --- /dev/null +++ b/pkg/utils/recycle.go @@ -0,0 +1,123 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "sort" + "time" + + "github.com/kubevela/workflow/api/v1alpha1" + "github.com/robfig/cron/v3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// recycleCronJob is the cron job to clean the completed workflow +type recycleCronJob struct { + cli client.Client + duration time.Duration + cron string + label string +} + +// NewRecycleCronJob returns a new recycleCronJob +func NewRecycleCronJob(cli client.Client, duration time.Duration, cron, label string) manager.Runnable { + return &recycleCronJob{ + cli: cli, + duration: duration, + label: label, + cron: cron, + } +} + +func (r *recycleCronJob) Start(ctx context.Context) error { + c, err := r.start(ctx) + if err != nil { + return err + } + defer c.Stop() + <-ctx.Done() + return nil +} + +func (r *recycleCronJob) start(ctx context.Context) (*cron.Cron, error) { + c := cron.New(cron.WithChain( + cron.Recover(cron.DefaultLogger), + )) + if _, err := c.AddFunc(r.cron, func() { + err := retry.OnError(wait.Backoff{ + Steps: 3, + Duration: 1 * time.Minute, + Factor: 5.0, + Jitter: 0.1, + }, func(err error) bool { + // always retry + return true + }, func() error { + if err := r.run(ctx); err != nil { + klog.Errorf("Failed to recycle workflow run: %v", err) + return err + } + klog.Info("Recycle workflow run successfully") + return nil + }) + if err != nil { + klog.Errorf("Failed to recycle workflow runs after 3 tries: %v", err) + } + }); err != nil { + return nil, err + } + c.Start() + return c, nil +} + +func (r *recycleCronJob) run(ctx context.Context) error { + runs := &v1alpha1.WorkflowRunList{} + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{Key: r.label, Operator: metav1.LabelSelectorOpExists}}}) + if err != nil { + return err + } + listOpt := &client.ListOptions{LabelSelector: selector} + if err := r.cli.List(ctx, runs, listOpt); err != nil { + return err + } + sort.Sort(runs) + items := make(map[string][]v1alpha1.WorkflowRun) + for _, item := range runs.Items { + if v, ok := item.Labels[r.label]; ok { + items[v] = append(items[v], item) + } + } + for _, l := range items { + for i := 1; i < len(l); i++ { + item := l[i] + if item.Status.Finished && time.Since(item.Status.EndTime.Time) > r.duration { + if err := r.cli.Delete(ctx, &item); err != nil { + klog.Errorf("Failed to delete workflowRun %s/%s, error: %v", item.Namespace, item.Name, err) + } + klog.Info("Successfully recycled completed workflowRun %s/%s", item.Namespace, item.Name) + } + } + } + return nil +} diff --git a/pkg/utils/recycle_test.go b/pkg/utils/recycle_test.go new file mode 100644 index 0000000..1c7350e --- /dev/null +++ b/pkg/utils/recycle_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubevela/workflow/api/v1alpha1" +) + +func TestRecycleCronJob(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + r := require.New(t) + for i := 1; i < 7; i++ { + run := &v1alpha1.WorkflowRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("workflow-test-%d", i), + Namespace: "default", + }, + } + if i%5 != 0 { + run.Labels = map[string]string{ + "pipeline.oam.dev/name": "test", + } + } + err := cli.Create(ctx, run) + r.NoError(err) + run.Status.Finished = i%6 != 0 + run.Status.EndTime = metav1.Time{Time: time.Now().AddDate(0, 0, -i)} + err = cli.Status().Update(ctx, run) + r.NoError(err) + defer cli.Delete(ctx, run) + } + + runner := NewRecycleCronJob(cli, time.Hour, "@every 1s", "pipeline.oam.dev/name") + err := runner.Start(ctx) + r.NoError(err) + runs := &v1alpha1.WorkflowRunList{} + err = cli.List(ctx, runs) + r.NoError(err) + r.Equal(3, len(runs.Items)) +}