Skip to content

Commit 966df30

Browse files
author
Phillip Wittrock
authored
Merge pull request #5 from pwittrock/master
Update sample to use kubebuilder controller library
2 parents 416d4ef + c5e577f commit 966df30

File tree

6 files changed

+79
-371
lines changed

6 files changed

+79
-371
lines changed

samples/controller/controller.go

Lines changed: 30 additions & 232 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package main
1818

1919
import (
2020
"fmt"
21-
"time"
2221

2322
"github.com/golang/glog"
2423
appsv1 "k8s.io/api/apps/v1"
@@ -27,21 +26,16 @@ import (
2726
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2827
"k8s.io/apimachinery/pkg/runtime/schema"
2928
"k8s.io/apimachinery/pkg/util/runtime"
30-
"k8s.io/apimachinery/pkg/util/wait"
31-
kubeinformers "k8s.io/client-go/informers"
32-
"k8s.io/client-go/kubernetes"
3329
"k8s.io/client-go/kubernetes/scheme"
34-
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
35-
appslisters "k8s.io/client-go/listers/apps/v1"
36-
"k8s.io/client-go/tools/cache"
3730
"k8s.io/client-go/tools/record"
38-
"k8s.io/client-go/util/workqueue"
3931

32+
"github.com/kubernetes-sigs/kubebuilder/pkg/controller"
33+
"github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers"
34+
"github.com/kubernetes-sigs/kubebuilder/pkg/controller/predicates"
35+
"github.com/kubernetes-sigs/kubebuilder/pkg/controller/types"
4036
samplev1alpha1 "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/apis/samplecontroller/v1alpha1"
41-
clientset "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/clientset/versioned"
4237
samplescheme "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/clientset/versioned/scheme"
43-
informers "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/informers/externalversions"
44-
listers "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/listers/samplecontroller/v1alpha1"
38+
"github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/inject/args"
4539
)
4640

4741
const controllerAgentName = "sample-controller"
@@ -63,196 +57,53 @@ const (
6357

6458
// Controller is the controller implementation for Foo resources
6559
type Controller struct {
66-
// kubeclientset is a standard kubernetes clientset
67-
kubeclientset kubernetes.Interface
68-
// sampleclientset is a clientset for our own API group
69-
sampleclientset clientset.Interface
60+
args.InjectArgs
7061

71-
deploymentsLister appslisters.DeploymentLister
72-
deploymentsSynced cache.InformerSynced
73-
foosLister listers.FooLister
74-
foosSynced cache.InformerSynced
75-
76-
// workqueue is a rate limited work queue. This is used to queue work to be
77-
// processed instead of performing it as soon as a change happens. This
78-
// means we can ensure we only process a fixed amount of resources at a
79-
// time, and makes it easy to ensure we are never processing the same item
80-
// simultaneously in two different workers.
81-
workqueue workqueue.RateLimitingInterface
8262
// recorder is an event recorder for recording Event resources to the
8363
// Kubernetes API.
8464
recorder record.EventRecorder
8565
}
8666

8767
// NewController returns a new sample controller
88-
func NewController(
89-
kubeclientset kubernetes.Interface,
90-
sampleclientset clientset.Interface,
91-
kubeInformerFactory kubeinformers.SharedInformerFactory,
92-
sampleInformerFactory informers.SharedInformerFactory) *Controller {
93-
94-
// obtain references to shared index informers for the Deployment and Foo
95-
// types.
96-
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
97-
fooInformer := sampleInformerFactory.Samplecontroller().V1alpha1().Foos()
98-
99-
// Create event broadcaster
100-
// Add sample-controller types to the default Kubernetes Scheme so Events can be
101-
// logged for sample-controller types.
68+
func NewController(iargs args.InjectArgs) *controller.GenericController {
10269
samplescheme.AddToScheme(scheme.Scheme)
103-
glog.V(4).Info("Creating event broadcaster")
104-
eventBroadcaster := record.NewBroadcaster()
105-
eventBroadcaster.StartLogging(glog.Infof)
106-
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
107-
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
10870

109-
controller := &Controller{
110-
kubeclientset: kubeclientset,
111-
sampleclientset: sampleclientset,
112-
deploymentsLister: deploymentInformer.Lister(),
113-
deploymentsSynced: deploymentInformer.Informer().HasSynced,
114-
foosLister: fooInformer.Lister(),
115-
foosSynced: fooInformer.Informer().HasSynced,
116-
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
117-
recorder: recorder,
71+
c := &Controller{
72+
recorder: iargs.CreateRecorder(controllerAgentName),
73+
}
74+
75+
genericController := &controller.GenericController{
76+
Name: controllerAgentName,
77+
InformerRegistry: iargs.ControllerManager,
78+
Reconcile: c.syncHandler,
11879
}
11980

12081
glog.Info("Setting up event handlers")
121-
// Set up an event handler for when Foo resources change
122-
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
123-
AddFunc: controller.enqueueFoo,
124-
UpdateFunc: func(old, new interface{}) {
125-
controller.enqueueFoo(new)
126-
},
127-
})
82+
genericController.Watch(&samplev1alpha1.Foo{})
83+
12884
// Set up an event handler for when Deployment resources change. This
12985
// handler will lookup the owner of the given Deployment, and if it is
13086
// owned by a Foo resource will enqueue that Foo resource for
13187
// processing. This way, we don't need to implement custom logic for
13288
// handling Deployment resources. More info on this pattern:
13389
// https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
134-
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
135-
AddFunc: controller.handleObject,
136-
UpdateFunc: func(old, new interface{}) {
137-
newDepl := new.(*appsv1.Deployment)
138-
oldDepl := old.(*appsv1.Deployment)
139-
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
140-
// Periodic resync will send update events for all known Deployments.
141-
// Two different versions of the same Deployment will always have different RVs.
142-
return
143-
}
144-
controller.handleObject(new)
145-
},
146-
DeleteFunc: controller.handleObject,
147-
})
90+
genericController.WatchControllerOf(&appsv1.Deployment{}, eventhandlers.Path{c.LookupFoo},
91+
predicates.ResourceVersionChanged)
14892

149-
return controller
93+
return genericController
15094
}
15195

152-
// Run will set up the event handlers for types we are interested in, as well
153-
// as syncing informer caches and starting workers. It will block until stopCh
154-
// is closed, at which point it will shutdown the workqueue and wait for
155-
// workers to finish processing their current work items.
156-
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
157-
defer runtime.HandleCrash()
158-
defer c.workqueue.ShutDown()
159-
160-
// Start the informer factories to begin populating the informer caches
161-
glog.Info("Starting Foo controller")
162-
163-
// Wait for the caches to be synced before starting workers
164-
glog.Info("Waiting for informer caches to sync")
165-
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
166-
return fmt.Errorf("failed to wait for caches to sync")
167-
}
168-
169-
glog.Info("Starting workers")
170-
// Launch two workers to process Foo resources
171-
for i := 0; i < threadiness; i++ {
172-
go wait.Until(c.runWorker, time.Second, stopCh)
173-
}
174-
175-
glog.Info("Started workers")
176-
<-stopCh
177-
glog.Info("Shutting down workers")
178-
179-
return nil
180-
}
181-
182-
// runWorker is a long-running function that will continually call the
183-
// processNextWorkItem function in order to read and process a message on the
184-
// workqueue.
185-
func (c *Controller) runWorker() {
186-
for c.processNextWorkItem() {
187-
}
188-
}
189-
190-
// processNextWorkItem will read a single work item off the workqueue and
191-
// attempt to process it, by calling the syncHandler.
192-
func (c *Controller) processNextWorkItem() bool {
193-
obj, shutdown := c.workqueue.Get()
194-
195-
if shutdown {
196-
return false
197-
}
198-
199-
// We wrap this block in a func so we can defer c.workqueue.Done.
200-
err := func(obj interface{}) error {
201-
// We call Done here so the workqueue knows we have finished
202-
// processing this item. We also must remember to call Forget if we
203-
// do not want this work item being re-queued. For example, we do
204-
// not call Forget if a transient error occurs, instead the item is
205-
// put back on the workqueue and attempted again after a back-off
206-
// period.
207-
defer c.workqueue.Done(obj)
208-
var key string
209-
var ok bool
210-
// We expect strings to come off the workqueue. These are of the
211-
// form namespace/name. We do this as the delayed nature of the
212-
// workqueue means the items in the informer cache may actually be
213-
// more up to date that when the item was initially put onto the
214-
// workqueue.
215-
if key, ok = obj.(string); !ok {
216-
// As the item in the workqueue is actually invalid, we call
217-
// Forget here else we'd go into a loop of attempting to
218-
// process a work item that is invalid.
219-
c.workqueue.Forget(obj)
220-
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
221-
return nil
222-
}
223-
// Run the syncHandler, passing it the namespace/name string of the
224-
// Foo resource to be synced.
225-
if err := c.syncHandler(key); err != nil {
226-
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
227-
}
228-
// Finally, if no error occurs we Forget this item so it does not
229-
// get queued again until another change happens.
230-
c.workqueue.Forget(obj)
231-
glog.Infof("Successfully synced '%s'", key)
232-
return nil
233-
}(obj)
234-
235-
if err != nil {
236-
runtime.HandleError(err)
237-
return true
238-
}
239-
240-
return true
96+
// LookupFoo looksup a Foo from the lister
97+
func (c Controller) LookupFoo(r types.ReconcileKey) (interface{}, error) {
98+
return c.Informers.Samplecontroller().V1alpha1().Foos().Lister().Foos(r.Namespace).Get(r.Name)
24199
}
242100

243101
// syncHandler compares the actual state with the desired, and attempts to
244102
// converge the two. It then updates the Status block of the Foo resource
245103
// with the current status of the resource.
246-
func (c *Controller) syncHandler(key string) error {
247-
// Convert the namespace/name string into a distinct namespace and name
248-
namespace, name, err := cache.SplitMetaNamespaceKey(key)
249-
if err != nil {
250-
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
251-
return nil
252-
}
253-
254-
// Get the Foo resource with this namespace/name
255-
foo, err := c.foosLister.Foos(namespace).Get(name)
104+
func (c *Controller) syncHandler(key types.ReconcileKey) error {
105+
namespace, name := key.Namespace, key.Name
106+
foo, err := c.Informers.Samplecontroller().V1alpha1().Foos().Lister().Foos(namespace).Get(name)
256107
if err != nil {
257108
// The Foo resource may no longer exist, in which case we stop
258109
// processing.
@@ -274,10 +125,10 @@ func (c *Controller) syncHandler(key string) error {
274125
}
275126

276127
// Get the deployment with the name specified in Foo.spec
277-
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
128+
deployment, err := c.KubernetesInformers.Apps().V1().Deployments().Lister().Deployments(foo.Namespace).Get(deploymentName)
278129
// If the resource doesn't exist, we'll create it
279130
if errors.IsNotFound(err) {
280-
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
131+
deployment, err = c.KubernetesClientSet.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
281132
}
282133

283134
// If an error occurs during Get/Create, we'll requeue the item so we can
@@ -300,7 +151,7 @@ func (c *Controller) syncHandler(key string) error {
300151
// should update the Deployment resource.
301152
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
302153
glog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
303-
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
154+
deployment, err = c.KubernetesClientSet.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
304155
}
305156

306157
// If an error occurs during Update, we'll requeue the item so we can
@@ -331,63 +182,10 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1
331182
// update the Status block of the Foo resource. UpdateStatus will not
332183
// allow changes to the Spec of the resource, which is ideal for ensuring
333184
// nothing other than resource status has been updated.
334-
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy)
185+
_, err := c.Clientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy)
335186
return err
336187
}
337188

338-
// enqueueFoo takes a Foo resource and converts it into a namespace/name
339-
// string which is then put onto the work queue. This method should *not* be
340-
// passed resources of any type other than Foo.
341-
func (c *Controller) enqueueFoo(obj interface{}) {
342-
var key string
343-
var err error
344-
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
345-
runtime.HandleError(err)
346-
return
347-
}
348-
c.workqueue.AddRateLimited(key)
349-
}
350-
351-
// handleObject will take any resource implementing metav1.Object and attempt
352-
// to find the Foo resource that 'owns' it. It does this by looking at the
353-
// objects metadata.ownerReferences field for an appropriate OwnerReference.
354-
// It then enqueues that Foo resource to be processed. If the object does not
355-
// have an appropriate OwnerReference, it will simply be skipped.
356-
func (c *Controller) handleObject(obj interface{}) {
357-
var object metav1.Object
358-
var ok bool
359-
if object, ok = obj.(metav1.Object); !ok {
360-
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
361-
if !ok {
362-
runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
363-
return
364-
}
365-
object, ok = tombstone.Obj.(metav1.Object)
366-
if !ok {
367-
runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
368-
return
369-
}
370-
glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
371-
}
372-
glog.V(4).Infof("Processing object: %s", object.GetName())
373-
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
374-
// If this object is not owned by a Foo, we should not do anything more
375-
// with it.
376-
if ownerRef.Kind != "Foo" {
377-
return
378-
}
379-
380-
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
381-
if err != nil {
382-
glog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
383-
return
384-
}
385-
386-
c.enqueueFoo(foo)
387-
return
388-
}
389-
}
390-
391189
// newDeployment creates a new Deployment for a Foo resource. It also sets
392190
// the appropriate OwnerReferences on the resource so handleObject can discover
393191
// the Foo resource that 'owns' it.

0 commit comments

Comments
 (0)