Skip to content

Commit 6f2d5be

Browse files
committed
clustercache: add optional uncached client support for workload clusters
Signed-off-by: Bharath Nallapeta <[email protected]>
1 parent cd8a2c8 commit 6f2d5be

File tree

5 files changed

+77
-17
lines changed

5 files changed

+77
-17
lines changed

controllers/clustercache/cluster_accessor.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ type clusterAccessorLockedConnectionState struct {
191191
// all typed objects except the ones for which caching has been disabled via DisableFor.
192192
cachedClient client.Client
193193

194+
// uncachedClient to communicate with the workload cluster.
195+
// It performs live GET/LIST calls directly against the API server with no caching.
196+
uncachedClient client.Client
197+
194198
// cache is the cache used by the client.
195199
// It manages informers that have been created e.g. by adding indexes to the cache,
196200
// Get & List calls from the client or via the Watch method of the clusterAccessor.
@@ -297,11 +301,12 @@ func (ca *clusterAccessor) Connect(ctx context.Context) (retErr error) {
297301
consecutiveFailures: 0,
298302
}
299303
ca.lockedState.connection = &clusterAccessorLockedConnectionState{
300-
restConfig: connection.RESTConfig,
301-
restClient: connection.RESTClient,
302-
cachedClient: connection.CachedClient,
303-
cache: connection.Cache,
304-
watches: sets.Set[string]{},
304+
restConfig: connection.RESTConfig,
305+
restClient: connection.RESTClient,
306+
cachedClient: connection.CachedClient,
307+
uncachedClient: connection.UncachedClient,
308+
cache: connection.Cache,
309+
watches: sets.Set[string]{},
305310
}
306311

307312
return nil
@@ -407,6 +412,18 @@ func (ca *clusterAccessor) GetReader(ctx context.Context) (client.Reader, error)
407412
return ca.lockedState.connection.cachedClient, nil
408413
}
409414

415+
// GetUncachedClient returns a live (uncached) client for the given cluster.
416+
func (ca *clusterAccessor) GetUncachedClient(ctx context.Context) (client.Client, error) {
417+
ca.rLock(ctx)
418+
defer ca.rUnlock(ctx)
419+
420+
if ca.lockedState.connection == nil {
421+
return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client")
422+
}
423+
424+
return ca.lockedState.connection.uncachedClient, nil
425+
}
426+
410427
func (ca *clusterAccessor) GetRESTConfig(ctx context.Context) (*rest.Config, error) {
411428
ca.rLock(ctx)
412429
defer ca.rUnlock(ctx)

controllers/clustercache/cluster_accessor_client.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ import (
4242
)
4343

4444
type createConnectionResult struct {
45-
RESTConfig *rest.Config
46-
RESTClient *rest.RESTClient
47-
CachedClient client.Client
48-
Cache *stoppableCache
45+
RESTConfig *rest.Config
46+
RESTClient *rest.RESTClient
47+
CachedClient client.Client
48+
UncachedClient client.Client
49+
Cache *stoppableCache
4950
}
5051

5152
func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnectionResult, error) {
@@ -97,6 +98,12 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
9798
if err != nil {
9899
return nil, errors.Wrapf(err, "error creating HTTP client and mapper (using in-cluster config)")
99100
}
101+
102+
log.V(6).Info(fmt.Sprintf("Creating uncached client with updated REST config with host %q", restConfig.Host))
103+
uncachedClient, err = createUncachedClient(ca.config.Scheme, restConfig, httpClient, mapper)
104+
if err != nil {
105+
return nil, errors.Wrapf(err, "error creating uncached client (using in-cluster config)")
106+
}
100107
}
101108

102109
log.V(6).Info("Creating cached client and cache")
@@ -106,10 +113,11 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
106113
}
107114

108115
return &createConnectionResult{
109-
RESTConfig: restConfig,
110-
RESTClient: restClient,
111-
CachedClient: cachedClient,
112-
Cache: cache,
116+
RESTConfig: restConfig,
117+
RESTClient: restClient,
118+
CachedClient: cachedClient,
119+
UncachedClient: uncachedClient,
120+
Cache: cache,
113121
}, nil
114122
}
115123

@@ -208,7 +216,7 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien
208216
return nil, errors.Wrapf(err, "error creating uncached client")
209217
}
210218

211-
return uncachedClient, nil
219+
return newClientWithTimeout(uncachedClient, config.Timeout), nil
212220
}
213221

214222
// createCachedClient creates a cached client for the given cluster, based on the rest.Config.

controllers/clustercache/cluster_accessor_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,13 @@ func TestConnect(t *testing.T) {
7676
}, nil)
7777
accessor := newClusterAccessor(context.Background(), clusterKey, config)
7878

79+
// Before connect, getting the uncached client should fail with ErrClusterNotConnected
80+
_, err := accessor.GetUncachedClient(ctx)
81+
g.Expect(err).To(HaveOccurred())
82+
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())
83+
7984
// Connect when kubeconfig Secret doesn't exist (should fail)
80-
err := accessor.Connect(ctx)
85+
err = accessor.Connect(ctx)
8186
g.Expect(err).To(HaveOccurred())
8287
g.Expect(err.Error()).To(Equal("error creating REST config: error getting kubeconfig secret: Secret \"test-cluster-kubeconfig\" not found"))
8388
g.Expect(accessor.Connected(ctx)).To(BeFalse())
@@ -136,6 +141,16 @@ func TestConnect(t *testing.T) {
136141
g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTime.IsZero()).To(BeFalse())
137142
g.Expect(accessor.lockedState.healthChecking.consecutiveFailures).To(Equal(0))
138143

144+
// After connect, getting the uncached client should succeed
145+
r, err := accessor.GetUncachedClient(ctx)
146+
g.Expect(err).ToNot(HaveOccurred())
147+
g.Expect(r).ToNot(BeNil())
148+
149+
// List Nodes via the uncached client
150+
nodeListUncached := &corev1.NodeList{}
151+
g.Expect(r.List(ctx, nodeListUncached)).To(Succeed())
152+
g.Expect(nodeListUncached.Items).To(BeEmpty())
153+
139154
// Get client and test Get & List
140155
c, err := accessor.GetClient(ctx)
141156
g.Expect(err).ToNot(HaveOccurred())
@@ -150,6 +165,11 @@ func TestConnect(t *testing.T) {
150165
// Disconnect
151166
accessor.Disconnect(ctx)
152167
g.Expect(accessor.Connected(ctx)).To(BeFalse())
168+
169+
// After disconnect, getting the uncached client should fail with ErrClusterNotConnected
170+
_, err = accessor.GetUncachedClient(ctx)
171+
g.Expect(err).To(HaveOccurred())
172+
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())
153173
}
154174

155175
func TestDisconnect(t *testing.T) {

controllers/clustercache/cluster_cache.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ type ClusterCache interface {
134134
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
135135
GetReader(ctx context.Context, cluster client.ObjectKey) (client.Reader, error)
136136

137+
// GetUncachedClient returns a live (uncached) client for the given cluster.
138+
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
139+
GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error)
140+
137141
// GetRESTConfig returns a REST config for the given cluster.
138142
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
139143
GetRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error)
@@ -392,6 +396,16 @@ func (cc *clusterCache) GetReader(ctx context.Context, cluster client.ObjectKey)
392396
return accessor.GetReader(ctx)
393397
}
394398

399+
// GetUncachedClient returns a live (uncached) client for the given cluster.
400+
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
401+
func (cc *clusterCache) GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
402+
accessor := cc.getClusterAccessor(cluster)
403+
if accessor == nil {
404+
return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client")
405+
}
406+
return accessor.GetUncachedClient(ctx)
407+
}
408+
395409
func (cc *clusterCache) GetRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) {
396410
accessor := cc.getClusterAccessor(cluster)
397411
if accessor == nil {

controllers/clustercache/cluster_cache_fake.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ func NewFakeClusterCache(workloadClient client.Client, clusterKey client.ObjectK
3232
testCacheTracker.clusterAccessors[clusterKey] = &clusterAccessor{
3333
lockedState: clusterAccessorLockedState{
3434
connection: &clusterAccessorLockedConnectionState{
35-
cachedClient: workloadClient,
36-
watches: sets.Set[string]{}.Insert(watchObjects...),
35+
cachedClient: workloadClient,
36+
uncachedClient: workloadClient,
37+
watches: sets.Set[string]{}.Insert(watchObjects...),
3738
},
3839
healthChecking: clusterAccessorLockedHealthCheckingState{
3940
lastProbeTime: time.Now(),

0 commit comments

Comments
 (0)