diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 3f662effc..7e2e621fc 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -57,6 +57,8 @@ const ( // Limit is required to avoid memory spikes during cache initialization. // The default limit of 50 is chosen based on experiments. defaultListSemaphoreWeight = 50 + // defaultEventProcessingInterval is the default interval for processing events + defaultEventProcessingInterval = 100 * time.Millisecond ) const ( @@ -75,6 +77,11 @@ type apiMeta struct { watchCancel context.CancelFunc } +type eventMeta struct { + event watch.EventType + un *unstructured.Unstructured +} + // ClusterInfo holds cluster cache stats type ClusterInfo struct { // Server holds cluster API server URL @@ -96,6 +103,9 @@ type ClusterInfo struct { // OnEventHandler is a function that handles Kubernetes event type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured) +// OnProcessEventsHandler handles process events event +type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int) + // OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) @@ -137,6 +147,8 @@ type ClusterCache interface { OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe // OnEvent register event handler that is executed every time when new K8S event received OnEvent(handler OnEventHandler) Unsubscribe + // OnProcessEventsHandler register event handler that is executed every time when events were processed + OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe } type WeightedSemaphore interface { @@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa cache := &clusterCache{ settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}}, apisMeta: make(map[schema.GroupKind]*apiMeta), + eventMetaCh: nil, listPageSize: defaultListPageSize, listPageBufferSize: defaultListPageBufferSize, listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), @@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa }, watchResyncTimeout: defaultWatchResyncTimeout, clusterSyncRetryTimeout: ClusterRetryTimeout, + eventProcessingInterval: defaultEventProcessingInterval, resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, eventHandlers: map[uint64]OnEventHandler{}, + processEventsHandlers: map[uint64]OnProcessEventsHandler{}, log: log, listRetryLimit: 1, listRetryUseBackoff: false, @@ -185,9 +200,11 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa type clusterCache struct { syncStatus clusterCacheSync - apisMeta map[schema.GroupKind]*apiMeta - serverVersion string - apiResources []kube.APIResourceInfo + apisMeta map[schema.GroupKind]*apiMeta + batchEventsProcessing bool + eventMetaCh chan eventMeta + serverVersion string + apiResources []kube.APIResourceInfo // namespacedResources is a simple map which indicates a groupKind is namespaced namespacedResources map[schema.GroupKind]bool @@ -195,6 +212,8 @@ type clusterCache struct { watchResyncTimeout time.Duration // sync retry timeout for cluster when sync error happens clusterSyncRetryTimeout time.Duration + // ticker interval for events processing + eventProcessingInterval time.Duration // size of a page for list operations pager. listPageSize int64 @@ -224,6 +243,7 @@ type clusterCache struct { populateResourceInfoHandler OnPopulateResourceInfoHandler resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler eventHandlers map[uint64]OnEventHandler + processEventsHandlers map[uint64]OnProcessEventsHandler openAPISchema openapi.Resources gvkParser *managedfields.GvkParser @@ -299,6 +319,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler { return handlers } +// OnProcessEventsHandler register event handler that is executed every time when events were processed +func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + key := c.handlerKey + c.handlerKey++ + c.processEventsHandlers[key] = handler + return func() { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + delete(c.processEventsHandlers, key) + } +} +func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers)) + for _, h := range c.processEventsHandlers { + handlers = append(handlers, h) + } + return handlers +} + // GetServerVersion returns observed cluster version func (c *clusterCache) GetServerVersion() string { return c.serverVersion @@ -440,6 +483,10 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { for i := range opts { opts[i](c) } + + if c.batchEventsProcessing { + c.invalidateEventMeta() + } c.apisMeta = nil c.namespacedResources = nil c.log.Info("Invalidated cluster") @@ -669,7 +716,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object) } - c.processEvent(event.Type, obj) + c.recordEvent(event.Type, obj) if kube.IsCRD(obj) { var resources []kube.APIResourceInfo crd := v1.CustomResourceDefinition{} @@ -823,6 +870,12 @@ func (c *clusterCache) sync() error { for i := range c.apisMeta { c.apisMeta[i].watchCancel() } + + if c.batchEventsProcessing { + c.invalidateEventMeta() + c.eventMetaCh = make(chan eventMeta) + } + c.apisMeta = make(map[schema.GroupKind]*apiMeta) c.resources = make(map[kube.ResourceKey]*Resource) c.namespacedResources = make(map[schema.GroupKind]bool) @@ -864,6 +917,10 @@ func (c *clusterCache) sync() error { return err } + if c.batchEventsProcessing { + go c.processEvents() + } + // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} err = kube.RunAllAsync(len(apis), func(i int) error { @@ -926,6 +983,14 @@ func (c *clusterCache) sync() error { return nil } +// invalidateEventMeta closes the eventMeta channel if it is open +func (c *clusterCache) invalidateEventMeta() { + if c.eventMetaCh != nil { + close(c.eventMetaCh) + c.eventMetaCh = nil + } +} + // EnsureSynced checks cache state and synchronizes it if necessary func (c *clusterCache) EnsureSynced() error { syncStatus := &c.syncStatus @@ -1231,7 +1296,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure return managedObjs, nil } -func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unstructured) { +func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) { for _, h := range c.getEventHandlers() { h(event, un) } @@ -1240,15 +1305,74 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst return } + if c.batchEventsProcessing { + c.eventMetaCh <- eventMeta{event, un} + } else { + c.lock.Lock() + defer c.lock.Unlock() + c.processEvent(key, eventMeta{event, un}) + } +} + +func (c *clusterCache) processEvents() { + log := c.log.WithValues("functionName", "processItems") + log.V(1).Info("Start processing events") + c.lock.Lock() - defer c.lock.Unlock() + ch := c.eventMetaCh + c.lock.Unlock() + + eventMetas := make([]eventMeta, 0) + ticker := time.NewTicker(c.eventProcessingInterval) + defer ticker.Stop() + + for { + select { + case evMeta, ok := <-ch: + if !ok { + log.V(2).Info("Event processing channel closed, finish processing") + return + } + eventMetas = append(eventMetas, evMeta) + case <-ticker.C: + if len(eventMetas) > 0 { + c.processEventsBatch(eventMetas) + eventMetas = eventMetas[:0] + } + } + } +} + +func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) { + log := c.log.WithValues("functionName", "processEventsBatch") + start := time.Now() + c.lock.Lock() + log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds()) + defer func() { + c.lock.Unlock() + duration := time.Since(start) + // Update the metric with the duration of the events processing + for _, handler := range c.getProcessEventsHandlers() { + handler(duration, len(eventMetas)) + } + }() + + for _, evMeta := range eventMetas { + key := kube.GetResourceKey(evMeta.un) + c.processEvent(key, evMeta) + } + + log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds()) +} + +func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) { existingNode, exists := c.resources[key] - if event == watch.Deleted { + if evMeta.event == watch.Deleted { if exists { c.onNodeRemoved(key) } - } else if event != watch.Deleted { - c.onNodeUpdated(existingNode, c.newResource(un)) + } else { + c.onNodeUpdated(existingNode, c.newResource(evMeta.un)) } } diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 26815c07e..8f3263da6 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -184,6 +184,12 @@ func TestEnsureSynced(t *testing.T) { } func TestStatefulSetOwnershipInferred(t *testing.T) { + var opts []UpdateSettingsFunc + opts = append(opts, func(c *clusterCache) { + c.batchEventsProcessing = true + c.eventProcessingInterval = 1 * time.Millisecond + }) + sts := &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{APIVersion: "apps/v1", Kind: kube.StatefulSetKind}, ObjectMeta: metav1.ObjectMeta{UID: "123", Name: "web", Namespace: "default"}, @@ -196,63 +202,71 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, } - t.Run("STSTemplateNameNotMatching", func(t *testing.T) { - cluster := newCluster(t, sts) - err := cluster.EnsureSynced() - require.NoError(t, err) - - pvc := mustToUnstructured(&v1.PersistentVolumeClaim{ - TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, - ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, - }) - - cluster.processEvent(watch.Added, pvc) - - cluster.lock.Lock() - defer cluster.lock.Unlock() - - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - - assert.Len(t, refs, 0) - }) - - t.Run("STSTemplateNameNotMatching", func(t *testing.T) { - cluster := newCluster(t, sts) - err := cluster.EnsureSynced() - require.NoError(t, err) - - pvc := mustToUnstructured(&v1.PersistentVolumeClaim{ - TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, - ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, - }) - cluster.processEvent(watch.Added, pvc) - - cluster.lock.Lock() - defer cluster.lock.Unlock() + tests := []struct { + name string + cluster *clusterCache + pvc *v1.PersistentVolumeClaim + expectedRefs []metav1.OwnerReference + expectNoOwner bool + }{ + { + name: "STSTemplateNameNotMatching", + cluster: newCluster(t, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, + }, + expectNoOwner: true, + }, + { + name: "MatchingSTSExists", + cluster: newCluster(t, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, + }, + expectedRefs: []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}, + }, + { + name: "STSTemplateNameNotMatchingWithBatchProcessing", + cluster: newClusterWithOptions(t, opts, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, + }, + expectNoOwner: true, + }, + { + name: "MatchingSTSExistsWithBatchProcessing", + cluster: newClusterWithOptions(t, opts, sts), + pvc: &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, + ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, + }, + expectedRefs: []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}, + }, + } - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cluster.EnsureSynced() + require.NoError(t, err) - assert.Len(t, refs, 0) - }) + pvc := mustToUnstructured(tc.pvc) + tc.cluster.recordEvent(watch.Added, pvc) - t.Run("MatchingSTSExists", func(t *testing.T) { - cluster := newCluster(t, sts) - err := cluster.EnsureSynced() - require.NoError(t, err) + require.Eventually(t, func() bool { + tc.cluster.lock.Lock() + defer tc.cluster.lock.Unlock() - pvc := mustToUnstructured(&v1.PersistentVolumeClaim{ - TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, - ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, + refs := tc.cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + if tc.expectNoOwner { + return len(refs) == 0 + } + return assert.ElementsMatch(t, refs, tc.expectedRefs) + }, 5*time.Second, 10*time.Millisecond, "Expected PVC to have correct owner reference") }) - cluster.processEvent(watch.Added, pvc) - - cluster.lock.Lock() - defer cluster.lock.Unlock() - - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - - assert.ElementsMatch(t, refs, []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}) - }) + } } func TestEnsureSyncedSingleNamespace(t *testing.T) { @@ -596,7 +610,7 @@ func TestChildDeletedEvent(t *testing.T) { err := cluster.EnsureSynced() require.NoError(t, err) - cluster.processEvent(watch.Deleted, mustToUnstructured(testPod1())) + cluster.recordEvent(watch.Deleted, mustToUnstructured(testPod1())) rsChildren := getChildren(cluster, mustToUnstructured(testRS())) assert.Equal(t, []*Resource{}, rsChildren) @@ -620,7 +634,7 @@ func TestProcessNewChildEvent(t *testing.T) { uid: "2" resourceVersion: "123"`) - cluster.processEvent(watch.Added, newPod) + cluster.recordEvent(watch.Added, newPod) rsChildren := getChildren(cluster, mustToUnstructured(testRS())) sort.Slice(rsChildren, func(i, j int) bool { diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index c9fbc8f9c..a4afa3119 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -262,6 +262,26 @@ func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe return r0 } +// OnProcessEventsHandler provides a mock function with given fields: handler +func (_m *ClusterCache) OnProcessEventsHandler(handler cache.OnProcessEventsHandler) cache.Unsubscribe { + ret := _m.Called(handler) + + if len(ret) == 0 { + panic("no return value specified for OnProcessEventsHandler") + } + + var r0 cache.Unsubscribe + if rf, ok := ret.Get(0).(func(cache.OnProcessEventsHandler) cache.Unsubscribe); ok { + r0 = rf(handler) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cache.Unsubscribe) + } + } + + return r0 +} + // OnResourceUpdated provides a mock function with given fields: handler func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler) cache.Unsubscribe { ret := _m.Called(handler) diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index a7194d0ca..241c75d3b 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -170,3 +170,17 @@ func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc { } } } + +// SetBatchEventsProcessing allows to set whether to process events in batch +func SetBatchEventsProcessing(batchProcessing bool) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.batchEventsProcessing = batchProcessing + } +} + +// SetEventProcessingInterval allows to set the interval for processing events +func SetEventProcessingInterval(interval time.Duration) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.eventProcessingInterval = interval + } +} diff --git a/pkg/cache/settings_test.go b/pkg/cache/settings_test.go index 28ac963cb..fdc1a7412 100644 --- a/pkg/cache/settings_test.go +++ b/pkg/cache/settings_test.go @@ -55,3 +55,20 @@ func TestSetWatchResyncTimeout(t *testing.T) { cache = NewClusterCache(&rest.Config{}, SetWatchResyncTimeout(timeout)) assert.Equal(t, timeout, cache.watchResyncTimeout) } + +func TestSetBatchEventsProcessing(t *testing.T) { + cache := NewClusterCache(&rest.Config{}) + assert.False(t, cache.batchEventsProcessing) + + cache.Invalidate(SetBatchEventsProcessing(true)) + assert.True(t, cache.batchEventsProcessing) +} + +func TestSetEventsProcessingInterval(t *testing.T) { + cache := NewClusterCache(&rest.Config{}) + assert.Equal(t, defaultEventProcessingInterval, cache.eventProcessingInterval) + + interval := 1 * time.Second + cache.Invalidate(SetEventProcessingInterval(interval)) + assert.Equal(t, interval, cache.eventProcessingInterval) +}