From 761ba6667f2b9df3adc2be7782a32592a5a2a75a Mon Sep 17 00:00:00 2001 From: Jan Kantert Date: Thu, 2 May 2024 21:30:26 +0200 Subject: [PATCH] add lock to watcher hash map to prevent concurrent access panics --- pkg/core/handler.go | 15 +++++++++--- pkg/core/handler_test.go | 36 ++++++++++++++--------------- pkg/core/watcher.go | 50 +++++++++++++++++++++++++--------------- 3 files changed, 62 insertions(+), 39 deletions(-) diff --git a/pkg/core/handler.go b/pkg/core/handler.go index 67be9198..20e13c4e 100644 --- a/pkg/core/handler.go +++ b/pkg/core/handler.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "sync" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -34,13 +35,21 @@ import ( type Handler struct { client.Client recorder record.EventRecorder - watchedConfigmaps map[types.NamespacedName]map[types.NamespacedName]bool - watchedSecrets map[types.NamespacedName]map[types.NamespacedName]bool + watchedConfigmaps WatcherList + watchedSecrets WatcherList } // NewHandler constructs a new instance of Handler func NewHandler(c client.Client, r record.EventRecorder) *Handler { - return &Handler{Client: c, recorder: r, watchedConfigmaps: make(map[types.NamespacedName]map[types.NamespacedName]bool), watchedSecrets: make(map[types.NamespacedName]map[types.NamespacedName]bool)} + return &Handler{Client: c, recorder: r, + watchedConfigmaps: WatcherList{ + watchers: make(map[types.NamespacedName]map[types.NamespacedName]bool), + watchersMutex: &sync.RWMutex{}, + }, + watchedSecrets: WatcherList{ + watchers: make(map[types.NamespacedName]map[types.NamespacedName]bool), + watchersMutex: &sync.RWMutex{}, + }} } // HandleDeployment is called by the deployment controller to reconcile deployments diff --git a/pkg/core/handler_test.go b/pkg/core/handler_test.go index 8c5291b1..89c18a5f 100644 --- a/pkg/core/handler_test.go +++ b/pkg/core/handler_test.go @@ -189,12 +189,12 @@ var _ = Describe("Wave controller Suite", func() { }) It("Is watched by the handler", func() { - Expect(h.GetWatchedConfigmaps()[example1Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedConfigmaps()[example2Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedConfigmaps()[example3Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedSecrets()[example1Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedSecrets()[example2Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedSecrets()[example3Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps().watchers[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps().watchers[example2Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps().watchers[example3Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets().watchers[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets().watchers[example2Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets().watchers[example3Name]).To(HaveKey(instanceName)) }) It("Sends an event when updating the hash", func() { @@ -240,12 +240,12 @@ var _ = Describe("Wave controller Suite", func() { }) It("Is is not longer watched by the handler", func() { - Expect(h.GetWatchedConfigmaps()[example1Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedConfigmaps()[example2Name]).NotTo(HaveKey(instanceName)) - Expect(h.GetWatchedConfigmaps()[example3Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedSecrets()[example1Name]).To(HaveKey(instanceName)) - Expect(h.GetWatchedSecrets()[example2Name]).NotTo(HaveKey(instanceName)) - Expect(h.GetWatchedSecrets()[example3Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps().watchers[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps().watchers[example2Name]).NotTo(HaveKey(instanceName)) + Expect(h.GetWatchedConfigmaps().watchers[example3Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets().watchers[example1Name]).To(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets().watchers[example2Name]).NotTo(HaveKey(instanceName)) + Expect(h.GetWatchedSecrets().watchers[example3Name]).To(HaveKey(instanceName)) }) }) @@ -464,8 +464,8 @@ var _ = Describe("Wave controller Suite", func() { }) It("No longer is watched by the handler", func() { - Expect(len(h.GetWatchedConfigmaps())).To(Equal(0)) - Expect(len(h.GetWatchedSecrets())).To(Equal(0)) + Expect(len(h.GetWatchedConfigmaps().watchers)).To(Equal(0)) + Expect(len(h.GetWatchedSecrets().watchers)).To(Equal(0)) }) }) @@ -486,8 +486,8 @@ var _ = Describe("Wave controller Suite", func() { }) It("No longer is watched by the handler", func() { - Expect(len(h.GetWatchedConfigmaps())).To(Equal(0)) - Expect(len(h.GetWatchedSecrets())).To(Equal(0)) + Expect(len(h.GetWatchedConfigmaps().watchers)).To(Equal(0)) + Expect(len(h.GetWatchedSecrets().watchers)).To(Equal(0)) }) }) }) @@ -499,8 +499,8 @@ var _ = Describe("Wave controller Suite", func() { }) It("Is not watched by the handler", func() { - Expect(len(h.GetWatchedConfigmaps())).To(Equal(0)) - Expect(len(h.GetWatchedSecrets())).To(Equal(0)) + Expect(len(h.GetWatchedConfigmaps().watchers)).To(Equal(0)) + Expect(len(h.GetWatchedSecrets().watchers)).To(Equal(0)) }) It("Doesn't add a config hash to the Pod Template", func() { diff --git a/pkg/core/watcher.go b/pkg/core/watcher.go index 1b167d5e..c435ee2a 100644 --- a/pkg/core/watcher.go +++ b/pkg/core/watcher.go @@ -2,6 +2,7 @@ package core import ( "context" + "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -13,14 +14,18 @@ import ( var _ handler.EventHandler = &enqueueRequestForWatcher{} +type WatcherList struct { + watchers map[types.NamespacedName]map[types.NamespacedName]bool + watchersMutex *sync.RWMutex +} + type enqueueRequestForWatcher struct { - // watcherList - watcherList map[types.NamespacedName]map[types.NamespacedName]bool + WatcherList } -func EnqueueRequestForWatcher(watcherList map[types.NamespacedName]map[types.NamespacedName]bool) handler.EventHandler { +func EnqueueRequestForWatcher(watcherList WatcherList) handler.EventHandler { e := &enqueueRequestForWatcher{ - watcherList: watcherList, + WatcherList: watcherList, } return e } @@ -50,37 +55,42 @@ func (e *enqueueRequestForWatcher) Generic(ctx context.Context, evt event.Generi // all owners of object func (e *enqueueRequestForWatcher) queueOwnerReconcileRequest(object metav1.Object, q workqueue.RateLimitingInterface) { name := GetNamespacedNameFromObject(object) - if watchers, ok := e.watcherList[name]; ok { + e.watchersMutex.Lock() + if watchers, ok := e.watchers[name]; ok { for watcher := range watchers { request := reconcile.Request{NamespacedName: watcher} q.Add(request) } } + e.watchersMutex.Unlock() } -func (h *Handler) GetWatchedConfigmaps() map[types.NamespacedName]map[types.NamespacedName]bool { +func (h *Handler) GetWatchedConfigmaps() WatcherList { return h.watchedConfigmaps } -func (h *Handler) GetWatchedSecrets() map[types.NamespacedName]map[types.NamespacedName]bool { +func (h *Handler) GetWatchedSecrets() WatcherList { return h.watchedSecrets } func (h *Handler) watchChildrenForInstance(instance podController, configMaps configMetadataMap, secrets configMetadataMap) { instanceName := GetNamespacedNameFromObject(instance) + h.watchedConfigmaps.watchersMutex.Lock() for childName := range configMaps { - - if _, ok := h.watchedConfigmaps[childName]; !ok { - h.watchedConfigmaps[childName] = map[types.NamespacedName]bool{} + if _, ok := h.watchedConfigmaps.watchers[childName]; !ok { + h.watchedConfigmaps.watchers[childName] = map[types.NamespacedName]bool{} } - h.watchedConfigmaps[childName][instanceName] = true + h.watchedConfigmaps.watchers[childName][instanceName] = true } + h.watchedConfigmaps.watchersMutex.Unlock() + h.watchedSecrets.watchersMutex.Lock() for childName := range secrets { - if _, ok := h.watchedSecrets[childName]; !ok { - h.watchedSecrets[childName] = map[types.NamespacedName]bool{} + if _, ok := h.watchedSecrets.watchers[childName]; !ok { + h.watchedSecrets.watchers[childName] = map[types.NamespacedName]bool{} } - h.watchedSecrets[childName][instanceName] = true + h.watchedSecrets.watchers[childName][instanceName] = true } + h.watchedSecrets.watchersMutex.Unlock() } func (h *Handler) removeWatchesForInstance(instance podController) { @@ -88,16 +98,20 @@ func (h *Handler) removeWatchesForInstance(instance podController) { } func (h *Handler) RemoveWatches(instanceName types.NamespacedName) { - for child, watchers := range h.watchedConfigmaps { + h.watchedConfigmaps.watchersMutex.Lock() + for child, watchers := range h.watchedConfigmaps.watchers { delete(watchers, instanceName) if len(watchers) == 0 { - delete(h.watchedConfigmaps, child) + delete(h.watchedConfigmaps.watchers, child) } } - for child, watchers := range h.watchedSecrets { + h.watchedConfigmaps.watchersMutex.Unlock() + h.watchedSecrets.watchersMutex.Lock() + for child, watchers := range h.watchedSecrets.watchers { delete(watchers, instanceName) if len(watchers) == 0 { - delete(h.watchedSecrets, child) + delete(h.watchedSecrets.watchers, child) } } + h.watchedSecrets.watchersMutex.Unlock() }