Skip to content

Commit 76535b7

Browse files
committed
Address review comments
- Refactor the the clusterConnectionService to use locks before reading the status - Rename the function that monitors the cluster connection status - Fix typo Signed-off-by: Chetan Banavikalmutt <[email protected]>
1 parent 532e636 commit 76535b7

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed

pkg/cache/cluster.go

+28-16
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ type ClusterCache interface {
134134
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
135135
// OnEvent register event handler that is executed every time when new K8S event received
136136
OnEvent(handler OnEventHandler) Unsubscribe
137-
// UpdateClusterConnectionStatus checks the watch errors periodically and updates the cluster connection status.
138-
UpdateClusterConnectionStatus(ctx context.Context)
137+
// StartClusterConnectionStatusMonitoring starts a goroutine that checks the watch errors periodically and updates the cluster connection status.
138+
StartClusterConnectionStatusMonitoring(ctx context.Context)
139139
}
140140

141141
type WeightedSemaphore interface {
@@ -175,7 +175,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
175175
listRetryUseBackoff: false,
176176
listRetryFunc: ListRetryFuncNever,
177177
connectionStatus: ConnectionStatusUnknown,
178-
watchFails: newWatchFaiures(),
178+
watchFails: newWatchFailures(),
179179
}
180180
for i := range opts {
181181
opts[i](cache)
@@ -1231,10 +1231,10 @@ func skipAppRequeuing(key kube.ResourceKey) bool {
12311231
return ignoredRefreshResources[key.Group+"/"+key.Kind]
12321232
}
12331233

1234-
// UpdateClusterConnectionStatus starts a goroutine that checks for watch failures.
1234+
// StartClusterConnectionStatusMonitoring starts a goroutine that checks for watch failures.
12351235
// If there are any watch errors, it will periodically ping the remote cluster
12361236
// and update the cluster connection status.
1237-
func (c *clusterCache) UpdateClusterConnectionStatus(ctx context.Context) {
1237+
func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) {
12381238
go c.clusterConnectionService(ctx)
12391239
}
12401240

@@ -1249,15 +1249,23 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) {
12491249
watchErrors := c.watchFails.len()
12501250
// Ping the cluster for connection verification if there are watch failures or
12511251
// if the cluster has recovered back from watch failures.
1252-
if watchErrors > 0 || (watchErrors == 0 && c.connectionStatus == ConnectionStatusFailed) {
1252+
watchesRecovered := false
1253+
if watchErrors == 0 {
1254+
// If there are no watch failures check if the status needs to be updated.
1255+
c.lock.RLock()
1256+
if c.connectionStatus == ConnectionStatusFailed {
1257+
watchesRecovered = true
1258+
}
1259+
c.lock.RUnlock()
1260+
}
1261+
1262+
if watchErrors > 0 || watchesRecovered {
12531263
c.log.V(1).Info("verifying cluster connection", "watches", watchErrors)
12541264

12551265
_, err := c.kubectl.GetServerVersion(c.config)
12561266
if err != nil {
1257-
if c.connectionStatus != ConnectionStatusFailed {
1258-
c.updateConnectionStatus(ConnectionStatusFailed)
1259-
}
1260-
} else if c.connectionStatus != ConnectionStatusSuccessful {
1267+
c.updateConnectionStatus(ConnectionStatusFailed)
1268+
} else {
12611269
c.updateConnectionStatus(ConnectionStatusSuccessful)
12621270
}
12631271
}
@@ -1270,14 +1278,18 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) {
12701278
}
12711279

12721280
func (c *clusterCache) updateConnectionStatus(status ConnectionStatus) {
1273-
if c.connectionStatus == status {
1274-
return
1275-
}
1276-
1281+
invalidateCache := false
12771282
c.lock.Lock()
1278-
c.connectionStatus = status
1283+
if c.connectionStatus != status {
1284+
c.connectionStatus = status
1285+
invalidateCache = true
1286+
}
12791287
c.lock.Unlock()
12801288

1289+
if !invalidateCache {
1290+
return
1291+
}
1292+
12811293
c.log.V(1).Info("updated cluster connection status", "server", c.config.Host, "status", status)
12821294

12831295
c.Invalidate()
@@ -1293,7 +1305,7 @@ type watchFailures struct {
12931305
mu sync.RWMutex
12941306
}
12951307

1296-
func newWatchFaiures() *watchFailures {
1308+
func newWatchFailures() *watchFailures {
12971309
return &watchFailures{
12981310
watches: make(map[string]bool),
12991311
}

pkg/cache/mocks/ClusterCache.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)