Skip to content

Commit 0afe997

Browse files
authored
Merge pull request #119 from Peefy/feat-depends-on
feat: add dependsOn field for flux kcl controller
2 parents 769591f + 4cb4ff6 commit 0afe997

File tree

2 files changed

+75
-2
lines changed

2 files changed

+75
-2
lines changed

cmd/main.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818

1919
import (
2020
"os"
21+
"time"
2122

2223
flag "github.com/spf13/pflag"
2324
"k8s.io/apimachinery/pkg/runtime"
@@ -65,6 +66,7 @@ func main() {
6566
var (
6667
metricsAddr string
6768
eventsAddr string
69+
requeueDependency time.Duration
6870
enableLeaderElection bool
6971
httpRetry int
7072
defaultServiceAccount string
@@ -80,6 +82,7 @@ func main() {
8082

8183
flag.StringVar(&metricsAddr, "metrics-addr", ":8083", "The address the metric endpoint binds to.")
8284
flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.")
85+
flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.")
8386
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
8487
"Enable leader election for controller manager. "+
8588
"Enabling this will ensure there is only one active controller manager.")
@@ -134,7 +137,8 @@ func main() {
134137
StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), pollingOpts),
135138
DisallowedFieldManagers: disallowedFieldManagers,
136139
}).SetupWithManager(mgr, controller.KCLRunReconcilerOptions{
137-
HTTPRetry: httpRetry,
140+
DependencyRequeueInterval: requeueDependency,
141+
HTTPRetry: httpRetry,
138142
}); err != nil {
139143
setupLog.Error(err, "unable to create controller", "controller", "KCLRun")
140144
os.Exit(1)

internal/controller/kclrun_controller.go

+70-1
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,14 @@ type KCLRunReconciler struct {
8888
DefaultServiceAccount string
8989
DisallowedFieldManagers []string
9090
artifactFetcher *fetch.ArchiveFetcher
91+
requeueDependency time.Duration
9192

9293
statusManager string
9394
}
9495

9596
type KCLRunReconcilerOptions struct {
96-
HTTPRetry int
97+
DependencyRequeueInterval time.Duration
98+
HTTPRetry int
9799
}
98100

99101
// SetupWithManager sets up the controller with the Manager.
@@ -105,6 +107,7 @@ func (r *KCLRunReconciler) SetupWithManager(mgr ctrl.Manager, opts KCLRunReconci
105107
fetch.WithUntar(tar.WithMaxUntarSize(tar.UnlimitedUntarSize)),
106108
fetch.WithHostnameOverwrite(os.Getenv("SOURCE_CONTROLLER_LOCALHOST")),
107109
)
110+
r.requeueDependency = opts.DependencyRequeueInterval
108111
r.statusManager = "gotk-flux-kcl-controller"
109112
// New controller
110113
return ctrl.NewControllerManagedBy(mgr).
@@ -214,6 +217,27 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
214217
return ctrl.Result{}, err
215218
}
216219
artifact := source.GetArtifact()
220+
221+
// Requeue the reconciliation if the source artifact is not found.
222+
if artifact == nil {
223+
msg := fmt.Sprintf("Source artifact not found, retrying in %s", r.requeueDependency.String())
224+
conditions.MarkFalse(&obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", msg)
225+
log.Info(msg)
226+
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
227+
}
228+
229+
// Check dependencies and requeue the reconciliation if the check fails.
230+
if len(obj.Spec.DependsOn) > 0 {
231+
if err := r.checkDependencies(ctx, &obj, source); err != nil {
232+
conditions.MarkFalse(&obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err)
233+
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
234+
log.Info(msg)
235+
r.event(&obj, artifact.Revision, eventv1.EventSeverityInfo, msg, nil)
236+
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
237+
}
238+
log.Info("All dependencies are ready, proceeding with reconciliation")
239+
}
240+
217241
progressingMsg := fmt.Sprintf("new revision detected %s", artifact.Revision)
218242
log.Info(progressingMsg)
219243
conditions.MarkUnknown(&obj, meta.ReadyCondition, meta.ProgressingReason, "Reconciliation in progress")
@@ -368,6 +392,51 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
368392
return ctrl.Result{}, nil
369393
}
370394

395+
func (r *KCLRunReconciler) checkDependencies(ctx context.Context,
396+
obj *v1alpha1.KCLRun,
397+
source sourcev1.Source) error {
398+
for _, d := range obj.Spec.DependsOn {
399+
if d.Namespace == "" {
400+
d.Namespace = obj.GetNamespace()
401+
}
402+
dName := types.NamespacedName{
403+
Namespace: d.Namespace,
404+
Name: d.Name,
405+
}
406+
var k v1alpha1.KCLRun
407+
err := r.Get(ctx, dName, &k)
408+
if err != nil {
409+
return fmt.Errorf("dependency '%s' not found: %w", dName, err)
410+
}
411+
412+
if len(k.Status.Conditions) == 0 || k.Generation != k.Status.ObservedGeneration {
413+
return fmt.Errorf("dependency '%s' is not ready", dName)
414+
}
415+
416+
if !apimeta.IsStatusConditionTrue(k.Status.Conditions, meta.ReadyCondition) {
417+
return fmt.Errorf("dependency '%s' is not ready", dName)
418+
}
419+
420+
srcNamespace := k.Spec.SourceRef.Namespace
421+
if srcNamespace == "" {
422+
srcNamespace = k.GetNamespace()
423+
}
424+
dSrcNamespace := obj.Spec.SourceRef.Namespace
425+
if dSrcNamespace == "" {
426+
dSrcNamespace = obj.GetNamespace()
427+
}
428+
429+
if k.Spec.SourceRef.Name == obj.Spec.SourceRef.Name &&
430+
srcNamespace == dSrcNamespace &&
431+
k.Spec.SourceRef.Kind == obj.Spec.SourceRef.Kind &&
432+
!source.GetArtifact().HasRevision(k.Status.LastAppliedRevision) {
433+
return fmt.Errorf("dependency '%s' revision is not up to date", dName)
434+
}
435+
}
436+
437+
return nil
438+
}
439+
371440
func (r *KCLRunReconciler) getSource(ctx context.Context,
372441
obj *v1alpha1.KCLRun) (sourcev1.Source, error) {
373442
var src sourcev1.Source

0 commit comments

Comments
 (0)