From ca7256a68a8eaceaac650e6834d1c32ed110aff5 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Sat, 22 Jun 2019 20:51:05 -0700 Subject: [PATCH] Add a simple informer implementation. --- examples/informer/informer.go | 88 ++++++++++++++++ kubernetes/client/cache.go | 187 ++++++++++++++++++++++++++++++++++ kubernetes/client/watch.go | 4 + kubernetes/config/azure.go | 2 +- 4 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 examples/informer/informer.go create mode 100644 kubernetes/client/cache.go diff --git a/examples/informer/informer.go b/examples/informer/informer.go new file mode 100644 index 0000000..efaf5e7 --- /dev/null +++ b/examples/informer/informer.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/kubernetes-client/go/kubernetes/client" + "github.com/kubernetes-client/go/kubernetes/config" +) + +type handler struct{} + +func (h handler) OnAdd(obj interface{}) { + ns := obj.(*client.V1Namespace) + fmt.Printf("Added %s\n", ns.Metadata.Name) +} + +func (h handler) OnUpdate(oldObj, newObj interface{}) { + ns := newObj.(*client.V1Namespace) + fmt.Printf("Updated %s\n", ns.Metadata.Name) +} + +func (h handler) OnDelete(obj interface{}) { + ns := obj.(*client.V1Namespace) + fmt.Printf("Deleted %s\n", ns.Metadata.Name) +} + +func main() { + c, err := config.LoadKubeConfig() + if err != nil { + panic(err.Error()) + } + + // create the clientset + clientset := client.NewAPIClient(c) + + lister := func() ([]interface{}, string, error) { + namespaces, _, err := clientset.CoreV1Api.ListNamespace(context.Background(), nil) + if err != nil { + return nil, "", err + } + result := make([]interface{}, len(namespaces.Items)) + for ix := range namespaces.Items { + result[ix] = &namespaces.Items[ix] + } + return result, namespaces.Metadata.ResourceVersion, nil + } + + watcher := func(resourceVersion string) (<-chan *client.Result, <-chan error) { + watch := client.WatchClient{ + Cfg: c, + Client: clientset, + Path: "/api/v1/namespaces", + MakerFn: func() interface{} { + return &client.V1Namespace{} + }, + } + results, errors, err := watch.Connect(context.Background(), resourceVersion) + if err != nil { + fmt.Printf("err: %s\n", err.Error()) + } + return results, errors + } + + extractor := func(obj interface{}) *client.V1ObjectMeta { + return obj.(*client.V1Namespace).Metadata + } + + cache := client.Cache{ + Extractor: extractor, + Lister: lister, + Watcher: watcher, + } + cache.AddEventHandler(handler{}) + go cache.Run(make(chan bool)) + + for { + fmt.Printf("----------\n") + list := cache.List() + for ix := range list { + ns := list[ix].(*client.V1Namespace) + fmt.Printf("%s %#v\n", ns.Metadata.Name, ns.Metadata.Labels) + } + fmt.Printf("----------\n") + time.Sleep(5 * time.Second) + } +} diff --git a/kubernetes/client/cache.go b/kubernetes/client/cache.go new file mode 100644 index 0000000..21dd490 --- /dev/null +++ b/kubernetes/client/cache.go @@ -0,0 +1,187 @@ +package client + +import ( + "log" + "time" +) + +type objEntry struct { + metadata *V1ObjectMeta + obj interface{} +} + +// ObjectLister is a function that knows how to list objects. +type ObjectLister func() ([]interface{}, string, error) + +// ObjectWatcher is a function that knows how to perform a watch. +type ObjectWatcher func(resourceVersion string) (results <-chan *Result, errors <-chan error) + +// EventHandler is implemented by objects that want event notifications +type EventHandler interface { + OnAdd(obj interface{}) + OnUpdate(oldObj, newObj interface{}) + OnDelete(obj interface{}) +} + +// Informer is an interface for things that can provide notifications +type Informer interface { + AddEventHandler(handler EventHandler) +} + +// Lister is an interface for things that can list objects for all namespaces or by namespace +type Lister interface { + List() []interface{} + ByNamespace(namespace string) []interface{} +} + +// Validate that we implement the interfaces +var _ Lister = &Cache{} +var _ Informer = &Cache{} + +// Cache is an implementation of a List/Watch cache +type Cache struct { + Extractor func(interface{}) *V1ObjectMeta + Lister ObjectLister + Watcher ObjectWatcher + allObjects []objEntry + namespaceObjects map[string][]objEntry + eventHandlers []EventHandler +} + +func (c *Cache) AddEventHandler(handler EventHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + +const maxSleep = 60 * time.Second + +func (c *Cache) Run(stop <-chan bool) { + sleep := 1 * time.Second + for { + select { + case <-stop: + return + default: + // pass + } + if err := c.ListWatch(); err != nil { + log.Printf("%s\n", err.Error()) + time.Sleep(sleep) + sleep = sleep * 2 + if sleep > maxSleep { + sleep = maxSleep + } + } else { + sleep = 1 + } + } +} + +func (c *Cache) ListWatch() error { + objects, resourceVersion, err := c.Lister() + if err != nil { + return err + } + for ix := range objects { + meta := c.Extractor(objects[ix]) + c.AddOrUpdate(meta, objects[ix]) + } + results, errors := c.Watcher(resourceVersion) + for { + select { + case result, ok := <-results: + if !ok { + return nil + } + c.ProcessResult(result) + case err := <-errors: + return err + } + } +} + +func (c *Cache) ProcessResult(res *Result) { + metadata := c.Extractor(res.Object) + + switch res.Type { + case Added, Modified: + c.AddOrUpdate(metadata, res.Object) + case Deleted: + c.Delete(metadata, res.Object) + } +} + +func (c *Cache) AddOrUpdate(metadata *V1ObjectMeta, obj interface{}) { + var oldObj interface{} + c.allObjects, oldObj = InsertOrUpdate(c.allObjects, metadata, obj) + if len(metadata.Namespace) > 0 { + c.namespaceObjects[metadata.Namespace], _ = + InsertOrUpdate(c.namespaceObjects[metadata.Namespace], metadata, obj) + } + for ix := range c.eventHandlers { + if oldObj == nil { + c.eventHandlers[ix].OnAdd(obj) + } else { + c.eventHandlers[ix].OnUpdate(oldObj, obj) + } + } +} + +func (c *Cache) Delete(metadata *V1ObjectMeta, obj interface{}) { + var deleted bool + c.allObjects, deleted = Delete(c.allObjects, metadata) + if len(metadata.Namespace) > 0 { + c.namespaceObjects[metadata.Namespace], _ = + Delete(c.namespaceObjects[metadata.Namespace], metadata) + } + if deleted { + for ix := range c.eventHandlers { + c.eventHandlers[ix].OnDelete(obj) + } + } +} + +func (c *Cache) List() []interface{} { + result := make([]interface{}, len(c.allObjects)) + for ix := range c.allObjects { + result[ix] = c.allObjects[ix].obj + } + return result +} + +func (c *Cache) ByNamespace(namespace string) []interface{} { + list := c.namespaceObjects[namespace] + result := make([]interface{}, len(list)) + for ix := range list { + result[ix] = list[ix].obj + } + return result +} + +func InsertOrUpdate(list []objEntry, metadata *V1ObjectMeta, obj interface{}) ([]objEntry, interface{}) { + ix := FindObject(list, metadata) + if ix == -1 { + return append(list, objEntry{metadata: metadata, obj: obj}), nil + } + oldObj := list[ix] + list[ix] = objEntry{metadata: metadata, obj: obj} + return list, oldObj +} + +func Delete(list []objEntry, metadata *V1ObjectMeta) ([]objEntry, bool) { + ix := FindObject(list, metadata) + if ix == -1 { + return list, false + } + return append(list[:ix], list[ix+1:]...), true +} + +func FindObject(list []objEntry, metadata *V1ObjectMeta) int { + for ix := range list { + entry := &list[ix] + if entry.metadata.Namespace == metadata.Namespace && + entry.metadata.Name == metadata.Name { + return ix + } + } + return -1 +} diff --git a/kubernetes/client/watch.go b/kubernetes/client/watch.go index ce42ed9..e553bc5 100644 --- a/kubernetes/client/watch.go +++ b/kubernetes/client/watch.go @@ -15,6 +15,10 @@ type Result struct { Object interface{} } +const Added = "ADDED" +const Modified = "MODIFIED" +const Deleted = "DELETED" + // WatchClient is a client for Watching the Kubernetes API type WatchClient struct { Cfg *Configuration diff --git a/kubernetes/config/azure.go b/kubernetes/config/azure.go index 8d1384b..b918a2d 100644 --- a/kubernetes/config/azure.go +++ b/kubernetes/config/azure.go @@ -62,7 +62,7 @@ func (l *KubeConfigLoader) refreshAzureToken() error { AccessToken: l.user.AuthProvider.Config["access-token"], RefreshToken: l.user.AuthProvider.Config["refresh-token"], ExpiresIn: json.Number(l.user.AuthProvider.Config["expires-in"]), - ExpiresOn: json.Number(l.user.AuthProvider.Config["expires-in"]), + ExpiresOn: json.Number(l.user.AuthProvider.Config["expires-on"]), } sptToken, err := adal.NewServicePrincipalTokenFromManualToken(*config, clientID, resource, token) if err := sptToken.Refresh(); err != nil {