-
Notifications
You must be signed in to change notification settings - Fork 76
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a simple informer implementation.
- Loading branch information
1 parent
075b33a
commit ca7256a
Showing
4 changed files
with
280 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/kubernetes-client/go/kubernetes/client" | ||
"github.com/kubernetes-client/go/kubernetes/config" | ||
) | ||
|
||
type handler struct{} | ||
|
||
func (h handler) OnAdd(obj interface{}) { | ||
ns := obj.(*client.V1Namespace) | ||
fmt.Printf("Added %s\n", ns.Metadata.Name) | ||
} | ||
|
||
func (h handler) OnUpdate(oldObj, newObj interface{}) { | ||
ns := newObj.(*client.V1Namespace) | ||
fmt.Printf("Updated %s\n", ns.Metadata.Name) | ||
} | ||
|
||
func (h handler) OnDelete(obj interface{}) { | ||
ns := obj.(*client.V1Namespace) | ||
fmt.Printf("Deleted %s\n", ns.Metadata.Name) | ||
} | ||
|
||
func main() { | ||
c, err := config.LoadKubeConfig() | ||
if err != nil { | ||
panic(err.Error()) | ||
} | ||
|
||
// create the clientset | ||
clientset := client.NewAPIClient(c) | ||
|
||
lister := func() ([]interface{}, string, error) { | ||
namespaces, _, err := clientset.CoreV1Api.ListNamespace(context.Background(), nil) | ||
if err != nil { | ||
return nil, "", err | ||
} | ||
result := make([]interface{}, len(namespaces.Items)) | ||
for ix := range namespaces.Items { | ||
result[ix] = &namespaces.Items[ix] | ||
} | ||
return result, namespaces.Metadata.ResourceVersion, nil | ||
} | ||
|
||
watcher := func(resourceVersion string) (<-chan *client.Result, <-chan error) { | ||
watch := client.WatchClient{ | ||
Cfg: c, | ||
Client: clientset, | ||
Path: "/api/v1/namespaces", | ||
MakerFn: func() interface{} { | ||
return &client.V1Namespace{} | ||
}, | ||
} | ||
results, errors, err := watch.Connect(context.Background(), resourceVersion) | ||
if err != nil { | ||
fmt.Printf("err: %s\n", err.Error()) | ||
} | ||
return results, errors | ||
} | ||
|
||
extractor := func(obj interface{}) *client.V1ObjectMeta { | ||
return obj.(*client.V1Namespace).Metadata | ||
} | ||
|
||
cache := client.Cache{ | ||
Extractor: extractor, | ||
Lister: lister, | ||
Watcher: watcher, | ||
} | ||
cache.AddEventHandler(handler{}) | ||
go cache.Run(make(chan bool)) | ||
|
||
for { | ||
fmt.Printf("----------\n") | ||
list := cache.List() | ||
for ix := range list { | ||
ns := list[ix].(*client.V1Namespace) | ||
fmt.Printf("%s %#v\n", ns.Metadata.Name, ns.Metadata.Labels) | ||
} | ||
fmt.Printf("----------\n") | ||
time.Sleep(5 * time.Second) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
package client | ||
|
||
import ( | ||
"log" | ||
"time" | ||
) | ||
|
||
type objEntry struct { | ||
metadata *V1ObjectMeta | ||
obj interface{} | ||
} | ||
|
||
// ObjectLister is a function that knows how to list objects. | ||
type ObjectLister func() ([]interface{}, string, error) | ||
|
||
// ObjectWatcher is a function that knows how to perform a watch. | ||
type ObjectWatcher func(resourceVersion string) (results <-chan *Result, errors <-chan error) | ||
|
||
// EventHandler is implemented by objects that want event notifications | ||
type EventHandler interface { | ||
OnAdd(obj interface{}) | ||
OnUpdate(oldObj, newObj interface{}) | ||
OnDelete(obj interface{}) | ||
} | ||
|
||
// Informer is an interface for things that can provide notifications | ||
type Informer interface { | ||
AddEventHandler(handler EventHandler) | ||
} | ||
|
||
// Lister is an interface for things that can list objects for all namespaces or by namespace | ||
type Lister interface { | ||
List() []interface{} | ||
ByNamespace(namespace string) []interface{} | ||
} | ||
|
||
// Validate that we implement the interfaces | ||
var _ Lister = &Cache{} | ||
var _ Informer = &Cache{} | ||
|
||
// Cache is an implementation of a List/Watch cache | ||
type Cache struct { | ||
Extractor func(interface{}) *V1ObjectMeta | ||
Lister ObjectLister | ||
Watcher ObjectWatcher | ||
allObjects []objEntry | ||
namespaceObjects map[string][]objEntry | ||
eventHandlers []EventHandler | ||
} | ||
|
||
func (c *Cache) AddEventHandler(handler EventHandler) { | ||
c.eventHandlers = append(c.eventHandlers, handler) | ||
} | ||
|
||
const maxSleep = 60 * time.Second | ||
|
||
func (c *Cache) Run(stop <-chan bool) { | ||
sleep := 1 * time.Second | ||
for { | ||
select { | ||
case <-stop: | ||
return | ||
default: | ||
// pass | ||
} | ||
if err := c.ListWatch(); err != nil { | ||
log.Printf("%s\n", err.Error()) | ||
time.Sleep(sleep) | ||
sleep = sleep * 2 | ||
if sleep > maxSleep { | ||
sleep = maxSleep | ||
} | ||
} else { | ||
sleep = 1 | ||
} | ||
} | ||
} | ||
|
||
func (c *Cache) ListWatch() error { | ||
objects, resourceVersion, err := c.Lister() | ||
if err != nil { | ||
return err | ||
} | ||
for ix := range objects { | ||
meta := c.Extractor(objects[ix]) | ||
c.AddOrUpdate(meta, objects[ix]) | ||
} | ||
results, errors := c.Watcher(resourceVersion) | ||
for { | ||
select { | ||
case result, ok := <-results: | ||
if !ok { | ||
return nil | ||
} | ||
c.ProcessResult(result) | ||
case err := <-errors: | ||
return err | ||
} | ||
} | ||
} | ||
|
||
func (c *Cache) ProcessResult(res *Result) { | ||
metadata := c.Extractor(res.Object) | ||
|
||
switch res.Type { | ||
case Added, Modified: | ||
c.AddOrUpdate(metadata, res.Object) | ||
case Deleted: | ||
c.Delete(metadata, res.Object) | ||
} | ||
} | ||
|
||
func (c *Cache) AddOrUpdate(metadata *V1ObjectMeta, obj interface{}) { | ||
var oldObj interface{} | ||
c.allObjects, oldObj = InsertOrUpdate(c.allObjects, metadata, obj) | ||
if len(metadata.Namespace) > 0 { | ||
c.namespaceObjects[metadata.Namespace], _ = | ||
InsertOrUpdate(c.namespaceObjects[metadata.Namespace], metadata, obj) | ||
} | ||
for ix := range c.eventHandlers { | ||
if oldObj == nil { | ||
c.eventHandlers[ix].OnAdd(obj) | ||
} else { | ||
c.eventHandlers[ix].OnUpdate(oldObj, obj) | ||
} | ||
} | ||
} | ||
|
||
func (c *Cache) Delete(metadata *V1ObjectMeta, obj interface{}) { | ||
var deleted bool | ||
c.allObjects, deleted = Delete(c.allObjects, metadata) | ||
if len(metadata.Namespace) > 0 { | ||
c.namespaceObjects[metadata.Namespace], _ = | ||
Delete(c.namespaceObjects[metadata.Namespace], metadata) | ||
} | ||
if deleted { | ||
for ix := range c.eventHandlers { | ||
c.eventHandlers[ix].OnDelete(obj) | ||
} | ||
} | ||
} | ||
|
||
func (c *Cache) List() []interface{} { | ||
result := make([]interface{}, len(c.allObjects)) | ||
for ix := range c.allObjects { | ||
result[ix] = c.allObjects[ix].obj | ||
} | ||
return result | ||
} | ||
|
||
func (c *Cache) ByNamespace(namespace string) []interface{} { | ||
list := c.namespaceObjects[namespace] | ||
result := make([]interface{}, len(list)) | ||
for ix := range list { | ||
result[ix] = list[ix].obj | ||
} | ||
return result | ||
} | ||
|
||
func InsertOrUpdate(list []objEntry, metadata *V1ObjectMeta, obj interface{}) ([]objEntry, interface{}) { | ||
ix := FindObject(list, metadata) | ||
if ix == -1 { | ||
return append(list, objEntry{metadata: metadata, obj: obj}), nil | ||
} | ||
oldObj := list[ix] | ||
list[ix] = objEntry{metadata: metadata, obj: obj} | ||
return list, oldObj | ||
} | ||
|
||
func Delete(list []objEntry, metadata *V1ObjectMeta) ([]objEntry, bool) { | ||
ix := FindObject(list, metadata) | ||
if ix == -1 { | ||
return list, false | ||
} | ||
return append(list[:ix], list[ix+1:]...), true | ||
} | ||
|
||
func FindObject(list []objEntry, metadata *V1ObjectMeta) int { | ||
for ix := range list { | ||
entry := &list[ix] | ||
if entry.metadata.Namespace == metadata.Namespace && | ||
entry.metadata.Name == metadata.Name { | ||
return ix | ||
} | ||
} | ||
return -1 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters