Skip to content

Commit 9571e6e

Browse files
authored
Merge pull request #992 from ripienaar/consumer_pause
Support consumer pause and resume
2 parents e1e3009 + fa09be3 commit 9571e6e

File tree

3 files changed

+153
-35
lines changed

3 files changed

+153
-35
lines changed

cli/consumer_command.go

+138-18
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ type consumerCmd struct {
9090
fcSet bool
9191
metadataIsSet bool
9292
metadata map[string]string
93+
pauseUntil string
9394

9495
dryRun bool
9596
mgr *jsm.Manager
@@ -144,30 +145,15 @@ func configureConsumerCommand(app commandHost) {
144145
}
145146
f.Flag("replicas", "Sets a custom replica count rather than inherit from the stream").IntVar(&c.replicas)
146147
f.Flag("metadata", "Adds metadata to the stream").PlaceHolder("META").IsSetByUser(&c.metadataIsSet).StringMapVar(&c.metadata)
147-
148+
if !edit {
149+
f.Flag("pause", fmt.Sprintf("Pause the consumer for a duration after start or until a specific timestamp (eg %s)", time.Now().Format(time.DateTime))).StringVar(&c.pauseUntil)
150+
}
148151
}
149152

150153
cons := app.Command("consumer", "JetStream Consumer management").Alias("con").Alias("obs").Alias("c")
151154
addCheat("consumer", cons)
152155
cons.Flag("all", "Operate on all streams including system ones").Short('a').UnNegatableBoolVar(&c.showAll)
153156

154-
consLs := cons.Command("ls", "List known Consumers").Alias("list").Action(c.lsAction)
155-
consLs.Arg("stream", "Stream name").StringVar(&c.stream)
156-
consLs.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&c.json)
157-
consLs.Flag("names", "Show just the consumer names").Short('n').UnNegatableBoolVar(&c.listNames)
158-
consLs.Flag("no-select", "Do not select consumers from a list").Default("false").UnNegatableBoolVar(&c.force)
159-
160-
conReport := cons.Command("report", "Reports on Consumer statistics").Action(c.reportAction)
161-
conReport.Arg("stream", "Stream name").StringVar(&c.stream)
162-
conReport.Flag("raw", "Show un-formatted numbers").Short('r').UnNegatableBoolVar(&c.raw)
163-
conReport.Flag("leaders", "Show details about the leaders").Short('l').UnNegatableBoolVar(&c.reportLeaderDistrib)
164-
165-
consInfo := cons.Command("info", "Consumer information").Alias("nfo").Action(c.infoAction)
166-
consInfo.Arg("stream", "Stream name").StringVar(&c.stream)
167-
consInfo.Arg("consumer", "Consumer name").StringVar(&c.consumer)
168-
consInfo.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&c.json)
169-
consInfo.Flag("no-select", "Do not select consumers from a list").Default("false").UnNegatableBoolVar(&c.force)
170-
171157
consAdd := cons.Command("add", "Creates a new Consumer").Alias("create").Alias("new").Action(c.createAction)
172158
consAdd.Arg("stream", "Stream name").StringVar(&c.stream)
173159
consAdd.Arg("consumer", "Consumer name").StringVar(&c.consumer)
@@ -185,6 +171,18 @@ func configureConsumerCommand(app commandHost) {
185171
edit.Flag("dry-run", "Only shows differences, do not edit the stream").UnNegatableBoolVar(&c.dryRun)
186172
addCreateFlags(edit, true)
187173

174+
consLs := cons.Command("ls", "List known Consumers").Alias("list").Action(c.lsAction)
175+
consLs.Arg("stream", "Stream name").StringVar(&c.stream)
176+
consLs.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&c.json)
177+
consLs.Flag("names", "Show just the consumer names").Short('n').UnNegatableBoolVar(&c.listNames)
178+
consLs.Flag("no-select", "Do not select consumers from a list").Default("false").UnNegatableBoolVar(&c.force)
179+
180+
consInfo := cons.Command("info", "Consumer information").Alias("nfo").Action(c.infoAction)
181+
consInfo.Arg("stream", "Stream name").StringVar(&c.stream)
182+
consInfo.Arg("consumer", "Consumer name").StringVar(&c.consumer)
183+
consInfo.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&c.json)
184+
consInfo.Flag("no-select", "Do not select consumers from a list").Default("false").UnNegatableBoolVar(&c.force)
185+
188186
consRm := cons.Command("rm", "Removes a Consumer").Alias("delete").Alias("del").Action(c.rmAction)
189187
consRm.Arg("stream", "Stream name").StringVar(&c.stream)
190188
consRm.Arg("consumer", "Consumer name").StringVar(&c.consumer)
@@ -213,6 +211,22 @@ func configureConsumerCommand(app commandHost) {
213211
consSub.Flag("raw", "Show only the message").Short('r').UnNegatableBoolVar(&c.raw)
214212
consSub.Flag("deliver-group", "Deliver group of the consumer").StringVar(&c.deliveryGroup)
215213

214+
conPause := cons.Command("pause", "Pause a consumer until a later time").Action(c.pauseAction)
215+
conPause.Arg("stream", "Stream name").StringVar(&c.stream)
216+
conPause.Arg("consumer", "Consumer name").StringVar(&c.consumer)
217+
conPause.Arg("until", fmt.Sprintf("Pause until a specific time (eg %s)", time.Now().UTC().Format(time.DateTime))).PlaceHolder("TIME").StringVar(&c.pauseUntil)
218+
conPause.Flag("force", "Force pause without prompting").Short('f').UnNegatableBoolVar(&c.force)
219+
220+
conResume := cons.Command("resume", "Resume a paused consumer").Action(c.resumeAction)
221+
conResume.Arg("stream", "Stream name").StringVar(&c.stream)
222+
conResume.Arg("consumer", "Consumer name").StringVar(&c.consumer)
223+
conResume.Flag("force", "Force resume without prompting").Short('f').UnNegatableBoolVar(&c.force)
224+
225+
conReport := cons.Command("report", "Reports on Consumer statistics").Action(c.reportAction)
226+
conReport.Arg("stream", "Stream name").StringVar(&c.stream)
227+
conReport.Flag("raw", "Show un-formatted numbers").Short('r').UnNegatableBoolVar(&c.raw)
228+
conReport.Flag("leaders", "Show details about the leaders").Short('l').UnNegatableBoolVar(&c.reportLeaderDistrib)
229+
216230
conCluster := cons.Command("cluster", "Manages a clustered Consumer").Alias("c")
217231
conClusterDown := conCluster.Command("step-down", "Force a new leader election by standing down the current leader").Alias("elect").Alias("down").Alias("d").Action(c.leaderStandDown)
218232
conClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream)
@@ -623,6 +637,11 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
623637
cols.AddRowIf("Backoff", c.renderBackoff(config.BackOff), len(config.BackOff) > 0)
624638
cols.AddRowIf("Replicas", config.Replicas, config.Replicas > 0)
625639
cols.AddRowIf("Memory Storage", true, config.MemoryStorage)
640+
if state.Paused {
641+
cols.AddRowf("Paused Until Deadline", "%s (%s remaining)", f(config.PauseUntil), state.PauseRemaining.Round(time.Second))
642+
} else {
643+
cols.AddRowIf("Paused Until Deadline", fmt.Sprintf("%s (passed)", f(config.PauseUntil)), !config.PauseUntil.IsZero())
644+
}
626645

627646
if len(config.Metadata) > 0 {
628647
cols.AddSectionTitle("Metadata")
@@ -696,6 +715,9 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
696715
cols.AddRow("Active Interest", "No interest")
697716
}
698717
}
718+
if state.Paused {
719+
cols.AddRowf("Paused Until", "%s (%s remaining)", f(state.TimeStamp.Add(state.PauseRemaining)), state.PauseRemaining.Round(time.Second))
720+
}
699721

700722
cols.Frender(os.Stdout)
701723
}
@@ -1266,9 +1288,107 @@ func (c *consumerCmd) prepareConfig(pc *fisk.ParseContext) (cfg *api.ConsumerCon
12661288
cfg.Metadata = c.metadata
12671289
}
12681290

1291+
if c.pauseUntil != "" {
1292+
cfg.PauseUntil, err = c.parsePauseUntil(c.pauseUntil)
1293+
if err != nil {
1294+
return nil, err
1295+
}
1296+
}
1297+
12691298
return cfg, nil
12701299
}
12711300

1301+
func (c *consumerCmd) parsePauseUntil(until string) (time.Time, error) {
1302+
if until == "" {
1303+
return time.Time{}, fmt.Errorf("time not given")
1304+
}
1305+
1306+
var ts time.Time
1307+
var err error
1308+
1309+
ts, err = time.Parse(time.DateTime, until)
1310+
if err != nil {
1311+
dur, err := parseDurationString(until)
1312+
if err != nil {
1313+
return ts, fmt.Errorf("could not parse the pause time as either timestamp or duration")
1314+
}
1315+
ts = time.Now().Add(dur)
1316+
}
1317+
1318+
return ts, nil
1319+
}
1320+
1321+
func (c *consumerCmd) resumeAction(_ *fisk.ParseContext) error {
1322+
c.connectAndSetup(true, true)
1323+
1324+
state, err := c.selectedConsumer.LatestState()
1325+
if err != nil {
1326+
return err
1327+
}
1328+
if !state.Paused {
1329+
return fmt.Errorf("consumer is not paused")
1330+
}
1331+
1332+
if !c.force {
1333+
ok, err := askConfirmation(fmt.Sprintf("Really resume Consumer %s > %s", c.stream, c.consumer), false)
1334+
fisk.FatalIfError(err, "could not obtain confirmation")
1335+
1336+
if !ok {
1337+
return nil
1338+
}
1339+
}
1340+
1341+
err = c.selectedConsumer.Resume()
1342+
if err != nil {
1343+
return err
1344+
}
1345+
1346+
fmt.Printf("Consumer %s > %s was resumed while previously paused until %s\n", c.stream, c.consumer, f(state.TimeStamp.Add(state.PauseRemaining)))
1347+
return nil
1348+
}
1349+
1350+
func (c *consumerCmd) pauseAction(_ *fisk.ParseContext) error {
1351+
c.connectAndSetup(true, true)
1352+
1353+
if c.pauseUntil == "" {
1354+
dflt := time.Now().Add(time.Hour).Format(time.DateTime)
1355+
err := askOne(&survey.Input{
1356+
Message: "Pause until (time or duration)",
1357+
Default: dflt,
1358+
Help: fmt.Sprintf("Sets the time in either a duration like 1h30m or a timestamp like '%s'", dflt),
1359+
}, &c.pauseUntil, survey.WithValidator(survey.Required))
1360+
if err != nil {
1361+
return err
1362+
}
1363+
}
1364+
1365+
ts, err := c.parsePauseUntil(c.pauseUntil)
1366+
if err != nil {
1367+
return err
1368+
}
1369+
1370+
if !c.force {
1371+
ok, err := askConfirmation(fmt.Sprintf("Really pause Consumer %s > %s until %s", c.stream, c.consumer, f(ts)), false)
1372+
fisk.FatalIfError(err, "could not obtain confirmation")
1373+
1374+
if !ok {
1375+
return nil
1376+
}
1377+
}
1378+
1379+
resp, err := c.selectedConsumer.Pause(ts)
1380+
if err != nil {
1381+
return err
1382+
}
1383+
1384+
if !resp.Paused {
1385+
return fmt.Errorf("consumer failed to pause, perhaps a time in the past was given")
1386+
}
1387+
1388+
fmt.Printf("Paused %s > %s until %s (%s)\n", c.selectedConsumer.StreamName(), c.selectedConsumer.Name(), f(resp.PauseUntil), resp.PauseRemaining.Round(time.Second))
1389+
return nil
1390+
}
1391+
12721392
func (c *consumerCmd) askBackoffPolicy() error {
12731393
ok, err := askConfirmation("Add a Retry Backoff Policy", false)
12741394
if err != nil {

go.mod

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/choria-io/fisk v0.6.2
99
github.com/dustin/go-humanize v1.0.1
1010
github.com/emicklei/dot v1.6.1
11-
github.com/expr-lang/expr v1.16.0
11+
github.com/expr-lang/expr v1.16.1
1212
github.com/fatih/color v1.16.0
1313
github.com/ghodss/yaml v1.0.0
1414
github.com/google/go-cmp v0.6.0
@@ -19,10 +19,10 @@ require (
1919
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
2020
github.com/klauspost/compress v1.17.6
2121
github.com/mattn/go-isatty v0.0.20
22-
github.com/nats-io/jsm.go v0.1.1-0.20240208101040-8a115db14d6c
23-
github.com/nats-io/jwt/v2 v2.5.3
24-
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88
25-
github.com/nats-io/nats.go v1.32.1-0.20240208125829-1c24aa7e50c2
22+
github.com/nats-io/jsm.go v0.1.1-0.20240219122351-c1d05e68494f
23+
github.com/nats-io/jwt/v2 v2.5.4
24+
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba
25+
github.com/nats-io/nats.go v1.33.1
2626
github.com/nats-io/nkeys v0.4.7
2727
github.com/nats-io/nuid v1.0.1
2828
github.com/prometheus/client_golang v1.18.0

go.sum

+10-12
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
2424
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
2525
github.com/emicklei/dot v1.6.1 h1:ujpDlBkkwgWUY+qPId5IwapRW/xEoligRSYjioR6DFI=
2626
github.com/emicklei/dot v1.6.1/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s=
27-
github.com/expr-lang/expr v1.16.0 h1:BQabx+PbjsL2PEQwkJ4GIn3CcuUh8flduHhJ0lHjWwE=
28-
github.com/expr-lang/expr v1.16.0/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
27+
github.com/expr-lang/expr v1.16.1 h1:Na8CUcMdyGbnNpShY7kzcHCU7WqxuL+hnxgHZ4vaz/A=
28+
github.com/expr-lang/expr v1.16.1/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
2929
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
3030
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
3131
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
@@ -76,16 +76,14 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
7676
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
7777
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
7878
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
79-
github.com/nats-io/jsm.go v0.1.1-0.20240208101040-8a115db14d6c h1:GF753FvW8qXCeeIl2+K95ZcpWqFo+FnPaqJ73bblRcA=
80-
github.com/nats-io/jsm.go v0.1.1-0.20240208101040-8a115db14d6c/go.mod h1:aEFfEUniqQ/vmTcbc8bvOkpuikTJnR4Y4WBQGTsqeYg=
81-
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
82-
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
83-
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88 h1:mQUXBh1zwlTogpLmb3F8wJC/OrJlgQ2j76LD1BVHp64=
84-
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA=
85-
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
86-
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
87-
github.com/nats-io/nats.go v1.32.1-0.20240208125829-1c24aa7e50c2 h1:u4P2uRG4k5ENRAI6Xni3ySlduMWSRiLSCkkws6iK47s=
88-
github.com/nats-io/nats.go v1.32.1-0.20240208125829-1c24aa7e50c2/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
79+
github.com/nats-io/jsm.go v0.1.1-0.20240219122351-c1d05e68494f h1:m82V3oI3pt6oJzBWbO0I2xgxM1U/irPS0GLAID5tqPw=
80+
github.com/nats-io/jsm.go v0.1.1-0.20240219122351-c1d05e68494f/go.mod h1:eZEYD+Sbv7Nd8Xr4kOt/reSwVRtNASzaFxU+oV9zp40=
81+
github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI=
82+
github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
83+
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba h1:idIfFiRzXv2wHFpxHH4nSoPtg7UVr4vQPB1NraU0D4c=
84+
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba/go.mod h1:Co2t9J1pk4WXyMZiNFkLjFiD7hKE/jjsXtDWCyLfcgw=
85+
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
86+
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
8987
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
9088
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
9189
github.com/nats-io/nsc/v2 v2.8.6-0.20231220104935-3f89317df670 h1:NQzs7g/+Z4kC4XsYsKCQlwRcM4Hk0VyKuz7F4zUgjvQ=

0 commit comments

Comments
 (0)