From fe6bf84945dfa6ae8008f5a69bec2c577dc6a6eb Mon Sep 17 00:00:00 2001 From: Mykola Pelekh Date: Thu, 7 Nov 2024 15:36:50 +0200 Subject: [PATCH 1/3] fix: avoid resources lock contention utilizing channel Signed-off-by: Mykola Pelekh --- pkg/cache/cluster.go | 123 ++++++++++++++++++++++++++-- pkg/cache/cluster_test.go | 141 +++++++++++++++----------------- pkg/cache/mocks/ClusterCache.go | 20 +++++ 3 files changed, 200 insertions(+), 84 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 3f662effc..6d839ea8e 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -57,6 +57,8 @@ const ( // Limit is required to avoid memory spikes during cache initialization. // The default limit of 50 is chosen based on experiments. defaultListSemaphoreWeight = 50 + // defaultEventProcessingInterval is the default interval for processing events + defaultEventProcessingInterval = 1 * time.Second ) const ( @@ -75,6 +77,11 @@ type apiMeta struct { watchCancel context.CancelFunc } +type eventMeta struct { + event watch.EventType + un *unstructured.Unstructured +} + // ClusterInfo holds cluster cache stats type ClusterInfo struct { // Server holds cluster API server URL @@ -96,6 +103,9 @@ type ClusterInfo struct { // OnEventHandler is a function that handles Kubernetes event type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured) +// OnProcessEventsHandler handles process events event +type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int) + // OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) @@ -137,6 +147,8 @@ type ClusterCache interface { OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe // OnEvent register event handler that is executed every time when new K8S event received OnEvent(handler OnEventHandler) Unsubscribe + // OnProcessEventsHandler register event handler that is executed every time when events were processed + OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe } type WeightedSemaphore interface { @@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa cache := &clusterCache{ settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}}, apisMeta: make(map[schema.GroupKind]*apiMeta), + eventMetaCh: nil, listPageSize: defaultListPageSize, listPageBufferSize: defaultListPageBufferSize, listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), @@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa }, watchResyncTimeout: defaultWatchResyncTimeout, clusterSyncRetryTimeout: ClusterRetryTimeout, + eventProcessingInterval: defaultEventProcessingInterval, resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, eventHandlers: map[uint64]OnEventHandler{}, + processEventsHandlers: map[uint64]OnProcessEventsHandler{}, log: log, listRetryLimit: 1, listRetryUseBackoff: false, @@ -186,6 +201,7 @@ type clusterCache struct { syncStatus clusterCacheSync apisMeta map[schema.GroupKind]*apiMeta + eventMetaCh chan eventMeta serverVersion string apiResources []kube.APIResourceInfo // namespacedResources is a simple map which indicates a groupKind is namespaced @@ -195,6 +211,8 @@ type clusterCache struct { watchResyncTimeout time.Duration // sync retry timeout for cluster when sync error happens clusterSyncRetryTimeout time.Duration + // ticker interval for events processing + eventProcessingInterval time.Duration // size of a page for list operations pager. listPageSize int64 @@ -224,6 +242,7 @@ type clusterCache struct { populateResourceInfoHandler OnPopulateResourceInfoHandler resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler eventHandlers map[uint64]OnEventHandler + processEventsHandlers map[uint64]OnProcessEventsHandler openAPISchema openapi.Resources gvkParser *managedfields.GvkParser @@ -299,6 +318,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler { return handlers } +// OnProcessEventsHandler register event handler that is executed every time when events were processed +func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + key := c.handlerKey + c.handlerKey++ + c.processEventsHandlers[key] = handler + return func() { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + delete(c.processEventsHandlers, key) + } +} +func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers)) + for _, h := range c.processEventsHandlers { + handlers = append(handlers, h) + } + return handlers +} + // GetServerVersion returns observed cluster version func (c *clusterCache) GetServerVersion() string { return c.serverVersion @@ -440,6 +482,8 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { for i := range opts { opts[i](c) } + + c.invalidateEventMeta() c.apisMeta = nil c.namespacedResources = nil c.log.Info("Invalidated cluster") @@ -669,7 +713,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object) } - c.processEvent(event.Type, obj) + c.recordEvent(event.Type, obj) if kube.IsCRD(obj) { var resources []kube.APIResourceInfo crd := v1.CustomResourceDefinition{} @@ -823,11 +867,13 @@ func (c *clusterCache) sync() error { for i := range c.apisMeta { c.apisMeta[i].watchCancel() } + c.invalidateEventMeta() c.apisMeta = make(map[schema.GroupKind]*apiMeta) c.resources = make(map[kube.ResourceKey]*Resource) c.namespacedResources = make(map[schema.GroupKind]bool) config := c.config version, err := c.kubectl.GetServerVersion(config) + c.eventMetaCh = make(chan eventMeta) if err != nil { return err @@ -864,6 +910,8 @@ func (c *clusterCache) sync() error { return err } + go c.processEvents() + // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} err = kube.RunAllAsync(len(apis), func(i int) error { @@ -926,6 +974,14 @@ func (c *clusterCache) sync() error { return nil } +// invalidateEventMeta closes the eventMeta channel if it is open +func (c *clusterCache) invalidateEventMeta() { + if c.eventMetaCh != nil { + close(c.eventMetaCh) + c.eventMetaCh = nil + } +} + // EnsureSynced checks cache state and synchronizes it if necessary func (c *clusterCache) EnsureSynced() error { syncStatus := &c.syncStatus @@ -1231,7 +1287,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure return managedObjs, nil } -func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unstructured) { +func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) { for _, h := range c.getEventHandlers() { h(event, un) } @@ -1240,16 +1296,65 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst return } + c.eventMetaCh <- eventMeta{event, un} +} + +func (c *clusterCache) processEvents() { + log := c.log.WithValues("functionName", "processItems") + log.V(1).Info("Start processing events") + c.lock.Lock() - defer c.lock.Unlock() - existingNode, exists := c.resources[key] - if event == watch.Deleted { - if exists { - c.onNodeRemoved(key) + ch := c.eventMetaCh + c.lock.Unlock() + + eventMetas := make([]eventMeta, 0) + ticker := time.NewTicker(c.eventProcessingInterval) + defer ticker.Stop() + + for { + select { + case evMeta, ok := <-ch: + if !ok { + log.V(2).Info("Event processing channel closed, finish processing") + return + } + eventMetas = append(eventMetas, evMeta) + case <-ticker.C: + if len(eventMetas) > 0 { + c.processEventsBatch(eventMetas) + eventMetas = eventMetas[:0] + } + } + } +} + +func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) { + log := c.log.WithValues("functionName", "processEventsBatch") + start := time.Now() + c.lock.Lock() + log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds()) + defer func() { + c.lock.Unlock() + duration := time.Since(start) + // Update the metric with the duration of the events processing + for _, handler := range c.getProcessEventsHandlers() { + handler(duration, len(eventMetas)) + } + }() + + for _, em := range eventMetas { + key := kube.GetResourceKey(em.un) + existingNode, exists := c.resources[key] + if em.event == watch.Deleted { + if exists { + c.onNodeRemoved(key) + } + } else { + c.onNodeUpdated(existingNode, c.newResource(em.un)) } - } else if event != watch.Deleted { - c.onNodeUpdated(existingNode, c.newResource(un)) } + + log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds()) } func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 26815c07e..d266d80c4 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -75,7 +75,11 @@ var ( ) func newCluster(t testing.TB, objs ...runtime.Object) *clusterCache { - cache := newClusterWithOptions(t, []UpdateSettingsFunc{}, objs...) + var opts []UpdateSettingsFunc + opts = append(opts, func(c *clusterCache) { + c.eventProcessingInterval = 1 * time.Millisecond + }) + cache := newClusterWithOptions(t, opts, objs...) t.Cleanup(func() { cache.Invalidate() @@ -205,34 +209,15 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, }) + cluster.recordEvent(watch.Added, pvc) - cluster.processEvent(watch.Added, pvc) - - cluster.lock.Lock() - defer cluster.lock.Unlock() - - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - - assert.Len(t, refs, 0) - }) - - t.Run("STSTemplateNameNotMatching", func(t *testing.T) { - cluster := newCluster(t, sts) - err := cluster.EnsureSynced() - require.NoError(t, err) - - pvc := mustToUnstructured(&v1.PersistentVolumeClaim{ - TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, - ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, - }) - cluster.processEvent(watch.Added, pvc) - - cluster.lock.Lock() - defer cluster.lock.Unlock() - - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + require.Eventually(t, func() bool { + cluster.lock.Lock() + defer cluster.lock.Unlock() - assert.Len(t, refs, 0) + refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + return len(refs) == 0 + }, 5*time.Second, 10*time.Millisecond, "Expected PVC to not have owner reference") }) t.Run("MatchingSTSExists", func(t *testing.T) { @@ -244,14 +229,15 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, }) - cluster.processEvent(watch.Added, pvc) + cluster.recordEvent(watch.Added, pvc) - cluster.lock.Lock() - defer cluster.lock.Unlock() + require.Eventually(t, func() bool { + cluster.lock.Lock() + defer cluster.lock.Unlock() - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - - assert.ElementsMatch(t, refs, []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}) + refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + return assert.ElementsMatch(t, refs, []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}) + }, 5*time.Second, 10*time.Millisecond, "Expected PVC to have owner reference") }) } @@ -596,10 +582,12 @@ func TestChildDeletedEvent(t *testing.T) { err := cluster.EnsureSynced() require.NoError(t, err) - cluster.processEvent(watch.Deleted, mustToUnstructured(testPod1())) + cluster.recordEvent(watch.Deleted, mustToUnstructured(testPod1())) - rsChildren := getChildren(cluster, mustToUnstructured(testRS())) - assert.Equal(t, []*Resource{}, rsChildren) + require.Eventually(t, func() bool { + rsChildren := getChildren(cluster, mustToUnstructured(testRS())) + return assert.Equal(t, []*Resource{}, rsChildren) + }, 5*time.Second, 10*time.Millisecond, "Expected no children for ReplicaSet") } func TestProcessNewChildEvent(t *testing.T) { @@ -620,46 +608,49 @@ func TestProcessNewChildEvent(t *testing.T) { uid: "2" resourceVersion: "123"`) - cluster.processEvent(watch.Added, newPod) + cluster.recordEvent(watch.Added, newPod) + + require.Eventually(t, func() bool { + rsChildren := getChildren(cluster, mustToUnstructured(testRS())) + sort.Slice(rsChildren, func(i, j int) bool { + return strings.Compare(rsChildren[i].Ref.Name, rsChildren[j].Ref.Name) < 0 + }) + return assert.Equal(t, []*Resource{{ + Ref: corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "helm-guestbook-pod-1", + APIVersion: "v1", + UID: "1", + }, + OwnerRefs: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + UID: "2", + }}, + ResourceVersion: "123", + CreationTimestamp: &metav1.Time{ + Time: testCreationTime.Local(), + }, + }, { + Ref: corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "helm-guestbook-pod-1-new", + APIVersion: "v1", + UID: "5", + }, + OwnerRefs: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + UID: "2", + }}, + ResourceVersion: "123", + }}, rsChildren) + }, 5*time.Second, 10*time.Millisecond, "Expected new child to be added to ReplicaSet") - rsChildren := getChildren(cluster, mustToUnstructured(testRS())) - sort.Slice(rsChildren, func(i, j int) bool { - return strings.Compare(rsChildren[i].Ref.Name, rsChildren[j].Ref.Name) < 0 - }) - assert.Equal(t, []*Resource{{ - Ref: corev1.ObjectReference{ - Kind: "Pod", - Namespace: "default", - Name: "helm-guestbook-pod-1", - APIVersion: "v1", - UID: "1", - }, - OwnerRefs: []metav1.OwnerReference{{ - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: "helm-guestbook-rs", - UID: "2", - }}, - ResourceVersion: "123", - CreationTimestamp: &metav1.Time{ - Time: testCreationTime.Local(), - }, - }, { - Ref: corev1.ObjectReference{ - Kind: "Pod", - Namespace: "default", - Name: "helm-guestbook-pod-1-new", - APIVersion: "v1", - UID: "5", - }, - OwnerRefs: []metav1.OwnerReference{{ - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: "helm-guestbook-rs", - UID: "2", - }}, - ResourceVersion: "123", - }}, rsChildren) } func TestWatchCacheUpdated(t *testing.T) { diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index c9fbc8f9c..a4afa3119 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -262,6 +262,26 @@ func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe return r0 } +// OnProcessEventsHandler provides a mock function with given fields: handler +func (_m *ClusterCache) OnProcessEventsHandler(handler cache.OnProcessEventsHandler) cache.Unsubscribe { + ret := _m.Called(handler) + + if len(ret) == 0 { + panic("no return value specified for OnProcessEventsHandler") + } + + var r0 cache.Unsubscribe + if rf, ok := ret.Get(0).(func(cache.OnProcessEventsHandler) cache.Unsubscribe); ok { + r0 = rf(handler) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cache.Unsubscribe) + } + } + + return r0 +} + // OnResourceUpdated provides a mock function with given fields: handler func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler) cache.Unsubscribe { ret := _m.Called(handler) From ebe596295ffb4d2674f9e5f5e9a4c5642f8ac265 Mon Sep 17 00:00:00 2001 From: Mykola Pelekh Date: Wed, 27 Nov 2024 16:56:16 +0200 Subject: [PATCH 2/3] feat: process events in batch when the mode is enabled Signed-off-by: Mykola Pelekh --- pkg/cache/cluster.go | 57 +++++++++++++++++++++++++------------- pkg/cache/settings.go | 6 ++++ pkg/cache/settings_test.go | 8 ++++++ 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 6d839ea8e..6b28d7421 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -200,10 +200,11 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa type clusterCache struct { syncStatus clusterCacheSync - apisMeta map[schema.GroupKind]*apiMeta - eventMetaCh chan eventMeta - serverVersion string - apiResources []kube.APIResourceInfo + apisMeta map[schema.GroupKind]*apiMeta + batchEventsProcessing bool + eventMetaCh chan eventMeta + serverVersion string + apiResources []kube.APIResourceInfo // namespacedResources is a simple map which indicates a groupKind is namespaced namespacedResources map[schema.GroupKind]bool @@ -483,7 +484,9 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { opts[i](c) } - c.invalidateEventMeta() + if c.batchEventsProcessing { + c.invalidateEventMeta() + } c.apisMeta = nil c.namespacedResources = nil c.log.Info("Invalidated cluster") @@ -867,13 +870,17 @@ func (c *clusterCache) sync() error { for i := range c.apisMeta { c.apisMeta[i].watchCancel() } - c.invalidateEventMeta() + + if c.batchEventsProcessing { + c.invalidateEventMeta() + c.eventMetaCh = make(chan eventMeta) + } + c.apisMeta = make(map[schema.GroupKind]*apiMeta) c.resources = make(map[kube.ResourceKey]*Resource) c.namespacedResources = make(map[schema.GroupKind]bool) config := c.config version, err := c.kubectl.GetServerVersion(config) - c.eventMetaCh = make(chan eventMeta) if err != nil { return err @@ -910,7 +917,9 @@ func (c *clusterCache) sync() error { return err } - go c.processEvents() + if c.batchEventsProcessing { + go c.processEvents() + } // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} @@ -1296,7 +1305,13 @@ func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstr return } - c.eventMetaCh <- eventMeta{event, un} + if c.batchEventsProcessing { + c.eventMetaCh <- eventMeta{event, un} + } else { + c.lock.Lock() + defer c.lock.Unlock() + c.processEvent(key, eventMeta{event, un}) + } } func (c *clusterCache) processEvents() { @@ -1342,21 +1357,25 @@ func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) { } }() - for _, em := range eventMetas { - key := kube.GetResourceKey(em.un) - existingNode, exists := c.resources[key] - if em.event == watch.Deleted { - if exists { - c.onNodeRemoved(key) - } - } else { - c.onNodeUpdated(existingNode, c.newResource(em.un)) - } + for _, evMeta := range eventMetas { + key := kube.GetResourceKey(evMeta.un) + c.processEvent(key, evMeta) } log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds()) } +func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) { + existingNode, exists := c.resources[key] + if evMeta.event == watch.Deleted { + if exists { + c.onNodeRemoved(key) + } + } else { + c.onNodeUpdated(existingNode, c.newResource(evMeta.un)) + } +} + func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) { c.setNode(newRes) for _, h := range c.getResourceUpdatedHandlers() { diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index a7194d0ca..ee40ce3af 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -170,3 +170,9 @@ func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc { } } } + +func SetBatchEventsProcessing(batchProcessing bool) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.batchEventsProcessing = batchProcessing + } +} diff --git a/pkg/cache/settings_test.go b/pkg/cache/settings_test.go index 28ac963cb..b88b7dbc3 100644 --- a/pkg/cache/settings_test.go +++ b/pkg/cache/settings_test.go @@ -55,3 +55,11 @@ func TestSetWatchResyncTimeout(t *testing.T) { cache = NewClusterCache(&rest.Config{}, SetWatchResyncTimeout(timeout)) assert.Equal(t, timeout, cache.watchResyncTimeout) } + +func TestSetBatchEventsProcessing(t *testing.T) { + cache := NewClusterCache(&rest.Config{}) + assert.False(t, cache.batchEventsProcessing) + + cache.Invalidate(SetBatchEventsProcessing(true)) + assert.True(t, cache.batchEventsProcessing) +} From 6ffb7534de408de18c8072103e8a119ef69ffd30 Mon Sep 17 00:00:00 2001 From: Mykola Pelekh Date: Mon, 2 Dec 2024 13:27:37 +0200 Subject: [PATCH 3/3] test: update unit tests to verify batch events processing flag Signed-off-by: Mykola Pelekh --- pkg/cache/cluster_test.go | 100 +++++++++++++++++++++++++------------- 1 file changed, 66 insertions(+), 34 deletions(-) diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index d266d80c4..2c6474b30 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -188,6 +188,12 @@ func TestEnsureSynced(t *testing.T) { } func TestStatefulSetOwnershipInferred(t *testing.T) { + var opts []UpdateSettingsFunc + opts = append(opts, func(c *clusterCache) { + c.batchEventsProcessing = true + c.eventProcessingInterval = 1 * time.Millisecond + }) + sts := &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{APIVersion: "apps/v1", Kind: kube.StatefulSetKind}, ObjectMeta: metav1.ObjectMeta{UID: "123", Name: "web", Namespace: "default"}, @@ -200,45 +206,71 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, } - t.Run("STSTemplateNameNotMatching", func(t *testing.T) { - cluster := newCluster(t, sts) - err := cluster.EnsureSynced() - require.NoError(t, err) - - pvc := mustToUnstructured(&v1.PersistentVolumeClaim{ - TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, - ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, - }) - cluster.recordEvent(watch.Added, pvc) + tests := []struct { + name string + cluster *clusterCache + pvc *v1.PersistentVolumeClaim + expectedRefs []metav1.OwnerReference + expectNoOwner bool + }{ + { + name: "STSTemplateNameNotMatching", + cluster: newCluster(t, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, + }, + expectNoOwner: true, + }, + { + name: "MatchingSTSExists", + cluster: newCluster(t, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, + }, + expectedRefs: []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}, + }, + { + name: "STSTemplateNameNotMatchingWithBatchProcessing", + cluster: newClusterWithOptions(t, opts, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, + }, + expectNoOwner: true, + }, + { + name: "MatchingSTSExistsWithBatchProcessing", + cluster: newClusterWithOptions(t, opts, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, + }, + expectedRefs: []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}, + }, + } - require.Eventually(t, func() bool { - cluster.lock.Lock() - defer cluster.lock.Unlock() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cluster.EnsureSynced() + require.NoError(t, err) - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - return len(refs) == 0 - }, 5*time.Second, 10*time.Millisecond, "Expected PVC to not have owner reference") - }) + pvc := mustToUnstructured(tc.pvc) + tc.cluster.recordEvent(watch.Added, pvc) - t.Run("MatchingSTSExists", func(t *testing.T) { - cluster := newCluster(t, sts) - err := cluster.EnsureSynced() - require.NoError(t, err) + require.Eventually(t, func() bool { + tc.cluster.lock.Lock() + defer tc.cluster.lock.Unlock() - pvc := mustToUnstructured(&v1.PersistentVolumeClaim{ - TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, - ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, + refs := tc.cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + if tc.expectNoOwner { + return len(refs) == 0 + } + return assert.ElementsMatch(t, refs, tc.expectedRefs) + }, 5*time.Second, 10*time.Millisecond, "Expected PVC to have correct owner reference") }) - cluster.recordEvent(watch.Added, pvc) - - require.Eventually(t, func() bool { - cluster.lock.Lock() - defer cluster.lock.Unlock() - - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - return assert.ElementsMatch(t, refs, []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}) - }, 5*time.Second, 10*time.Millisecond, "Expected PVC to have owner reference") - }) + } } func TestEnsureSyncedSingleNamespace(t *testing.T) {