Skip to content

Commit

Permalink
feat: Drop unnecessary listing for the sake of watch reinitialization
Browse files Browse the repository at this point in the history
Signed-off-by: Antoni Zawodny <[email protected]>
  • Loading branch information
tosi3k committed Jul 30, 2024
1 parent 6b2984e commit f7c5e6e
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type clusterCache struct {
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool

// maximum time we allow watches to run before relisting the group/kind and restarting the watch
// maximum time we allow watches to run before restarting them
watchResyncTimeout time.Duration
// sync retry timeout for cluster when sync error happens
clusterSyncRetryTimeout time.Duration
Expand Down Expand Up @@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
timeoutSeconds := int64(c.watchResyncTimeout.Seconds())
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.TimeoutSeconds = &timeoutSeconds
res, err := resClient.Watch(ctx, options)
if errors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
Expand All @@ -633,30 +635,21 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return err
}

defer func() {
w.Stop()
resourceVersion = ""
}()

var watchResyncTimeoutCh <-chan time.Time
if c.watchResyncTimeout > 0 {
shouldResync := time.NewTimer(c.watchResyncTimeout)
defer shouldResync.Stop()
watchResyncTimeoutCh = shouldResync.C
}
defer w.Stop()

for {
select {
// stop watching when parent context got cancelled
case <-ctx.Done():
return nil

// re-synchronize API state and restart watch periodically
case <-watchResyncTimeoutCh:
return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)

// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
case <-w.Done():
// The underlying retry watcher has stopped, possibly due to specifying an RV in
// the watch request that is stale (error code 410). This forces us to relist
// objects from the kube-apiserver to get a fresher RV and we invoke that relist
// by resetting the locally stored RV.
resourceVersion = ""
return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host)

case event, ok := <-w.ResultChan():
Expand All @@ -666,8 +659,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
// We failed to cast the object received in the watch event to something
// that contains a resource version field. Because of that, we don't know
// from what RV we should reinitialize the watch connection, so in order to
// avoid any inconsistencies due to accidental skipping of a potential RV,
// we reset the locally stored RV to forcefully invoke the list API call to
// get it from the kube-apiserver.
resourceVersion = ""
return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
}
resourceVersion = obj.GetResourceVersion()

c.processEvent(event.Type, obj)
if kube.IsCRD(obj) {
Expand Down

0 comments on commit f7c5e6e

Please sign in to comment.