From 42a6024e61ff9fc6b6c8e40b877eb95929c32f93 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Tue, 26 Mar 2019 09:33:26 -0700 Subject: [PATCH] Add end to end HTTP Connect tests. --- kubernetes/client/watch.go | 6 +- kubernetes/client/watch_test.go | 101 +++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/kubernetes/client/watch.go b/kubernetes/client/watch.go index 111d0b5..22d137f 100644 --- a/kubernetes/client/watch.go +++ b/kubernetes/client/watch.go @@ -23,6 +23,7 @@ type WatchClient struct { MakerFn func() interface{} } +// Connect initiates a watch to the server. TODO: support watch from resource version func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error, error) { url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + "?watch=true" req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{}) @@ -36,8 +37,8 @@ func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error if res.StatusCode != 200 { return nil, nil, fmt.Errorf("Error connecting watch (%d: %s)", res.StatusCode, res.Status) } - resultChan := make(chan *Result) - errChan := make(chan error) + resultChan := make(chan *Result, 1) + errChan := make(chan error, 1) processWatch(res.Body, w.MakerFn, resultChan, errChan) return resultChan, errChan, nil } @@ -67,6 +68,7 @@ func decode(line string, makerFn func() interface{}) (*Result, error) { if len(line) == 0 { return nil, nil } + // TODO: support protocol buffer encoding? decoder := json.NewDecoder(strings.NewReader(line)) result := &Result{} for decoder.More() { diff --git a/kubernetes/client/watch_test.go b/kubernetes/client/watch_test.go index 3c34815..1744f59 100644 --- a/kubernetes/client/watch_test.go +++ b/kubernetes/client/watch_test.go @@ -1,17 +1,113 @@ package client import ( + "context" + "net/http" + "net/http/httptest" + "net/url" "strings" "testing" ) func makerFn() interface{} { return &V1Namespace{} } +type staticHandler struct { + Code int + Body string +} + +func (s *staticHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(s.Code) + res.Write([]byte(s.Body)) +} + +func TestFullError(t *testing.T) { + server := httptest.NewServer(&staticHandler{ + Code: 404, + Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`, + }) + defer server.Close() + + u, err := url.Parse(server.URL) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + cfg := &Configuration{} + cfg.Host = u.Host + cfg.Scheme = u.Scheme + + watch := WatchClient{ + Cfg: cfg, + Client: NewAPIClient(cfg), + MakerFn: makerFn, + } + + if _, _, err := watch.Connect(context.Background()); err == nil { + t.Error("unexpected nil error") + } +} + +func TestFull(t *testing.T) { + server := httptest.NewServer(&staticHandler{ + Code: 200, + Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`, + }) + defer server.Close() + + u, err := url.Parse(server.URL) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + cfg := &Configuration{} + cfg.Host = u.Host + cfg.Scheme = u.Scheme + + watch := WatchClient{ + Cfg: cfg, + Client: NewAPIClient(cfg), + MakerFn: makerFn, + } + + resultChan, errChan, err := watch.Connect(context.Background()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + out := []*Result{} + outErrs := []error{} + + for r := range resultChan { + out = append(out, r) + } + + for e := range errChan { + outErrs = append(outErrs, e) + } + + if len(out) != 1 { + t.Errorf("unexpected results: %v", out) + t.FailNow() + } + for ix, val := range []string{"ADDED"} { + if out[ix].Type != val { + t.Errorf("unexpected value (%d): %v", ix, out[ix]) + } + } + if len(outErrs) != 0 { + t.Errorf("unexpected errors: %v", outErrs) + } +} + func TestDecode(t *testing.T) { line := `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}` result, err := decode(line, makerFn) if err != nil { - t.Errorf("Unexpeceted non-nil: %v", err) + t.Errorf("Unexpected non-nil: %v", err) + } + if result.Type != "ADDED" { + t.Errorf("Unexpected event type: %s expected 'ADDED'", result.Type) } ns, ok := result.Object.(*V1Namespace) if !ok { @@ -52,4 +148,7 @@ func TestMultiDecode(t *testing.T) { t.Errorf("unexpected value (%d): %v", ix, out[ix]) } } + if len(outErrs) != 0 { + t.Errorf("unexpected errors: %v", outErrs) + } }