Skip to content

Commit

Permalink
Merge pull request #24 from brendandburns/watch
Browse files Browse the repository at this point in the history
Implement Watch
k8s-ci-robot authored Mar 27, 2019
2 parents 1c33110 + 42a6024 commit 3ce0a05
Showing 3 changed files with 306 additions and 0 deletions.
52 changes: 52 additions & 0 deletions examples/watch/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Note: the example only works with the code within the same release/branch.
package main

import (
"context"
"fmt"

"github.com/kubernetes-client/go/kubernetes/client"
"github.com/kubernetes-client/go/kubernetes/config"
)

func main() {
c, err := config.LoadKubeConfig()
if err != nil {
panic(err.Error())
}
cl := client.NewAPIClient(c)

watch := client.WatchClient{
Cfg: c,
Client: cl,
Path: "/api/v1/namespaces",
MakerFn: func() interface{} { return &client.V1Namespace{} },
}
if resultChan, errChan, err := watch.Connect(context.Background()); err != nil {
panic(err)
} else {
for obj := range resultChan {
fmt.Printf("%s\n%#v\n", obj.Type, obj.Object)
}
for err := range errChan {
fmt.Printf("ERROR: %#v", err)
}
fmt.Printf("Closed!\n")
}
}
100 changes: 100 additions & 0 deletions kubernetes/client/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package client

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"strings"
)

// Result is a watch result
type Result struct {
Type string
Object interface{}
}

// WatchClient is a client for Watching the Kubernetes API
type WatchClient struct {
Cfg *Configuration
Client *APIClient
Path string
MakerFn func() interface{}
}

// Connect initiates a watch to the server. TODO: support watch from resource version
func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error, error) {
url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + "?watch=true"
req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{})
if err != nil {
return nil, nil, err
}
res, err := w.Client.callAPI(req)
if err != nil {
return nil, nil, err
}
if res.StatusCode != 200 {
return nil, nil, fmt.Errorf("Error connecting watch (%d: %s)", res.StatusCode, res.Status)
}
resultChan := make(chan *Result, 1)
errChan := make(chan error, 1)
processWatch(res.Body, w.MakerFn, resultChan, errChan)
return resultChan, errChan, nil
}

func processWatch(stream io.Reader, makerFn func() interface{}, resultChan chan<- *Result, errChan chan<- error) {
scanner := bufio.NewScanner(stream)
go func() {
defer close(resultChan)
defer close(errChan)
for scanner.Scan() {
watchObj, err := decode(scanner.Text(), makerFn)
if err != nil {
errChan <- err
return
}
if watchObj != nil {
resultChan <- watchObj
}
}
if err := scanner.Err(); err != nil {
errChan <- err
}
}()
}

func decode(line string, makerFn func() interface{}) (*Result, error) {
if len(line) == 0 {
return nil, nil
}
// TODO: support protocol buffer encoding?
decoder := json.NewDecoder(strings.NewReader(line))
result := &Result{}
for decoder.More() {
name, err := decoder.Token()
if err != nil {
return nil, err
}
if name == "type" {
token, err := decoder.Token()
if err != nil {
return nil, err
}
var ok bool
result.Type, ok = token.(string)
if !ok {
return nil, fmt.Errorf("Error casting %v to string", token)
}
}
if name == "object" {
obj := makerFn()
if err := decoder.Decode(&obj); err != nil {
return nil, err
}
result.Object = obj
return result, nil
}
}
return nil, nil
}
154 changes: 154 additions & 0 deletions kubernetes/client/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package client

import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
)

func makerFn() interface{} { return &V1Namespace{} }

type staticHandler struct {
Code int
Body string
}

func (s *staticHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(s.Code)
res.Write([]byte(s.Body))
}

func TestFullError(t *testing.T) {
server := httptest.NewServer(&staticHandler{
Code: 404,
Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`,
})
defer server.Close()

u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

cfg := &Configuration{}
cfg.Host = u.Host
cfg.Scheme = u.Scheme

watch := WatchClient{
Cfg: cfg,
Client: NewAPIClient(cfg),
MakerFn: makerFn,
}

if _, _, err := watch.Connect(context.Background()); err == nil {
t.Error("unexpected nil error")
}
}

func TestFull(t *testing.T) {
server := httptest.NewServer(&staticHandler{
Code: 200,
Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`,
})
defer server.Close()

u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

cfg := &Configuration{}
cfg.Host = u.Host
cfg.Scheme = u.Scheme

watch := WatchClient{
Cfg: cfg,
Client: NewAPIClient(cfg),
MakerFn: makerFn,
}

resultChan, errChan, err := watch.Connect(context.Background())
if err != nil {
t.Errorf("unexpected error: %v", err)
}

out := []*Result{}
outErrs := []error{}

for r := range resultChan {
out = append(out, r)
}

for e := range errChan {
outErrs = append(outErrs, e)
}

if len(out) != 1 {
t.Errorf("unexpected results: %v", out)
t.FailNow()
}
for ix, val := range []string{"ADDED"} {
if out[ix].Type != val {
t.Errorf("unexpected value (%d): %v", ix, out[ix])
}
}
if len(outErrs) != 0 {
t.Errorf("unexpected errors: %v", outErrs)
}
}

func TestDecode(t *testing.T) {
line := `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}`
result, err := decode(line, makerFn)
if err != nil {
t.Errorf("Unexpected non-nil: %v", err)
}
if result.Type != "ADDED" {
t.Errorf("Unexpected event type: %s expected 'ADDED'", result.Type)
}
ns, ok := result.Object.(*V1Namespace)
if !ok {
t.Errorf("Cast failed: %v", result.Object)
}
if ns.Kind != "Namespace" || ns.Metadata.Name != "kube-system" {
t.Errorf("Unexpected value %#v", *ns)
}
}

func TestMultiDecode(t *testing.T) {
lines := `
{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
{"type":"MODIFIED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-2","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
{"type":"DELETED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-3","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
`

results := make(chan *Result)
errs := make(chan error)
processWatch(strings.NewReader(lines), makerFn, results, errs)

out := []*Result{}
outErrs := []error{}

for r := range results {
out = append(out, r)
}

for e := range errs {
outErrs = append(outErrs, e)
}

if len(out) != 3 {
t.Errorf("unexpected results: %v", out)
}
for ix, val := range []string{"ADDED", "MODIFIED", "DELETED"} {
if out[ix].Type != val {
t.Errorf("unexpected value (%d): %v", ix, out[ix])
}
}
if len(outErrs) != 0 {
t.Errorf("unexpected errors: %v", outErrs)
}
}

0 comments on commit 3ce0a05

Please sign in to comment.