diff --git a/pkg/controller/daemonset/daemonset_controller.go b/pkg/controller/daemonset/daemonset_controller.go index 050223de..32980f8e 100644 --- a/pkg/controller/daemonset/daemonset_controller.go +++ b/pkg/controller/daemonset/daemonset_controller.go @@ -34,11 +34,12 @@ import ( // Add creates a new DaemonSet Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { - return add(mgr, newReconciler(mgr)) + r := newReconciler(mgr) + return add(mgr, r, r.handler) } // newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager) reconcile.Reconciler { +func newReconciler(mgr manager.Manager) *ReconcileDaemonSet { return &ReconcileDaemonSet{ scheme: mgr.GetScheme(), handler: core.NewHandler(mgr.GetClient(), mgr.GetEventRecorderFor("wave")), @@ -46,7 +47,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { } // add adds a new Controller to mgr with r as the reconcile.Reconciler -func add(mgr manager.Manager, r reconcile.Reconciler) error { +func add(mgr manager.Manager, r reconcile.Reconciler, h *core.Handler) error { // Create a new controller c, err := controller.New("daemonset-controller", mgr, controller.Options{Reconciler: r}) if err != nil { @@ -58,16 +59,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } - handler := handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.DaemonSet{}) - // Watch ConfigMaps owned by a DaemonSet - err = c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), handler) + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), core.EnqueueRequestForWatcher(h.GetWatchedConfigmaps())) if err != nil { return err } // Watch Secrets owned by a DaemonSet - err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler) + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), core.EnqueueRequestForWatcher(h.GetWatchedSecrets())) if err != nil { return err } @@ -95,6 +94,7 @@ func (r *ReconcileDaemonSet) Reconcile(ctx context.Context, request reconcile.Re err := r.handler.Get(ctx, request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { + r.handler.RemoveWatches(request.NamespacedName) // Object not found, return. Created objects are automatically garbage collected. return reconcile.Result{}, nil } diff --git a/pkg/controller/daemonset/daemonset_controller_suite_test.go b/pkg/controller/daemonset/daemonset_controller_suite_test.go index dbeae73b..4ad3d13e 100644 --- a/pkg/controller/daemonset/daemonset_controller_suite_test.go +++ b/pkg/controller/daemonset/daemonset_controller_suite_test.go @@ -63,14 +63,16 @@ var _ = AfterSuite(func() { // SetupTestReconcile returns a reconcile.Reconcile implementation that delegates to inner and // writes the request to requests after Reconcile is finished. -func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) { +func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request, chan reconcile.Request) { + requestsStart := make(chan reconcile.Request) requests := make(chan reconcile.Request) fn := reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + requestsStart <- req result, err := inner.Reconcile(ctx, req) requests <- req return result, err }) - return fn, requests + return fn, requestsStart, requests } // Run runs the webhook server. diff --git a/pkg/controller/daemonset/daemonset_controller_test.go b/pkg/controller/daemonset/daemonset_controller_test.go index 205e719e..efb9d3a8 100644 --- a/pkg/controller/daemonset/daemonset_controller_test.go +++ b/pkg/controller/daemonset/daemonset_controller_test.go @@ -18,7 +18,6 @@ package daemonset import ( "context" - "fmt" "time" . "github.com/onsi/ginkgo/v2" @@ -28,8 +27,6 @@ import ( "github.com/wave-k8s/wave/test/utils" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -44,12 +41,12 @@ var _ = Describe("DaemonSet controller Suite", func() { var m utils.Matcher var daemonset *appsv1.DaemonSet + var requestsStart <-chan reconcile.Request var requests <-chan reconcile.Request const timeout = time.Second * 5 const consistentlyTimeout = time.Second - var ownerRef metav1.OwnerReference var cm1 *corev1.ConfigMap var cm2 *corev1.ConfigMap var cm3 *corev1.ConfigMap @@ -67,9 +64,28 @@ var _ = Describe("DaemonSet controller Suite", func() { }, } // wait for reconcile for creating the DaemonSet + Eventually(requestsStart, timeout).Should(Receive(Equal(request))) Eventually(requests, timeout).Should(Receive(Equal(request))) } + var consistentlyDaemonSetNotReconciled = func(obj core.Object) { + request := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }, + } + // wait for reconcile for creating the DaemonSet + Consistently(requestsStart, consistentlyTimeout).ShouldNot(Receive(Equal(request))) + } + + var clearReconciled = func() { + for len(requestsStart) > 0 { + <-requestsStart + <-requests + } + } + BeforeEach(func() { // Reset the Prometheus Registry before each test to avoid errors metrics.Registry = prometheus.NewRegistry() @@ -86,8 +102,9 @@ var _ = Describe("DaemonSet controller Suite", func() { m = utils.Matcher{Client: c} var recFn reconcile.Reconciler - recFn, requests = SetupTestReconcile(newReconciler(mgr)) - Expect(add(mgr, recFn)).NotTo(HaveOccurred()) + r := newReconciler(mgr) + recFn, requestsStart, requests = SetupTestReconcile(r) + Expect(add(mgr, recFn, r.handler)).NotTo(HaveOccurred()) testCtx, testCancel = context.WithCancel(context.Background()) go Run(testCtx, mgr) @@ -116,41 +133,13 @@ var _ = Describe("DaemonSet controller Suite", func() { daemonset = utils.ExampleDaemonSet.DeepCopy() // Create a daemonset and wait for it to be reconciled + clearReconciled() m.Create(daemonset).Should(Succeed()) waitForDaemonSetReconciled(daemonset) - - ownerRef = utils.GetOwnerRefDaemonSet(daemonset) }) AfterEach(func() { - // Make sure to delete any finalizers (if the daemonset exists) - Eventually(func() error { - key := types.NamespacedName{Namespace: daemonset.GetNamespace(), Name: daemonset.GetName()} - err := c.Get(context.TODO(), key, daemonset) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - daemonset.SetFinalizers([]string{}) - return c.Update(context.TODO(), daemonset) - }, timeout).Should(Succeed()) - - Eventually(func() error { - key := types.NamespacedName{Namespace: daemonset.GetNamespace(), Name: daemonset.GetName()} - err := c.Get(context.TODO(), key, daemonset) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - if len(daemonset.GetFinalizers()) > 0 { - return fmt.Errorf("Finalizers not upated") - } - return nil - }, timeout).Should(Succeed()) + testCancel() utils.DeleteAll(cfg, timeout, &appsv1.DaemonSetList{}, @@ -158,8 +147,6 @@ var _ = Describe("DaemonSet controller Suite", func() { &corev1.SecretList{}, &corev1.EventList{}, ) - - testCancel() }) Context("When a DaemonSet is reconciled", func() { @@ -174,25 +161,16 @@ var _ = Describe("DaemonSet controller Suite", func() { obj.SetAnnotations(annotations) return obj } - + clearReconciled() m.Update(daemonset, addAnnotation).Should(Succeed()) + // Two runs since we the controller retriggers itself by changing the object + waitForDaemonSetReconciled(daemonset) waitForDaemonSetReconciled(daemonset) // Get the updated DaemonSet m.Get(daemonset, timeout).Should(Succeed()) }) - It("Adds OwnerReferences to all children", func() { - for _, obj := range []core.Object{cm1, cm2, cm3, s1, s2, s3} { - m.Get(obj, timeout).Should(Succeed()) - Eventually(obj, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - } - }) - - It("Adds a finalizer to the DaemonSet", func() { - Eventually(daemonset, timeout).Should(utils.WithFinalizers(ContainElement(core.FinalizerString))) - }) - It("Adds a config hash to the Pod Template", func() { Eventually(daemonset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) }) @@ -224,7 +202,7 @@ var _ = Describe("DaemonSet controller Suite", func() { ss.Spec.Template.Spec.Containers = []corev1.Container{containers[0]} return ss } - + clearReconciled() m.Update(daemonset, removeContainer2).Should(Succeed()) waitForDaemonSetReconciled(daemonset) waitForDaemonSetReconciled(daemonset) @@ -233,16 +211,22 @@ var _ = Describe("DaemonSet controller Suite", func() { m.Get(daemonset, timeout).Should(Succeed()) }) - It("Removes the OwnerReference from the orphaned ConfigMap", func() { - Eventually(cm2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) + It("Updates the config hash in the Pod Template", func() { + Eventually(func() string { + return daemonset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) - It("Removes the OwnerReference from the orphaned Secret", func() { - Eventually(s2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - }) + It("Changes to the removed children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() - It("Updates the config hash in the Pod Template", func() { - Eventually(daemonset, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + m.Update(cm2, modifyCM).Should(Succeed()) + consistentlyDaemonSetNotReconciled(daemonset) }) }) @@ -250,6 +234,7 @@ var _ = Describe("DaemonSet controller Suite", func() { var originalHash string BeforeEach(func() { + m.Get(daemonset, timeout).Should(Succeed()) Eventually(daemonset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) originalHash = daemonset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] }) @@ -261,8 +246,9 @@ var _ = Describe("DaemonSet controller Suite", func() { cm.Data["key1"] = modified return cm } + clearReconciled() m.Update(cm1, modifyCM).Should(Succeed()) - + waitForDaemonSetReconciled(daemonset) waitForDaemonSetReconciled(daemonset) // Get the updated DaemonSet @@ -270,7 +256,9 @@ var _ = Describe("DaemonSet controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(daemonset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return daemonset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -281,8 +269,9 @@ var _ = Describe("DaemonSet controller Suite", func() { cm.Data["key1"] = modified return cm } + clearReconciled() m.Update(cm2, modifyCM).Should(Succeed()) - + waitForDaemonSetReconciled(daemonset) waitForDaemonSetReconciled(daemonset) // Get the updated DaemonSet @@ -290,7 +279,9 @@ var _ = Describe("DaemonSet controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(daemonset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return daemonset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -304,8 +295,9 @@ var _ = Describe("DaemonSet controller Suite", func() { s.StringData["key1"] = modified return s } + clearReconciled() m.Update(s1, modifyS).Should(Succeed()) - + waitForDaemonSetReconciled(daemonset) waitForDaemonSetReconciled(daemonset) // Get the updated DaemonSet @@ -313,7 +305,9 @@ var _ = Describe("DaemonSet controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(daemonset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return daemonset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -327,8 +321,9 @@ var _ = Describe("DaemonSet controller Suite", func() { s.StringData["key1"] = modified return s } + clearReconciled() m.Update(s2, modifyS).Should(Succeed()) - + waitForDaemonSetReconciled(daemonset) waitForDaemonSetReconciled(daemonset) // Get the updated DaemonSet @@ -336,7 +331,9 @@ var _ = Describe("DaemonSet controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(daemonset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return daemonset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) }) @@ -347,23 +344,27 @@ var _ = Describe("DaemonSet controller Suite", func() { obj.SetAnnotations(make(map[string]string)) return obj } + clearReconciled() m.Update(daemonset, removeAnnotations).Should(Succeed()) waitForDaemonSetReconciled(daemonset) - waitForDaemonSetReconciled(daemonset) m.Get(daemonset).Should(Succeed()) Eventually(daemonset, timeout).ShouldNot(utils.WithAnnotations(HaveKey(core.RequiredAnnotation))) }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - m.Get(obj, timeout).Should(Succeed()) - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Removes the config hash annotation", func() { + m.Consistently(daemonset, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) }) - It("Removes the DaemonSet's finalizer", func() { - m.Get(daemonset).Should(Succeed()) - Eventually(daemonset, timeout).ShouldNot(utils.WithFinalizers(ContainElement(core.FinalizerString))) + It("Changes to children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() + + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyDaemonSetNotReconciled(daemonset) }) }) @@ -371,22 +372,25 @@ var _ = Describe("DaemonSet controller Suite", func() { BeforeEach(func() { // Make sure the cache has synced before we run the test Eventually(daemonset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) + clearReconciled() m.Delete(daemonset).Should(Succeed()) waitForDaemonSetReconciled(daemonset) + }) - // Get the updated DaemonSet - m.Get(daemonset, timeout).Should(Succeed()) - Eventually(daemonset, timeout).ShouldNot(utils.WithDeletionTimestamp(BeNil())) + It("Not longer exists", func() { + m.Get(daemonset).Should(MatchError(MatchRegexp(`not found`))) }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) + + It("Changes to children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm } - }) + clearReconciled() - It("Removes the DaemonSet's finalizer", func() { - // Removing the finalizer causes the daemonset to be deleted - m.Get(daemonset, timeout).ShouldNot(Succeed()) + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyDaemonSetNotReconciled(daemonset) }) }) }) @@ -397,18 +401,20 @@ var _ = Describe("DaemonSet controller Suite", func() { m.Get(daemonset, timeout).Should(Succeed()) }) - It("Doesn't add any OwnerReferences to any children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - m.Consistently(obj, consistentlyTimeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Doesn't add a config hash to the Pod Template", func() { + m.Consistently(daemonset, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) }) - It("Doesn't add a finalizer to the DaemonSet", func() { - m.Consistently(daemonset, consistentlyTimeout).ShouldNot(utils.WithFinalizers(ContainElement(core.FinalizerString))) - }) + It("Changes to children no do not trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() - It("Doesn't add a config hash to the Pod Template", func() { - m.Consistently(daemonset, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyDaemonSetNotReconciled(daemonset) }) }) }) diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 29da1a8a..7ceeb38e 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -34,11 +34,12 @@ import ( // Add creates a new Deployment Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { - return add(mgr, newReconciler(mgr)) + r := newReconciler(mgr) + return add(mgr, r, r.handler) } // newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager) reconcile.Reconciler { +func newReconciler(mgr manager.Manager) *ReconcileDeployment { return &ReconcileDeployment{ scheme: mgr.GetScheme(), handler: core.NewHandler(mgr.GetClient(), mgr.GetEventRecorderFor("wave")), @@ -46,7 +47,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { } // add adds a new Controller to mgr with r as the reconcile.Reconciler -func add(mgr manager.Manager, r reconcile.Reconciler) error { +func add(mgr manager.Manager, r reconcile.Reconciler, h *core.Handler) error { // Create a new controller c, err := controller.New("deployment-controller", mgr, controller.Options{Reconciler: r}) if err != nil { @@ -59,16 +60,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } - handler := handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}) - - // Watch ConfigMaps owned by a Deployment - err = c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), handler) + // Watch ConfigMaps owned by a DaemonSet + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), core.EnqueueRequestForWatcher(h.GetWatchedConfigmaps())) if err != nil { return err } - // Watch Secrets owned by a Deployment - err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler) + // Watch Secrets owned by a DaemonSet + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), core.EnqueueRequestForWatcher(h.GetWatchedSecrets())) if err != nil { return err } @@ -96,6 +95,7 @@ func (r *ReconcileDeployment) Reconcile(ctx context.Context, request reconcile.R err := r.handler.Get(ctx, request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { + r.handler.RemoveWatches(request.NamespacedName) // Object not found, return. Created objects are automatically garbage collected. return reconcile.Result{}, nil } diff --git a/pkg/controller/deployment/deployment_controller_suite_test.go b/pkg/controller/deployment/deployment_controller_suite_test.go index 3a3e391f..0da71857 100644 --- a/pkg/controller/deployment/deployment_controller_suite_test.go +++ b/pkg/controller/deployment/deployment_controller_suite_test.go @@ -66,14 +66,16 @@ var _ = AfterSuite(func() { // SetupTestReconcile returns a reconcile.Reconcile implementation that delegates to inner and // writes the request to requests after Reconcile is finished. -func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) { +func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request, chan reconcile.Request) { + requestsStart := make(chan reconcile.Request) requests := make(chan reconcile.Request) fn := reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + requestsStart <- req result, err := inner.Reconcile(ctx, req) requests <- req return result, err }) - return fn, requests + return fn, requestsStart, requests } // Run runs the webhook server. diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 4f0b8edf..ba887726 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -18,7 +18,6 @@ package deployment import ( "context" - "fmt" "time" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -30,8 +29,6 @@ import ( "github.com/wave-k8s/wave/test/utils" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,12 +42,12 @@ var _ = Describe("Deployment controller Suite", func() { var m utils.Matcher var deployment *appsv1.Deployment + var requestsStart <-chan reconcile.Request var requests <-chan reconcile.Request const timeout = time.Second * 5 const consistentlyTimeout = time.Second - var ownerRef metav1.OwnerReference var cm1 *corev1.ConfigMap var cm2 *corev1.ConfigMap var cm3 *corev1.ConfigMap @@ -73,10 +70,29 @@ var _ = Describe("Deployment controller Suite", func() { Namespace: obj.GetNamespace(), }, } - // wait for reconcile for creating the Deployment + // wait for reconcile for creating the DaemonSet + Eventually(requestsStart, timeout).Should(Receive(Equal(request))) Eventually(requests, timeout).Should(Receive(Equal(request))) } + var consistentlyDeploymentNotReconciled = func(obj core.Object) { + request := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }, + } + // wait for reconcile for creating the DaemonSet + Consistently(requestsStart, consistentlyTimeout).ShouldNot(Receive(Equal(request))) + } + + var clearReconciled = func() { + for len(requestsStart) > 0 { + <-requestsStart + <-requests + } + } + BeforeEach(func() { // Reset the Prometheus Registry before each test to avoid errors metrics.Registry = prometheus.NewRegistry() @@ -93,8 +109,9 @@ var _ = Describe("Deployment controller Suite", func() { m = utils.Matcher{Client: c} var recFn reconcile.Reconciler - recFn, requests = SetupTestReconcile(newReconciler(mgr)) - Expect(add(mgr, recFn)).NotTo(HaveOccurred()) + r := newReconciler(mgr) + recFn, requestsStart, requests = SetupTestReconcile(r) + Expect(add(mgr, recFn, r.handler)).NotTo(HaveOccurred()) testCtx, testCancel = context.WithCancel(context.Background()) go Run(testCtx, mgr) @@ -141,41 +158,13 @@ var _ = Describe("Deployment controller Suite", func() { deployment = utils.ExampleDeployment.DeepCopy() // Create a deployment and wait for it to be reconciled + clearReconciled() m.Create(deployment).Should(Succeed()) waitForDeploymentReconciled(deployment) - - ownerRef = utils.GetOwnerRefDeployment(deployment) }) AfterEach(func() { - // Make sure to delete any finalizers (if the deployment exists) - Eventually(func() error { - key := types.NamespacedName{Namespace: deployment.GetNamespace(), Name: deployment.GetName()} - err := c.Get(context.TODO(), key, deployment) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - deployment.SetFinalizers([]string{}) - return c.Update(context.TODO(), deployment) - }, timeout).Should(Succeed()) - - Eventually(func() error { - key := types.NamespacedName{Namespace: deployment.GetNamespace(), Name: deployment.GetName()} - err := c.Get(context.TODO(), key, deployment) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - if len(deployment.GetFinalizers()) > 0 { - return fmt.Errorf("Finalizers not upated") - } - return nil - }, timeout).Should(Succeed()) + testCancel() utils.DeleteAll(cfg, timeout, &appsv1.DeploymentList{}, @@ -183,8 +172,6 @@ var _ = Describe("Deployment controller Suite", func() { &corev1.SecretList{}, &corev1.EventList{}, ) - - testCancel() }) Context("When a Deployment is reconciled", func() { @@ -199,25 +186,16 @@ var _ = Describe("Deployment controller Suite", func() { obj.SetAnnotations(annotations) return obj } - + clearReconciled() m.Update(deployment, addAnnotation).Should(Succeed()) + // Two runs since we the controller retriggers itself by changing the object + waitForDeploymentReconciled(deployment) waitForDeploymentReconciled(deployment) // Get the updated Deployment m.Get(deployment, timeout).Should(Succeed()) }) - It("Adds OwnerReferences to all children", func() { - for _, obj := range []core.Object{cm1, cm2, cm3, s1, s2, s3} { - m.Get(obj, timeout).Should(Succeed()) - Eventually(obj, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - } - }) - - It("Adds a finalizer to the Deployment", func() { - Eventually(deployment, timeout).Should(utils.WithFinalizers(ContainElement(core.FinalizerString))) - }) - It("Adds a config hash to the Pod Template", func() { Eventually(deployment, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) }) @@ -251,7 +229,7 @@ var _ = Describe("Deployment controller Suite", func() { dep.Spec.Template.Spec.Containers = []corev1.Container{containers[0]} return dep } - + clearReconciled() m.Update(deployment, removeContainer2).Should(Succeed()) waitForDeploymentReconciled(deployment) waitForDeploymentReconciled(deployment) @@ -260,16 +238,22 @@ var _ = Describe("Deployment controller Suite", func() { m.Get(deployment, timeout).Should(Succeed()) }) - It("Removes the OwnerReference from the orphaned ConfigMap", func() { - Eventually(cm2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) + It("Updates the config hash in the Pod Template", func() { + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) - It("Removes the OwnerReference from the orphaned Secret", func() { - Eventually(s2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - }) + It("Changes to the removed children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() - It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + m.Update(cm2, modifyCM).Should(Succeed()) + consistentlyDeploymentNotReconciled(deployment) }) }) @@ -277,6 +261,7 @@ var _ = Describe("Deployment controller Suite", func() { var originalHash string BeforeEach(func() { + m.Get(deployment, timeout).Should(Succeed()) Eventually(deployment, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) originalHash = deployment.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] }) @@ -288,8 +273,9 @@ var _ = Describe("Deployment controller Suite", func() { cm.Data["key1"] = modified return cm } + clearReconciled() m.Update(cm1, modifyCM).Should(Succeed()) - + waitForDeploymentReconciled(deployment) waitForDeploymentReconciled(deployment) // Get the updated Deployment @@ -297,7 +283,9 @@ var _ = Describe("Deployment controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -308,8 +296,9 @@ var _ = Describe("Deployment controller Suite", func() { cm.Data["key1"] = modified return cm } + clearReconciled() m.Update(cm2, modifyCM).Should(Succeed()) - + waitForDeploymentReconciled(deployment) waitForDeploymentReconciled(deployment) // Get the updated Deployment @@ -317,7 +306,9 @@ var _ = Describe("Deployment controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -331,8 +322,9 @@ var _ = Describe("Deployment controller Suite", func() { s.StringData["key1"] = modified return s } + clearReconciled() m.Update(s1, modifyS).Should(Succeed()) - + waitForDeploymentReconciled(deployment) waitForDeploymentReconciled(deployment) // Get the updated Deployment @@ -340,7 +332,9 @@ var _ = Describe("Deployment controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -354,8 +348,9 @@ var _ = Describe("Deployment controller Suite", func() { s.StringData["key1"] = modified return s } + clearReconciled() m.Update(s2, modifyS).Should(Succeed()) - + waitForDeploymentReconciled(deployment) waitForDeploymentReconciled(deployment) // Get the updated Deployment @@ -363,7 +358,9 @@ var _ = Describe("Deployment controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) }) @@ -374,23 +371,27 @@ var _ = Describe("Deployment controller Suite", func() { obj.SetAnnotations(make(map[string]string)) return obj } + clearReconciled() m.Update(deployment, removeAnnotations).Should(Succeed()) waitForDeploymentReconciled(deployment) - waitForDeploymentReconciled(deployment) - m.Get(deployment).Should(Succeed()) Eventually(deployment, timeout).ShouldNot(utils.WithAnnotations(HaveKey(core.RequiredAnnotation))) }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Removes the config hash annotation", func() { + m.Consistently(deployment, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) }) - It("Removes the Deployment's finalizer", func() { - m.Get(deployment).Should(Succeed()) - Eventually(deployment, timeout).ShouldNot(utils.WithFinalizers(ContainElement(core.FinalizerString))) + It("Changes to children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() + + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyDeploymentNotReconciled(deployment) }) }) @@ -398,22 +399,24 @@ var _ = Describe("Deployment controller Suite", func() { BeforeEach(func() { // Make sure the cache has synced before we run the test Eventually(deployment, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) + clearReconciled() m.Delete(deployment).Should(Succeed()) waitForDeploymentReconciled(deployment) - - // Get the updated Deployment - m.Get(deployment, timeout).Should(Succeed()) - Eventually(deployment, timeout).ShouldNot(utils.WithDeletionTimestamp(BeNil())) }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Not longer exists", func() { + m.Get(deployment).Should(MatchError(MatchRegexp(`not found`))) }) - It("Removes the Deployment's finalizer", func() { - // Removing the finalizer causes the deployment to be deleted - m.Get(deployment, timeout).ShouldNot(Succeed()) + It("Changes to children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() + + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyDeploymentNotReconciled(deployment) }) }) }) @@ -424,18 +427,20 @@ var _ = Describe("Deployment controller Suite", func() { m.Get(deployment, timeout).Should(Succeed()) }) - It("Doesn't add any OwnerReferences to any children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - m.Consistently(obj, consistentlyTimeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Doesn't add a config hash to the Pod Template", func() { + m.Consistently(deployment, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) }) - It("Doesn't add a finalizer to the Deployment", func() { - m.Consistently(deployment, consistentlyTimeout).ShouldNot(utils.WithFinalizers(ContainElement(core.FinalizerString))) - }) + It("Changes to children no do not trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() - It("Doesn't add a config hash to the Pod Template", func() { - m.Consistently(deployment, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyDeploymentNotReconciled(deployment) }) }) }) diff --git a/pkg/controller/statefulset/statefulset_controller.go b/pkg/controller/statefulset/statefulset_controller.go index 29f2f139..0febb435 100644 --- a/pkg/controller/statefulset/statefulset_controller.go +++ b/pkg/controller/statefulset/statefulset_controller.go @@ -34,11 +34,12 @@ import ( // Add creates a new StatefulSet Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { - return add(mgr, newReconciler(mgr)) + r := newReconciler(mgr) + return add(mgr, r, r.handler) } // newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager) reconcile.Reconciler { +func newReconciler(mgr manager.Manager) *ReconcileStatefulSet { return &ReconcileStatefulSet{ scheme: mgr.GetScheme(), handler: core.NewHandler(mgr.GetClient(), mgr.GetEventRecorderFor("wave")), @@ -46,7 +47,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { } // add adds a new Controller to mgr with r as the reconcile.Reconciler -func add(mgr manager.Manager, r reconcile.Reconciler) error { +func add(mgr manager.Manager, r reconcile.Reconciler, h *core.Handler) error { // Create a new controller c, err := controller.New("statefulset-controller", mgr, controller.Options{Reconciler: r}) if err != nil { @@ -59,16 +60,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } - handler := handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.StatefulSet{}) - - // Watch ConfigMaps owned by a StatefulSet - err = c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), handler) + // Watch ConfigMaps owned by a DaemonSet + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), core.EnqueueRequestForWatcher(h.GetWatchedConfigmaps())) if err != nil { return err } - // Watch Secrets owned by a StatefulSet - err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler) + // Watch Secrets owned by a DaemonSet + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), core.EnqueueRequestForWatcher(h.GetWatchedSecrets())) if err != nil { return err } @@ -96,6 +95,7 @@ func (r *ReconcileStatefulSet) Reconcile(ctx context.Context, request reconcile. err := r.handler.Get(ctx, request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { + r.handler.RemoveWatches(request.NamespacedName) // Object not found, return. Created objects are automatically garbage collected. return reconcile.Result{}, nil } diff --git a/pkg/controller/statefulset/statefulset_controller_suite_test.go b/pkg/controller/statefulset/statefulset_controller_suite_test.go index 62ef11d9..93ddfd9f 100644 --- a/pkg/controller/statefulset/statefulset_controller_suite_test.go +++ b/pkg/controller/statefulset/statefulset_controller_suite_test.go @@ -67,14 +67,16 @@ var _ = AfterSuite(func() { // SetupTestReconcile returns a reconcile.Reconcile implementation that delegates to inner and // writes the request to requests after Reconcile is finished. -func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) { +func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request, chan reconcile.Request) { + requestsStart := make(chan reconcile.Request) requests := make(chan reconcile.Request) fn := reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + requestsStart <- req result, err := inner.Reconcile(ctx, req) requests <- req return result, err }) - return fn, requests + return fn, requestsStart, requests } // Run runs the webhook server. diff --git a/pkg/controller/statefulset/statefulset_controller_test.go b/pkg/controller/statefulset/statefulset_controller_test.go index 6ac2b99d..578aa0d2 100644 --- a/pkg/controller/statefulset/statefulset_controller_test.go +++ b/pkg/controller/statefulset/statefulset_controller_test.go @@ -18,7 +18,6 @@ package statefulset import ( "context" - "fmt" "time" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -30,8 +29,6 @@ import ( "github.com/wave-k8s/wave/test/utils" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,12 +42,12 @@ var _ = Describe("StatefulSet controller Suite", func() { var m utils.Matcher var statefulset *appsv1.StatefulSet + var requestsStart <-chan reconcile.Request var requests <-chan reconcile.Request const timeout = time.Second * 5 const consistentlyTimeout = time.Second - var ownerRef metav1.OwnerReference var cm1 *corev1.ConfigMap var cm2 *corev1.ConfigMap var cm3 *corev1.ConfigMap @@ -68,9 +65,28 @@ var _ = Describe("StatefulSet controller Suite", func() { }, } // wait for reconcile for creating the StatefulSet + Eventually(requestsStart, timeout).Should(Receive(Equal(request))) Eventually(requests, timeout).Should(Receive(Equal(request))) } + var consistentlyStatefulSetNotReconciled = func(obj core.Object) { + request := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }, + } + // wait for reconcile for creating the DaemonSet + Consistently(requestsStart, consistentlyTimeout).ShouldNot(Receive(Equal(request))) + } + + var clearReconciled = func() { + for len(requestsStart) > 0 { + <-requestsStart + <-requests + } + } + BeforeEach(func() { // Reset the Prometheus Registry before each test to avoid errors metrics.Registry = prometheus.NewRegistry() @@ -89,8 +105,9 @@ var _ = Describe("StatefulSet controller Suite", func() { m = utils.Matcher{Client: c} var recFn reconcile.Reconciler - recFn, requests = SetupTestReconcile(newReconciler(mgr)) - Expect(add(mgr, recFn)).NotTo(HaveOccurred()) + r := newReconciler(mgr) + recFn, requestsStart, requests = SetupTestReconcile(r) + Expect(add(mgr, recFn, r.handler)).NotTo(HaveOccurred()) testCtx, testCancel = context.WithCancel(context.Background()) go Run(testCtx, mgr) @@ -119,42 +136,12 @@ var _ = Describe("StatefulSet controller Suite", func() { statefulset = utils.ExampleStatefulSet.DeepCopy() // Create a statefulset and wait for it to be reconciled + clearReconciled() m.Create(statefulset).Should(Succeed()) waitForStatefulSetReconciled(statefulset) - - ownerRef = utils.GetOwnerRefStatefulSet(statefulset) }) AfterEach(func() { - // Make sure to delete any finalizers (if the statefulset exists) - Eventually(func() error { - key := types.NamespacedName{Namespace: statefulset.GetNamespace(), Name: statefulset.GetName()} - err := c.Get(context.TODO(), key, statefulset) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - statefulset.SetFinalizers([]string{}) - return c.Update(context.TODO(), statefulset) - }, timeout).Should(Succeed()) - - Eventually(func() error { - key := types.NamespacedName{Namespace: statefulset.GetNamespace(), Name: statefulset.GetName()} - err := c.Get(context.TODO(), key, statefulset) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - if len(statefulset.GetFinalizers()) > 0 { - return fmt.Errorf("Finalizers not upated") - } - return nil - }, timeout).Should(Succeed()) - testCancel() utils.DeleteAll(cfg, timeout, @@ -177,25 +164,16 @@ var _ = Describe("StatefulSet controller Suite", func() { obj.SetAnnotations(annotations) return obj } - + clearReconciled() m.Update(statefulset, addAnnotation).Should(Succeed()) + // Two runs since we the controller retriggers itself by changing the object + waitForStatefulSetReconciled(statefulset) waitForStatefulSetReconciled(statefulset) // Get the updated StatefulSet m.Get(statefulset, timeout).Should(Succeed()) }) - It("Adds OwnerReferences to all children", func() { - for _, obj := range []core.Object{cm1, cm2, cm3, s1, s2, s3} { - m.Get(obj, timeout).Should(Succeed()) - Eventually(obj, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - } - }) - - It("Adds a finalizer to the StatefulSet", func() { - Eventually(statefulset, timeout).Should(utils.WithFinalizers(ContainElement(core.FinalizerString))) - }) - It("Adds a config hash to the Pod Template", func() { Eventually(statefulset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) }) @@ -229,7 +207,7 @@ var _ = Describe("StatefulSet controller Suite", func() { ss.Spec.Template.Spec.Containers = []corev1.Container{containers[0]} return ss } - + clearReconciled() m.Update(statefulset, removeContainer2).Should(Succeed()) waitForStatefulSetReconciled(statefulset) waitForStatefulSetReconciled(statefulset) @@ -238,18 +216,22 @@ var _ = Describe("StatefulSet controller Suite", func() { m.Get(statefulset, timeout).Should(Succeed()) }) - It("Removes the OwnerReference from the orphaned ConfigMap", func() { - m.Get(cm2, timeout).Should(Succeed()) - Eventually(cm2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) + It("Updates the config hash in the Pod Template", func() { + Eventually(func() string { + return statefulset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) - It("Removes the OwnerReference from the orphaned Secret", func() { - m.Get(s2, timeout).Should(Succeed()) - Eventually(s2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - }) + It("Changes to the removed children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() - It("Updates the config hash in the Pod Template", func() { - Eventually(statefulset, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + m.Update(cm2, modifyCM).Should(Succeed()) + consistentlyStatefulSetNotReconciled(statefulset) }) }) @@ -257,6 +239,7 @@ var _ = Describe("StatefulSet controller Suite", func() { var originalHash string BeforeEach(func() { + m.Get(statefulset, timeout).Should(Succeed()) Eventually(statefulset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) originalHash = statefulset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] }) @@ -268,15 +251,19 @@ var _ = Describe("StatefulSet controller Suite", func() { cm.Data["key1"] = modified return cm } + clearReconciled() m.Update(cm1, modifyCM).Should(Succeed()) waitForStatefulSetReconciled(statefulset) + waitForStatefulSetReconciled(statefulset) // Get the updated StatefulSet m.Get(statefulset, timeout).Should(Succeed()) }) It("Updates the config hash in the Pod Template", func() { - Eventually(statefulset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return statefulset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -287,15 +274,19 @@ var _ = Describe("StatefulSet controller Suite", func() { cm.Data["key1"] = modified return cm } + clearReconciled() m.Update(cm2, modifyCM).Should(Succeed()) waitForStatefulSetReconciled(statefulset) + waitForStatefulSetReconciled(statefulset) // Get the updated StatefulSet m.Get(statefulset, timeout).Should(Succeed()) }) It("Updates the config hash in the Pod Template", func() { - Eventually(statefulset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return statefulset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -309,15 +300,19 @@ var _ = Describe("StatefulSet controller Suite", func() { s.StringData["key1"] = modified return s } + clearReconciled() m.Update(s1, modifyS).Should(Succeed()) waitForStatefulSetReconciled(statefulset) + waitForStatefulSetReconciled(statefulset) // Get the updated StatefulSet m.Get(statefulset, timeout).Should(Succeed()) }) It("Updates the config hash in the Pod Template", func() { - Eventually(statefulset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return statefulset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -331,15 +326,19 @@ var _ = Describe("StatefulSet controller Suite", func() { s.StringData["key1"] = modified return s } + clearReconciled() m.Update(s2, modifyS).Should(Succeed()) waitForStatefulSetReconciled(statefulset) + waitForStatefulSetReconciled(statefulset) // Get the updated StatefulSet m.Get(statefulset, timeout).Should(Succeed()) }) It("Updates the config hash in the Pod Template", func() { - Eventually(statefulset, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(core.ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return statefulset.Spec.Template.GetAnnotations()[core.ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) }) @@ -350,23 +349,27 @@ var _ = Describe("StatefulSet controller Suite", func() { obj.SetAnnotations(make(map[string]string)) return obj } + clearReconciled() m.Update(statefulset, removeAnnotations).Should(Succeed()) waitForStatefulSetReconciled(statefulset) - waitForStatefulSetReconciled(statefulset) - m.Get(statefulset, timeout).Should(Succeed()) Eventually(statefulset, timeout).ShouldNot(utils.WithAnnotations(HaveKey(core.RequiredAnnotation))) }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Removes the config hash annotation", func() { + m.Consistently(statefulset, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) }) - It("Removes the StatefulSet's finalizer", func() { - m.Get(statefulset, timeout).Should(Succeed()) - Eventually(statefulset, timeout).ShouldNot(utils.WithFinalizers(ContainElement(core.FinalizerString))) + It("Changes to children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() + + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyStatefulSetNotReconciled(statefulset) }) }) @@ -374,22 +377,24 @@ var _ = Describe("StatefulSet controller Suite", func() { BeforeEach(func() { // Make sure the cache has synced before we run the test Eventually(statefulset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) + clearReconciled() m.Delete(statefulset).Should(Succeed()) waitForStatefulSetReconciled(statefulset) - - // Get the updated StatefulSet - m.Get(statefulset, timeout).Should(Succeed()) - Eventually(statefulset, timeout).ShouldNot(utils.WithDeletionTimestamp(BeNil())) }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Not longer exists", func() { + m.Get(statefulset).Should(MatchError(MatchRegexp(`not found`))) }) - It("Removes the StatefulSet's finalizer", func() { - // Removing the finalizer causes the statefulset to be deleted - m.Get(statefulset, timeout).ShouldNot(Succeed()) + It("Changes to children no longer trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() + + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyStatefulSetNotReconciled(statefulset) }) }) }) @@ -400,18 +405,20 @@ var _ = Describe("StatefulSet controller Suite", func() { m.Get(statefulset, timeout).Should(Succeed()) }) - It("Doesn't add any OwnerReferences to any children", func() { - for _, obj := range []core.Object{cm1, cm2, s1, s2} { - m.Consistently(obj, consistentlyTimeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Doesn't add a config hash to the Pod Template", func() { + m.Consistently(statefulset, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) }) - It("Doesn't add a finalizer to the StatefulSet", func() { - m.Consistently(statefulset, consistentlyTimeout).ShouldNot(utils.WithFinalizers(ContainElement(core.FinalizerString))) - }) + It("Changes to children no do not trigger a reconcile", func() { + modifyCM := func(obj client.Object) client.Object { + cm, _ := obj.(*corev1.ConfigMap) + cm.Data["key1"] = "modified" + return cm + } + clearReconciled() - It("Doesn't add a config hash to the Pod Template", func() { - m.Consistently(statefulset, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(core.ConfigHashAnnotation))) + m.Update(cm1, modifyCM).Should(Succeed()) + consistentlyStatefulSetNotReconciled(statefulset) }) }) }) diff --git a/pkg/core/delete.go b/pkg/core/delete.go index a0b4d4a9..f40252bc 100644 --- a/pkg/core/delete.go +++ b/pkg/core/delete.go @@ -21,13 +21,12 @@ import ( "fmt" "reflect" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -// handleDelete removes all existing Owner References pointing to the object +// deleteOwnerReferencesAndFinalizer removes all existing Owner References pointing to the object // before removing the object's Finalizer -func (h *Handler) handleDelete(obj podController) (reconcile.Result, error) { +func (h *Handler) deleteOwnerReferencesAndFinalizer(obj podController) (reconcile.Result, error) { // Fetch all children with an OwnerReference pointing to the object existing, err := h.getExistingChildren(obj) if err != nil { @@ -51,9 +50,3 @@ func (h *Handler) handleDelete(obj podController) (reconcile.Result, error) { } return reconcile.Result{}, nil } - -// toBeDeleted checks whether the object has been marked for deletion -func toBeDeleted(obj metav1.Object) bool { - // IsZero means that the object hasn't been marked for deletion - return !obj.GetDeletionTimestamp().IsZero() -} diff --git a/pkg/core/delete_test.go b/pkg/core/delete_test.go index e678465f..fee1b95b 100644 --- a/pkg/core/delete_test.go +++ b/pkg/core/delete_test.go @@ -17,8 +17,6 @@ limitations under the License. package core import ( - "context" - "fmt" "sync" "time" @@ -29,15 +27,13 @@ import ( "github.com/wave-k8s/wave/test/utils" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" ) -var _ = Describe("Wave owner references Suite", func() { +var _ = Describe("Wave migration Suite", func() { var c client.Client var h *Handler var m utils.Matcher @@ -91,7 +87,7 @@ var _ = Describe("Wave owner references Suite", func() { ) }) - Context("handleDelete", func() { + Context("When a legacy deployment is reconciled", func() { var cm1 *corev1.ConfigMap var cm2 *corev1.ConfigMap var s1 *corev1.Secret @@ -112,47 +108,15 @@ var _ = Describe("Wave owner references Suite", func() { f := deploymentObject.GetFinalizers() f = append(f, FinalizerString) - f = append(f, "keep.me.around/finalizer") m.Update(deploymentObject, func(obj client.Object) client.Object { obj.SetFinalizers(f) return obj }, timeout).Should(Succeed()) - _, err := h.handleDelete(podControllerDeployment) + _, err := h.handlePodController(podControllerDeployment) Expect(err).NotTo(HaveOccurred()) }) - AfterEach(func() { - // Make sure to delete any finalizers (if the deployment exists) - Eventually(func() error { - key := types.NamespacedName{Namespace: deploymentObject.GetNamespace(), Name: deploymentObject.GetName()} - err := c.Get(context.TODO(), key, deploymentObject) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - deploymentObject.SetFinalizers([]string{}) - return c.Update(context.TODO(), deploymentObject) - }, timeout).Should(Succeed()) - - Eventually(func() error { - key := types.NamespacedName{Namespace: deploymentObject.GetNamespace(), Name: deploymentObject.GetName()} - err := c.Get(context.TODO(), key, deploymentObject) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - if len(deploymentObject.GetFinalizers()) > 0 { - return fmt.Errorf("Finalizers not upated") - } - return nil - }, timeout).Should(Succeed()) - }) - It("removes owner references from all children", func() { for _, obj := range []Object{cm1, cm2, s1, s2} { m.Get(obj, timeout).Should(Succeed()) @@ -166,18 +130,4 @@ var _ = Describe("Wave owner references Suite", func() { }) }) - // Waiting for toBeDeleted to be implemented - Context("toBeDeleted", func() { - It("returns true if deletion timestamp is non-nil", func() { - t := metav1.NewTime(time.Now()) - deploymentObject.SetDeletionTimestamp(&t) - Expect(toBeDeleted(deploymentObject)).To(BeTrue()) - }) - - It("returns false if the deleteion timestamp is nil", func() { - Expect(toBeDeleted(deploymentObject)).To(BeFalse()) - }) - - }) - }) diff --git a/pkg/core/finalizer.go b/pkg/core/finalizer.go index 56b6ee77..9b5f3b68 100644 --- a/pkg/core/finalizer.go +++ b/pkg/core/finalizer.go @@ -16,21 +16,6 @@ limitations under the License. package core -// addFinalizer adds the wave finalizer to the given PodController -func addFinalizer(obj podController) { - finalizers := obj.GetFinalizers() - for _, finalizer := range finalizers { - if finalizer == FinalizerString { - // podController already contains the finalizer - return - } - } - - //podController doesn't contain the finalizer, so add it - finalizers = append(finalizers, FinalizerString) - obj.SetFinalizers(finalizers) -} - // removeFinalizer removes the wave finalizer from the given podController func removeFinalizer(obj podController) { finalizers := obj.GetFinalizers() diff --git a/pkg/core/finalizer_test.go b/pkg/core/finalizer_test.go index a8859f20..3d09d653 100644 --- a/pkg/core/finalizer_test.go +++ b/pkg/core/finalizer_test.go @@ -32,23 +32,6 @@ var _ = Describe("Wave finalizer Suite", func() { podControllerDeployment = &deployment{deploymentObject} }) - Context("addFinalizer", func() { - It("adds the wave finalizer to the deployment", func() { - addFinalizer(podControllerDeployment) - - Expect(deploymentObject.GetFinalizers()).To(ContainElement(FinalizerString)) - }) - - It("leaves existing finalizers in place", func() { - f := deploymentObject.GetFinalizers() - f = append(f, "kubernetes") - deploymentObject.SetFinalizers(f) - addFinalizer(podControllerDeployment) - - Expect(deploymentObject.GetFinalizers()).To(ContainElement("kubernetes")) - }) - }) - Context("removeFinalizer", func() { It("removes the wave finalizer from the deployment", func() { f := deploymentObject.GetFinalizers() diff --git a/pkg/core/handler.go b/pkg/core/handler.go index eb6045d9..588eddf6 100644 --- a/pkg/core/handler.go +++ b/pkg/core/handler.go @@ -23,6 +23,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -32,12 +33,14 @@ import ( // Handler performs the main business logic of the Wave controller type Handler struct { client.Client - recorder record.EventRecorder + recorder record.EventRecorder + watchedConfigmaps map[types.NamespacedName]map[types.NamespacedName]bool + watchedSecrets map[types.NamespacedName]map[types.NamespacedName]bool } // NewHandler constructs a new instance of Handler func NewHandler(c client.Client, r record.EventRecorder) *Handler { - return &Handler{Client: c, recorder: r} + return &Handler{Client: c, recorder: r, watchedConfigmaps: make(map[types.NamespacedName]map[types.NamespacedName]bool), watchedSecrets: make(map[types.NamespacedName]map[types.NamespacedName]bool)} } // HandleDeployment is called by the deployment controller to reconcile deployments @@ -57,41 +60,28 @@ func (h *Handler) HandleDaemonSet(instance *appsv1.DaemonSet) (reconcile.Result, // handlePodController reconciles the state of a podController func (h *Handler) handlePodController(instance podController) (reconcile.Result, error) { - log := logf.Log.WithName("wave") + log := logf.Log.WithName("wave").WithValues("namespace", instance.GetNamespace(), "name", instance.GetName()) + + // To cleanup legacy ownerReferences and finalizer + if hasFinalizer(instance) { + log.V(0).Info("Removing old finalizer") + return h.deleteOwnerReferencesAndFinalizer(instance) + } // If the required annotation isn't present, ignore the instance if !hasRequiredAnnotation(instance) { - // Perform deletion logic if the finalizer is present on the object - if hasFinalizer(instance) { - log.V(0).Info("Required annotation removed from instance, cleaning up orphans", "namespace", instance.GetNamespace(), "name", instance.GetName()) - return h.handleDelete(instance) - } + h.removeWatchesForInstance(instance) return reconcile.Result{}, nil } - // If the instance is marked for deletion, run cleanup process - if toBeDeleted(instance) { - log.V(0).Info("Instance marked for deletion, cleaning up orphans", "namespace", instance.GetNamespace(), "name", instance.GetName()) - return h.handleDelete(instance) - } - - // Get all children that have an OwnerReference pointing to this instance - existing, err := h.getExistingChildren(instance) - if err != nil { - return reconcile.Result{}, fmt.Errorf("error fetching existing children: %v", err) - } - // Get all children that the instance currently references current, err := h.getCurrentChildren(instance) if err != nil { return reconcile.Result{}, fmt.Errorf("error fetching current children: %v", err) } - // Reconcile the OwnerReferences on the existing and current children - err = h.updateOwnerReferences(instance, existing, current) - if err != nil { - return reconcile.Result{}, fmt.Errorf("error updating OwnerReferences: %v", err) - } + h.removeWatchesForInstance(instance) + h.watchChildrenForInstance(instance, current) hash, err := calculateConfigHash(current) if err != nil { @@ -101,11 +91,10 @@ func (h *Handler) handlePodController(instance podController) (reconcile.Result, // Update the desired state of the Deployment in a DeepCopy copy := instance.DeepCopyPodController() setConfigHash(copy, hash) - addFinalizer(copy) // If the desired state doesn't match the existing state, update it if !reflect.DeepEqual(instance, copy) { - log.V(0).Info("Updating instance hash", "namespace", instance.GetNamespace(), "name", instance.GetName(), "hash", hash) + log.V(0).Info("Updating instance hash", "hash", hash) h.recorder.Eventf(copy.GetApiObject(), corev1.EventTypeNormal, "ConfigChanged", "Configuration hash updated to %s", hash) err := h.Update(context.TODO(), copy.GetApiObject()) @@ -113,6 +102,5 @@ func (h *Handler) handlePodController(instance podController) (reconcile.Result, return reconcile.Result{}, fmt.Errorf("error updating instance %s/%s: %v", instance.GetNamespace(), instance.GetName(), err) } } - return reconcile.Result{}, nil } diff --git a/pkg/core/handler_test.go b/pkg/core/handler_test.go index dfcc2a1a..8c5291b1 100644 --- a/pkg/core/handler_test.go +++ b/pkg/core/handler_test.go @@ -18,7 +18,6 @@ package core import ( "context" - "fmt" "sync" "time" @@ -29,8 +28,6 @@ import ( "github.com/wave-k8s/wave/test/utils" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,7 +46,6 @@ var _ = Describe("Wave controller Suite", func() { const timeout = time.Second * 5 const consistentlyTimeout = time.Second - var ownerRef metav1.OwnerReference var cm1 *corev1.ConfigMap var cm2 *corev1.ConfigMap var cm3 *corev1.ConfigMap @@ -62,6 +58,10 @@ var _ = Describe("Wave controller Suite", func() { var s4 *corev1.Secret var s5 *corev1.Secret var s6 *corev1.Secret + var example1Name types.NamespacedName + var example2Name types.NamespacedName + var example3Name types.NamespacedName + var instanceName types.NamespacedName var modified = "modified" @@ -122,45 +122,32 @@ var _ = Describe("Wave controller Suite", func() { deployment = utils.ExampleDeployment.DeepCopy() + example1Name = types.NamespacedName{ + Name: "example1", + Namespace: deployment.GetNamespace(), + } + example2Name = types.NamespacedName{ + Name: "example2", + Namespace: deployment.GetNamespace(), + } + example3Name = types.NamespacedName{ + Name: "example3", + Namespace: deployment.GetNamespace(), + } + instanceName = types.NamespacedName{ + Name: deployment.GetName(), + Namespace: deployment.GetNamespace(), + } + // Create a deployment and wait for it to be reconciled m.Create(deployment).Should(Succeed()) _, err = h.HandleDeployment(deployment) Expect(err).NotTo(HaveOccurred()) m.Get(deployment).Should(Succeed()) - ownerRef = utils.GetOwnerRefDeployment(deployment) }) AfterEach(func() { - // Make sure to delete any finalizers (if the deployment exists) - Eventually(func() error { - key := types.NamespacedName{Namespace: deployment.GetNamespace(), Name: deployment.GetName()} - err := c.Get(context.TODO(), key, deployment) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - deployment.SetFinalizers([]string{}) - return c.Update(context.TODO(), deployment) - }, timeout).Should(Succeed()) - - Eventually(func() error { - key := types.NamespacedName{Namespace: deployment.GetNamespace(), Name: deployment.GetName()} - err := c.Get(context.TODO(), key, deployment) - if err != nil && errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - if len(deployment.GetFinalizers()) > 0 { - return fmt.Errorf("Finalizers not upated") - } - return nil - }, timeout).Should(Succeed()) - close(stopMgr) mgrStopped.Wait() @@ -197,21 +184,19 @@ var _ = Describe("Wave controller Suite", func() { m.Get(deployment, timeout).Should(Succeed()) }) - It("Adds OwnerReferences to all children", func() { - for _, obj := range []Object{cm1, cm2, cm3, s1, s2, s3} { - m.Get(obj, timeout).Should(Succeed()) - Eventually(obj, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - } - }) - - It("Adds a finalizer to the Deployment", func() { - Eventually(deployment, timeout).Should(utils.WithFinalizers(ContainElement(FinalizerString))) - }) - It("Adds a config hash to the Pod Template", func() { Eventually(deployment, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(ConfigHashAnnotation))) }) + It("Is watched by the handler", func() { + Expect(h.GetWatchedConfigmaps()[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps()[example2Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps()[example3Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets()[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets()[example2Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets()[example3Name]).To(HaveKey(instanceName)) + }) + It("Sends an event when updating the hash", func() { Eventually(deployment, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(ConfigHashAnnotation))) @@ -248,16 +233,19 @@ var _ = Describe("Wave controller Suite", func() { m.Get(deployment, timeout).Should(Succeed()) }) - It("Removes the OwnerReference from the orphaned ConfigMap", func() { - Eventually(cm2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) + It("Updates the config hash in the Pod Template", func() { + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) - It("Removes the OwnerReference from the orphaned Secret", func() { - Eventually(s2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - }) - - It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithAnnotations(HaveKeyWithValue(ConfigHashAnnotation, originalHash))) + It("Is is not longer watched by the handler", func() { + Expect(h.GetWatchedConfigmaps()[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps()[example2Name]).NotTo(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps()[example3Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets()[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets()[example2Name]).NotTo(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets()[example3Name]).To(HaveKey(instanceName)) }) }) @@ -285,7 +273,9 @@ var _ = Describe("Wave controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -305,7 +295,9 @@ var _ = Describe("Wave controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -326,7 +318,9 @@ var _ = Describe("Wave controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -371,7 +365,9 @@ var _ = Describe("Wave controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -395,7 +391,9 @@ var _ = Describe("Wave controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -419,7 +417,9 @@ var _ = Describe("Wave controller Suite", func() { }) It("Updates the config hash in the Pod Template", func() { - Eventually(deployment, timeout).ShouldNot(utils.WithPodTemplateAnnotations(HaveKeyWithValue(ConfigHashAnnotation, originalHash))) + Eventually(func() string { + return deployment.Spec.Template.GetAnnotations()[ConfigHashAnnotation] + }, timeout).ShouldNot(Equal(originalHash)) }) }) @@ -463,16 +463,9 @@ var _ = Describe("Wave controller Suite", func() { Eventually(deployment, timeout).ShouldNot(utils.WithAnnotations(HaveKey(RequiredAnnotation))) }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []Object{cm1, cm2, s1, s2} { - m.Get(obj, timeout).Should(Succeed()) - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } - }) - - It("Removes the Deployment's finalizer", func() { - m.Get(deployment, timeout).Should(Succeed()) - Eventually(deployment, timeout).ShouldNot(utils.WithFinalizers(ContainElement(FinalizerString))) + It("No longer is watched by the handler", func() { + Expect(len(h.GetWatchedConfigmaps())).To(Equal(0)) + Expect(len(h.GetWatchedSecrets())).To(Equal(0)) }) }) @@ -482,22 +475,19 @@ var _ = Describe("Wave controller Suite", func() { Eventually(deployment, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(ConfigHashAnnotation))) m.Delete(deployment).Should(Succeed()) - m.Get(deployment).Should(Succeed()) - Eventually(deployment, timeout).ShouldNot(utils.WithDeletionTimestamp(BeNil())) - _, err := h.HandleDeployment(deployment) - Expect(err).NotTo(HaveOccurred()) + // This test is suboptimal this the controller is not part of it here + // Controller will call RemoveWatches if Delete cannot be found during reconcile + h.RemoveWatches(instanceName) + }) - It("Removes the OwnerReference from the all children", func() { - for _, obj := range []Object{cm1, cm2, s1, s2} { - m.Get(obj, timeout).Should(Succeed()) - Eventually(obj, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } + It("Not longer exists", func() { + m.Get(deployment).Should(MatchError(MatchRegexp(`not found`))) }) - It("Removes the Deployment's finalizer", func() { - // Removing the finalizer causes the deployment to be deleted - m.Get(deployment, timeout).ShouldNot(Succeed()) + It("No longer is watched by the handler", func() { + Expect(len(h.GetWatchedConfigmaps())).To(Equal(0)) + Expect(len(h.GetWatchedSecrets())).To(Equal(0)) }) }) }) @@ -508,19 +498,15 @@ var _ = Describe("Wave controller Suite", func() { m.Get(deployment, timeout).Should(Succeed()) }) - It("Doesn't add any OwnerReferences to any children", func() { - for _, obj := range []Object{cm1, cm2, s1, s2} { - m.Consistently(obj, consistentlyTimeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - } - }) - - It("Doesn't add a finalizer to the Deployment", func() { - m.Consistently(deployment, consistentlyTimeout).ShouldNot(utils.WithFinalizers(ContainElement(FinalizerString))) + It("Is not watched by the handler", func() { + Expect(len(h.GetWatchedConfigmaps())).To(Equal(0)) + Expect(len(h.GetWatchedSecrets())).To(Equal(0)) }) It("Doesn't add a config hash to the Pod Template", func() { m.Consistently(deployment, consistentlyTimeout).ShouldNot(utils.WithAnnotations(ContainElement(ConfigHashAnnotation))) }) + }) }) diff --git a/pkg/core/owner_references.go b/pkg/core/owner_references.go index c21d4f8e..40ae9139 100644 --- a/pkg/core/owner_references.go +++ b/pkg/core/owner_references.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "reflect" - "strings" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,98 +50,6 @@ func (h *Handler) removeOwnerReferences(obj podController, children []Object) er return nil } -// updateOwnerReferences determines which children need to have their -// OwnerReferences added/updated and which need to have their OwnerReferences -// removed and then performs all updates -func (h *Handler) updateOwnerReferences(owner podController, existing []Object, current []configObject) error { - // Add an owner reference to each child object - errChan := make(chan error) - for _, obj := range current { - go func(child Object) { - errChan <- h.updateOwnerReference(owner, child) - }(obj.object) - } - - // Return any errors encountered updating the child objects - errs := []string{} - for range current { - err := <-errChan - if err != nil { - errs = append(errs, err.Error()) - } - } - if len(errs) > 0 { - return fmt.Errorf("error(s) encountered updating children: %s", strings.Join(errs, ", ")) - } - - // Get the orphaned children and remove their OwnerReferences - orphans := getOrphans(existing, current) - err := h.removeOwnerReferences(owner, orphans) - if err != nil { - return fmt.Errorf("error removing Owner References: %v", err) - } - - return nil -} - -// updateOwnerReference ensures that the child object has an OwnerReference -// pointing to the owner -func (h *Handler) updateOwnerReference(owner podController, child Object) error { - ownerRef := getOwnerReference(owner) - for _, ref := range child.GetOwnerReferences() { - // Owner Reference already exists, do nothing - if reflect.DeepEqual(ref, ownerRef) { - return nil - } - } - - // Append the new OwnerReference and update the child - h.recorder.Eventf(child, corev1.EventTypeNormal, "AddWatch", "Adding watch for %s %s", kindOf(child), child.GetName()) - ownerRefs := append(child.GetOwnerReferences(), ownerRef) - child.SetOwnerReferences(ownerRefs) - err := h.Update(context.TODO(), child) - if err != nil { - return fmt.Errorf("error updating child: %v", err) - } - return nil -} - -// getOrphans creates a slice of orphaned child objects that need their -// OwnerReferences removing -func getOrphans(existing []Object, current []configObject) []Object { - orphans := []Object{} - for _, child := range existing { - if !isIn(current, child) { - orphans = append(orphans, child) - } - } - return orphans -} - -// getOwnerReference constructs an OwnerReference pointing to the object given -func getOwnerReference(obj podController) metav1.OwnerReference { - t := true - f := false - return metav1.OwnerReference{ - APIVersion: "apps/v1", - Kind: kindOf(obj), - Name: obj.GetName(), - UID: obj.GetUID(), - BlockOwnerDeletion: &t, - Controller: &f, - } -} - -// isIn checks whether a child object exists within a slice of objects -func isIn(list []configObject, child Object) bool { - for _, obj := range list { - if obj.object.GetUID() == child.GetUID() { - return true - } - } - return false -} - // kindOf returns the Kind of the given object as a string func kindOf(obj Object) string { switch obj.(type) { diff --git a/pkg/core/owner_references_test.go b/pkg/core/owner_references_test.go index af2c6c62..4055ed42 100644 --- a/pkg/core/owner_references_test.go +++ b/pkg/core/owner_references_test.go @@ -162,225 +162,4 @@ var _ = Describe("Wave owner references Suite", func() { }) }) - Context("updateOwnerReferences", func() { - BeforeEach(func() { - for _, obj := range []Object{cm2, s1, s2} { - m.Update(obj, func(obj client.Object) client.Object { - obj.SetOwnerReferences([]metav1.OwnerReference{ownerRef}) - return obj - }, timeout).Should(Succeed()) - Eventually(obj, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - } - - existing := []Object{cm2, cm3, s1, s2} - current := []configObject{ - {object: cm1, allKeys: true}, - {object: s1, allKeys: true}, - {object: s3, allKeys: false, keys: map[string]struct{}{ - "key1": {}, - "key2": {}, - }, - }, - } - err := h.updateOwnerReferences(podControllerDeployment, existing, current) - Expect(err).NotTo(HaveOccurred()) - }) - - It("removes owner references from those not in current", func() { - Eventually(cm2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - Eventually(cm3, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - Eventually(s2, timeout).ShouldNot(utils.WithOwnerReferences(ContainElement(ownerRef))) - }) - - It("adds owner references to those in current", func() { - Eventually(cm1, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - Eventually(s1, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - Eventually(s3, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - }) - }) - - Context("updateOwnerReference", func() { - BeforeEach(func() { - // Add an OwnerReference to cm2 - m.Update(cm2, func(obj client.Object) client.Object { - cm2 := obj.(*corev1.ConfigMap) - cm2.SetOwnerReferences([]metav1.OwnerReference{ownerRef}) - - return cm2 - }, timeout).Should(Succeed()) - Eventually(cm2, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - - m.Get(cm1, timeout).Should(Succeed()) - m.Get(cm2, timeout).Should(Succeed()) - }) - - It("adds an OwnerReference if not present", func() { - // Add an OwnerReference to cm1 - otherRef := ownerRef - otherRef.UID = cm1.GetUID() - m.Update(cm1, func(obj client.Object) client.Object { - cm1 := obj.(*corev1.ConfigMap) - cm1.SetOwnerReferences([]metav1.OwnerReference{otherRef}) - - return cm1 - }, timeout).Should(Succeed()) - Eventually(cm1, timeout).Should(utils.WithOwnerReferences(ContainElement(otherRef))) - - m.Get(cm1, timeout).Should(Succeed()) - Expect(h.updateOwnerReference(podControllerDeployment, cm1)).NotTo(HaveOccurred()) - Eventually(cm1, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - }) - - It("doesn't update the child object if there is already and OwnerReference present", func() { - // Add an OwnerReference to cm2 - m.Update(cm2, func(obj client.Object) client.Object { - cm2 := obj.(*corev1.ConfigMap) - cm2.SetOwnerReferences([]metav1.OwnerReference{ownerRef}) - - return cm2 - }, timeout).Should(Succeed()) - Eventually(cm2, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - - // Get the original version - m.Get(cm2, timeout).Should(Succeed()) - originalVersion := cm2.GetResourceVersion() - Expect(h.updateOwnerReference(podControllerDeployment, cm2)).NotTo(HaveOccurred()) - - // Compare current version - m.Get(cm2, timeout).Should(Succeed()) - Expect(cm2.GetResourceVersion()).To(Equal(originalVersion)) - }) - - It("sends events for adding each owner reference", func() { - m.Get(cm1, timeout).Should(Succeed()) - Expect(h.updateOwnerReference(podControllerDeployment, cm1)).NotTo(HaveOccurred()) - Eventually(cm1, timeout).Should(utils.WithOwnerReferences(ContainElement(ownerRef))) - - cmMessage := "Adding watch for ConfigMap example1" - eventMessage := func(event *corev1.Event) string { - return event.Message - } - - Eventually(func() *corev1.EventList { - events := &corev1.EventList{} - m.Client.List(context.TODO(), events) - return events - }, timeout).Should(utils.WithItems(ContainElement(WithTransform(eventMessage, Equal(cmMessage))))) - }) - }) - - Context("getOrphans", func() { - It("returns an empty list when current and existing match", func() { - current := []configObject{ - {object: cm1, allKeys: true}, - {object: cm2, allKeys: true}, - {object: s1, allKeys: true}, - {object: s2, allKeys: true}, - } - existing := []Object{cm1, cm2, s1, s2} - Expect(getOrphans(existing, current)).To(BeEmpty()) - }) - - It("returns an empty list when existing is a subset of current", func() { - current := []configObject{ - {object: cm1, allKeys: true}, - {object: cm2, allKeys: true}, - {object: s1, allKeys: true}, - {object: s2, allKeys: true}, - } - existing := []Object{cm1, s2} - Expect(getOrphans(existing, current)).To(BeEmpty()) - }) - - It("returns the correct objects when current is a subset of existing", func() { - current := []configObject{ - {object: cm1, allKeys: true}, - {object: s2, allKeys: true}, - } - existing := []Object{cm1, cm2, s1, s2} - orphans := getOrphans(existing, current) - Expect(orphans).To(ContainElement(cm2)) - Expect(orphans).To(ContainElement(s1)) - }) - - Context("when current contains multiple singleField entries", func() { - It("returns an empty list when current and existing match", func() { - current := []configObject{ - {object: cm1, allKeys: true}, - {object: cm2, allKeys: false, keys: map[string]struct{}{ - "key1": {}, - "key2": {}, - }, - }, - {object: s1, allKeys: true}, - {object: s2, allKeys: true}, - } - existing := []Object{cm1, cm2, s1, s2} - Expect(getOrphans(existing, current)).To(BeEmpty()) - }) - - It("returns an empty list when existing is a subset of current", func() { - current := []configObject{ - {object: cm1, allKeys: true}, - {object: cm2, allKeys: false, keys: map[string]struct{}{ - "key1": {}, - "key2": {}, - }, - }, - {object: s1, allKeys: true}, - {object: s2, allKeys: true}, - } - existing := []Object{cm1, s2} - Expect(getOrphans(existing, current)).To(BeEmpty()) - }) - - It("returns the correct objects when current is a subset of existing", func() { - current := []configObject{ - {object: cm1, allKeys: true}, - {object: s2, allKeys: false, keys: map[string]struct{}{ - "key1": {}, - "key2": {}, - }, - }, - } - existing := []Object{cm1, cm2, s1, s2} - orphans := getOrphans(existing, current) - Expect(orphans).To(ContainElement(cm2)) - Expect(orphans).To(ContainElement(s1)) - }) - }) - }) - - Context("getOwnerReference", func() { - var ref metav1.OwnerReference - BeforeEach(func() { - ref = getOwnerReference(podControllerDeployment) - }) - - It("sets the APIVersion", func() { - Expect(ref.APIVersion).To(Equal("apps/v1")) - }) - - It("sets the Kind", func() { - Expect(ref.Kind).To(Equal("Deployment")) - }) - - It("sets the UID", func() { - Expect(ref.UID).To(Equal(deploymentObject.UID)) - }) - - It("sets the Name", func() { - Expect(ref.Name).To(Equal(deploymentObject.Name)) - }) - - It("sets Controller to false", func() { - Expect(ref.Controller).NotTo(BeNil()) - Expect(*ref.Controller).To(BeFalse()) - }) - - It("sets BlockOwnerDeletion to true", func() { - Expect(ref.BlockOwnerDeletion).NotTo(BeNil()) - Expect(*ref.BlockOwnerDeletion).To(BeTrue()) - }) - }) }) diff --git a/pkg/core/watcher.go b/pkg/core/watcher.go new file mode 100644 index 00000000..7c4aaedc --- /dev/null +++ b/pkg/core/watcher.go @@ -0,0 +1,122 @@ +package core + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var _ handler.EventHandler = &enqueueRequestForWatcher{} + +type enqueueRequestForWatcher struct { + // watcherList + watcherList map[types.NamespacedName]map[types.NamespacedName]bool +} + +func EnqueueRequestForWatcher(watcherList map[types.NamespacedName]map[types.NamespacedName]bool) handler.EventHandler { + e := &enqueueRequestForWatcher{ + watcherList: watcherList, + } + return e +} + +// Create implements EventHandler. +func (e *enqueueRequestForWatcher) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { + e.queueOwnerReconcileRequest(evt.Object, q) +} + +// Update implements EventHandler. +func (e *enqueueRequestForWatcher) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + e.queueOwnerReconcileRequest(evt.ObjectOld, q) + e.queueOwnerReconcileRequest(evt.ObjectNew, q) +} + +// Delete implements EventHandler. +func (e *enqueueRequestForWatcher) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + e.queueOwnerReconcileRequest(evt.Object, q) +} + +// Generic implements EventHandler. +func (e *enqueueRequestForWatcher) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { + e.queueOwnerReconcileRequest(evt.Object, q) +} + +// queueOwnerReconcileRequest looks the object up in our watchList and queues reconcile.Request to reconcile +// all owners of object +func (e *enqueueRequestForWatcher) queueOwnerReconcileRequest(object metav1.Object, q workqueue.RateLimitingInterface) { + name := types.NamespacedName{ + Name: object.GetName(), + Namespace: object.GetNamespace(), + } + if watchers, ok := e.watcherList[name]; ok { + for watcher := range watchers { + request := reconcile.Request{NamespacedName: watcher} + q.Add(request) + } + } +} + +func (h *Handler) GetWatchedConfigmaps() map[types.NamespacedName]map[types.NamespacedName]bool { + return h.watchedConfigmaps +} + +func (h *Handler) GetWatchedSecrets() map[types.NamespacedName]map[types.NamespacedName]bool { + return h.watchedSecrets +} + +func (h *Handler) watchChildrenForInstance(instance podController, children []configObject) { + instanceName := types.NamespacedName{ + Name: instance.GetName(), + Namespace: instance.GetNamespace(), + } + for _, child := range children { + childName := types.NamespacedName{ + Name: child.object.GetName(), + Namespace: child.object.GetNamespace(), + } + + switch child.object.(type) { + case *corev1.ConfigMap: + if _, ok := h.watchedConfigmaps[childName]; !ok { + h.watchedConfigmaps[childName] = map[types.NamespacedName]bool{} + } + h.watchedConfigmaps[childName][instanceName] = true + case *corev1.Secret: + if _, ok := h.watchedSecrets[childName]; !ok { + h.watchedSecrets[childName] = map[types.NamespacedName]bool{} + } + h.watchedSecrets[childName][instanceName] = true + default: + panic(child.object) + } + } +} + +func (h *Handler) removeWatchesForInstance(instance podController) { + instanceName := types.NamespacedName{ + Name: instance.GetName(), + Namespace: instance.GetNamespace(), + } + h.RemoveWatches(instanceName) +} + +func (h *Handler) RemoveWatches(instanceName types.NamespacedName) { + for child, watchers := range h.watchedConfigmaps { + delete(watchers, instanceName) + if len(watchers) == 0 { + delete(h.watchedConfigmaps, child) + } + } + for child, watchers := range h.watchedSecrets { + delete(watchers, instanceName) + if len(watchers) == 0 { + delete(h.watchedSecrets, child) + } + } +}