Skip to content

Commit 2db9011

Browse files
authored
Merge pull request #983 from ripienaar/multi_sub
Support subscribing to multiple subjects
2 parents a1075a5 + cf39494 commit 2db9011

File tree

3 files changed

+79
-31
lines changed

3 files changed

+79
-31
lines changed

cli/sub_command.go

+68-29
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"path/filepath"
2323
"runtime"
2424
"sort"
25+
"strings"
2526
"sync"
2627
"time"
2728

@@ -33,7 +34,7 @@ import (
3334
)
3435

3536
type subCmd struct {
36-
subject string
37+
subjects []string
3738
queue string
3839
durable string
3940
raw bool
@@ -63,7 +64,7 @@ func configureSubCommand(app commandHost) {
6364
act := app.Command("subscribe", "Generic subscription client").Alias("sub").Action(c.subscribe)
6465
addCheat("sub", act)
6566

66-
act.Arg("subject", "Subject to subscribe to").StringVar(&c.subject)
67+
act.Arg("subjects", "Subjects to subscribe to").StringsVar(&c.subjects)
6768
act.Flag("queue", "Subscribe to a named queue group").StringVar(&c.queue)
6869
act.Flag("durable", "Use a durable consumer (requires JetStream)").StringVar(&c.durable)
6970
act.Flag("raw", "Show the raw data received").Short('r').UnNegatableBoolVar(&c.raw)
@@ -159,15 +160,29 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
159160
}
160161
defer nc.Close()
161162

162-
if c.subject == "" && c.inbox {
163-
c.subject = nc.NewRespInbox()
164-
} else if c.subject == "" && c.stream == "" {
163+
c.jetStream = c.sseq > 0 || len(c.durable) > 0 || c.deliverAll || c.deliverNew || c.deliverLast || c.deliverSince != "" || c.deliverLastPerSubject || c.stream != ""
164+
165+
switch {
166+
case len(c.subjects) == 0 && c.inbox:
167+
c.subjects = []string{nc.NewRespInbox()}
168+
case len(c.subjects) == 0 && c.stream == "":
165169
return fmt.Errorf("subject is required")
170+
case len(c.subjects) > 1 && c.jetStream:
171+
return fmt.Errorf("streams subscribe support only 1 subject")
166172
}
167173

174+
if c.inbox && c.jetStream {
175+
return fmt.Errorf("generating inboxes is not compatible with JetStream subscriptions")
176+
}
177+
if c.queue != "" && c.jetStream {
178+
return fmt.Errorf("queue group subscriptions are not supported with JetStream")
179+
}
168180
if c.dump == "-" && c.inbox {
169181
return fmt.Errorf("generating inboxes is not compatible with dumping to stdout using null terminated strings")
170182
}
183+
if c.reportSubjects && c.reportSubjectsCount == 0 {
184+
return fmt.Errorf("subject count must be at least one")
185+
}
171186

172187
if c.dump != "" && c.dump != "-" {
173188
err = os.MkdirAll(c.dump, 0700)
@@ -176,21 +191,8 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
176191
}
177192
}
178193

179-
c.jetStream = c.sseq > 0 || len(c.durable) > 0 || c.deliverAll || c.deliverNew || c.deliverLast || c.deliverSince != "" || c.deliverLastPerSubject || c.stream != ""
180-
181-
if c.inbox && c.jetStream {
182-
return fmt.Errorf("generating inboxes is not compatible with JetStream subscriptions")
183-
}
184-
if c.queue != "" && c.jetStream {
185-
return fmt.Errorf("queue group subscriptions are not supported with JetStream")
186-
}
187-
188-
if c.reportSubjects && c.reportSubjectsCount == 0 {
189-
return fmt.Errorf("subject count must be at least one")
190-
}
191-
192194
var (
193-
sub *nats.Subscription
195+
subs []*nats.Subscription
194196
mu = sync.Mutex{}
195197
subjMu = sync.Mutex{}
196198
dump = c.dump != ""
@@ -269,7 +271,10 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
269271
}
270272

271273
if ctr == c.limit {
272-
sub.Unsubscribe()
274+
for _, sub := range subs {
275+
sub.Unsubscribe()
276+
}
277+
273278
// if no reply matching, or if didn't yet get all replies
274279
if !c.match || len(matchMap) == 0 {
275280
cancel()
@@ -335,15 +340,22 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
335340
case c.jetStream:
336341
// logs later depending on settings
337342
case c.jsAck:
338-
log.Printf("Subscribing on %s with acknowledgement of JetStream messages %s", c.subject, ignoredSubjInfo)
343+
log.Printf("Subscribing on %s with acknowledgement of JetStream messages %s", c.firstSubject(), ignoredSubjInfo)
339344
default:
340-
log.Printf("Subscribing on %s %s", c.subject, ignoredSubjInfo)
345+
log.Printf("Subscribing on %s %s", strings.Join(c.subjects, ", "), ignoredSubjInfo)
341346
}
342347
}
343348

344349
switch {
345350
case c.reportSubjects:
346-
sub, err = nc.Subscribe(c.subject, handler)
351+
for _, subj := range c.subjects {
352+
sub, err := nc.Subscribe(subj, handler)
353+
if err != nil {
354+
return err
355+
}
356+
subs = append(subs, sub)
357+
}
358+
347359
startSubjectReporting(ctx, &subjMu, subjectReportMap, subjectBytesReportMap, c.reportSubjectsCount)
348360

349361
case c.jetStream:
@@ -378,9 +390,9 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
378390
}
379391
}
380392

381-
subMsg := c.subject
393+
subMsg := c.firstSubject()
382394
if c.stream != "" {
383-
if c.subject == "" {
395+
if len(c.subjects) == 0 {
384396
str, err := js.StreamInfo(c.stream)
385397
if err != nil {
386398
return err
@@ -422,17 +434,36 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
422434
}
423435

424436
if bindDurable {
425-
sub, err = js.Subscribe("", handler, nats.Bind(c.stream, c.durable))
437+
sub, err := js.Subscribe("", handler, nats.Bind(c.stream, c.durable))
438+
if err != nil {
439+
return err
440+
}
441+
subs = append(subs, sub)
426442
} else {
427443
c.jsAck = false
428-
sub, err = js.Subscribe(c.subject, handler, opts...)
444+
sub, err := js.Subscribe(c.firstSubject(), handler, opts...)
445+
if err != nil {
446+
return err
447+
}
448+
subs = append(subs, sub)
429449
}
430450

431451
case c.queue != "":
432-
sub, err = nc.QueueSubscribe(c.subject, c.queue, handler)
452+
sub, err := nc.QueueSubscribe(c.firstSubject(), c.queue, handler)
453+
if err != nil {
454+
return err
455+
}
456+
subs = append(subs, sub)
433457

434458
default:
435-
sub, err = nc.Subscribe(c.subject, handler)
459+
for _, subj := range c.subjects {
460+
sub, err := nc.Subscribe(subj, handler)
461+
if err != nil {
462+
return err
463+
}
464+
subs = append(subs, sub)
465+
}
466+
436467
}
437468
if err != nil {
438469
return err
@@ -450,6 +481,14 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
450481
return nil
451482
}
452483

484+
func (c *subCmd) firstSubject() string {
485+
if len(c.subjects) == 0 {
486+
return ""
487+
}
488+
489+
return c.subjects[0]
490+
}
491+
453492
func (c *subCmd) printMsg(msg *nats.Msg, reply *nats.Msg, ctr uint) {
454493
var info *jsm.MsgInfo
455494
if msg.Reply != "" {

go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ require (
1717
github.com/guptarohit/asciigraph v0.5.6
1818
github.com/jedib0t/go-pretty/v6 v6.5.3
1919
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
20-
github.com/klauspost/compress v1.17.4
20+
github.com/klauspost/compress v1.17.5
2121
github.com/mattn/go-isatty v0.0.20
2222
github.com/nats-io/jsm.go v0.1.1-0.20240118115416-fcaa77de81f6
2323
github.com/nats-io/jwt/v2 v2.5.3
24-
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240111031832-67d41da49bb2
24+
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240203223349-0e3dc05fe637
2525
github.com/nats-io/nats.go v1.32.0
2626
github.com/nats-io/nkeys v0.4.7
2727
github.com/nats-io/nuid v1.0.1
@@ -51,6 +51,7 @@ require (
5151
github.com/prometheus/client_model v0.5.0 // indirect
5252
github.com/prometheus/procfs v0.12.0 // indirect
5353
github.com/rivo/uniseg v0.4.4 // indirect
54+
go.uber.org/automaxprocs v1.5.3 // indirect
5455
golang.org/x/net v0.20.0 // indirect
5556
golang.org/x/sys v0.16.0 // indirect
5657
golang.org/x/text v0.14.0 // indirect

go.sum

+8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU
5353
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
5454
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
5555
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
56+
github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E=
57+
github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
5658
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
5759
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
5860
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -79,8 +81,12 @@ github.com/nats-io/jsm.go v0.1.1-0.20240118115416-fcaa77de81f6 h1:Ux6MzBV3LOvoGH
7981
github.com/nats-io/jsm.go v0.1.1-0.20240118115416-fcaa77de81f6/go.mod h1:ZJ2U65E3DwQDlTd8CmqnnRnN/wfo51vO0LGRB5UQ2UM=
8082
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
8183
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
84+
github.com/nats-io/nats-server/v2 v2.10.10 h1:g1Wd64J5SGsoqWSx1qoNu9/At7a2x+jE7Qtf2XpEx/I=
85+
github.com/nats-io/nats-server/v2 v2.10.10/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA=
8286
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240111031832-67d41da49bb2 h1:3Pb4Jp6AiJl9htNdLO4A92Gj6j88S87OWZModvkzDrE=
8387
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240111031832-67d41da49bb2/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ=
88+
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240203223349-0e3dc05fe637 h1:DaQ/zAHfz/Wi/RNfIPDlwR8gh5SDh9jvhqRG+h5iUkE=
89+
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240203223349-0e3dc05fe637/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA=
8490
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
8591
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
8692
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
@@ -117,6 +123,8 @@ github.com/synadia-io/jwt-auth-builder.go v0.0.0-20240124210942-2c5fcc303e3b/go.
117123
github.com/tylertreat/hdrhistogram-writer v0.0.0-20210816161836-2e440612a39f h1:SGznmvCovewbaSgBsHgdThtWsLj5aCLX/3ZXMLd1UD0=
118124
github.com/tylertreat/hdrhistogram-writer v0.0.0-20210816161836-2e440612a39f/go.mod h1:IY84XkhrEJTdHYLNy/zObs8mXuUAp9I65VyarbPSCCY=
119125
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
126+
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
127+
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
120128
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
121129
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
122130
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=

0 commit comments

Comments
 (0)