Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 89 additions & 76 deletions pkg/sqlcache/informer/factory/informer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/rancher/steve/pkg/sqlcache/sqltypes"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -46,21 +45,16 @@ type CacheFactory struct {
}

type guardedInformer struct {
// This mutex ensures the initialization of informer, as well as controlling a graceful shutdown
mutex sync.RWMutex
informer *informer.Informer
// informerMutex ensures informer is only set by one goroutine even if
// multiple concurrent calls to CacheFor are made
informerMutex *sync.Mutex

// stopMutex ensures no CacheFor call can be made for a given GVK when
// a Stop call is ongoing.
//
// CacheFactory.informersMutex is not enough because part of the code
// might still have an old cache from a previous CacheFor call.
stopMutex *sync.RWMutex
// informerDone is non-nil when the informer is started, and closed when it finishes
informerDone chan struct{}

ctx context.Context
cancel context.CancelFunc
wg wait.Group
// wg represents all active cache users, and is decreased when DoneWithCache is called
wg sync.WaitGroup
}

type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, typeGuidance map[string]string, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error)
Expand Down Expand Up @@ -134,6 +128,14 @@ func NewCacheFactoryWithContext(ctx context.Context, opts CacheFactoryOptions) (
}, nil
}

func logCacheInitializationDuration(gvk schema.GroupVersionKind) func() {
start := time.Now()
log.Infof("CacheFor STARTS creating informer for %v", gvk)
return func() {
log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Since(start))
}
}

// CacheFor returns an informer for given GVK, using sql store indexed with fields, using the specified client. For virtual fields, they must be added by the transform function
// and specified by fields to be used for later fields.
//
Expand All @@ -155,80 +157,86 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
if !ok {
giCtx, giCancel := context.WithCancel(f.ctx)
gi = &guardedInformer{
informer: nil,
informerMutex: &sync.Mutex{},
stopMutex: &sync.RWMutex{},
ctx: giCtx,
cancel: giCancel,
informer: nil,
ctx: giCtx,
cancel: giCancel,
}
f.informers[gvk] = gi
}
f.informersMutex.Unlock()

// Prevent Stop() to be called for that GVK
gi.stopMutex.RLock()
gi.mutex.RLock()
if gi.informer == nil {
// Initialize informer needs to acquire a write lock, so we need to release it temporarily
gi.mutex.RUnlock()

defer logCacheInitializationDuration(gvk)()
if err := f.initializeInformer(gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, typeGuidance, namespaced, watchable); err != nil {
log.Errorf("CacheFor failed to initialize informer for %v: %v", gvk, err)
return nil, fmt.Errorf("setting up informer for %v: %w", gvk, err)
}

gvkCache, err := f.cacheForLocked(ctx, gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, typeGuidance, namespaced, watchable)
gi.mutex.RLock()
}
defer gi.mutex.RUnlock()

gvkCache, err := f.waitForCacheReady(ctx, gvk, gi)
if err != nil {
gi.stopMutex.RUnlock()
return nil, err
}

// from this point, it's responsibility of the caller to call DoneWithCache(gvkCache) once it's done
gi.wg.Add(1)
return gvkCache, nil
}

func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) (*Cache, error) {
// At this point an informer-specific mutex (gi.mutex) is guaranteed to exist. Lock it
gi.informerMutex.Lock()
func (f *CacheFactory) initializeInformer(gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) error {
gi.mutex.Lock()
defer gi.mutex.Unlock()

// Then: if the informer really was not created yet (first time here or previous times have errored out)
// actually create the informer
if gi.informer == nil {
start := time.Now()
log.Infof("CacheFor STARTS creating informer for %v", gvk)
defer func() {
log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Since(start))
}()

_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
shouldEncrypt := f.encryptAll || encryptResourceAlways
// In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer()
// search for "func NewInformer(ctx"
i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, typeGuidance, namespaced, watchable, f.gcInterval, f.gcKeepCount)
if err != nil {
gi.informerMutex.Unlock()
return nil, err
}
_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
shouldEncrypt := f.encryptAll || encryptResourceAlways
// In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer()
// search for "func NewInformer(ctx"
i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, typeGuidance, namespaced, watchable, f.gcInterval, f.gcKeepCount)
if err != nil {
log.Errorf("creating informer for %v: %v", gvk, err)
return err
}

err = i.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
if !watchable && errors.IsMethodNotSupported(err) {
// expected, continue without logging
return
}
cache.DefaultWatchErrorHandler(gi.ctx, r, err)
})
if err != nil {
gi.informerMutex.Unlock()
return nil, err
if err := i.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
if !watchable && errors.IsMethodNotSupported(err) {
// expected, continue without logging
return
}

gi.wg.StartWithChannel(gi.ctx.Done(), i.Run)

gi.informer = i
cache.DefaultWatchErrorHandler(gi.ctx, r, err)
}); err != nil {
return fmt.Errorf("setting watch error handler: %w", err)
}
gi.informerMutex.Unlock()

gi.informerDone = make(chan struct{})
go func() {
defer close(gi.informerDone)
i.Run(gi.ctx.Done())
}()
gi.informer = i
return nil
}

// waitForCacheReady will block until the informer has synced, or any of the context is canceled (the one from the request or the informer's own context)
func (f *CacheFactory) waitForCacheReady(ctx context.Context, gvk schema.GroupVersionKind, gi *guardedInformer) (*Cache, error) {
// We don't want to get stuck in WaitForCachesSync if the request from
// the client has been canceled.
waitCh := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
case <-gi.ctx.Done():
cancel()
}
close(waitCh)
}()

if !cache.WaitForCacheSync(waitCh, gi.informer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), gi.informer.HasSynced) {
if gi.ctx.Err() != nil {
return nil, fmt.Errorf("cache context canceled while waiting for SQL cache sync for %v: %w", gvk, gi.ctx.Err())
}
Expand All @@ -251,7 +259,7 @@ func (f *CacheFactory) DoneWithCache(cache *Cache) {
return
}

cache.gi.stopMutex.RUnlock()
cache.gi.wg.Done()
}

// Stop cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets
Expand All @@ -263,35 +271,40 @@ func (f *CacheFactory) Stop(gvk schema.GroupVersionKind) error {
}

f.informersMutex.Lock()
defer f.informersMutex.Unlock()

gi, ok := f.informers[gvk]
f.informersMutex.Unlock()
if !ok {
return nil
}
delete(f.informers, gvk)

// We must stop informers here to unblock those stuck in WaitForCacheSync
// which is blocking DoneWithCache call.
// Stop any ongoing WaitForCacheSync or Cache users, which will eventually call DoneWithCache as a result
gi.cancel()

// Prevent other CacheFor calls for that GVK
gi.stopMutex.Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also implicitly waited for all Cache users to finish, while it's now explicitly by using a WaitGroup

defer gi.stopMutex.Unlock()
// Prevent any new operations in this guarded informer
gi.mutex.Lock()
defer gi.mutex.Unlock()

// Wait for all informers to have exited
// Wait for all remaining DoneWithCache
gi.wg.Wait()

// Since we hold the lock on gi.stopMutex, we do not need to also hold
// onto gi.informersMutex
if gi.informer != nil {
// Wait for informer to finish
if gi.informerDone != nil {
<-gi.informerDone
}

// Prepare for the next run
gi.ctx, gi.cancel = context.WithCancel(f.ctx)

if inf := gi.informer; inf != nil {
// Discard gi.informer regardless of the result of dropping, as it's not retried anyway
gi.informer = nil

// DropAll needs its own context because the context from the informer
// is canceled
err := gi.informer.DropAll(context.Background())
if err != nil {
if err := inf.DropAll(f.ctx); err != nil {
log.Errorf("error running informer.DropAll for %v: %v", gvk, err)
return fmt.Errorf("dropall %q: %w", gvk, err)
}
}

return nil
}
Loading