Skip to content

Commit

Permalink
feat: process events in batch when the mode is enabled
Browse files Browse the repository at this point in the history
Signed-off-by: Mykola Pelekh <[email protected]>
  • Loading branch information
mpelekh committed Nov 30, 2024
1 parent fe6bf84 commit ebe5962
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 19 deletions.
57 changes: 38 additions & 19 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
type clusterCache struct {
syncStatus clusterCacheSync

apisMeta map[schema.GroupKind]*apiMeta
eventMetaCh chan eventMeta
serverVersion string
apiResources []kube.APIResourceInfo
apisMeta map[schema.GroupKind]*apiMeta
batchEventsProcessing bool
eventMetaCh chan eventMeta
serverVersion string
apiResources []kube.APIResourceInfo
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool

Expand Down Expand Up @@ -483,7 +484,9 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
opts[i](c)
}

c.invalidateEventMeta()
if c.batchEventsProcessing {
c.invalidateEventMeta()
}
c.apisMeta = nil
c.namespacedResources = nil
c.log.Info("Invalidated cluster")
Expand Down Expand Up @@ -867,13 +870,17 @@ func (c *clusterCache) sync() error {
for i := range c.apisMeta {
c.apisMeta[i].watchCancel()
}
c.invalidateEventMeta()

if c.batchEventsProcessing {
c.invalidateEventMeta()
c.eventMetaCh = make(chan eventMeta)
}

c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.resources = make(map[kube.ResourceKey]*Resource)
c.namespacedResources = make(map[schema.GroupKind]bool)
config := c.config
version, err := c.kubectl.GetServerVersion(config)
c.eventMetaCh = make(chan eventMeta)

if err != nil {
return err
Expand Down Expand Up @@ -910,7 +917,9 @@ func (c *clusterCache) sync() error {
return err
}

go c.processEvents()
if c.batchEventsProcessing {
go c.processEvents()
}

// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
lock := sync.Mutex{}
Expand Down Expand Up @@ -1296,7 +1305,13 @@ func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstr
return
}

c.eventMetaCh <- eventMeta{event, un}
if c.batchEventsProcessing {
c.eventMetaCh <- eventMeta{event, un}
} else {
c.lock.Lock()
defer c.lock.Unlock()
c.processEvent(key, eventMeta{event, un})
}
}

func (c *clusterCache) processEvents() {
Expand Down Expand Up @@ -1342,21 +1357,25 @@ func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) {
}
}()

for _, em := range eventMetas {
key := kube.GetResourceKey(em.un)
existingNode, exists := c.resources[key]
if em.event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
}
} else {
c.onNodeUpdated(existingNode, c.newResource(em.un))
}
for _, evMeta := range eventMetas {
key := kube.GetResourceKey(evMeta.un)
c.processEvent(key, evMeta)
}

log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds())
}

func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) {
existingNode, exists := c.resources[key]
if evMeta.event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
}
} else {
c.onNodeUpdated(existingNode, c.newResource(evMeta.un))
}
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
c.setNode(newRes)
for _, h := range c.getResourceUpdatedHandlers() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/cache/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,9 @@ func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc {
}
}
}

func SetBatchEventsProcessing(batchProcessing bool) UpdateSettingsFunc {
return func(cache *clusterCache) {
cache.batchEventsProcessing = batchProcessing
}
}
8 changes: 8 additions & 0 deletions pkg/cache/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,11 @@ func TestSetWatchResyncTimeout(t *testing.T) {
cache = NewClusterCache(&rest.Config{}, SetWatchResyncTimeout(timeout))
assert.Equal(t, timeout, cache.watchResyncTimeout)
}

func TestSetBatchEventsProcessing(t *testing.T) {
cache := NewClusterCache(&rest.Config{})
assert.False(t, cache.batchEventsProcessing)

cache.Invalidate(SetBatchEventsProcessing(true))
assert.True(t, cache.batchEventsProcessing)
}

0 comments on commit ebe5962

Please sign in to comment.