Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: feature flag batch event processing [Work in Progress] #645

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 133 additions & 9 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
// Limit is required to avoid memory spikes during cache initialization.
// The default limit of 50 is chosen based on experiments.
defaultListSemaphoreWeight = 50
// defaultEventProcessingInterval is the default interval for processing events
defaultEventProcessingInterval = 1 * time.Second
)

const (
Expand All @@ -75,6 +77,11 @@ type apiMeta struct {
watchCancel context.CancelFunc
}

type eventMeta struct {
event watch.EventType
un *unstructured.Unstructured
}

// ClusterInfo holds cluster cache stats
type ClusterInfo struct {
// Server holds cluster API server URL
Expand All @@ -96,6 +103,9 @@ type ClusterInfo struct {
// OnEventHandler is a function that handles Kubernetes event
type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured)

// OnProcessEventsHandler handles process events event
type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int)

// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool)

Expand Down Expand Up @@ -137,6 +147,8 @@ type ClusterCache interface {
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
// OnEvent register event handler that is executed every time when new K8S event received
OnEvent(handler OnEventHandler) Unsubscribe
// OnProcessEventsHandler register event handler that is executed every time when events were processed
OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe
}

type WeightedSemaphore interface {
Expand All @@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
cache := &clusterCache{
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
apisMeta: make(map[schema.GroupKind]*apiMeta),
eventMetaCh: nil,
listPageSize: defaultListPageSize,
listPageBufferSize: defaultListPageBufferSize,
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
Expand All @@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
},
watchResyncTimeout: defaultWatchResyncTimeout,
clusterSyncRetryTimeout: ClusterRetryTimeout,
eventProcessingInterval: defaultEventProcessingInterval,
resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
eventHandlers: map[uint64]OnEventHandler{},
processEventsHandlers: map[uint64]OnProcessEventsHandler{},
log: log,
listRetryLimit: 1,
listRetryUseBackoff: false,
Expand All @@ -185,16 +200,20 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
type clusterCache struct {
syncStatus clusterCacheSync

apisMeta map[schema.GroupKind]*apiMeta
serverVersion string
apiResources []kube.APIResourceInfo
apisMeta map[schema.GroupKind]*apiMeta
batchEventsProcessing bool
eventMetaCh chan eventMeta
serverVersion string
apiResources []kube.APIResourceInfo
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool

// maximum time we allow watches to run before relisting the group/kind and restarting the watch
watchResyncTimeout time.Duration
// sync retry timeout for cluster when sync error happens
clusterSyncRetryTimeout time.Duration
// ticker interval for events processing
eventProcessingInterval time.Duration

// size of a page for list operations pager.
listPageSize int64
Expand Down Expand Up @@ -224,6 +243,7 @@ type clusterCache struct {
populateResourceInfoHandler OnPopulateResourceInfoHandler
resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler
eventHandlers map[uint64]OnEventHandler
processEventsHandlers map[uint64]OnProcessEventsHandler
openAPISchema openapi.Resources
gvkParser *managedfields.GvkParser

Expand Down Expand Up @@ -299,6 +319,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler {
return handlers
}

// OnProcessEventsHandler register event handler that is executed every time when events were processed
func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
key := c.handlerKey
c.handlerKey++
c.processEventsHandlers[key] = handler
return func() {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
delete(c.processEventsHandlers, key)
}
}
func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers))
for _, h := range c.processEventsHandlers {
handlers = append(handlers, h)
}
return handlers
}

// GetServerVersion returns observed cluster version
func (c *clusterCache) GetServerVersion() string {
return c.serverVersion
Expand Down Expand Up @@ -440,6 +483,10 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
for i := range opts {
opts[i](c)
}

if c.batchEventsProcessing {
c.invalidateEventMeta()
}
c.apisMeta = nil
c.namespacedResources = nil
c.log.Info("Invalidated cluster")
Expand Down Expand Up @@ -669,7 +716,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
}

c.processEvent(event.Type, obj)
c.recordEvent(event.Type, obj)
if kube.IsCRD(obj) {
var resources []kube.APIResourceInfo
crd := v1.CustomResourceDefinition{}
Expand Down Expand Up @@ -823,6 +870,12 @@ func (c *clusterCache) sync() error {
for i := range c.apisMeta {
c.apisMeta[i].watchCancel()
}

if c.batchEventsProcessing {
c.invalidateEventMeta()
c.eventMetaCh = make(chan eventMeta)
}

c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.resources = make(map[kube.ResourceKey]*Resource)
c.namespacedResources = make(map[schema.GroupKind]bool)
Expand Down Expand Up @@ -864,6 +917,10 @@ func (c *clusterCache) sync() error {
return err
}

if c.batchEventsProcessing {
go c.processEvents()
}

// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
Expand Down Expand Up @@ -926,6 +983,14 @@ func (c *clusterCache) sync() error {
return nil
}

// invalidateEventMeta closes the eventMeta channel if it is open
func (c *clusterCache) invalidateEventMeta() {
if c.eventMetaCh != nil {
close(c.eventMetaCh)
c.eventMetaCh = nil
}
}

// EnsureSynced checks cache state and synchronizes it if necessary
func (c *clusterCache) EnsureSynced() error {
syncStatus := &c.syncStatus
Expand Down Expand Up @@ -1231,7 +1296,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
return managedObjs, nil
}

func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unstructured) {
func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) {
for _, h := range c.getEventHandlers() {
h(event, un)
}
Expand All @@ -1240,15 +1305,74 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
return
}

if c.batchEventsProcessing {
c.eventMetaCh <- eventMeta{event, un}
} else {
c.lock.Lock()
defer c.lock.Unlock()
c.processEvent(key, eventMeta{event, un})
}
}

func (c *clusterCache) processEvents() {
log := c.log.WithValues("functionName", "processItems")
log.V(1).Info("Start processing events")

c.lock.Lock()
defer c.lock.Unlock()
ch := c.eventMetaCh
c.lock.Unlock()

eventMetas := make([]eventMeta, 0)
ticker := time.NewTicker(c.eventProcessingInterval)
defer ticker.Stop()

for {
select {
case evMeta, ok := <-ch:
if !ok {
log.V(2).Info("Event processing channel closed, finish processing")
return
}
eventMetas = append(eventMetas, evMeta)
case <-ticker.C:
if len(eventMetas) > 0 {
c.processEventsBatch(eventMetas)
eventMetas = eventMetas[:0]
}
}
}
}

func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) {
log := c.log.WithValues("functionName", "processEventsBatch")
start := time.Now()
c.lock.Lock()
log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds())
defer func() {
c.lock.Unlock()
duration := time.Since(start)
// Update the metric with the duration of the events processing
for _, handler := range c.getProcessEventsHandlers() {
handler(duration, len(eventMetas))
}
}()

for _, evMeta := range eventMetas {
key := kube.GetResourceKey(evMeta.un)
c.processEvent(key, evMeta)
}

log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds())
}

func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) {
existingNode, exists := c.resources[key]
if event == watch.Deleted {
if evMeta.event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
}
} else if event != watch.Deleted {
c.onNodeUpdated(existingNode, c.newResource(un))
} else {
c.onNodeUpdated(existingNode, c.newResource(evMeta.un))
}
}

Expand Down
Loading
Loading