diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 6d839ea8e..d47601d4b 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -200,10 +200,11 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa type clusterCache struct { syncStatus clusterCacheSync - apisMeta map[schema.GroupKind]*apiMeta - eventMetaCh chan eventMeta - serverVersion string - apiResources []kube.APIResourceInfo + apisMeta map[schema.GroupKind]*apiMeta + batchProcessingEnabled bool + eventMetaCh chan eventMeta + serverVersion string + apiResources []kube.APIResourceInfo // namespacedResources is a simple map which indicates a groupKind is namespaced namespacedResources map[schema.GroupKind]bool @@ -483,7 +484,9 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { opts[i](c) } - c.invalidateEventMeta() + if c.batchProcessingEnabled { + c.invalidateEventMeta() + } c.apisMeta = nil c.namespacedResources = nil c.log.Info("Invalidated cluster") @@ -867,13 +870,17 @@ func (c *clusterCache) sync() error { for i := range c.apisMeta { c.apisMeta[i].watchCancel() } - c.invalidateEventMeta() + + if c.batchProcessingEnabled { + c.invalidateEventMeta() + c.eventMetaCh = make(chan eventMeta) + } + c.apisMeta = make(map[schema.GroupKind]*apiMeta) c.resources = make(map[kube.ResourceKey]*Resource) c.namespacedResources = make(map[schema.GroupKind]bool) config := c.config version, err := c.kubectl.GetServerVersion(config) - c.eventMetaCh = make(chan eventMeta) if err != nil { return err @@ -910,7 +917,9 @@ func (c *clusterCache) sync() error { return err } - go c.processEvents() + if c.batchProcessingEnabled { + go c.processEvents() + } // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} @@ -1296,7 +1305,13 @@ func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstr return } - c.eventMetaCh <- eventMeta{event, un} + if c.batchProcessingEnabled { + c.eventMetaCh <- eventMeta{event, un} + } else { + c.lock.Lock() + defer c.lock.Unlock() + c.processEvent(key, eventMeta{event, un}) + } } func (c *clusterCache) processEvents() { @@ -1342,21 +1357,25 @@ func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) { } }() - for _, em := range eventMetas { - key := kube.GetResourceKey(em.un) - existingNode, exists := c.resources[key] - if em.event == watch.Deleted { - if exists { - c.onNodeRemoved(key) - } - } else { - c.onNodeUpdated(existingNode, c.newResource(em.un)) - } + for _, evMeta := range eventMetas { + key := kube.GetResourceKey(evMeta.un) + c.processEvent(key, evMeta) } log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds()) } +func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) { + existingNode, exists := c.resources[key] + if evMeta.event == watch.Deleted { + if exists { + c.onNodeRemoved(key) + } + } else { + c.onNodeUpdated(existingNode, c.newResource(evMeta.un)) + } +} + func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) { c.setNode(newRes) for _, h := range c.getResourceUpdatedHandlers() {