diff --git a/examples/watch/watch.go b/examples/watch/watch.go index d197b71..57b19eb 100644 --- a/examples/watch/watch.go +++ b/examples/watch/watch.go @@ -38,7 +38,7 @@ func main() { Path: "/api/v1/namespaces", MakerFn: func() interface{} { return &client.V1Namespace{} }, } - if resultChan, errChan, err := watch.Connect(context.Background()); err != nil { + if resultChan, errChan, err := watch.Connect(context.Background(), ""); err != nil { panic(err) } else { for obj := range resultChan { diff --git a/kubernetes/client/watch.go b/kubernetes/client/watch.go index 22d137f..ce42ed9 100644 --- a/kubernetes/client/watch.go +++ b/kubernetes/client/watch.go @@ -23,9 +23,14 @@ type WatchClient struct { MakerFn func() interface{} } -// Connect initiates a watch to the server. TODO: support watch from resource version -func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error, error) { - url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + "?watch=true" +// Connect initiates a watch to the server. +func (w *WatchClient) Connect(ctx context.Context, resourceVersion string) (<-chan *Result, <-chan error, error) { + params := []string{"watch=true"} + if len(resourceVersion) != 0 { + params = append(params, "resourceVersion="+resourceVersion) + } + queryStr := "?" + strings.Join(params, "&") + url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + queryStr req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{}) if err != nil { return nil, nil, err diff --git a/kubernetes/client/watch_test.go b/kubernetes/client/watch_test.go index 1744f59..71a9c83 100644 --- a/kubernetes/client/watch_test.go +++ b/kubernetes/client/watch_test.go @@ -12,13 +12,15 @@ import ( func makerFn() interface{} { return &V1Namespace{} } type staticHandler struct { - Code int - Body string + Code int + Body string + QueryParams url.Values } func (s *staticHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.WriteHeader(s.Code) res.Write([]byte(s.Body)) + s.QueryParams = req.URL.Query() } func TestFullError(t *testing.T) { @@ -43,7 +45,7 @@ func TestFullError(t *testing.T) { MakerFn: makerFn, } - if _, _, err := watch.Connect(context.Background()); err == nil { + if _, _, err := watch.Connect(context.Background(), ""); err == nil { t.Error("unexpected nil error") } } @@ -70,7 +72,7 @@ func TestFull(t *testing.T) { MakerFn: makerFn, } - resultChan, errChan, err := watch.Connect(context.Background()) + resultChan, errChan, err := watch.Connect(context.Background(), "") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -100,6 +102,41 @@ func TestFull(t *testing.T) { } } +func TestResourceVersion(t *testing.T) { + handler := &staticHandler{ + Code: 200, + Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`, + } + server := httptest.NewServer(handler) + defer server.Close() + + u, err := url.Parse(server.URL) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + cfg := &Configuration{} + cfg.Host = u.Host + cfg.Scheme = u.Scheme + + watch := WatchClient{ + Cfg: cfg, + Client: NewAPIClient(cfg), + MakerFn: makerFn, + } + + version := "12345" + _, _, err = watch.Connect(context.Background(), version) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + versionOut := handler.QueryParams["resourceVersion"][0] + if versionOut != version { + t.Errorf("unexpected resource version %s vs %s", version, versionOut) + } +} + func TestDecode(t *testing.T) { line := `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}` result, err := decode(line, makerFn)