Skip to content

Commit cdf4bda

Browse files
committedApr 4, 2024
Address review comments
- Refactor the the clusterConnectionService to use locks before reading the status - Rename the function that monitors the cluster connection status - Fix typo Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
1 parent ed5229e commit cdf4bda

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed
 

‎pkg/cache/cluster.go

+28-16
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ type ClusterCache interface {
134134
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
135135
// OnEvent register event handler that is executed every time when new K8S event received
136136
OnEvent(handler OnEventHandler) Unsubscribe
137-
// UpdateClusterConnectionStatus checks the watch errors periodically and updates the cluster connection status.
138-
UpdateClusterConnectionStatus(ctx context.Context)
137+
// StartClusterConnectionStatusMonitoring starts a goroutine that checks the watch errors periodically and updates the cluster connection status.
138+
StartClusterConnectionStatusMonitoring(ctx context.Context)
139139
}
140140

141141
type WeightedSemaphore interface {
@@ -175,7 +175,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
175175
listRetryUseBackoff: false,
176176
listRetryFunc: ListRetryFuncNever,
177177
connectionStatus: ConnectionStatusUnknown,
178-
watchFails: newWatchFaiures(),
178+
watchFails: newWatchFailures(),
179179
}
180180
for i := range opts {
181181
opts[i](cache)
@@ -1232,10 +1232,10 @@ func skipAppRequeuing(key kube.ResourceKey) bool {
12321232
return ignoredRefreshResources[key.Group+"/"+key.Kind]
12331233
}
12341234

1235-
// UpdateClusterConnectionStatus starts a goroutine that checks for watch failures.
1235+
// StartClusterConnectionStatusMonitoring starts a goroutine that checks for watch failures.
12361236
// If there are any watch errors, it will periodically ping the remote cluster
12371237
// and update the cluster connection status.
1238-
func (c *clusterCache) UpdateClusterConnectionStatus(ctx context.Context) {
1238+
func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) {
12391239
go c.clusterConnectionService(ctx)
12401240
}
12411241

@@ -1250,15 +1250,23 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) {
12501250
watchErrors := c.watchFails.len()
12511251
// Ping the cluster for connection verification if there are watch failures or
12521252
// if the cluster has recovered back from watch failures.
1253-
if watchErrors > 0 || (watchErrors == 0 && c.connectionStatus == ConnectionStatusFailed) {
1253+
watchesRecovered := false
1254+
if watchErrors == 0 {
1255+
// If there are no watch failures check if the status needs to be updated.
1256+
c.lock.RLock()
1257+
if c.connectionStatus == ConnectionStatusFailed {
1258+
watchesRecovered = true
1259+
}
1260+
c.lock.RUnlock()
1261+
}
1262+
1263+
if watchErrors > 0 || watchesRecovered {
12541264
c.log.V(1).Info("verifying cluster connection", "watches", watchErrors)
12551265

12561266
_, err := c.kubectl.GetServerVersion(c.config)
12571267
if err != nil {
1258-
if c.connectionStatus != ConnectionStatusFailed {
1259-
c.updateConnectionStatus(ConnectionStatusFailed)
1260-
}
1261-
} else if c.connectionStatus != ConnectionStatusSuccessful {
1268+
c.updateConnectionStatus(ConnectionStatusFailed)
1269+
} else {
12621270
c.updateConnectionStatus(ConnectionStatusSuccessful)
12631271
}
12641272
}
@@ -1271,14 +1279,18 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) {
12711279
}
12721280

12731281
func (c *clusterCache) updateConnectionStatus(status ConnectionStatus) {
1274-
if c.connectionStatus == status {
1275-
return
1276-
}
1277-
1282+
invalidateCache := false
12781283
c.lock.Lock()
1279-
c.connectionStatus = status
1284+
if c.connectionStatus != status {
1285+
c.connectionStatus = status
1286+
invalidateCache = true
1287+
}
12801288
c.lock.Unlock()
12811289

1290+
if !invalidateCache {
1291+
return
1292+
}
1293+
12821294
c.log.V(1).Info("updated cluster connection status", "server", c.config.Host, "status", status)
12831295

12841296
c.Invalidate()
@@ -1294,7 +1306,7 @@ type watchFailures struct {
12941306
mu sync.RWMutex
12951307
}
12961308

1297-
func newWatchFaiures() *watchFailures {
1309+
func newWatchFailures() *watchFailures {
12981310
return &watchFailures{
12991311
watches: make(map[string]bool),
13001312
}

‎pkg/cache/mocks/ClusterCache.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)