Skip to content

Commit

Permalink
add fetching into discovery client for OpenAPI v3
Browse files Browse the repository at this point in the history
reflect latest struct changes

use correct discovery openapi test data layout

make the OpenAPIv3 interface less blue

field grouping

add copyrights

implement cached discovery client

add cached discovery tests

address review feedback

Kubernetes-commit: 075866b3e3ea029c243d82d8d6eb99e96d9c49d3
  • Loading branch information
Alexander Zielenski authored and k8s-publishing-bot committed Mar 22, 2022
1 parent 92adc4d commit 018cf8a
Show file tree
Hide file tree
Showing 18 changed files with 680 additions and 11 deletions.
21 changes: 21 additions & 0 deletions discovery/cached/disk/cached_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/openapi"
cachedopenapi "k8s.io/client-go/openapi/cached"
restclient "k8s.io/client-go/rest"
)

Expand All @@ -56,6 +58,9 @@ type CachedDiscoveryClient struct {
invalidated bool
// fresh is true if all used cache files were ours
fresh bool

///
openapiClient openapi.Client
}

var _ discovery.CachedDiscoveryInterface = &CachedDiscoveryClient{}
Expand Down Expand Up @@ -233,6 +238,21 @@ func (d *CachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
return d.delegate.OpenAPISchema()
}

// OpenAPIV3 retrieves and parses the OpenAPIV3 specs exposed by the server
func (d *CachedDiscoveryClient) OpenAPIV3() openapi.Client {
// Must take lock since Invalidate call may modify openapiClient
d.mutex.Lock()
defer d.mutex.Unlock()

if d.openapiClient == nil {
// Delegate is discovery client created with special HTTP client which
// respects E-Tag cache responses to serve cache from disk.
d.openapiClient = cachedopenapi.NewClient(d.delegate.OpenAPIV3())
}

return d.openapiClient
}

// Fresh is supposed to tell the caller whether or not to retry if the cache
// fails to find something (false = retry, true = no need to retry).
func (d *CachedDiscoveryClient) Fresh() bool {
Expand All @@ -250,6 +270,7 @@ func (d *CachedDiscoveryClient) Invalidate() {
d.ourFiles = map[string]struct{}{}
d.fresh = true
d.invalidated = true
d.openapiClient = nil
}

// NewCachedDiscoveryClientForConfig creates a new DiscoveryClient for the given config, and wraps
Expand Down
86 changes: 86 additions & 0 deletions discovery/cached/disk/cached_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,24 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

openapi_v2 "github.com/google/gnostic/openapiv2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/openapi"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
testutil "k8s.io/client-go/util/testing"
)

func TestCachedDiscoveryClient_Fresh(t *testing.T) {
Expand Down Expand Up @@ -123,6 +128,83 @@ func TestNewCachedDiscoveryClient_PathPerm(t *testing.T) {
assert.NoError(err)
}

// Tests that schema instances returned by openapi cached and returned after
// successive calls
func TestOpenAPIDiskCache(t *testing.T) {
// Create discovery cache dir (unused)
discoCache, err := ioutil.TempDir("", "")
require.NoError(t, err)
os.RemoveAll(discoCache)
defer os.RemoveAll(discoCache)

// Create http cache dir
httpCache, err := ioutil.TempDir("", "")
require.NoError(t, err)
os.RemoveAll(httpCache)
defer os.RemoveAll(httpCache)

// Start test OpenAPI server
fakeServer, err := testutil.NewFakeOpenAPIV3Server("../../testdata")
require.NoError(t, err)
defer fakeServer.HttpServer.Close()

require.Greater(t, len(fakeServer.ServedDocuments), 0)

client, err := NewCachedDiscoveryClientForConfig(
&rest.Config{Host: fakeServer.HttpServer.URL},
discoCache,
httpCache,
1*time.Nanosecond,
)
require.NoError(t, err)

openapiClient := client.OpenAPIV3()

// Ensure initial Paths call hits server
_, err = openapiClient.Paths()
require.NoError(t, err)
assert.Equal(t, 1, fakeServer.RequestCounters["/openapi/v3"])

// Ensure Paths call does hits server again
// This is expected since openapiClient is the same instance, so Paths()
// should be cached in memory.
paths, err := openapiClient.Paths()
require.NoError(t, err)
assert.Equal(t, 1, fakeServer.RequestCounters["/openapi/v3"])

require.Greater(t, len(paths), 0)
i := 0
for k, v := range paths {
i++

_, err = v.Schema()
assert.NoError(t, err)

path := "/openapi/v3/" + strings.TrimPrefix(k, "/")
assert.Equal(t, 1, fakeServer.RequestCounters[path])

// Ensure schema call is served from memory
_, err = v.Schema()
assert.NoError(t, err)
assert.Equal(t, 1, fakeServer.RequestCounters[path])

client.Invalidate()

// Refetch the schema from a new openapi client to try to force a new
// http request
newPaths, err := client.OpenAPIV3().Paths()
if !assert.NoError(t, err) {
continue
}

// Ensure schema call is still served from disk
_, err = newPaths[k].Schema()
assert.NoError(t, err)
assert.Equal(t, 1+i, fakeServer.RequestCounters["/openapi/v3"])
assert.Equal(t, 1, fakeServer.RequestCounters[path])
}
}

type fakeDiscoveryClient struct {
groupCalls int
resourceCalls int
Expand Down Expand Up @@ -207,3 +289,7 @@ func (c *fakeDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
c.openAPICalls = c.openAPICalls + 1
return &openapi_v2.Document{}, nil
}

func (d *fakeDiscoveryClient) OpenAPIV3() openapi.Client {
panic("unimplemented")
}
16 changes: 16 additions & 0 deletions discovery/cached/memory/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/openapi"
cachedopenapi "k8s.io/client-go/openapi/cached"
restclient "k8s.io/client-go/rest"
)

Expand All @@ -49,6 +51,7 @@ type memCacheClient struct {
groupToServerResources map[string]*cacheEntry
groupList *metav1.APIGroupList
cacheValid bool
openapiClient openapi.Client
}

// Error Constants
Expand Down Expand Up @@ -143,6 +146,18 @@ func (d *memCacheClient) OpenAPISchema() (*openapi_v2.Document, error) {
return d.delegate.OpenAPISchema()
}

func (d *memCacheClient) OpenAPIV3() openapi.Client {
// Must take lock since Invalidate call may modify openapiClient
d.lock.Lock()
defer d.lock.Unlock()

if d.openapiClient == nil {
d.openapiClient = cachedopenapi.NewClient(d.delegate.OpenAPIV3())
}

return d.openapiClient
}

func (d *memCacheClient) Fresh() bool {
d.lock.RLock()
defer d.lock.RUnlock()
Expand All @@ -160,6 +175,7 @@ func (d *memCacheClient) Invalidate() {
d.cacheValid = false
d.groupToServerResources = nil
d.groupList = nil
d.openapiClient = nil
}

// refreshLocked refreshes the state of cache. The caller must hold d.lock for
Expand Down
62 changes: 62 additions & 0 deletions discovery/cached/memory/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
errorsutil "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/fake"
"k8s.io/client-go/rest"
testutil "k8s.io/client-go/util/testing"
)

type resourceMapEntry struct {
Expand Down Expand Up @@ -390,3 +395,60 @@ func TestPartialRetryableFailure(t *testing.T) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}

// Tests that schema instances returned by openapi cached and returned after
// successive calls
func TestOpenAPIMemCache(t *testing.T) {
fakeServer, err := testutil.NewFakeOpenAPIV3Server("../../testdata")
require.NoError(t, err)
defer fakeServer.HttpServer.Close()

require.Greater(t, len(fakeServer.ServedDocuments), 0)

client := NewMemCacheClient(
discovery.NewDiscoveryClientForConfigOrDie(
&rest.Config{Host: fakeServer.HttpServer.URL},
),
)
openapiClient := client.OpenAPIV3()

paths, err := openapiClient.Paths()
require.NoError(t, err)

for k, v := range paths {
original, err := v.Schema()
if !assert.NoError(t, err) {
continue
}

pathsAgain, err := openapiClient.Paths()
if !assert.NoError(t, err) {
continue
}

schemaAgain, err := pathsAgain[k].Schema()
if !assert.NoError(t, err) {
continue
}

assert.True(t, reflect.ValueOf(paths).Pointer() == reflect.ValueOf(pathsAgain).Pointer())
assert.True(t, reflect.ValueOf(original).Pointer() == reflect.ValueOf(schemaAgain).Pointer())

// Invalidate and try again. This time pointers should not be equal
client.Invalidate()

pathsAgain, err = client.OpenAPIV3().Paths()
if !assert.NoError(t, err) {
continue
}

schemaAgain, err = pathsAgain[k].Schema()
if !assert.NoError(t, err) {
continue
}

assert.True(t, reflect.ValueOf(paths).Pointer() != reflect.ValueOf(pathsAgain).Pointer())
assert.True(t, reflect.ValueOf(original).Pointer() != reflect.ValueOf(schemaAgain).Pointer())
assert.Equal(t, original, schemaAgain)
}
}
17 changes: 14 additions & 3 deletions discovery/discovery_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/openapi"
restclient "k8s.io/client-go/rest"
)

const (
// defaultRetries is the number of times a resource discovery is repeated if an api group disappears on the fly (e.g. CustomResourceDefinitions).
defaultRetries = 2
// protobuf mime type
mimePb = "application/[email protected]+protobuf"
openAPIV2mimePb = "application/[email protected]+protobuf"

// defaultTimeout is the maximum amount of time per request when no timeout has been set on a RESTClient.
// Defaults to 32s in order to have a distinguishable length of time, relative to other timeouts that exist.
defaultTimeout = 32 * time.Second
Expand All @@ -60,6 +62,7 @@ type DiscoveryInterface interface {
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
OpenAPIV3SchemaInterface
}

// CachedDiscoveryInterface is a DiscoveryInterface with cache invalidation and freshness.
Expand Down Expand Up @@ -121,6 +124,10 @@ type OpenAPISchemaInterface interface {
OpenAPISchema() (*openapi_v2.Document, error)
}

type OpenAPIV3SchemaInterface interface {
OpenAPIV3() openapi.Client
}

// DiscoveryClient implements the functions that discover server-supported API groups,
// versions and resources.
type DiscoveryClient struct {
Expand Down Expand Up @@ -399,9 +406,9 @@ func (d *DiscoveryClient) ServerVersion() (*version.Info, error) {
return &info, nil
}

// OpenAPISchema fetches the open api schema using a rest client and parses the proto.
// OpenAPISchema fetches the open api v2 schema using a rest client and parses the proto.
func (d *DiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
data, err := d.restClient.Get().AbsPath("/openapi/v2").SetHeader("Accept", mimePb).Do(context.TODO()).Raw()
data, err := d.restClient.Get().AbsPath("/openapi/v2").SetHeader("Accept", openAPIV2mimePb).Do(context.TODO()).Raw()
if err != nil {
if errors.IsForbidden(err) || errors.IsNotFound(err) || errors.IsNotAcceptable(err) {
// single endpoint not found/registered in old server, try to fetch old endpoint
Expand All @@ -422,6 +429,10 @@ func (d *DiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
return document, nil
}

func (d *DiscoveryClient) OpenAPIV3() openapi.Client {
return openapi.NewClient(d.restClient)
}

// withRetries retries the given recovery function in case the groups supported by the server change after ServerGroup() returns.
func withRetries(maxRetries int, f func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var result []*metav1.APIResourceList
Expand Down
Loading

0 comments on commit 018cf8a

Please sign in to comment.