From c2f4b02aedf66e29eadf89e5469c80764ffe5dd0 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Tue, 5 Dec 2023 09:29:37 +0530 Subject: [PATCH 1/7] bug: Argo CD should update the correct cluster connectivity status Signed-off-by: Chetan Banavikalmutt --- pkg/cache/cluster.go | 41 ++++++++++++++++++++++++++++++++ pkg/cache/cluster_test.go | 49 +++++++++++++++++++++------------------ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 80e3f261f..d6b0a2c6d 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -89,6 +89,8 @@ type ClusterInfo struct { SyncError error // APIResources holds list of API resources supported by the cluster APIResources []kube.APIResourceInfo + // ConnectionStatus indicates the status of the connection with the cluster. + ConnectionStatus ConnectionStatus } // OnEventHandler is a function that handles Kubernetes event @@ -170,6 +172,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa listRetryLimit: 1, listRetryUseBackoff: false, listRetryFunc: ListRetryFuncNever, + connectionStatus: ConnectionStatusUnknown, } for i := range opts { opts[i](cache) @@ -177,9 +180,21 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa return cache } +// ConnectionStatus indicates the status of the connection with the cluster. +type ConnectionStatus string + +const ( + ConnectionStatusSuccessful ConnectionStatus = "Successful" + ConnectionStatusFailed ConnectionStatus = "Failed" + ConnectionStatusUnknown ConnectionStatus = "Unknown" +) + type clusterCache struct { syncStatus clusterCacheSync + // connectionStatus indicates the status of the connection with the cluster. + connectionStatus ConnectionStatus + apisMeta map[schema.GroupKind]*apiMeta serverVersion string apiResources []kube.APIResourceInfo @@ -616,6 +631,25 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo if errors.IsNotFound(err) { c.stopWatching(api.GroupKind, ns) } + var connectionUpdated bool + if err != nil { + if c.connectionStatus != ConnectionStatusFailed { + c.log.Info("unable to access cluster", "cluster", c.config.Host, "reason", err.Error()) + c.connectionStatus = ConnectionStatusFailed + connectionUpdated = true + } + } else if c.connectionStatus != ConnectionStatusSuccessful { + c.connectionStatus = ConnectionStatusSuccessful + connectionUpdated = true + } + + if connectionUpdated { + c.Invalidate() + if err := c.EnsureSynced(); err != nil { + return nil, err + } + } + return res, err }, }) @@ -810,8 +844,14 @@ func (c *clusterCache) sync() error { version, err := c.kubectl.GetServerVersion(config) if err != nil { + if c.connectionStatus != ConnectionStatusFailed { + c.log.Info("unable to access cluster", "cluster", c.config.Host, "reason", err.Error()) + c.connectionStatus = ConnectionStatusFailed + } return err } + + c.connectionStatus = ConnectionStatusSuccessful c.serverVersion = version apiResources, err := c.kubectl.GetAPIResources(config, false, NewNoopSettings()) if err != nil { @@ -1186,6 +1226,7 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { LastCacheSyncTime: c.syncStatus.syncTime, SyncError: c.syncStatus.syncError, APIResources: c.apiResources, + ConnectionStatus: c.connectionStatus, } } diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 68221ab89..a0f464e7e 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -3,12 +3,13 @@ package cache import ( "context" "fmt" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" "sort" "strings" "testing" "time" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -148,6 +149,7 @@ func TestEnsureSynced(t *testing.T) { } cluster := newCluster(t, obj1, obj2) + assert.Equal(t, cluster.connectionStatus, ConnectionStatusUnknown) err := cluster.EnsureSynced() require.NoError(t, err) @@ -160,6 +162,7 @@ func TestEnsureSynced(t *testing.T) { names = append(names, k.Name) } assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) + assert.Equal(t, cluster.connectionStatus, ConnectionStatusSuccessful) } func TestStatefulSetOwnershipInferred(t *testing.T) { @@ -492,23 +495,23 @@ metadata: func TestGetManagedLiveObjsFailedConversion(t *testing.T) { cronTabGroup := "stable.example.com" - testCases := []struct{ - name string - localConvertFails bool + testCases := []struct { + name string + localConvertFails bool expectConvertToVersionCalled bool - expectGetResourceCalled bool + expectGetResourceCalled bool }{ { - name: "local convert fails, so GetResource is called", - localConvertFails: true, + name: "local convert fails, so GetResource is called", + localConvertFails: true, expectConvertToVersionCalled: true, - expectGetResourceCalled: true, + expectGetResourceCalled: true, }, { - name: "local convert succeeds, so GetResource is not called", - localConvertFails: false, + name: "local convert succeeds, so GetResource is not called", + localConvertFails: false, expectConvertToVersionCalled: true, - expectGetResourceCalled: false, + expectGetResourceCalled: false, }, } @@ -557,7 +560,6 @@ metadata: return testCronTab(), nil }) - managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool { return true }) @@ -716,9 +718,10 @@ func TestGetClusterInfo(t *testing.T) { cluster.serverVersion = "v1.16" info := cluster.GetClusterInfo() assert.Equal(t, ClusterInfo{ - Server: cluster.config.Host, - APIResources: cluster.apiResources, - K8SVersion: cluster.serverVersion, + Server: cluster.config.Host, + APIResources: cluster.apiResources, + K8SVersion: cluster.serverVersion, + ConnectionStatus: ConnectionStatusUnknown, }, info) } @@ -816,25 +819,25 @@ func testPod() *corev1.Pod { func testCRD() *apiextensions.CustomResourceDefinition { return &apiextensions.CustomResourceDefinition{ - TypeMeta: metav1.TypeMeta{ + TypeMeta: metav1.TypeMeta{ APIVersion: "apiextensions.k8s.io/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: "crontabs.stable.example.com", }, - Spec: apiextensions.CustomResourceDefinitionSpec{ + Spec: apiextensions.CustomResourceDefinitionSpec{ Group: "stable.example.com", Versions: []apiextensions.CustomResourceDefinitionVersion{ { - Name: "v1", - Served: true, + Name: "v1", + Served: true, Storage: true, Schema: &apiextensions.CustomResourceValidation{ OpenAPIV3Schema: &apiextensions.JSONSchemaProps{ Type: "object", Properties: map[string]apiextensions.JSONSchemaProps{ "cronSpec": {Type: "string"}, - "image": {Type: "string"}, + "image": {Type: "string"}, "replicas": {Type: "integer"}, }, }, @@ -855,14 +858,14 @@ func testCRD() *apiextensions.CustomResourceDefinition { func testCronTab() *unstructured.Unstructured { return &unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": "stable.example.com/v1", - "kind": "CronTab", + "kind": "CronTab", "metadata": map[string]interface{}{ - "name": "test-crontab", + "name": "test-crontab", "namespace": "default", }, "spec": map[string]interface{}{ "cronSpec": "* * * * */5", - "image": "my-awesome-cron-image", + "image": "my-awesome-cron-image", }, }} } From 24d1408f51ad7faef161e090c5a053890aa64eca Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Wed, 3 Jan 2024 18:53:43 +0530 Subject: [PATCH 2/7] Use lock before updating connection status Signed-off-by: Chetan Banavikalmutt --- pkg/cache/cluster.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index d6b0a2c6d..41dcdd8ec 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -635,11 +635,15 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo if err != nil { if c.connectionStatus != ConnectionStatusFailed { c.log.Info("unable to access cluster", "cluster", c.config.Host, "reason", err.Error()) + c.lock.Lock() c.connectionStatus = ConnectionStatusFailed + c.lock.Unlock() connectionUpdated = true } } else if c.connectionStatus != ConnectionStatusSuccessful { + c.lock.Lock() c.connectionStatus = ConnectionStatusSuccessful + c.lock.Unlock() connectionUpdated = true } From 4a32ce8d4be73771efa133a9f2531acf8bf7d63a Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 25 Jan 2024 18:57:58 +0530 Subject: [PATCH 3/7] update the connection status periodically using watch errors Signed-off-by: Chetan Banavikalmutt --- pkg/cache/cluster.go | 116 ++++++++++++++++++++++++++------ pkg/cache/mocks/ClusterCache.go | 75 ++++++++++++++++++++- 2 files changed, 168 insertions(+), 23 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 41dcdd8ec..9854508f8 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -134,6 +134,8 @@ type ClusterCache interface { OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe // OnEvent register event handler that is executed every time when new K8S event received OnEvent(handler OnEventHandler) Unsubscribe + // UpdateClusterConnectionStatus checks the watch errors periodically and updates the cluster connection status. + UpdateClusterConnectionStatus() } type WeightedSemaphore interface { @@ -173,6 +175,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa listRetryUseBackoff: false, listRetryFunc: ListRetryFuncNever, connectionStatus: ConnectionStatusUnknown, + watchFails: newWatchFaiures(), } for i := range opts { opts[i](cache) @@ -195,6 +198,9 @@ type clusterCache struct { // connectionStatus indicates the status of the connection with the cluster. connectionStatus ConnectionStatus + // watchFails is used to keep track of the failures while watching resources. + watchFails *watchFailures + apisMeta map[schema.GroupKind]*apiMeta serverVersion string apiResources []kube.APIResourceInfo @@ -610,6 +616,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc } func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { + watchKey := api.GroupKind.String() kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) { defer func() { if r := recover(); r != nil { @@ -630,28 +637,14 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo res, err := resClient.Watch(ctx, options) if errors.IsNotFound(err) { c.stopWatching(api.GroupKind, ns) - } - var connectionUpdated bool - if err != nil { - if c.connectionStatus != ConnectionStatusFailed { - c.log.Info("unable to access cluster", "cluster", c.config.Host, "reason", err.Error()) - c.lock.Lock() - c.connectionStatus = ConnectionStatusFailed - c.lock.Unlock() - connectionUpdated = true - } - } else if c.connectionStatus != ConnectionStatusSuccessful { - c.lock.Lock() - c.connectionStatus = ConnectionStatusSuccessful - c.lock.Unlock() - connectionUpdated = true + c.watchFails.remove(watchKey) + return res, err } - if connectionUpdated { - c.Invalidate() - if err := c.EnsureSynced(); err != nil { - return nil, err - } + if err != nil { + c.watchFails.add(watchKey) + } else { + c.watchFails.remove(watchKey) } return res, err @@ -1239,3 +1232,86 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { func skipAppRequeuing(key kube.ResourceKey) bool { return ignoredRefreshResources[key.Group+"/"+key.Kind] } + +// UpdateClusterConnectionStatus starts a goroutine that checks for watch failures. +// If there are any watch errors, it will periodically ping the remote cluster +// and update the cluster connection status. +func (c *clusterCache) UpdateClusterConnectionStatus() { + go c.clusterConnectionService() +} + +func (c *clusterCache) clusterConnectionService() { + clusterConnectionTimeout := 10 * time.Second + ticker := time.NewTicker(clusterConnectionTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + watchErrors := c.watchFails.len() + // Ping the cluster for connection verification if there are watch failures or + // if the cluster has recovered back from watch failures. + if watchErrors > 0 || (watchErrors == 0 && c.connectionStatus == ConnectionStatusFailed) { + c.log.V(1).Info("verifying cluster connection", "watches", watchErrors) + + _, err := c.kubectl.GetServerVersion(c.config) + if err != nil { + if c.connectionStatus != ConnectionStatusFailed { + c.updateConnectionStatus(ConnectionStatusFailed) + } + } else if c.connectionStatus != ConnectionStatusSuccessful { + c.updateConnectionStatus(ConnectionStatusSuccessful) + } + } + } + } + +} + +func (c *clusterCache) updateConnectionStatus(status ConnectionStatus) { + if c.connectionStatus == status { + return + } + + c.lock.Lock() + c.connectionStatus = status + c.lock.Unlock() + + c.log.V(1).Info("updated cluster connection status", "server", c.config.Host, "status", status) + + c.Invalidate() + if err := c.EnsureSynced(); err != nil { + c.log.Error(err, "failed to sync cache state after updating cluster connection status", "server", c.config.Host) + } +} + +// watchFailures is used to keep track of the failures while watching resources. It is updated +// whenever an error occurs during watch or when the watch recovers back from a failure. +type watchFailures struct { + watches map[string]bool + mu sync.RWMutex +} + +func newWatchFaiures() *watchFailures { + return &watchFailures{ + watches: make(map[string]bool), + } +} + +func (w *watchFailures) add(key string) { + w.mu.Lock() + defer w.mu.Unlock() + w.watches[key] = true +} + +func (w *watchFailures) remove(key string) { + w.mu.Lock() + defer w.mu.Unlock() + delete(w.watches, key) +} + +func (w *watchFailures) len() int { + w.mu.RLock() + defer w.mu.RUnlock() + return len(w.watches) +} diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index fa00346cb..7bb7f4ff1 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package mocks @@ -26,6 +26,10 @@ type ClusterCache struct { func (_m *ClusterCache) EnsureSynced() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for EnsureSynced") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -47,6 +51,10 @@ func (_m *ClusterCache) FindResources(namespace string, predicates ...func(*cach _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for FindResources") + } + var r0 map[kube.ResourceKey]*cache.Resource if rf, ok := ret.Get(0).(func(string, ...func(*cache.Resource) bool) map[kube.ResourceKey]*cache.Resource); ok { r0 = rf(namespace, predicates...) @@ -63,6 +71,10 @@ func (_m *ClusterCache) FindResources(namespace string, predicates ...func(*cach func (_m *ClusterCache) GetAPIResources() []kube.APIResourceInfo { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetAPIResources") + } + var r0 []kube.APIResourceInfo if rf, ok := ret.Get(0).(func() []kube.APIResourceInfo); ok { r0 = rf() @@ -79,6 +91,10 @@ func (_m *ClusterCache) GetAPIResources() []kube.APIResourceInfo { func (_m *ClusterCache) GetClusterInfo() cache.ClusterInfo { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetClusterInfo") + } + var r0 cache.ClusterInfo if rf, ok := ret.Get(0).(func() cache.ClusterInfo); ok { r0 = rf() @@ -93,6 +109,10 @@ func (_m *ClusterCache) GetClusterInfo() cache.ClusterInfo { func (_m *ClusterCache) GetGVKParser() *managedfields.GvkParser { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetGVKParser") + } + var r0 *managedfields.GvkParser if rf, ok := ret.Get(0).(func() *managedfields.GvkParser); ok { r0 = rf() @@ -109,7 +129,15 @@ func (_m *ClusterCache) GetGVKParser() *managedfields.GvkParser { func (_m *ClusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(*cache.Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error) { ret := _m.Called(targetObjs, isManaged) + if len(ret) == 0 { + panic("no return value specified for GetManagedLiveObjs") + } + var r0 map[kube.ResourceKey]*unstructured.Unstructured + var r1 error + if rf, ok := ret.Get(0).(func([]*unstructured.Unstructured, func(*cache.Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error)); ok { + return rf(targetObjs, isManaged) + } if rf, ok := ret.Get(0).(func([]*unstructured.Unstructured, func(*cache.Resource) bool) map[kube.ResourceKey]*unstructured.Unstructured); ok { r0 = rf(targetObjs, isManaged) } else { @@ -118,7 +146,6 @@ func (_m *ClusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructur } } - var r1 error if rf, ok := ret.Get(1).(func([]*unstructured.Unstructured, func(*cache.Resource) bool) error); ok { r1 = rf(targetObjs, isManaged) } else { @@ -132,6 +159,10 @@ func (_m *ClusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructur func (_m *ClusterCache) GetOpenAPISchema() openapi.Resources { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetOpenAPISchema") + } + var r0 openapi.Resources if rf, ok := ret.Get(0).(func() openapi.Resources); ok { r0 = rf() @@ -148,6 +179,10 @@ func (_m *ClusterCache) GetOpenAPISchema() openapi.Resources { func (_m *ClusterCache) GetServerVersion() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetServerVersion") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -173,14 +208,21 @@ func (_m *ClusterCache) Invalidate(opts ...cache.UpdateSettingsFunc) { func (_m *ClusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { ret := _m.Called(gk) + if len(ret) == 0 { + panic("no return value specified for IsNamespaced") + } + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(schema.GroupKind) (bool, error)); ok { + return rf(gk) + } if rf, ok := ret.Get(0).(func(schema.GroupKind) bool); ok { r0 = rf(gk) } else { r0 = ret.Get(0).(bool) } - var r1 error if rf, ok := ret.Get(1).(func(schema.GroupKind) error); ok { r1 = rf(gk) } else { @@ -199,6 +241,10 @@ func (_m *ClusterCache) IterateHierarchy(key kube.ResourceKey, action func(*cach func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe { ret := _m.Called(handler) + if len(ret) == 0 { + panic("no return value specified for OnEvent") + } + var r0 cache.Unsubscribe if rf, ok := ret.Get(0).(func(cache.OnEventHandler) cache.Unsubscribe); ok { r0 = rf(handler) @@ -215,6 +261,10 @@ func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler) cache.Unsubscribe { ret := _m.Called(handler) + if len(ret) == 0 { + panic("no return value specified for OnResourceUpdated") + } + var r0 cache.Unsubscribe if rf, ok := ret.Get(0).(func(cache.OnResourceUpdatedHandler) cache.Unsubscribe); ok { r0 = rf(handler) @@ -226,3 +276,22 @@ func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler return r0 } + +// UpdateClusterConnectionStatus provides a mock function with given fields: +func (_m *ClusterCache) UpdateClusterConnectionStatus() { + _m.Called() +} + +// NewClusterCache creates a new instance of ClusterCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewClusterCache(t interface { + mock.TestingT + Cleanup(func()) +}) *ClusterCache { + mock := &ClusterCache{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} From 2b291f724e60fe3b3df364a7740bf14f7e057105 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 25 Jan 2024 19:27:41 +0530 Subject: [PATCH 4/7] use context while updating cluster status Signed-off-by: Chetan Banavikalmutt --- pkg/cache/cluster.go | 11 +++++++---- pkg/cache/mocks/ClusterCache.go | 9 ++++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 9854508f8..65c62f3fc 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -135,7 +135,7 @@ type ClusterCache interface { // OnEvent register event handler that is executed every time when new K8S event received OnEvent(handler OnEventHandler) Unsubscribe // UpdateClusterConnectionStatus checks the watch errors periodically and updates the cluster connection status. - UpdateClusterConnectionStatus() + UpdateClusterConnectionStatus(ctx context.Context) } type WeightedSemaphore interface { @@ -1236,11 +1236,11 @@ func skipAppRequeuing(key kube.ResourceKey) bool { // UpdateClusterConnectionStatus starts a goroutine that checks for watch failures. // If there are any watch errors, it will periodically ping the remote cluster // and update the cluster connection status. -func (c *clusterCache) UpdateClusterConnectionStatus() { - go c.clusterConnectionService() +func (c *clusterCache) UpdateClusterConnectionStatus(ctx context.Context) { + go c.clusterConnectionService(ctx) } -func (c *clusterCache) clusterConnectionService() { +func (c *clusterCache) clusterConnectionService(ctx context.Context) { clusterConnectionTimeout := 10 * time.Second ticker := time.NewTicker(clusterConnectionTimeout) defer ticker.Stop() @@ -1263,6 +1263,9 @@ func (c *clusterCache) clusterConnectionService() { c.updateConnectionStatus(ConnectionStatusSuccessful) } } + case <-ctx.Done(): + ticker.Stop() + return } } diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index 7bb7f4ff1..8154555b0 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -3,7 +3,10 @@ package mocks import ( + context "context" + cache "github.com/argoproj/gitops-engine/pkg/cache" + kube "github.com/argoproj/gitops-engine/pkg/utils/kube" managedfields "k8s.io/apimachinery/pkg/util/managedfields" @@ -277,9 +280,9 @@ func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler return r0 } -// UpdateClusterConnectionStatus provides a mock function with given fields: -func (_m *ClusterCache) UpdateClusterConnectionStatus() { - _m.Called() +// UpdateClusterConnectionStatus provides a mock function with given fields: ctx +func (_m *ClusterCache) UpdateClusterConnectionStatus(ctx context.Context) { + _m.Called(ctx) } // NewClusterCache creates a new instance of ClusterCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. From 7f1b99649c0cf7a42d5c83388b8358c3598c6b5b Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Fri, 9 Feb 2024 09:52:01 +0530 Subject: [PATCH 5/7] Address review comments - Refactor the the clusterConnectionService to use locks before reading the status - Rename the function that monitors the cluster connection status - Fix typo Signed-off-by: Chetan Banavikalmutt --- pkg/cache/cluster.go | 44 +++++++++++++++++++++------------ pkg/cache/mocks/ClusterCache.go | 4 +-- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 65c62f3fc..a9ef07d3f 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -134,8 +134,8 @@ type ClusterCache interface { OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe // OnEvent register event handler that is executed every time when new K8S event received OnEvent(handler OnEventHandler) Unsubscribe - // UpdateClusterConnectionStatus checks the watch errors periodically and updates the cluster connection status. - UpdateClusterConnectionStatus(ctx context.Context) + // StartClusterConnectionStatusMonitoring starts a goroutine that checks the watch errors periodically and updates the cluster connection status. + StartClusterConnectionStatusMonitoring(ctx context.Context) } type WeightedSemaphore interface { @@ -175,7 +175,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa listRetryUseBackoff: false, listRetryFunc: ListRetryFuncNever, connectionStatus: ConnectionStatusUnknown, - watchFails: newWatchFaiures(), + watchFails: newWatchFailures(), } for i := range opts { opts[i](cache) @@ -1233,10 +1233,10 @@ func skipAppRequeuing(key kube.ResourceKey) bool { return ignoredRefreshResources[key.Group+"/"+key.Kind] } -// UpdateClusterConnectionStatus starts a goroutine that checks for watch failures. +// StartClusterConnectionStatusMonitoring starts a goroutine that checks for watch failures. // If there are any watch errors, it will periodically ping the remote cluster // and update the cluster connection status. -func (c *clusterCache) UpdateClusterConnectionStatus(ctx context.Context) { +func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) { go c.clusterConnectionService(ctx) } @@ -1251,15 +1251,23 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) { watchErrors := c.watchFails.len() // Ping the cluster for connection verification if there are watch failures or // if the cluster has recovered back from watch failures. - if watchErrors > 0 || (watchErrors == 0 && c.connectionStatus == ConnectionStatusFailed) { + watchesRecovered := false + if watchErrors == 0 { + // If there are no watch failures check if the status needs to be updated. + c.lock.RLock() + if c.connectionStatus == ConnectionStatusFailed { + watchesRecovered = true + } + c.lock.RUnlock() + } + + if watchErrors > 0 || watchesRecovered { c.log.V(1).Info("verifying cluster connection", "watches", watchErrors) _, err := c.kubectl.GetServerVersion(c.config) if err != nil { - if c.connectionStatus != ConnectionStatusFailed { - c.updateConnectionStatus(ConnectionStatusFailed) - } - } else if c.connectionStatus != ConnectionStatusSuccessful { + c.updateConnectionStatus(ConnectionStatusFailed) + } else { c.updateConnectionStatus(ConnectionStatusSuccessful) } } @@ -1272,14 +1280,18 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) { } func (c *clusterCache) updateConnectionStatus(status ConnectionStatus) { - if c.connectionStatus == status { - return - } - + invalidateCache := false c.lock.Lock() - c.connectionStatus = status + if c.connectionStatus != status { + c.connectionStatus = status + invalidateCache = true + } c.lock.Unlock() + if !invalidateCache { + return + } + c.log.V(1).Info("updated cluster connection status", "server", c.config.Host, "status", status) c.Invalidate() @@ -1295,7 +1307,7 @@ type watchFailures struct { mu sync.RWMutex } -func newWatchFaiures() *watchFailures { +func newWatchFailures() *watchFailures { return &watchFailures{ watches: make(map[string]bool), } diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index 8154555b0..27a95df63 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -280,8 +280,8 @@ func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler return r0 } -// UpdateClusterConnectionStatus provides a mock function with given fields: ctx -func (_m *ClusterCache) UpdateClusterConnectionStatus(ctx context.Context) { +// StartClusterConnectionStatusMonitoring provides a mock function with given fields: ctx +func (_m *ClusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) { _m.Called(ctx) } From c1292322c9b943223ed498dc7e33826980433b9c Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Mon, 19 Feb 2024 16:43:49 +0530 Subject: [PATCH 6/7] Update the cluster connection interval as configurable Signed-off-by: Chetan Banavikalmutt --- pkg/cache/cluster.go | 30 ++++++++++++++++++------------ pkg/cache/settings.go | 7 +++++++ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index a9ef07d3f..ea536dc8a 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -57,6 +57,9 @@ const ( // Limit is required to avoid memory spikes during cache initialization. // The default limit of 50 is chosen based on experiments. defaultListSemaphoreWeight = 50 + + // The default interval for monitoring the cluster connection status. + defaultClusterConnectionInterval = 10 * time.Second ) const ( @@ -166,16 +169,17 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa resyncTimeout: defaultClusterResyncTimeout, syncTime: nil, }, - watchResyncTimeout: defaultWatchResyncTimeout, - clusterSyncRetryTimeout: ClusterRetryTimeout, - resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, - eventHandlers: map[uint64]OnEventHandler{}, - log: log, - listRetryLimit: 1, - listRetryUseBackoff: false, - listRetryFunc: ListRetryFuncNever, - connectionStatus: ConnectionStatusUnknown, - watchFails: newWatchFailures(), + watchResyncTimeout: defaultWatchResyncTimeout, + clusterSyncRetryTimeout: ClusterRetryTimeout, + resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, + eventHandlers: map[uint64]OnEventHandler{}, + log: log, + listRetryLimit: 1, + listRetryUseBackoff: false, + listRetryFunc: ListRetryFuncNever, + connectionStatus: ConnectionStatusUnknown, + watchFails: newWatchFailures(), + clusterConnectionInterval: defaultClusterConnectionInterval, } for i := range opts { opts[i](cache) @@ -198,6 +202,9 @@ type clusterCache struct { // connectionStatus indicates the status of the connection with the cluster. connectionStatus ConnectionStatus + // clusterConnectionInterval is the interval used to monitor the cluster connection status. + clusterConnectionInterval time.Duration + // watchFails is used to keep track of the failures while watching resources. watchFails *watchFailures @@ -1241,8 +1248,7 @@ func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Contex } func (c *clusterCache) clusterConnectionService(ctx context.Context) { - clusterConnectionTimeout := 10 * time.Second - ticker := time.NewTicker(clusterConnectionTimeout) + ticker := time.NewTicker(c.clusterConnectionInterval) defer ticker.Stop() for { diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index a7194d0ca..f3546a535 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -170,3 +170,10 @@ func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc { } } } + +// SetClusterConnectionInterval sets the interval for monitoring the cluster connection status. +func SetClusterConnectionInterval(interval time.Duration) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.clusterConnectionInterval = interval + } +} From b3e1c67fec674c1a93e4e913049135fead679061 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 4 Apr 2024 16:04:11 +0530 Subject: [PATCH 7/7] Retry reaching the server if there are transient errors Signed-off-by: Chetan Banavikalmutt --- pkg/cache/cluster.go | 34 ++++++++++++++++++++++++---------- pkg/cache/settings.go | 9 ++++++++- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index ea536dc8a..46047ffc2 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -147,7 +147,7 @@ type WeightedSemaphore interface { Release(n int64) } -type ListRetryFunc func(err error) bool +type RetryFunc func(err error) bool // NewClusterCache creates new instance of cluster cache func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache { @@ -176,9 +176,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa log: log, listRetryLimit: 1, listRetryUseBackoff: false, - listRetryFunc: ListRetryFuncNever, + listRetryFunc: RetryFuncNever, connectionStatus: ConnectionStatusUnknown, watchFails: newWatchFailures(), + clusterStatusRetryFunc: RetryFuncNever, clusterConnectionInterval: defaultClusterConnectionInterval, } for i := range opts { @@ -208,6 +209,8 @@ type clusterCache struct { // watchFails is used to keep track of the failures while watching resources. watchFails *watchFailures + clusterStatusRetryFunc RetryFunc + apisMeta map[schema.GroupKind]*apiMeta serverVersion string apiResources []kube.APIResourceInfo @@ -228,7 +231,7 @@ type clusterCache struct { // retry options for list operations listRetryLimit int32 listRetryUseBackoff bool - listRetryFunc ListRetryFunc + listRetryFunc RetryFunc // lock is a rw lock which protects the fields of clusterInfo lock sync.RWMutex @@ -264,13 +267,13 @@ type clusterCacheSync struct { resyncTimeout time.Duration } -// ListRetryFuncNever never retries on errors -func ListRetryFuncNever(err error) bool { +// RetryFuncNever never retries on errors +func RetryFuncNever(err error) bool { return false } -// ListRetryFuncAlways always retries on errors -func ListRetryFuncAlways(err error) bool { +// RetryFuncAlways always retries on errors +func RetryFuncAlways(err error) bool { return true } @@ -1248,6 +1251,10 @@ func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Contex } func (c *clusterCache) clusterConnectionService(ctx context.Context) { + if c.clusterConnectionInterval <= 0 { + return + } + ticker := time.NewTicker(c.clusterConnectionInterval) defer ticker.Stop() @@ -1268,9 +1275,15 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) { } if watchErrors > 0 || watchesRecovered { - c.log.V(1).Info("verifying cluster connection", "watches", watchErrors) - - _, err := c.kubectl.GetServerVersion(c.config) + c.log.V(1).Info("verifying cluster connection", "server", c.config.Host) + // Retry fetching the server version to avoid invalidating the cache due to transient errors. + err := retry.OnError(retry.DefaultBackoff, c.clusterStatusRetryFunc, func() error { + _, err := c.kubectl.GetServerVersion(c.config) + if err != nil && c.clusterStatusRetryFunc(err) { + c.log.V(1).Info("Error while fetching server version", "error", err.Error()) + } + return err + }) if err != nil { c.updateConnectionStatus(ConnectionStatusFailed) } else { @@ -1278,6 +1291,7 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) { } } case <-ctx.Done(): + c.log.V(1).Info("Stopping cluster connection status monitoring", "server", c.config.Host) ticker.Stop() return } diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index f3546a535..7ff0b9c2b 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -147,7 +147,7 @@ func SetTracer(tracer tracing.Tracer) UpdateSettingsFunc { } // SetRetryOptions sets cluster list retry options -func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc) UpdateSettingsFunc { +func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc RetryFunc) UpdateSettingsFunc { return func(cache *clusterCache) { // Max retries must be at least one if maxRetries < 1 { @@ -177,3 +177,10 @@ func SetClusterConnectionInterval(interval time.Duration) UpdateSettingsFunc { cache.clusterConnectionInterval = interval } } + +// SetClusterStatusRetryFunc sets the retry function for monitoring the cluster connection status. +func SetClusterStatusRetryFunc(retryFunc RetryFunc) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.clusterStatusRetryFunc = retryFunc + } +}