Skip to content

Commit 8f8fe3d

Browse files
authored
Merge pull request #3118 from sttts/sttts-openapi-v3
✨ Implement cluster-aware OpenAPI v3
2 parents dcd4a44 + e1e70a3 commit 8f8fe3d

File tree

7 files changed

+671
-3
lines changed

7 files changed

+671
-3
lines changed

Diff for: pkg/server/config.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import (
6363
"github.com/kcp-dev/kcp/pkg/informer"
6464
"github.com/kcp-dev/kcp/pkg/server/bootstrap"
6565
kcpfilters "github.com/kcp-dev/kcp/pkg/server/filters"
66+
"github.com/kcp-dev/kcp/pkg/server/openapiv3"
6667
kcpserveroptions "github.com/kcp-dev/kcp/pkg/server/options"
6768
"github.com/kcp-dev/kcp/pkg/server/options/batteries"
6869
"github.com/kcp-dev/kcp/pkg/server/requestinfo"
@@ -112,8 +113,10 @@ type ExtraConfig struct {
112113
ExternalLogicalClusterAdminConfig *rest.Config // client config connecting to the front proxy
113114

114115
// misc
115-
preHandlerChainMux *handlerChainMuxes
116-
quotaAdmissionStopCh chan struct{}
116+
preHandlerChainMux *handlerChainMuxes
117+
quotaAdmissionStopCh chan struct{}
118+
openAPIv3Controller *openapiv3.Controller
119+
openAPIv3ServiceCache *openapiv3.ServiceCache
117120

118121
// URL getters depending on genericspiserver.ExternalAddress which is initialized on server run
119122
ShardBaseURL func() string
@@ -389,6 +392,7 @@ func NewConfig(opts kcpserveroptions.CompletedOptions) (*Config, error) {
389392
// to give handlers below one mux.Handle func to call.
390393
c.preHandlerChainMux = &handlerChainMuxes{}
391394
c.GenericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, genericConfig *genericapiserver.Config) (secure http.Handler) {
395+
apiHandler = openapiv3.WithOpenAPIv3(apiHandler, c.openAPIv3ServiceCache) // will be initialized further down after apiextensions-apiserver
392396
apiHandler = WithWildcardListWatchGuard(apiHandler)
393397
apiHandler = WithRequestIdentity(apiHandler)
394398
apiHandler = authorization.WithSubjectAccessReviewAuditAnnotations(apiHandler)
@@ -518,8 +522,10 @@ func NewConfig(opts kcpserveroptions.CompletedOptions) (*Config, error) {
518522
if err != nil {
519523
return nil, err
520524
}
525+
apiextGenericConfig := *c.GenericConfig
526+
apiextGenericConfig.SkipOpenAPIInstallation = true // we run our own OpenAPI service
521527
c.ApiExtensions, err = controlplaneapiserver.CreateAPIExtensionsConfig(
522-
*c.GenericConfig,
528+
apiextGenericConfig,
523529
informerfactoryhack.Wrap(c.KubeSharedInformerFactory),
524530
admissionPluginInitializers,
525531
opts.GenericControlPlane,
@@ -551,6 +557,9 @@ func NewConfig(opts kcpserveroptions.CompletedOptions) (*Config, error) {
551557
c.ApiExtensions.ExtraConfig.Informers = c.ApiExtensionsSharedInformerFactory
552558
c.ApiExtensions.ExtraConfig.TableConverterProvider = NewTableConverterProvider()
553559

560+
c.openAPIv3Controller = openapiv3.NewController(c.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions())
561+
c.openAPIv3ServiceCache = openapiv3.NewServiceCache(c.GenericConfig.OpenAPIV3Config, c.ApiExtensions.ExtraConfig.ClusterAwareCRDLister, c.openAPIv3Controller, openapiv3.DefaultServiceCacheSize)
562+
554563
c.MiniAggregator = &miniaggregator.MiniAggregatorConfig{
555564
GenericConfig: *c.GenericConfig,
556565
}

Diff for: pkg/server/controllers.go

+12
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ import (
7878
"github.com/kcp-dev/kcp/pkg/reconciler/tenancy/workspacemounts"
7979
"github.com/kcp-dev/kcp/pkg/reconciler/tenancy/workspacetype"
8080
"github.com/kcp-dev/kcp/pkg/reconciler/topology/partitionset"
81+
"github.com/kcp-dev/kcp/pkg/server/openapiv3"
8182
initializingworkspacesbuilder "github.com/kcp-dev/kcp/pkg/virtual/initializingworkspaces/builder"
8283
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
8384
tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1"
@@ -151,6 +152,17 @@ func (s *Server) installClusterRoleAggregationController(ctx context.Context, co
151152
})
152153
}
153154

155+
func (s *Server) installOpenAPIv3Controller(ctx context.Context, config *rest.Config) error {
156+
controllerName := openapiv3.ControllerName
157+
158+
return s.registerController(&controllerWrapper{
159+
Name: controllerName,
160+
Runner: func(ctx context.Context) {
161+
s.openAPIv3Controller.Run(ctx)
162+
},
163+
})
164+
}
165+
154166
func (s *Server) installKubeNamespaceController(ctx context.Context, config *rest.Config) error {
155167
controllerName := "kube-namespace-controller"
156168
config = rest.AddUserAgent(rest.CopyConfig(config), controllerName)

Diff for: pkg/server/openapiv3/controller.go

+260
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
Copyright 2024 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package openapiv3
18+
19+
import (
20+
"context"
21+
"crypto/sha512"
22+
"encoding/json"
23+
"fmt"
24+
"sync"
25+
"time"
26+
27+
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
28+
kcpapiextensionsv1informers "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
29+
kcpapiextensionsv1listers "github.com/kcp-dev/client-go/apiextensions/listers/apiextensions/v1"
30+
"github.com/kcp-dev/logicalcluster/v3"
31+
32+
apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
33+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
34+
"k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
35+
"k8s.io/apimachinery/pkg/api/errors"
36+
"k8s.io/apimachinery/pkg/labels"
37+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
38+
"k8s.io/apimachinery/pkg/util/wait"
39+
"k8s.io/client-go/tools/cache"
40+
"k8s.io/client-go/util/workqueue"
41+
"k8s.io/klog/v2"
42+
"k8s.io/kube-openapi/pkg/cached"
43+
"k8s.io/kube-openapi/pkg/spec3"
44+
45+
"github.com/kcp-dev/kcp/pkg/logging"
46+
)
47+
48+
const ControllerName = "kcp-openapiv3"
49+
50+
type CRDSpecGetter interface {
51+
GetCRDSpecs(clusterName logicalcluster.Name, name string) (specs map[string]cached.Data[*spec3.OpenAPI], err error)
52+
}
53+
54+
// Controller watches CustomResourceDefinitions and publishes OpenAPI v3.
55+
type Controller struct {
56+
crdLister kcpapiextensionsv1listers.CustomResourceDefinitionClusterLister
57+
crdsSynced cache.InformerSynced
58+
59+
queue workqueue.RateLimitingInterface
60+
61+
// specs per version, logical cluster and per CRD name
62+
lock sync.Mutex
63+
byClusterNameVersion map[logicalcluster.Name]map[string]map[string]cached.Data[*spec3.OpenAPI]
64+
}
65+
66+
// NewController creates a new Controller with input CustomResourceDefinition informer.
67+
func NewController(crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer) *Controller {
68+
c := &Controller{
69+
crdLister: crdInformer.Lister(),
70+
crdsSynced: crdInformer.Informer().HasSynced,
71+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_v3_controller"),
72+
byClusterNameVersion: map[logicalcluster.Name]map[string]map[string]cached.Data[*spec3.OpenAPI]{},
73+
}
74+
75+
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ //nolint:errcheck
76+
AddFunc: c.addCustomResourceDefinition,
77+
UpdateFunc: c.updateCustomResourceDefinition,
78+
DeleteFunc: c.deleteCustomResourceDefinition,
79+
})
80+
81+
return c
82+
}
83+
84+
func (c *Controller) Run(ctx context.Context) {
85+
defer utilruntime.HandleCrash()
86+
defer c.queue.ShutDown()
87+
88+
log := logging.WithReconciler(klog.FromContext(ctx), ControllerName)
89+
ctx = klog.NewContext(ctx, log)
90+
log.Info("Starting controller")
91+
defer log.Info("Shutting down controller")
92+
93+
if !cache.WaitForCacheSync(ctx.Done(), c.crdsSynced) {
94+
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
95+
return
96+
}
97+
98+
crds, err := c.crdLister.List(labels.Everything())
99+
if err != nil {
100+
utilruntime.HandleError(fmt.Errorf("failed to initially list all CRDs: %v", err))
101+
return
102+
}
103+
for _, crd := range crds {
104+
c.processCRD(crd)
105+
}
106+
107+
go wait.Until(func() { c.startWorker(ctx) }, time.Second, ctx.Done())
108+
109+
<-ctx.Done()
110+
}
111+
112+
func (c *Controller) startWorker(ctx context.Context) {
113+
for c.processNextWorkItem(ctx) {
114+
}
115+
}
116+
117+
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
118+
// Wait until there is a new item in the working queue
119+
k, quit := c.queue.Get()
120+
if quit {
121+
return false
122+
}
123+
key := k.(string)
124+
125+
log := logging.WithQueueKey(klog.FromContext(ctx), key)
126+
ctx = klog.NewContext(ctx, log)
127+
log.V(4).Info("processing key")
128+
129+
// No matter what, tell the queue we're done with this key, to unblock
130+
// other workers.
131+
defer c.queue.Done(key)
132+
133+
// log slow aggregations
134+
start := time.Now()
135+
defer func() {
136+
elapsed := time.Since(start)
137+
if elapsed > time.Second {
138+
log.Info("slow openapi v3 aggregation", "duration", elapsed)
139+
}
140+
}()
141+
142+
if requeue, err := c.process(ctx, key); err != nil {
143+
utilruntime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", ControllerName, key, err))
144+
c.queue.AddRateLimited(key)
145+
return true
146+
} else if requeue {
147+
// only requeue if we didn't error, but we still want to requeue
148+
c.queue.Add(key)
149+
return true
150+
}
151+
c.queue.Forget(key)
152+
return true
153+
}
154+
155+
func (c *Controller) process(ctx context.Context, key string) (bool, error) {
156+
c.lock.Lock()
157+
defer c.lock.Unlock()
158+
159+
clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
160+
if err != nil {
161+
utilruntime.HandleError(err)
162+
return false, nil
163+
}
164+
crd, err := c.crdLister.Cluster(clusterName).Get(name)
165+
if err != nil && !errors.IsNotFound(err) {
166+
return false, err
167+
}
168+
169+
if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
170+
delete(c.byClusterNameVersion[clusterName], name)
171+
if len(c.byClusterNameVersion[clusterName]) == 0 {
172+
delete(c.byClusterNameVersion, clusterName)
173+
}
174+
return false, nil
175+
}
176+
177+
c.processCRD(crd)
178+
179+
return false, nil
180+
}
181+
182+
func (c *Controller) processCRD(crd *apiextensionsv1.CustomResourceDefinition) {
183+
clusterName := logicalcluster.From(crd)
184+
185+
// remove old instance
186+
delete(c.byClusterNameVersion[clusterName], crd.Name)
187+
188+
// add new instance with all versions
189+
for _, v := range crd.Spec.Versions {
190+
if !v.Served {
191+
delete(c.byClusterNameVersion[clusterName][crd.Name], v.Name)
192+
continue
193+
}
194+
195+
spec := cached.NewStaticSource[*spec3.OpenAPI](func() cached.Result[*spec3.OpenAPI] {
196+
spec, err := builder.BuildOpenAPIV3(crd, v.Name, builder.Options{V2: false})
197+
if err != nil {
198+
return cached.NewResultErr[*spec3.OpenAPI](err)
199+
}
200+
bs, err := json.Marshal(spec)
201+
if err != nil {
202+
return cached.NewResultErr[*spec3.OpenAPI](err)
203+
}
204+
return cached.NewResultOK[*spec3.OpenAPI](spec, fmt.Sprintf("%X", sha512.Sum512(bs)))
205+
})
206+
if c.byClusterNameVersion[clusterName] == nil {
207+
c.byClusterNameVersion[clusterName] = map[string]map[string]cached.Data[*spec3.OpenAPI]{}
208+
}
209+
if c.byClusterNameVersion[clusterName][crd.Name] == nil {
210+
c.byClusterNameVersion[clusterName][crd.Name] = map[string]cached.Data[*spec3.OpenAPI]{}
211+
}
212+
c.byClusterNameVersion[clusterName][crd.Name][v.Name] = spec
213+
}
214+
}
215+
216+
func (c *Controller) addCustomResourceDefinition(obj interface{}) {
217+
castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
218+
c.enqueue(castObj)
219+
}
220+
221+
func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) {
222+
castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
223+
c.enqueue(castNewObj)
224+
}
225+
226+
func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
227+
castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
228+
if !ok {
229+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
230+
if !ok {
231+
return
232+
}
233+
castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
234+
if !ok {
235+
return
236+
}
237+
}
238+
c.enqueue(castObj)
239+
}
240+
241+
func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
242+
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
243+
if err != nil {
244+
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
245+
return
246+
}
247+
c.queue.Add(key)
248+
}
249+
250+
func (c *Controller) GetCRDSpecs(clusterName logicalcluster.Name, name string) (specs map[string]cached.Data[*spec3.OpenAPI], err error) {
251+
c.lock.Lock()
252+
defer c.lock.Unlock()
253+
254+
specs, ok := c.byClusterNameVersion[clusterName][name]
255+
if !ok {
256+
return nil, fmt.Errorf("CRD %s|%s not found", clusterName, name)
257+
}
258+
259+
return specs, nil
260+
}

0 commit comments

Comments
 (0)