Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions controllers/clustercache/cluster_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ type clusterAccessorLockedConnectionState struct {
// all typed objects except the ones for which caching has been disabled via DisableFor.
cachedClient client.Client

// uncachedClient to communicate with the workload cluster.
// It performs live GET/LIST calls directly against the API server with no caching.
uncachedClient client.Client

// cache is the cache used by the client.
// It manages informers that have been created e.g. by adding indexes to the cache,
// Get & List calls from the client or via the Watch method of the clusterAccessor.
Expand Down Expand Up @@ -297,11 +301,12 @@ func (ca *clusterAccessor) Connect(ctx context.Context) (retErr error) {
consecutiveFailures: 0,
}
ca.lockedState.connection = &clusterAccessorLockedConnectionState{
restConfig: connection.RESTConfig,
restClient: connection.RESTClient,
cachedClient: connection.CachedClient,
cache: connection.Cache,
watches: sets.Set[string]{},
restConfig: connection.RESTConfig,
restClient: connection.RESTClient,
cachedClient: connection.CachedClient,
uncachedClient: connection.UncachedClient,
cache: connection.Cache,
watches: sets.Set[string]{},
}

return nil
Expand Down Expand Up @@ -407,6 +412,18 @@ func (ca *clusterAccessor) GetReader(ctx context.Context) (client.Reader, error)
return ca.lockedState.connection.cachedClient, nil
}

// GetUncachedClient returns a live (uncached) client for the given cluster.
func (ca *clusterAccessor) GetUncachedClient(ctx context.Context) (client.Client, error) {
ca.rLock(ctx)
defer ca.rUnlock(ctx)

if ca.lockedState.connection == nil {
return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client")
}

return ca.lockedState.connection.uncachedClient, nil
}

func (ca *clusterAccessor) GetRESTConfig(ctx context.Context) (*rest.Config, error) {
ca.rLock(ctx)
defer ca.rUnlock(ctx)
Expand Down
26 changes: 17 additions & 9 deletions controllers/clustercache/cluster_accessor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ import (
)

type createConnectionResult struct {
RESTConfig *rest.Config
RESTClient *rest.RESTClient
CachedClient client.Client
Cache *stoppableCache
RESTConfig *rest.Config
RESTClient *rest.RESTClient
CachedClient client.Client
UncachedClient client.Client
Cache *stoppableCache
}

func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnectionResult, error) {
Expand Down Expand Up @@ -97,6 +98,12 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
if err != nil {
return nil, errors.Wrapf(err, "error creating HTTP client and mapper (using in-cluster config)")
}

log.V(6).Info(fmt.Sprintf("Creating uncached client with updated REST config with host %q", restConfig.Host))
uncachedClient, err = createUncachedClient(ca.config.Scheme, restConfig, httpClient, mapper)
if err != nil {
return nil, errors.Wrapf(err, "error creating uncached client (using in-cluster config)")
}
}

log.V(6).Info("Creating cached client and cache")
Expand All @@ -106,10 +113,11 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
}

return &createConnectionResult{
RESTConfig: restConfig,
RESTClient: restClient,
CachedClient: cachedClient,
Cache: cache,
RESTConfig: restConfig,
RESTClient: restClient,
CachedClient: cachedClient,
UncachedClient: uncachedClient,
Cache: cache,
}, nil
}

Expand Down Expand Up @@ -208,7 +216,7 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien
return nil, errors.Wrapf(err, "error creating uncached client")
}

return uncachedClient, nil
return newClientWithTimeout(uncachedClient, config.Timeout), nil
}

// createCachedClient creates a cached client for the given cluster, based on the rest.Config.
Expand Down
22 changes: 21 additions & 1 deletion controllers/clustercache/cluster_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,13 @@ func TestConnect(t *testing.T) {
}, nil)
accessor := newClusterAccessor(context.Background(), clusterKey, config)

// Before connect, getting the uncached client should fail with ErrClusterNotConnected
_, err := accessor.GetUncachedClient(ctx)
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())

// Connect when kubeconfig Secret doesn't exist (should fail)
err := accessor.Connect(ctx)
err = accessor.Connect(ctx)
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(Equal("error creating REST config: error getting kubeconfig secret: Secret \"test-cluster-kubeconfig\" not found"))
g.Expect(accessor.Connected(ctx)).To(BeFalse())
Expand Down Expand Up @@ -136,6 +141,16 @@ func TestConnect(t *testing.T) {
g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTime.IsZero()).To(BeFalse())
g.Expect(accessor.lockedState.healthChecking.consecutiveFailures).To(Equal(0))

// After connect, getting the uncached client should succeed
r, err := accessor.GetUncachedClient(ctx)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r).ToNot(BeNil())

// List Nodes via the uncached client
nodeListUncached := &corev1.NodeList{}
g.Expect(r.List(ctx, nodeListUncached)).To(Succeed())
g.Expect(nodeListUncached.Items).To(BeEmpty())

// Get client and test Get & List
c, err := accessor.GetClient(ctx)
g.Expect(err).ToNot(HaveOccurred())
Expand All @@ -150,6 +165,11 @@ func TestConnect(t *testing.T) {
// Disconnect
accessor.Disconnect(ctx)
g.Expect(accessor.Connected(ctx)).To(BeFalse())

// After disconnect, getting the uncached client should fail with ErrClusterNotConnected
_, err = accessor.GetUncachedClient(ctx)
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())
}

func TestDisconnect(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions controllers/clustercache/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type ClusterCache interface {
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
GetReader(ctx context.Context, cluster client.ObjectKey) (client.Reader, error)

// GetUncachedClient returns a live (uncached) client for the given cluster.
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error)

// GetRESTConfig returns a REST config for the given cluster.
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
GetRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error)
Expand Down Expand Up @@ -392,6 +396,16 @@ func (cc *clusterCache) GetReader(ctx context.Context, cluster client.ObjectKey)
return accessor.GetReader(ctx)
}

// GetUncachedClient returns a live (uncached) client for the given cluster.
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
func (cc *clusterCache) GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
accessor := cc.getClusterAccessor(cluster)
if accessor == nil {
return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client")
}
return accessor.GetUncachedClient(ctx)
}

func (cc *clusterCache) GetRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) {
accessor := cc.getClusterAccessor(cluster)
if accessor == nil {
Expand Down
5 changes: 3 additions & 2 deletions controllers/clustercache/cluster_cache_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func NewFakeClusterCache(workloadClient client.Client, clusterKey client.ObjectK
testCacheTracker.clusterAccessors[clusterKey] = &clusterAccessor{
lockedState: clusterAccessorLockedState{
connection: &clusterAccessorLockedConnectionState{
cachedClient: workloadClient,
watches: sets.Set[string]{}.Insert(watchObjects...),
cachedClient: workloadClient,
uncachedClient: workloadClient,
watches: sets.Set[string]{}.Insert(watchObjects...),
},
healthChecking: clusterAccessorLockedHealthCheckingState{
lastProbeTime: time.Now(),
Expand Down