@@ -64,6 +64,9 @@ type kvCommand struct {
64
64
mirrorDomain string
65
65
sources []string
66
66
compression bool
67
+ includeHistory bool
68
+ includeDeletes bool
69
+ updatesOnly bool
67
70
}
68
71
69
72
func configureKVCommand (app commandHost ) {
@@ -158,6 +161,10 @@ for an indefinite period or a per-bucket configured TTL.
158
161
watch := kv .Command ("watch" , "Watch the bucket or a specific key for updated" ).Action (c .watchAction )
159
162
watch .Arg ("bucket" , "The bucket to act on" ).Required ().StringVar (& c .bucket )
160
163
watch .Arg ("key" , "The key to act on" ).Default (">" ).StringVar (& c .key )
164
+ watch .Flag ("history" , "Includes historic values" ).UnNegatableBoolVar (& c .includeHistory )
165
+ watch .Flag ("deletes" , "Includes deletes in watched values" ).Default ("true" ).BoolVar (& c .includeDeletes )
166
+ watch .Flag ("updates" , "Only show new values written" ).UnNegatableBoolVar (& c .updatesOnly )
167
+ watch .Flag ("revision" , "Starts from a certain revision" ).Uint64Var (& c .revision )
161
168
162
169
ls := kv .Command ("ls" , "List available buckets or the keys in a bucket" ).Alias ("list" ).Action (c .lsAction )
163
170
ls .Arg ("bucket" , "The bucket to list the keys" ).StringVar (& c .bucket )
@@ -804,7 +811,21 @@ func (c *kvCommand) watchAction(_ *fisk.ParseContext) error {
804
811
805
812
ctx := context .Background ()
806
813
807
- watch , err := store .Watch (ctx , c .key )
814
+ var opts []jetstream.WatchOpt
815
+ if ! c .includeDeletes {
816
+ opts = append (opts , jetstream .IgnoreDeletes ())
817
+ }
818
+ if c .includeHistory {
819
+ opts = append (opts , jetstream .IncludeHistory ())
820
+ }
821
+ if c .updatesOnly {
822
+ opts = append (opts , jetstream .UpdatesOnly ())
823
+ }
824
+ if c .revision > 0 {
825
+ opts = append (opts , jetstream .ResumeFromRevision (c .revision ))
826
+ }
827
+
828
+ watch , err := store .Watch (ctx , c .key , opts ... )
808
829
if err != nil {
809
830
return err
810
831
}
0 commit comments