Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/sqlcache/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ intended to be used as a way of enforcing RBAC.
fields := [][]string{{"metadata", "name"}, {"metadata", "namespace"}}
opts := &informer.ListOptions{}
// gvk should be of type k8s.io/apimachinery/pkg/runtime/schema.GroupVersionKind
c, err := cacheFactory.CacheFor(fields, client, gvk)
c, err := cacheFactory.CacheFor(context.Context,
getFieldsForGVKFunc, // See pkg/stores/sqlproxy/proxy_store:fieldsForGVK for an example
tableClient,
gvk,
schema,
controllerschema.IsListWatchable(schema))
if err != nil {
panic(err)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/sqlcache/informer/factory/informer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type guardedInformer struct {
wg wait.Group
}

// This function is used by factory.CacheFor() to pull in table info when the function needs to create new DB tables.
type GetFieldsFuncType func() (fields [][]string, typeGuidance map[string]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, isNamespaced bool, transform cache.TransformFunc)

type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, typeGuidance map[string]string, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error)

type Cache struct {
Expand Down Expand Up @@ -144,7 +147,8 @@ func NewCacheFactoryWithContext(ctx context.Context, opts CacheFactoryOptions) (
// a context for a single cache to be able to stop that cache (eg: on schema refresh) without impacting the other caches.
//
// Don't forget to call DoneWithCache with the given informer once done with it.
func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) (*Cache, error) {

func (f *CacheFactory) CacheFor(ctx context.Context, getFieldsFunc GetFieldsFuncType, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, watchable bool) (*Cache, error) {
// Second, check if the informer and its accompanying informer-specific mutex exist already in the informers cache
// If not, start by creating such informer-specific mutex. That is used later to ensure no two goroutines create
// informers for the same GVK at the same type
Expand All @@ -168,15 +172,15 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
// Prevent Stop() to be called for that GVK
gi.stopMutex.RLock()

gvkCache, err := f.cacheForLocked(ctx, gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, typeGuidance, namespaced, watchable)
gvkCache, err := f.cacheForLocked(ctx, gi, getFieldsFunc, client, gvk, watchable)
if err != nil {
gi.stopMutex.RUnlock()
return nil, err
}
return gvkCache, nil
}

func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) (*Cache, error) {
func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, getFieldsFunc GetFieldsFuncType, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, watchable bool) (*Cache, error) {
// At this point an informer-specific mutex (gi.mutex) is guaranteed to exist. Lock it
gi.informerMutex.Lock()

Expand All @@ -191,9 +195,10 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer,

_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
shouldEncrypt := f.encryptAll || encryptResourceAlways
fields, typeGuidance, externalUpdateInfo, selfUpdateInfo, isNamespaced, transformFunc := getFieldsFunc()
// In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer()
// search for "func NewInformer(ctx"
i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, typeGuidance, namespaced, watchable, f.gcInterval, f.gcKeepCount)
i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transformFunc, gvk, f.dbClient, shouldEncrypt, typeGuidance, isNamespaced, watchable, f.gcInterval, f.gcKeepCount)
if err != nil {
gi.informerMutex.Unlock()
return nil, err
Expand Down
Loading
Loading