diff --git a/augerctl/README.md b/augerctl/README.md index 9e8ea2b..8980ea3 100644 --- a/augerctl/README.md +++ b/augerctl/README.md @@ -86,6 +86,14 @@ augerctl get leases -n kube-system kubectl get leases -n kube-system -o yaml ``` +Watch all leases with namespace `kube-system` + +``` bash +augerctl get leases -n kube-system -w +# Nearly equivalent +kubectl get leases -n kube-system -w -o yaml +``` + List a single resource of type `apiservices.apiregistration.k8s.io` and name `v1.apps` ``` bash diff --git a/augerctl/command/get_command.go b/augerctl/command/get_command.go index 7051974..4e16827 100644 --- a/augerctl/command/get_command.go +++ b/augerctl/command/get_command.go @@ -31,6 +31,9 @@ type getFlagpole struct { Output string ChunkSize int64 Prefix string + + Watch bool + WatchOnly bool } var ( @@ -50,6 +53,11 @@ var ( # Nearly equivalent kubectl get leases -n kube-system -o yaml + # Watch all leases with namespace "kube-system" + augerctl get leases -n kube-system -w + # Nearly equivalent + kubectl get leases -n kube-system -w -o yaml + # List a single resource of type "apiservices.apiregistration.k8s.io" and name "v1.apps" augerctl get apiservices.apiregistration.k8s.io v1.apps # Nearly equivalent @@ -89,6 +97,8 @@ func newCtlGetCommand(f *flagpole) *cobra.Command { cmd.Flags().Int64Var(&flags.ChunkSize, "chunk-size", 500, "chunk size of the list pager") cmd.Flags().StringVar(&flags.Prefix, "prefix", "/registry", "prefix to prepend to the resource") + cmd.Flags().BoolVarP(&flags.Watch, "watch", "w", false, "after listing/getting the requested object, watch for changes") + cmd.Flags().BoolVar(&flags.WatchOnly, "watch-only", false, "watch for changes to the requested object(s), without listing/getting first") return cmd } @@ -124,13 +134,30 @@ func getCommand(ctx context.Context, etcdclient client.Client, flags *getFlagpol client.WithResponse(printer.Print), } - // TODO: Support watch + if flags.Watch { + if !flags.WatchOnly { + rev, err := etcdclient.Get(ctx, flags.Prefix, + opOpts..., + ) + if err != nil { + return err + } + opOpts = append(opOpts, client.WithRevision(rev)) + } - _, err := etcdclient.Get(ctx, flags.Prefix, - opOpts..., - ) - if err != nil { - return err + err := etcdclient.Watch(ctx, flags.Prefix, + opOpts..., + ) + if err != nil { + return err + } + } else { + _, err := etcdclient.Get(ctx, flags.Prefix, + opOpts..., + ) + if err != nil { + return err + } } return nil diff --git a/augerctl/command/printer_json.go b/augerctl/command/printer_json.go index aa0c92a..ce4d26c 100644 --- a/augerctl/command/printer_json.go +++ b/augerctl/command/printer_json.go @@ -30,6 +30,9 @@ type jsonPrinter struct { func (p *jsonPrinter) Print(kv *client.KeyValue) error { value := kv.Value + if kv.PrevValue != nil { + value = kv.PrevValue + } inMediaType, _, err := encoding.DetectAndExtract(value) if err != nil { return err diff --git a/augerctl/command/printer_yaml.go b/augerctl/command/printer_yaml.go index eab9ee1..c01a913 100644 --- a/augerctl/command/printer_yaml.go +++ b/augerctl/command/printer_yaml.go @@ -32,6 +32,9 @@ type yamlPrinter struct { func (p *yamlPrinter) Print(kv *client.KeyValue) error { value := kv.Value + if kv.PrevValue != nil { + value = kv.PrevValue + } inMediaType, _, err := encoding.DetectAndExtract(value) if err != nil { _, err0 := fmt.Fprintf(p.w, "---\n# %s | raw | %v\n# %s\n", kv.Key, err, value) diff --git a/pkg/client/client.go b/pkg/client/client.go index 1cc53d4..837f053 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -31,6 +31,9 @@ type Client interface { // Get is a method that retrieves a key-value pair from the etcd server. // It returns the revision of the key-value pair Get(ctx context.Context, prefix string, opOpts ...OpOption) (rev int64, err error) + + // Watch is a method that watches for changes to a key-value pair on the etcd server. + Watch(ctx context.Context, prefix string, opOpts ...OpOption) error } // client is the etcd client. @@ -89,6 +92,13 @@ func WithChunkSize(chunkSize int64) OpOption { } } +// WithRevision sets the revision for the target. +func WithRevision(revision int64) OpOption { + return func(o *op) { + o.revision = revision + } +} + func opOption(opts []OpOption) op { var opt op for _, o := range opts { @@ -101,6 +111,9 @@ func opOption(opts []OpOption) op { type KeyValue struct { Key []byte Value []byte + + // For delete event + PrevValue []byte } func iterateList(kvs []*mvccpb.KeyValue, callback func(kv *KeyValue) error) error { diff --git a/pkg/client/client_watch.go b/pkg/client/client_watch.go new file mode 100644 index 0000000..720aa12 --- /dev/null +++ b/pkg/client/client_watch.go @@ -0,0 +1,66 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +func (c *client) Watch(ctx context.Context, prefix string, opOpts ...OpOption) error { + opt := opOption(opOpts) + if opt.response == nil { + return fmt.Errorf("response is required") + } + + path, single, err := getPrefix(prefix, opt.gr, opt.name, opt.namespace) + if err != nil { + return err + } + + opts := make([]clientv3.OpOption, 0, 3) + + if !single { + opts = append(opts, clientv3.WithPrefix()) + } + + if opt.revision != 0 { + opts = append(opts, clientv3.WithRev(opt.revision)) + } + + opts = append(opts, clientv3.WithPrevKV()) + + watchChan := c.client.Watch(ctx, path, opts...) + for watchResp := range watchChan { + for _, event := range watchResp.Events { + r := &KeyValue{ + Key: event.Kv.Key, + Value: event.Kv.Value, + } + if event.PrevKv != nil { + r.PrevValue = event.PrevKv.Value + } + err := opt.response(r) + if err != nil { + return err + } + } + } + return nil +}