Skip to content

Commit

Permalink
add metrics and pending phase
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <[email protected]>
  • Loading branch information
FogDong committed Jul 18, 2022
1 parent 64d5dfe commit 4c1a4cb
Show file tree
Hide file tree
Showing 36 changed files with 815 additions and 1,039 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ check-diff: reviewable ## Execute auto-gen code commands and ensure branch is cl
.PHONY: manifests
manifests: controller-gen ## Generate CustomResourceDefinition objects.
$(CONTROLLER_GEN) crd paths="./..." output:crd:artifacts:config=config/crd/bases
mv config/crd/bases/* charts/vela-workflow/crds/

.PHONY: generate
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ const (
WorkflowStepPhaseStopped WorkflowStepPhase = "stopped"
// WorkflowStepPhaseRunning will make the controller continue the workflow.
WorkflowStepPhaseRunning WorkflowStepPhase = "running"
// WorkflowStepPhasePending will make the controller wait for the step to run.
WorkflowStepPhasePending WorkflowStepPhase = "pending"
)

// StepOutputs defines output variable of WorkflowStep
Expand Down
4 changes: 2 additions & 2 deletions charts/vela-workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow -
| `logDebug` | Enable debug logs for development purpose | `false` |
| `logFilePath` | If non-empty, write log files in this path | `""` |
| `logFileMaxSize` | Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. | `1024` |
| `kubeClient.qps` | The qps for reconcile clients, default is 50 | `50` |
| `kubeClient.burst` | The burst for reconcile clients, default is 100 | `100` |
| `kubeClient.qps` | The qps for reconcile clients, default is 50 | `500` |
| `kubeClient.burst` | The burst for reconcile clients, default is 100 | `1000` |


## Uninstallation
Expand Down
10 changes: 6 additions & 4 deletions charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ spec:
- "--webhook-port={{ .Values.webhookService.port }}"
- "--webhook-cert-dir={{ .Values.admissionWebhooks.certificate.mountPath }}"
{{ end }}
# - "--health-addr=:{{ .Values.healthCheck.port }}"
# - "--concurrent-reconciles={{ .Values.concurrentReconciles }}"
# - "--kube-api-qps={{ .Values.kubeClient.qps }}"
# - "--kube-api-burst={{ .Values.kubeClient.burst }}"
- "--metrics-bind-address=:8080"
- "--leader-elect"
- "--health-probe-bind-address=:{{ .Values.healthCheck.port }}"
- "--concurrent-reconciles={{ .Values.concurrentReconciles }}"
- "--kube-api-qps={{ .Values.kubeClient.qps }}"
- "--kube-api-burst={{ .Values.kubeClient.burst }}"
- "--max-workflow-wait-backoff-time={{ .Values.workflow.backoff.maxTime.waitState }}"
- "--max-workflow-failed-backoff-time={{ .Values.workflow.backoff.maxTime.failedState }}"
- "--max-workflow-step-error-retry-times={{ .Values.workflow.step.errorRetryTimes }}"
Expand Down
4 changes: 2 additions & 2 deletions charts/vela-workflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,5 @@ admissionWebhooks:
## @param kubeClient.qps The qps for reconcile clients, default is 50
## @param kubeClient.burst The burst for reconcile clients, default is 100
kubeClient:
qps: 50
burst: 100
qps: 500
burst: 1000
140 changes: 110 additions & 30 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,40 @@ limitations under the License.
package main

import (
"context"
"errors"
"fmt"
"net/http"
"net/http/pprof"
"os"
"strings"
"time"

"github.com/crossplane/crossplane-runtime/pkg/event"
flag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"

"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"

"github.com/kubevela/workflow/api/v1alpha1"
"github.com/kubevela/workflow/controllers"
ctrlClient "github.com/kubevela/workflow/pkg/client"
"github.com/kubevela/workflow/pkg/cue/packages"
"github.com/kubevela/workflow/pkg/monitor/watcher"
"github.com/kubevela/workflow/pkg/types"
"github.com/kubevela/workflow/version"
//+kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
scheme = runtime.NewScheme()
)

func init() {
Expand All @@ -55,46 +65,108 @@ func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var qps float64
var burst int
var webhookPort int
var leaderElectionResourceLock string
var leaseDuration time.Duration
var renewDeadline time.Duration
var retryPeriod time.Duration
var pprofAddr string
var controllerArgs controllers.Args

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.StringVar(&leaderElectionResourceLock, "leader-election-resource-lock", "configmapsleases", "The resource lock to use for leader election")
flag.DurationVar(&leaseDuration, "leader-election-lease-duration", 15*time.Second,
"The duration that non-leader candidates will wait to force acquire leadership")
flag.DurationVar(&renewDeadline, "leader-election-renew-deadline", 10*time.Second,
"The duration that the acting controlplane will retry refreshing leadership before giving up")
flag.DurationVar(&retryPeriod, "leader-election-retry-period", 2*time.Second,
"The duration the LeaderElector clients should wait between tries of actions")
flag.IntVar(&webhookPort, "webhook-port", 9443, "admission webhook listen address")
flag.IntVar(&controllerArgs.ConcurrentReconciles, "concurrent-reconciles", 4, "concurrent-reconciles is the concurrent reconcile number of the controller. The default value is 4")
flag.Float64Var(&qps, "kube-api-qps", 50, "the qps for reconcile clients. Low qps may lead to low throughput. High qps may give stress to api-server. Raise this value if concurrent-reconciles is set to be high.")
flag.IntVar(&burst, "kube-api-burst", 100, "the burst for reconcile clients. Recommend setting it qps*2.")
flag.StringVar(&pprofAddr, "pprof-addr", "", "The address for pprof to use while exporting profiling results. The default value is empty which means do not expose it. Set it to address like :6666 to expose it.")
flag.IntVar(&types.MaxWorkflowWaitBackoffTime, "max-workflow-wait-backoff-time", 60, "Set the max workflow wait backoff time, default is 60")
flag.IntVar(&types.MaxWorkflowFailedBackoffTime, "max-workflow-failed-backoff-time", 300, "Set the max workflow wait backoff time, default is 300")
flag.IntVar(&types.MaxWorkflowStepErrorRetryTimes, "max-workflow-step-error-retry-times", 10, "Set the max workflow step error retry times, default is 10")
feature.DefaultMutableFeatureGate.AddFlag(flag.CommandLine)

flag.Parse()
klog.InitFlags(nil)

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "0ef1568c.core.oam.dev",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,

if pprofAddr != "" {
// Start pprof server if enabled
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
pprofServer := http.Server{
Addr: pprofAddr,
Handler: mux,
}
klog.InfoS("Starting debug HTTP server", "addr", pprofServer.Addr)

go func() {
go func() {
ctx := context.Background()
<-ctx.Done()

ctx, cancelFunc := context.WithTimeout(context.Background(), 60*time.Minute)
defer cancelFunc()

if err := pprofServer.Shutdown(ctx); err != nil {
klog.Error(err, "Failed to shutdown debug HTTP server")
}
}()

if err := pprofServer.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) {
klog.Error(err, "Failed to start debug HTTP server")
panic(err)
}
}()
}

ctrl.SetLogger(klogr.New())

klog.InfoS("KubeVela Workflow information", "version", version.VelaVersion, "revision", version.GitRevision)

restConfig := ctrl.GetConfigOrDie()
restConfig.QPS = float32(qps)
restConfig.Burst = burst
klog.InfoS("Kubernetes Config Loaded",
"QPS", restConfig.QPS,
"Burst", restConfig.Burst,
)

leaderElectionID := fmt.Sprintf("workflow-%s", strings.ToLower(strings.ReplaceAll(version.VelaVersion, ".", "-")))
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: webhookPort,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: leaderElectionID,
LeaderElectionResourceLock: leaderElectionResourceLock,
LeaseDuration: &leaseDuration,
RenewDeadline: &renewDeadline,
RetryPeriod: &retryPeriod,
NewClient: ctrlClient.DefaultNewControllerClient,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
klog.Error(err, "unable to start manager")
os.Exit(1)
}

pd, err := packages.NewPackageDiscover(mgr.GetConfig())
if err != nil {
setupLog.Error(err, "Failed to create CRD discovery for CUE package client")
klog.Error(err, "Failed to create CRD discovery for CUE package client")
if !packages.IsCUEParseErr(err) {
os.Exit(1)
}
Expand All @@ -105,24 +177,32 @@ func main() {
Scheme: mgr.GetScheme(),
PackageDiscover: pd,
Recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("WorkflowRun")),
Args: controllerArgs,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "WorkflowRun")
klog.Error(err, "unable to create controller", "controller", "WorkflowRun")
os.Exit(1)
}
//+kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
klog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
klog.Error(err, "unable to set up ready check")
os.Exit(1)
}

setupLog.Info("starting manager")
klog.Info("Start the vela workflow monitor")
informer, err := mgr.GetCache().GetInformer(context.Background(), &v1alpha1.WorkflowRun{})
if err != nil {
klog.ErrorS(err, "Unable to get informer for application")
}
watcher.StartWorkflowRunMetricsWatcher(informer)

klog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
klog.Error(err, "problem running manager")
os.Exit(1)
}
}
Loading

0 comments on commit 4c1a4cb

Please sign in to comment.