Skip to content

Commit 7f1b996

Browse files
committed
Address review comments
- Refactor the the clusterConnectionService to use locks before reading the status - Rename the function that monitors the cluster connection status - Fix typo Signed-off-by: Chetan Banavikalmutt <[email protected]>
1 parent 2b291f7 commit 7f1b996

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed

pkg/cache/cluster.go

+28-16
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ type ClusterCache interface {
134134
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
135135
// OnEvent register event handler that is executed every time when new K8S event received
136136
OnEvent(handler OnEventHandler) Unsubscribe
137-
// UpdateClusterConnectionStatus checks the watch errors periodically and updates the cluster connection status.
138-
UpdateClusterConnectionStatus(ctx context.Context)
137+
// StartClusterConnectionStatusMonitoring starts a goroutine that checks the watch errors periodically and updates the cluster connection status.
138+
StartClusterConnectionStatusMonitoring(ctx context.Context)
139139
}
140140

141141
type WeightedSemaphore interface {
@@ -175,7 +175,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
175175
listRetryUseBackoff: false,
176176
listRetryFunc: ListRetryFuncNever,
177177
connectionStatus: ConnectionStatusUnknown,
178-
watchFails: newWatchFaiures(),
178+
watchFails: newWatchFailures(),
179179
}
180180
for i := range opts {
181181
opts[i](cache)
@@ -1233,10 +1233,10 @@ func skipAppRequeuing(key kube.ResourceKey) bool {
12331233
return ignoredRefreshResources[key.Group+"/"+key.Kind]
12341234
}
12351235

1236-
// UpdateClusterConnectionStatus starts a goroutine that checks for watch failures.
1236+
// StartClusterConnectionStatusMonitoring starts a goroutine that checks for watch failures.
12371237
// If there are any watch errors, it will periodically ping the remote cluster
12381238
// and update the cluster connection status.
1239-
func (c *clusterCache) UpdateClusterConnectionStatus(ctx context.Context) {
1239+
func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) {
12401240
go c.clusterConnectionService(ctx)
12411241
}
12421242

@@ -1251,15 +1251,23 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) {
12511251
watchErrors := c.watchFails.len()
12521252
// Ping the cluster for connection verification if there are watch failures or
12531253
// if the cluster has recovered back from watch failures.
1254-
if watchErrors > 0 || (watchErrors == 0 && c.connectionStatus == ConnectionStatusFailed) {
1254+
watchesRecovered := false
1255+
if watchErrors == 0 {
1256+
// If there are no watch failures check if the status needs to be updated.
1257+
c.lock.RLock()
1258+
if c.connectionStatus == ConnectionStatusFailed {
1259+
watchesRecovered = true
1260+
}
1261+
c.lock.RUnlock()
1262+
}
1263+
1264+
if watchErrors > 0 || watchesRecovered {
12551265
c.log.V(1).Info("verifying cluster connection", "watches", watchErrors)
12561266

12571267
_, err := c.kubectl.GetServerVersion(c.config)
12581268
if err != nil {
1259-
if c.connectionStatus != ConnectionStatusFailed {
1260-
c.updateConnectionStatus(ConnectionStatusFailed)
1261-
}
1262-
} else if c.connectionStatus != ConnectionStatusSuccessful {
1269+
c.updateConnectionStatus(ConnectionStatusFailed)
1270+
} else {
12631271
c.updateConnectionStatus(ConnectionStatusSuccessful)
12641272
}
12651273
}
@@ -1272,14 +1280,18 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) {
12721280
}
12731281

12741282
func (c *clusterCache) updateConnectionStatus(status ConnectionStatus) {
1275-
if c.connectionStatus == status {
1276-
return
1277-
}
1278-
1283+
invalidateCache := false
12791284
c.lock.Lock()
1280-
c.connectionStatus = status
1285+
if c.connectionStatus != status {
1286+
c.connectionStatus = status
1287+
invalidateCache = true
1288+
}
12811289
c.lock.Unlock()
12821290

1291+
if !invalidateCache {
1292+
return
1293+
}
1294+
12831295
c.log.V(1).Info("updated cluster connection status", "server", c.config.Host, "status", status)
12841296

12851297
c.Invalidate()
@@ -1295,7 +1307,7 @@ type watchFailures struct {
12951307
mu sync.RWMutex
12961308
}
12971309

1298-
func newWatchFaiures() *watchFailures {
1310+
func newWatchFailures() *watchFailures {
12991311
return &watchFailures{
13001312
watches: make(map[string]bool),
13011313
}

pkg/cache/mocks/ClusterCache.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)