diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 80e3f261f..46047ffc2 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -57,6 +57,9 @@ const ( // Limit is required to avoid memory spikes during cache initialization. // The default limit of 50 is chosen based on experiments. defaultListSemaphoreWeight = 50 + + // The default interval for monitoring the cluster connection status. + defaultClusterConnectionInterval = 10 * time.Second ) const ( @@ -89,6 +92,8 @@ type ClusterInfo struct { SyncError error // APIResources holds list of API resources supported by the cluster APIResources []kube.APIResourceInfo + // ConnectionStatus indicates the status of the connection with the cluster. + ConnectionStatus ConnectionStatus } // OnEventHandler is a function that handles Kubernetes event @@ -132,6 +137,8 @@ type ClusterCache interface { OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe // OnEvent register event handler that is executed every time when new K8S event received OnEvent(handler OnEventHandler) Unsubscribe + // StartClusterConnectionStatusMonitoring starts a goroutine that checks the watch errors periodically and updates the cluster connection status. + StartClusterConnectionStatusMonitoring(ctx context.Context) } type WeightedSemaphore interface { @@ -140,7 +147,7 @@ type WeightedSemaphore interface { Release(n int64) } -type ListRetryFunc func(err error) bool +type RetryFunc func(err error) bool // NewClusterCache creates new instance of cluster cache func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache { @@ -162,14 +169,18 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa resyncTimeout: defaultClusterResyncTimeout, syncTime: nil, }, - watchResyncTimeout: defaultWatchResyncTimeout, - clusterSyncRetryTimeout: ClusterRetryTimeout, - resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, - eventHandlers: map[uint64]OnEventHandler{}, - log: log, - listRetryLimit: 1, - listRetryUseBackoff: false, - listRetryFunc: ListRetryFuncNever, + watchResyncTimeout: defaultWatchResyncTimeout, + clusterSyncRetryTimeout: ClusterRetryTimeout, + resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, + eventHandlers: map[uint64]OnEventHandler{}, + log: log, + listRetryLimit: 1, + listRetryUseBackoff: false, + listRetryFunc: RetryFuncNever, + connectionStatus: ConnectionStatusUnknown, + watchFails: newWatchFailures(), + clusterStatusRetryFunc: RetryFuncNever, + clusterConnectionInterval: defaultClusterConnectionInterval, } for i := range opts { opts[i](cache) @@ -177,9 +188,29 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa return cache } +// ConnectionStatus indicates the status of the connection with the cluster. +type ConnectionStatus string + +const ( + ConnectionStatusSuccessful ConnectionStatus = "Successful" + ConnectionStatusFailed ConnectionStatus = "Failed" + ConnectionStatusUnknown ConnectionStatus = "Unknown" +) + type clusterCache struct { syncStatus clusterCacheSync + // connectionStatus indicates the status of the connection with the cluster. + connectionStatus ConnectionStatus + + // clusterConnectionInterval is the interval used to monitor the cluster connection status. + clusterConnectionInterval time.Duration + + // watchFails is used to keep track of the failures while watching resources. + watchFails *watchFailures + + clusterStatusRetryFunc RetryFunc + apisMeta map[schema.GroupKind]*apiMeta serverVersion string apiResources []kube.APIResourceInfo @@ -200,7 +231,7 @@ type clusterCache struct { // retry options for list operations listRetryLimit int32 listRetryUseBackoff bool - listRetryFunc ListRetryFunc + listRetryFunc RetryFunc // lock is a rw lock which protects the fields of clusterInfo lock sync.RWMutex @@ -236,13 +267,13 @@ type clusterCacheSync struct { resyncTimeout time.Duration } -// ListRetryFuncNever never retries on errors -func ListRetryFuncNever(err error) bool { +// RetryFuncNever never retries on errors +func RetryFuncNever(err error) bool { return false } -// ListRetryFuncAlways always retries on errors -func ListRetryFuncAlways(err error) bool { +// RetryFuncAlways always retries on errors +func RetryFuncAlways(err error) bool { return true } @@ -595,6 +626,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc } func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { + watchKey := api.GroupKind.String() kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) { defer func() { if r := recover(); r != nil { @@ -615,7 +647,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo res, err := resClient.Watch(ctx, options) if errors.IsNotFound(err) { c.stopWatching(api.GroupKind, ns) + c.watchFails.remove(watchKey) + return res, err } + + if err != nil { + c.watchFails.add(watchKey) + } else { + c.watchFails.remove(watchKey) + } + return res, err }, }) @@ -810,8 +851,14 @@ func (c *clusterCache) sync() error { version, err := c.kubectl.GetServerVersion(config) if err != nil { + if c.connectionStatus != ConnectionStatusFailed { + c.log.Info("unable to access cluster", "cluster", c.config.Host, "reason", err.Error()) + c.connectionStatus = ConnectionStatusFailed + } return err } + + c.connectionStatus = ConnectionStatusSuccessful c.serverVersion = version apiResources, err := c.kubectl.GetAPIResources(config, false, NewNoopSettings()) if err != nil { @@ -1186,6 +1233,7 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { LastCacheSyncTime: c.syncStatus.syncTime, SyncError: c.syncStatus.syncError, APIResources: c.apiResources, + ConnectionStatus: c.connectionStatus, } } @@ -1194,3 +1242,111 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { func skipAppRequeuing(key kube.ResourceKey) bool { return ignoredRefreshResources[key.Group+"/"+key.Kind] } + +// StartClusterConnectionStatusMonitoring starts a goroutine that checks for watch failures. +// If there are any watch errors, it will periodically ping the remote cluster +// and update the cluster connection status. +func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) { + go c.clusterConnectionService(ctx) +} + +func (c *clusterCache) clusterConnectionService(ctx context.Context) { + if c.clusterConnectionInterval <= 0 { + return + } + + ticker := time.NewTicker(c.clusterConnectionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + watchErrors := c.watchFails.len() + // Ping the cluster for connection verification if there are watch failures or + // if the cluster has recovered back from watch failures. + watchesRecovered := false + if watchErrors == 0 { + // If there are no watch failures check if the status needs to be updated. + c.lock.RLock() + if c.connectionStatus == ConnectionStatusFailed { + watchesRecovered = true + } + c.lock.RUnlock() + } + + if watchErrors > 0 || watchesRecovered { + c.log.V(1).Info("verifying cluster connection", "server", c.config.Host) + // Retry fetching the server version to avoid invalidating the cache due to transient errors. + err := retry.OnError(retry.DefaultBackoff, c.clusterStatusRetryFunc, func() error { + _, err := c.kubectl.GetServerVersion(c.config) + if err != nil && c.clusterStatusRetryFunc(err) { + c.log.V(1).Info("Error while fetching server version", "error", err.Error()) + } + return err + }) + if err != nil { + c.updateConnectionStatus(ConnectionStatusFailed) + } else { + c.updateConnectionStatus(ConnectionStatusSuccessful) + } + } + case <-ctx.Done(): + c.log.V(1).Info("Stopping cluster connection status monitoring", "server", c.config.Host) + ticker.Stop() + return + } + } + +} + +func (c *clusterCache) updateConnectionStatus(status ConnectionStatus) { + invalidateCache := false + c.lock.Lock() + if c.connectionStatus != status { + c.connectionStatus = status + invalidateCache = true + } + c.lock.Unlock() + + if !invalidateCache { + return + } + + c.log.V(1).Info("updated cluster connection status", "server", c.config.Host, "status", status) + + c.Invalidate() + if err := c.EnsureSynced(); err != nil { + c.log.Error(err, "failed to sync cache state after updating cluster connection status", "server", c.config.Host) + } +} + +// watchFailures is used to keep track of the failures while watching resources. It is updated +// whenever an error occurs during watch or when the watch recovers back from a failure. +type watchFailures struct { + watches map[string]bool + mu sync.RWMutex +} + +func newWatchFailures() *watchFailures { + return &watchFailures{ + watches: make(map[string]bool), + } +} + +func (w *watchFailures) add(key string) { + w.mu.Lock() + defer w.mu.Unlock() + w.watches[key] = true +} + +func (w *watchFailures) remove(key string) { + w.mu.Lock() + defer w.mu.Unlock() + delete(w.watches, key) +} + +func (w *watchFailures) len() int { + w.mu.RLock() + defer w.mu.RUnlock() + return len(w.watches) +} diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 68221ab89..a0f464e7e 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -3,12 +3,13 @@ package cache import ( "context" "fmt" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" "sort" "strings" "testing" "time" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -148,6 +149,7 @@ func TestEnsureSynced(t *testing.T) { } cluster := newCluster(t, obj1, obj2) + assert.Equal(t, cluster.connectionStatus, ConnectionStatusUnknown) err := cluster.EnsureSynced() require.NoError(t, err) @@ -160,6 +162,7 @@ func TestEnsureSynced(t *testing.T) { names = append(names, k.Name) } assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names) + assert.Equal(t, cluster.connectionStatus, ConnectionStatusSuccessful) } func TestStatefulSetOwnershipInferred(t *testing.T) { @@ -492,23 +495,23 @@ metadata: func TestGetManagedLiveObjsFailedConversion(t *testing.T) { cronTabGroup := "stable.example.com" - testCases := []struct{ - name string - localConvertFails bool + testCases := []struct { + name string + localConvertFails bool expectConvertToVersionCalled bool - expectGetResourceCalled bool + expectGetResourceCalled bool }{ { - name: "local convert fails, so GetResource is called", - localConvertFails: true, + name: "local convert fails, so GetResource is called", + localConvertFails: true, expectConvertToVersionCalled: true, - expectGetResourceCalled: true, + expectGetResourceCalled: true, }, { - name: "local convert succeeds, so GetResource is not called", - localConvertFails: false, + name: "local convert succeeds, so GetResource is not called", + localConvertFails: false, expectConvertToVersionCalled: true, - expectGetResourceCalled: false, + expectGetResourceCalled: false, }, } @@ -557,7 +560,6 @@ metadata: return testCronTab(), nil }) - managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool { return true }) @@ -716,9 +718,10 @@ func TestGetClusterInfo(t *testing.T) { cluster.serverVersion = "v1.16" info := cluster.GetClusterInfo() assert.Equal(t, ClusterInfo{ - Server: cluster.config.Host, - APIResources: cluster.apiResources, - K8SVersion: cluster.serverVersion, + Server: cluster.config.Host, + APIResources: cluster.apiResources, + K8SVersion: cluster.serverVersion, + ConnectionStatus: ConnectionStatusUnknown, }, info) } @@ -816,25 +819,25 @@ func testPod() *corev1.Pod { func testCRD() *apiextensions.CustomResourceDefinition { return &apiextensions.CustomResourceDefinition{ - TypeMeta: metav1.TypeMeta{ + TypeMeta: metav1.TypeMeta{ APIVersion: "apiextensions.k8s.io/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: "crontabs.stable.example.com", }, - Spec: apiextensions.CustomResourceDefinitionSpec{ + Spec: apiextensions.CustomResourceDefinitionSpec{ Group: "stable.example.com", Versions: []apiextensions.CustomResourceDefinitionVersion{ { - Name: "v1", - Served: true, + Name: "v1", + Served: true, Storage: true, Schema: &apiextensions.CustomResourceValidation{ OpenAPIV3Schema: &apiextensions.JSONSchemaProps{ Type: "object", Properties: map[string]apiextensions.JSONSchemaProps{ "cronSpec": {Type: "string"}, - "image": {Type: "string"}, + "image": {Type: "string"}, "replicas": {Type: "integer"}, }, }, @@ -855,14 +858,14 @@ func testCRD() *apiextensions.CustomResourceDefinition { func testCronTab() *unstructured.Unstructured { return &unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": "stable.example.com/v1", - "kind": "CronTab", + "kind": "CronTab", "metadata": map[string]interface{}{ - "name": "test-crontab", + "name": "test-crontab", "namespace": "default", }, "spec": map[string]interface{}{ "cronSpec": "* * * * */5", - "image": "my-awesome-cron-image", + "image": "my-awesome-cron-image", }, }} } diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index fa00346cb..27a95df63 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -1,9 +1,12 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package mocks import ( + context "context" + cache "github.com/argoproj/gitops-engine/pkg/cache" + kube "github.com/argoproj/gitops-engine/pkg/utils/kube" managedfields "k8s.io/apimachinery/pkg/util/managedfields" @@ -26,6 +29,10 @@ type ClusterCache struct { func (_m *ClusterCache) EnsureSynced() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for EnsureSynced") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -47,6 +54,10 @@ func (_m *ClusterCache) FindResources(namespace string, predicates ...func(*cach _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for FindResources") + } + var r0 map[kube.ResourceKey]*cache.Resource if rf, ok := ret.Get(0).(func(string, ...func(*cache.Resource) bool) map[kube.ResourceKey]*cache.Resource); ok { r0 = rf(namespace, predicates...) @@ -63,6 +74,10 @@ func (_m *ClusterCache) FindResources(namespace string, predicates ...func(*cach func (_m *ClusterCache) GetAPIResources() []kube.APIResourceInfo { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetAPIResources") + } + var r0 []kube.APIResourceInfo if rf, ok := ret.Get(0).(func() []kube.APIResourceInfo); ok { r0 = rf() @@ -79,6 +94,10 @@ func (_m *ClusterCache) GetAPIResources() []kube.APIResourceInfo { func (_m *ClusterCache) GetClusterInfo() cache.ClusterInfo { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetClusterInfo") + } + var r0 cache.ClusterInfo if rf, ok := ret.Get(0).(func() cache.ClusterInfo); ok { r0 = rf() @@ -93,6 +112,10 @@ func (_m *ClusterCache) GetClusterInfo() cache.ClusterInfo { func (_m *ClusterCache) GetGVKParser() *managedfields.GvkParser { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetGVKParser") + } + var r0 *managedfields.GvkParser if rf, ok := ret.Get(0).(func() *managedfields.GvkParser); ok { r0 = rf() @@ -109,7 +132,15 @@ func (_m *ClusterCache) GetGVKParser() *managedfields.GvkParser { func (_m *ClusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(*cache.Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error) { ret := _m.Called(targetObjs, isManaged) + if len(ret) == 0 { + panic("no return value specified for GetManagedLiveObjs") + } + var r0 map[kube.ResourceKey]*unstructured.Unstructured + var r1 error + if rf, ok := ret.Get(0).(func([]*unstructured.Unstructured, func(*cache.Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error)); ok { + return rf(targetObjs, isManaged) + } if rf, ok := ret.Get(0).(func([]*unstructured.Unstructured, func(*cache.Resource) bool) map[kube.ResourceKey]*unstructured.Unstructured); ok { r0 = rf(targetObjs, isManaged) } else { @@ -118,7 +149,6 @@ func (_m *ClusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructur } } - var r1 error if rf, ok := ret.Get(1).(func([]*unstructured.Unstructured, func(*cache.Resource) bool) error); ok { r1 = rf(targetObjs, isManaged) } else { @@ -132,6 +162,10 @@ func (_m *ClusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructur func (_m *ClusterCache) GetOpenAPISchema() openapi.Resources { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetOpenAPISchema") + } + var r0 openapi.Resources if rf, ok := ret.Get(0).(func() openapi.Resources); ok { r0 = rf() @@ -148,6 +182,10 @@ func (_m *ClusterCache) GetOpenAPISchema() openapi.Resources { func (_m *ClusterCache) GetServerVersion() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetServerVersion") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -173,14 +211,21 @@ func (_m *ClusterCache) Invalidate(opts ...cache.UpdateSettingsFunc) { func (_m *ClusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { ret := _m.Called(gk) + if len(ret) == 0 { + panic("no return value specified for IsNamespaced") + } + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(schema.GroupKind) (bool, error)); ok { + return rf(gk) + } if rf, ok := ret.Get(0).(func(schema.GroupKind) bool); ok { r0 = rf(gk) } else { r0 = ret.Get(0).(bool) } - var r1 error if rf, ok := ret.Get(1).(func(schema.GroupKind) error); ok { r1 = rf(gk) } else { @@ -199,6 +244,10 @@ func (_m *ClusterCache) IterateHierarchy(key kube.ResourceKey, action func(*cach func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe { ret := _m.Called(handler) + if len(ret) == 0 { + panic("no return value specified for OnEvent") + } + var r0 cache.Unsubscribe if rf, ok := ret.Get(0).(func(cache.OnEventHandler) cache.Unsubscribe); ok { r0 = rf(handler) @@ -215,6 +264,10 @@ func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler) cache.Unsubscribe { ret := _m.Called(handler) + if len(ret) == 0 { + panic("no return value specified for OnResourceUpdated") + } + var r0 cache.Unsubscribe if rf, ok := ret.Get(0).(func(cache.OnResourceUpdatedHandler) cache.Unsubscribe); ok { r0 = rf(handler) @@ -226,3 +279,22 @@ func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler return r0 } + +// StartClusterConnectionStatusMonitoring provides a mock function with given fields: ctx +func (_m *ClusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) { + _m.Called(ctx) +} + +// NewClusterCache creates a new instance of ClusterCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewClusterCache(t interface { + mock.TestingT + Cleanup(func()) +}) *ClusterCache { + mock := &ClusterCache{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index a7194d0ca..7ff0b9c2b 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -147,7 +147,7 @@ func SetTracer(tracer tracing.Tracer) UpdateSettingsFunc { } // SetRetryOptions sets cluster list retry options -func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc) UpdateSettingsFunc { +func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc RetryFunc) UpdateSettingsFunc { return func(cache *clusterCache) { // Max retries must be at least one if maxRetries < 1 { @@ -170,3 +170,17 @@ func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc { } } } + +// SetClusterConnectionInterval sets the interval for monitoring the cluster connection status. +func SetClusterConnectionInterval(interval time.Duration) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.clusterConnectionInterval = interval + } +} + +// SetClusterStatusRetryFunc sets the retry function for monitoring the cluster connection status. +func SetClusterStatusRetryFunc(retryFunc RetryFunc) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.clusterStatusRetryFunc = retryFunc + } +}