Skip to content

Commit 040dd12

Browse files
committed
Add a retry to handle transient network errors
Signed-off-by: Chetan Banavikalmutt <[email protected]>
1 parent e4a82cf commit 040dd12

File tree

1 file changed

+37
-13
lines changed

1 file changed

+37
-13
lines changed

controller/cache/cache.go

+37-13
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,17 @@ func NewLiveStateCache(
176176
resourceTracking argo.ResourceTracking) LiveStateCache {
177177

178178
return &liveStateCache{
179-
appInformer: appInformer,
180-
db: db,
181-
clusters: make(map[string]clustercache.ClusterCache),
182-
onObjectUpdated: onObjectUpdated,
183-
kubectl: kubectl,
184-
settingsMgr: settingsMgr,
185-
metricsServer: metricsServer,
186-
clusterSharding: clusterSharding,
187-
resourceTracking: resourceTracking,
179+
appInformer: appInformer,
180+
db: db,
181+
clusters: make(map[string]clustercache.ClusterCache),
182+
clusterStatusCancel: make(map[string]context.CancelFunc),
183+
onObjectUpdated: onObjectUpdated,
184+
kubectl: kubectl,
185+
settingsMgr: settingsMgr,
186+
metricsServer: metricsServer,
187+
clusterSharding: clusterSharding,
188+
clusterFilter: clusterFilter,
189+
resourceTracking: resourceTracking,
188190
}
189191
}
190192

@@ -210,9 +212,10 @@ type liveStateCache struct {
210212
resourceTracking argo.ResourceTracking
211213
ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts
212214

213-
clusters map[string]clustercache.ClusterCache
214-
cacheSettings cacheSettings
215-
lock sync.RWMutex
215+
clusterStatusCancel map[string]context.CancelFunc
216+
clusters map[string]clustercache.ClusterCache
217+
cacheSettings cacheSettings
218+
lock sync.RWMutex
216219
}
217220

218221
func (c *liveStateCache) loadCacheSettings() (*cacheSettings, error) {
@@ -520,11 +523,20 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
520523
clustercache.SetLogr(logutils.NewLogrusLogger(log.WithField("server", cluster.Server))),
521524
clustercache.SetRetryOptions(clusterCacheAttemptLimit, clusterCacheRetryUseBackoff, isRetryableError),
522525
clustercache.SetRespectRBAC(respectRBAC),
526+
clustercache.SetClusterStatusRetryFunc(isTransientNetworkErr),
527+
clustercache.SetClusterConnectionInterval(10 * time.Second),
523528
}
524529

525530
clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...)
526531

527-
clusterCache.StartClusterConnectionStatusMonitoring(context.Background())
532+
// Make sure to check if the monitoring interval is disabled
533+
534+
ctx, cancel := context.WithCancel(context.Background())
535+
if c.clusterStatusCancel == nil {
536+
c.clusterStatusCancel = make(map[string]context.CancelFunc)
537+
}
538+
c.clusterStatusCancel[server] = cancel
539+
clusterCache.StartClusterConnectionStatusMonitoring(ctx)
528540

529541
_ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) {
530542
toNotify := make(map[string]bool)
@@ -779,6 +791,12 @@ func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *a
779791
if !c.canHandleCluster(newCluster) {
780792
cluster.Invalidate()
781793
c.lock.Lock()
794+
cancel, ok := c.clusterStatusCancel[newCluster.Server]
795+
if ok {
796+
// stop the cluster status monitoring goroutine
797+
cancel()
798+
delete(c.clusterStatusCancel, newCluster.Server)
799+
}
782800
delete(c.clusters, newCluster.Server)
783801
c.lock.Unlock()
784802
return
@@ -820,6 +838,12 @@ func (c *liveStateCache) handleDeleteEvent(clusterServer string) {
820838
if ok {
821839
cluster.Invalidate()
822840
c.lock.Lock()
841+
cancel, ok := c.clusterStatusCancel[clusterServer]
842+
if ok {
843+
// stop the cluster status monitoring goroutine
844+
cancel()
845+
delete(c.clusterStatusCancel, clusterServer)
846+
}
823847
delete(c.clusters, clusterServer)
824848
c.lock.Unlock()
825849
}

0 commit comments

Comments
 (0)