diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index 7895a7680..e4c31c256 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -17,7 +17,6 @@ import ( "github.com/rancher/steve/pkg/sqlcache/sqltypes" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" ) @@ -46,21 +45,16 @@ type CacheFactory struct { } type guardedInformer struct { + // This mutex ensures the initialization of informer, as well as controlling a graceful shutdown + mutex sync.RWMutex informer *informer.Informer - // informerMutex ensures informer is only set by one goroutine even if - // multiple concurrent calls to CacheFor are made - informerMutex *sync.Mutex - - // stopMutex ensures no CacheFor call can be made for a given GVK when - // a Stop call is ongoing. - // - // CacheFactory.informersMutex is not enough because part of the code - // might still have an old cache from a previous CacheFor call. - stopMutex *sync.RWMutex + // informerDone is non-nil when the informer is started, and closed when it finishes + informerDone chan struct{} ctx context.Context cancel context.CancelFunc - wg wait.Group + // wg represents all active cache users, and is decreased when DoneWithCache is called + wg sync.WaitGroup } type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, typeGuidance map[string]string, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) @@ -134,6 +128,14 @@ func NewCacheFactoryWithContext(ctx context.Context, opts CacheFactoryOptions) ( }, nil } +func logCacheInitializationDuration(gvk schema.GroupVersionKind) func() { + start := time.Now() + log.Infof("CacheFor STARTS creating informer for %v", gvk) + return func() { + log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Since(start)) + } +} + // CacheFor returns an informer for given GVK, using sql store indexed with fields, using the specified client. For virtual fields, they must be added by the transform function // and specified by fields to be used for later fields. // @@ -155,80 +157,86 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external if !ok { giCtx, giCancel := context.WithCancel(f.ctx) gi = &guardedInformer{ - informer: nil, - informerMutex: &sync.Mutex{}, - stopMutex: &sync.RWMutex{}, - ctx: giCtx, - cancel: giCancel, + informer: nil, + ctx: giCtx, + cancel: giCancel, } f.informers[gvk] = gi } f.informersMutex.Unlock() - // Prevent Stop() to be called for that GVK - gi.stopMutex.RLock() + gi.mutex.RLock() + if gi.informer == nil { + // Initialize informer needs to acquire a write lock, so we need to release it temporarily + gi.mutex.RUnlock() + + defer logCacheInitializationDuration(gvk)() + if err := f.initializeInformer(gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, typeGuidance, namespaced, watchable); err != nil { + log.Errorf("CacheFor failed to initialize informer for %v: %v", gvk, err) + return nil, fmt.Errorf("setting up informer for %v: %w", gvk, err) + } - gvkCache, err := f.cacheForLocked(ctx, gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, typeGuidance, namespaced, watchable) + gi.mutex.RLock() + } + defer gi.mutex.RUnlock() + + gvkCache, err := f.waitForCacheReady(ctx, gvk, gi) if err != nil { - gi.stopMutex.RUnlock() return nil, err } + + // from this point, it's responsibility of the caller to call DoneWithCache(gvkCache) once it's done + gi.wg.Add(1) return gvkCache, nil } -func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) (*Cache, error) { - // At this point an informer-specific mutex (gi.mutex) is guaranteed to exist. Lock it - gi.informerMutex.Lock() +func (f *CacheFactory) initializeInformer(gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) error { + gi.mutex.Lock() + defer gi.mutex.Unlock() - // Then: if the informer really was not created yet (first time here or previous times have errored out) - // actually create the informer - if gi.informer == nil { - start := time.Now() - log.Infof("CacheFor STARTS creating informer for %v", gvk) - defer func() { - log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Since(start)) - }() - - _, encryptResourceAlways := defaultEncryptedResourceTypes[gvk] - shouldEncrypt := f.encryptAll || encryptResourceAlways - // In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer() - // search for "func NewInformer(ctx" - i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, typeGuidance, namespaced, watchable, f.gcInterval, f.gcKeepCount) - if err != nil { - gi.informerMutex.Unlock() - return nil, err - } + _, encryptResourceAlways := defaultEncryptedResourceTypes[gvk] + shouldEncrypt := f.encryptAll || encryptResourceAlways + // In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer() + // search for "func NewInformer(ctx" + i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, typeGuidance, namespaced, watchable, f.gcInterval, f.gcKeepCount) + if err != nil { + log.Errorf("creating informer for %v: %v", gvk, err) + return err + } - err = i.SetWatchErrorHandler(func(r *cache.Reflector, err error) { - if !watchable && errors.IsMethodNotSupported(err) { - // expected, continue without logging - return - } - cache.DefaultWatchErrorHandler(gi.ctx, r, err) - }) - if err != nil { - gi.informerMutex.Unlock() - return nil, err + if err := i.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + if !watchable && errors.IsMethodNotSupported(err) { + // expected, continue without logging + return } - - gi.wg.StartWithChannel(gi.ctx.Done(), i.Run) - - gi.informer = i + cache.DefaultWatchErrorHandler(gi.ctx, r, err) + }); err != nil { + return fmt.Errorf("setting watch error handler: %w", err) } - gi.informerMutex.Unlock() + gi.informerDone = make(chan struct{}) + go func() { + defer close(gi.informerDone) + i.Run(gi.ctx.Done()) + }() + gi.informer = i + return nil +} + +// waitForCacheReady will block until the informer has synced, or any of the context is canceled (the one from the request or the informer's own context) +func (f *CacheFactory) waitForCacheReady(ctx context.Context, gvk schema.GroupVersionKind, gi *guardedInformer) (*Cache, error) { // We don't want to get stuck in WaitForCachesSync if the request from // the client has been canceled. - waitCh := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(ctx) go func() { select { case <-ctx.Done(): case <-gi.ctx.Done(): + cancel() } - close(waitCh) }() - if !cache.WaitForCacheSync(waitCh, gi.informer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), gi.informer.HasSynced) { if gi.ctx.Err() != nil { return nil, fmt.Errorf("cache context canceled while waiting for SQL cache sync for %v: %w", gvk, gi.ctx.Err()) } @@ -251,7 +259,7 @@ func (f *CacheFactory) DoneWithCache(cache *Cache) { return } - cache.gi.stopMutex.RUnlock() + cache.gi.wg.Done() } // Stop cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets @@ -263,35 +271,40 @@ func (f *CacheFactory) Stop(gvk schema.GroupVersionKind) error { } f.informersMutex.Lock() - defer f.informersMutex.Unlock() - gi, ok := f.informers[gvk] + f.informersMutex.Unlock() if !ok { return nil } - delete(f.informers, gvk) - // We must stop informers here to unblock those stuck in WaitForCacheSync - // which is blocking DoneWithCache call. + // Stop any ongoing WaitForCacheSync or Cache users, which will eventually call DoneWithCache as a result gi.cancel() - // Prevent other CacheFor calls for that GVK - gi.stopMutex.Lock() - defer gi.stopMutex.Unlock() + // Prevent any new operations in this guarded informer + gi.mutex.Lock() + defer gi.mutex.Unlock() - // Wait for all informers to have exited + // Wait for all remaining DoneWithCache gi.wg.Wait() - // Since we hold the lock on gi.stopMutex, we do not need to also hold - // onto gi.informersMutex - if gi.informer != nil { + // Wait for informer to finish + if gi.informerDone != nil { + <-gi.informerDone + } + + // Prepare for the next run + gi.ctx, gi.cancel = context.WithCancel(f.ctx) + + if inf := gi.informer; inf != nil { + // Discard gi.informer regardless of the result of dropping, as it's not retried anyway + gi.informer = nil + // DropAll needs its own context because the context from the informer // is canceled - err := gi.informer.DropAll(context.Background()) - if err != nil { + if err := inf.DropAll(f.ctx); err != nil { + log.Errorf("error running informer.DropAll for %v: %v", gvk, err) return fmt.Errorf("dropall %q: %w", gvk, err) } } - return nil }