From ebe596295ffb4d2674f9e5f5e9a4c5642f8ac265 Mon Sep 17 00:00:00 2001 From: Mykola Pelekh Date: Wed, 27 Nov 2024 16:56:16 +0200 Subject: [PATCH] feat: process events in batch when the mode is enabled Signed-off-by: Mykola Pelekh --- pkg/cache/cluster.go | 57 +++++++++++++++++++++++++------------- pkg/cache/settings.go | 6 ++++ pkg/cache/settings_test.go | 8 ++++++ 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 6d839ea8e..6b28d7421 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -200,10 +200,11 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa type clusterCache struct { syncStatus clusterCacheSync - apisMeta map[schema.GroupKind]*apiMeta - eventMetaCh chan eventMeta - serverVersion string - apiResources []kube.APIResourceInfo + apisMeta map[schema.GroupKind]*apiMeta + batchEventsProcessing bool + eventMetaCh chan eventMeta + serverVersion string + apiResources []kube.APIResourceInfo // namespacedResources is a simple map which indicates a groupKind is namespaced namespacedResources map[schema.GroupKind]bool @@ -483,7 +484,9 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { opts[i](c) } - c.invalidateEventMeta() + if c.batchEventsProcessing { + c.invalidateEventMeta() + } c.apisMeta = nil c.namespacedResources = nil c.log.Info("Invalidated cluster") @@ -867,13 +870,17 @@ func (c *clusterCache) sync() error { for i := range c.apisMeta { c.apisMeta[i].watchCancel() } - c.invalidateEventMeta() + + if c.batchEventsProcessing { + c.invalidateEventMeta() + c.eventMetaCh = make(chan eventMeta) + } + c.apisMeta = make(map[schema.GroupKind]*apiMeta) c.resources = make(map[kube.ResourceKey]*Resource) c.namespacedResources = make(map[schema.GroupKind]bool) config := c.config version, err := c.kubectl.GetServerVersion(config) - c.eventMetaCh = make(chan eventMeta) if err != nil { return err @@ -910,7 +917,9 @@ func (c *clusterCache) sync() error { return err } - go c.processEvents() + if c.batchEventsProcessing { + go c.processEvents() + } // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} @@ -1296,7 +1305,13 @@ func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstr return } - c.eventMetaCh <- eventMeta{event, un} + if c.batchEventsProcessing { + c.eventMetaCh <- eventMeta{event, un} + } else { + c.lock.Lock() + defer c.lock.Unlock() + c.processEvent(key, eventMeta{event, un}) + } } func (c *clusterCache) processEvents() { @@ -1342,21 +1357,25 @@ func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) { } }() - for _, em := range eventMetas { - key := kube.GetResourceKey(em.un) - existingNode, exists := c.resources[key] - if em.event == watch.Deleted { - if exists { - c.onNodeRemoved(key) - } - } else { - c.onNodeUpdated(existingNode, c.newResource(em.un)) - } + for _, evMeta := range eventMetas { + key := kube.GetResourceKey(evMeta.un) + c.processEvent(key, evMeta) } log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds()) } +func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) { + existingNode, exists := c.resources[key] + if evMeta.event == watch.Deleted { + if exists { + c.onNodeRemoved(key) + } + } else { + c.onNodeUpdated(existingNode, c.newResource(evMeta.un)) + } +} + func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) { c.setNode(newRes) for _, h := range c.getResourceUpdatedHandlers() { diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index a7194d0ca..ee40ce3af 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -170,3 +170,9 @@ func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc { } } } + +func SetBatchEventsProcessing(batchProcessing bool) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.batchEventsProcessing = batchProcessing + } +} diff --git a/pkg/cache/settings_test.go b/pkg/cache/settings_test.go index 28ac963cb..b88b7dbc3 100644 --- a/pkg/cache/settings_test.go +++ b/pkg/cache/settings_test.go @@ -55,3 +55,11 @@ func TestSetWatchResyncTimeout(t *testing.T) { cache = NewClusterCache(&rest.Config{}, SetWatchResyncTimeout(timeout)) assert.Equal(t, timeout, cache.watchResyncTimeout) } + +func TestSetBatchEventsProcessing(t *testing.T) { + cache := NewClusterCache(&rest.Config{}) + assert.False(t, cache.batchEventsProcessing) + + cache.Invalidate(SetBatchEventsProcessing(true)) + assert.True(t, cache.batchEventsProcessing) +}