Skip to content

Commit a9c3cf3

Browse files
committed
clustercache: add optional uncached client support for workload clusters
Signed-off-by: Bharath Nallapeta <[email protected]>
1 parent cd8a2c8 commit a9c3cf3

File tree

5 files changed

+79
-17
lines changed

5 files changed

+79
-17
lines changed

controllers/clustercache/cluster_accessor.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ type clusterAccessorLockedConnectionState struct {
191191
// all typed objects except the ones for which caching has been disabled via DisableFor.
192192
cachedClient client.Client
193193

194+
// uncachedClient to communicate with the workload cluster.
195+
// It performs live GET/LIST calls directly against the API server with no caching.
196+
uncachedClient client.Client
197+
194198
// cache is the cache used by the client.
195199
// It manages informers that have been created e.g. by adding indexes to the cache,
196200
// Get & List calls from the client or via the Watch method of the clusterAccessor.
@@ -297,11 +301,12 @@ func (ca *clusterAccessor) Connect(ctx context.Context) (retErr error) {
297301
consecutiveFailures: 0,
298302
}
299303
ca.lockedState.connection = &clusterAccessorLockedConnectionState{
300-
restConfig: connection.RESTConfig,
301-
restClient: connection.RESTClient,
302-
cachedClient: connection.CachedClient,
303-
cache: connection.Cache,
304-
watches: sets.Set[string]{},
304+
restConfig: connection.RESTConfig,
305+
restClient: connection.RESTClient,
306+
cachedClient: connection.CachedClient,
307+
uncachedClient: connection.UncachedClient,
308+
cache: connection.Cache,
309+
watches: sets.Set[string]{},
305310
}
306311

307312
return nil
@@ -407,6 +412,18 @@ func (ca *clusterAccessor) GetReader(ctx context.Context) (client.Reader, error)
407412
return ca.lockedState.connection.cachedClient, nil
408413
}
409414

415+
// GetUncachedClient returns a live (uncached) client for the given cluster.
416+
func (ca *clusterAccessor) GetUncachedClient(ctx context.Context) (client.Client, error) {
417+
ca.rLock(ctx)
418+
defer ca.rUnlock(ctx)
419+
420+
if ca.lockedState.connection == nil {
421+
return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client")
422+
}
423+
424+
return ca.lockedState.connection.uncachedClient, nil
425+
}
426+
410427
func (ca *clusterAccessor) GetRESTConfig(ctx context.Context) (*rest.Config, error) {
411428
ca.rLock(ctx)
412429
defer ca.rUnlock(ctx)

controllers/clustercache/cluster_accessor_client.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ import (
4242
)
4343

4444
type createConnectionResult struct {
45-
RESTConfig *rest.Config
46-
RESTClient *rest.RESTClient
47-
CachedClient client.Client
48-
Cache *stoppableCache
45+
RESTConfig *rest.Config
46+
RESTClient *rest.RESTClient
47+
CachedClient client.Client
48+
UncachedClient client.Client
49+
Cache *stoppableCache
4950
}
5051

5152
func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnectionResult, error) {
@@ -97,6 +98,12 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
9798
if err != nil {
9899
return nil, errors.Wrapf(err, "error creating HTTP client and mapper (using in-cluster config)")
99100
}
101+
102+
log.V(6).Info(fmt.Sprintf("Creating uncached client with updated REST config with host %q", restConfig.Host))
103+
uncachedClient, err = createUncachedClient(ca.config.Scheme, restConfig, httpClient, mapper)
104+
if err != nil {
105+
return nil, errors.Wrapf(err, "error creating uncached client (using in-cluster config)")
106+
}
100107
}
101108

102109
log.V(6).Info("Creating cached client and cache")
@@ -106,10 +113,11 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
106113
}
107114

108115
return &createConnectionResult{
109-
RESTConfig: restConfig,
110-
RESTClient: restClient,
111-
CachedClient: cachedClient,
112-
Cache: cache,
116+
RESTConfig: restConfig,
117+
RESTClient: restClient,
118+
CachedClient: cachedClient,
119+
UncachedClient: uncachedClient,
120+
Cache: cache,
113121
}, nil
114122
}
115123

@@ -208,7 +216,7 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien
208216
return nil, errors.Wrapf(err, "error creating uncached client")
209217
}
210218

211-
return uncachedClient, nil
219+
return newClientWithTimeout(uncachedClient, config.Timeout), nil
212220
}
213221

214222
// createCachedClient creates a cached client for the given cluster, based on the rest.Config.

controllers/clustercache/cluster_accessor_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,13 @@ func TestConnect(t *testing.T) {
7676
}, nil)
7777
accessor := newClusterAccessor(context.Background(), clusterKey, config)
7878

79+
// Before connect, getting the uncached client should fail with ErrClusterNotConnected
80+
_, err := accessor.GetUncachedClient(ctx)
81+
g.Expect(err).To(HaveOccurred())
82+
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())
83+
7984
// Connect when kubeconfig Secret doesn't exist (should fail)
80-
err := accessor.Connect(ctx)
85+
err = accessor.Connect(ctx)
8186
g.Expect(err).To(HaveOccurred())
8287
g.Expect(err.Error()).To(Equal("error creating REST config: error getting kubeconfig secret: Secret \"test-cluster-kubeconfig\" not found"))
8388
g.Expect(accessor.Connected(ctx)).To(BeFalse())
@@ -136,6 +141,16 @@ func TestConnect(t *testing.T) {
136141
g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTime.IsZero()).To(BeFalse())
137142
g.Expect(accessor.lockedState.healthChecking.consecutiveFailures).To(Equal(0))
138143

144+
// After connect, getting the uncached client should succeed
145+
r, err := accessor.GetUncachedClient(ctx)
146+
g.Expect(err).ToNot(HaveOccurred())
147+
g.Expect(r).ToNot(BeNil())
148+
149+
// List Nodes via the uncached client
150+
nodeListUncached := &corev1.NodeList{}
151+
g.Expect(r.List(ctx, nodeListUncached)).To(Succeed())
152+
g.Expect(nodeListUncached.Items).To(BeEmpty())
153+
139154
// Get client and test Get & List
140155
c, err := accessor.GetClient(ctx)
141156
g.Expect(err).ToNot(HaveOccurred())
@@ -150,6 +165,11 @@ func TestConnect(t *testing.T) {
150165
// Disconnect
151166
accessor.Disconnect(ctx)
152167
g.Expect(accessor.Connected(ctx)).To(BeFalse())
168+
169+
// After disconnect, getting the uncached client should fail with ErrClusterNotConnected
170+
_, err = accessor.GetUncachedClient(ctx)
171+
g.Expect(err).To(HaveOccurred())
172+
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())
153173
}
154174

155175
func TestDisconnect(t *testing.T) {

controllers/clustercache/cluster_cache.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ type ClusterCache interface {
166166
GetClusterSource(controllerName string, mapFunc func(ctx context.Context, cluster client.Object) []ctrl.Request, opts ...GetClusterSourceOption) source.Source
167167
}
168168

169+
// UncachedClientProvider provides live (uncached) access to the workload cluster API server.
170+
// This is an optional extension interface; callers can type assert a ClusterCache to this interface to retrieve an uncached client for a given workload cluster.
171+
type UncachedClientProvider interface {
172+
GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error)
173+
}
174+
169175
// HealthCheckingState holds the health checking state for a Cluster.
170176
type HealthCheckingState struct {
171177
// LastProbeTime is the time when a health probe was executed last.
@@ -392,6 +398,16 @@ func (cc *clusterCache) GetReader(ctx context.Context, cluster client.ObjectKey)
392398
return accessor.GetReader(ctx)
393399
}
394400

401+
// GetUncachedClient returns a live (uncached) client for the given cluster.
402+
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
403+
func (cc *clusterCache) GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
404+
accessor := cc.getClusterAccessor(cluster)
405+
if accessor == nil {
406+
return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client")
407+
}
408+
return accessor.GetUncachedClient(ctx)
409+
}
410+
395411
func (cc *clusterCache) GetRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) {
396412
accessor := cc.getClusterAccessor(cluster)
397413
if accessor == nil {

controllers/clustercache/cluster_cache_fake.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ func NewFakeClusterCache(workloadClient client.Client, clusterKey client.ObjectK
3232
testCacheTracker.clusterAccessors[clusterKey] = &clusterAccessor{
3333
lockedState: clusterAccessorLockedState{
3434
connection: &clusterAccessorLockedConnectionState{
35-
cachedClient: workloadClient,
36-
watches: sets.Set[string]{}.Insert(watchObjects...),
35+
cachedClient: workloadClient,
36+
uncachedClient: workloadClient,
37+
watches: sets.Set[string]{}.Insert(watchObjects...),
3738
},
3839
healthChecking: clusterAccessorLockedHealthCheckingState{
3940
lastProbeTime: time.Now(),

0 commit comments

Comments
 (0)