@@ -135,7 +135,7 @@ type ClusterCache interface {
135
135
// OnEvent register event handler that is executed every time when new K8S event received
136
136
OnEvent (handler OnEventHandler ) Unsubscribe
137
137
// UpdateClusterConnectionStatus checks the watch errors periodically and updates the cluster connection status.
138
- UpdateClusterConnectionStatus ()
138
+ UpdateClusterConnectionStatus (ctx context. Context )
139
139
}
140
140
141
141
type WeightedSemaphore interface {
@@ -1234,11 +1234,11 @@ func skipAppRequeuing(key kube.ResourceKey) bool {
1234
1234
// UpdateClusterConnectionStatus starts a goroutine that checks for watch failures.
1235
1235
// If there are any watch errors, it will periodically ping the remote cluster
1236
1236
// and update the cluster connection status.
1237
- func (c * clusterCache ) UpdateClusterConnectionStatus () {
1238
- go c .clusterConnectionService ()
1237
+ func (c * clusterCache ) UpdateClusterConnectionStatus (ctx context. Context ) {
1238
+ go c .clusterConnectionService (ctx )
1239
1239
}
1240
1240
1241
- func (c * clusterCache ) clusterConnectionService () {
1241
+ func (c * clusterCache ) clusterConnectionService (ctx context. Context ) {
1242
1242
clusterConnectionTimeout := 10 * time .Second
1243
1243
ticker := time .NewTicker (clusterConnectionTimeout )
1244
1244
defer ticker .Stop ()
@@ -1261,6 +1261,9 @@ func (c *clusterCache) clusterConnectionService() {
1261
1261
c .updateConnectionStatus (ConnectionStatusSuccessful )
1262
1262
}
1263
1263
}
1264
+ case <- ctx .Done ():
1265
+ ticker .Stop ()
1266
+ return
1264
1267
}
1265
1268
}
1266
1269
0 commit comments