Skip to content

Commit 6b72b8e

Browse files
authored
Merge pull request #1295 from jnmoyne/jnm/bench-payload-headers
Add payload and header flags for bench command
2 parents f3f5732 + 39373b3 commit 6b72b8e

File tree

1 file changed

+79
-35
lines changed

1 file changed

+79
-35
lines changed

cli/bench_command.go

+79-35
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ type benchCmd struct {
6666
deDuplicationWindow time.Duration
6767
ack bool
6868
randomizeGets int
69+
payloadFilename string
70+
hdrs []string
6971
}
7072

7173
const (
@@ -107,6 +109,8 @@ func configureBenchCommand(app commandHost) {
107109
addPubFlags := func(f *fisk.CmdClause) {
108110
f.Flag("multisubject", "Multi-subject mode, each message is published on a subject that includes the publisher's message sequence number as a token").UnNegatableBoolVar(&c.multiSubject)
109111
f.Flag("multisubjectmax", "The maximum number of subjects to use in multi-subject mode (0 means no max)").Default("100000").IntVar(&c.multiSubjectMax)
112+
f.Flag("payload", "File containing a message payload to send").ExistingFileVar(&c.payloadFilename)
113+
f.Flag("header", "Adds headers to the message using K:V format").Short('H').StringsVar(&c.hdrs)
110114
}
111115

112116
addJSCommonFlags := func(f *fisk.CmdClause) {
@@ -163,6 +167,8 @@ func configureBenchCommand(app commandHost) {
163167
request := microService.Command("request", "Send a request and wait for its reply").Action(c.requestAction)
164168
request.Help("Send a request and wait for a reply")
165169
request.Arg("subject", "Subject to use for the benchmark").Required().StringVar(&c.subject)
170+
request.Flag("payload", "File containing the payload to send").ExistingFileVar(&c.payloadFilename)
171+
request.Flag("header", "Adds headers to the message using K:V format").Short('H').StringsVar(&c.hdrs)
166172
// TODO: support randomized payload data
167173

168174
reply := microService.Command("serve", "Service requests").Action(c.serveAction)
@@ -1534,9 +1540,36 @@ func (c *benchCmd) oldjsPullAction(_ *fisk.ParseContext) error {
15341540
return nil
15351541
}
15361542

1537-
func (c *benchCmd) coreNATSPublisher(nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, offset int) error {
1543+
func (c benchCmd) getPayload(msgSize int) ([]byte, error) {
1544+
if len(c.payloadFilename) > 0 {
1545+
1546+
buffer, err := os.ReadFile(c.payloadFilename)
1547+
if err != nil {
1548+
return nil, fmt.Errorf("reading the payload file: %w", err)
1549+
}
1550+
1551+
return buffer, nil
1552+
}
1553+
1554+
buffer := make([]byte, msgSize)
1555+
return buffer, nil
1556+
}
1557+
1558+
func (c *benchCmd) coreNATSPublisher(nc *nats.Conn, progress *uiprogress.Bar, payloadSize int, numMsg int, offset int) error {
15381559
state := "Publishing"
15391560

1561+
payload, err := c.getPayload(payloadSize)
1562+
if err != nil {
1563+
return err
1564+
}
1565+
1566+
headers, err := iu.ParseStringsToHeader(c.hdrs, 0)
1567+
if err != nil {
1568+
return err
1569+
}
1570+
1571+
message := nats.Msg{Data: payload, Header: headers}
1572+
15401573
if progress != nil {
15411574
progress.PrependFunc(func(b *uiprogress.Bar) string {
15421575
return state
@@ -1550,7 +1583,8 @@ func (c *benchCmd) coreNATSPublisher(nc *nats.Conn, progress *uiprogress.Bar, ms
15501583
progress.Incr()
15511584
}
15521585

1553-
err := nc.Publish(c.getPublishSubject(i+offset), msg)
1586+
message.Subject = c.getPublishSubject(i + offset)
1587+
err := nc.PublishMsg(&message)
15541588
if err != nil {
15551589
return fmt.Errorf("publishing: %w", err)
15561590
}
@@ -1562,11 +1596,22 @@ func (c *benchCmd) coreNATSPublisher(nc *nats.Conn, progress *uiprogress.Bar, ms
15621596
return nil
15631597
}
15641598

1565-
func (c *benchCmd) coreNATSRequester(nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, offset int) error {
1599+
func (c *benchCmd) coreNATSRequester(nc *nats.Conn, progress *uiprogress.Bar, payloadSize int, numMsg int, offset int) error {
15661600
errBytes := []byte("error")
15671601
minusByte := byte('-')
15681602

15691603
state := "Requesting"
1604+
payload, err := c.getPayload(payloadSize)
1605+
if err != nil {
1606+
return err
1607+
}
1608+
1609+
headers, err := iu.ParseStringsToHeader(c.hdrs, 0)
1610+
if err != nil {
1611+
return err
1612+
}
1613+
1614+
message := nats.Msg{Data: payload, Header: headers}
15701615

15711616
if progress != nil {
15721617
progress.PrependFunc(func(b *uiprogress.Bar) string {
@@ -1581,7 +1626,9 @@ func (c *benchCmd) coreNATSRequester(nc *nats.Conn, progress *uiprogress.Bar, ms
15811626
progress.Incr()
15821627
}
15831628

1584-
m, err := nc.Request(c.getPublishSubject(i+offset), msg, opts().Timeout)
1629+
message.Subject = c.getPublishSubject(i + offset)
1630+
1631+
m, err := nc.RequestMsg(&message, opts().Timeout)
15851632
if err != nil {
15861633
return fmt.Errorf("requesting: %w", err)
15871634
}
@@ -1597,13 +1644,24 @@ func (c *benchCmd) coreNATSRequester(nc *nats.Conn, progress *uiprogress.Bar, ms
15971644
return nil
15981645
}
15991646

1600-
func (c *benchCmd) jsPublisher(nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, idPrefix string, pubNumber string, offset int) error {
1647+
func (c *benchCmd) jsPublisher(nc *nats.Conn, progress *uiprogress.Bar, payloadSize int, numMsg int, idPrefix string, pubNumber string, offset int) error {
16011648
js, err := c.getJS(nc)
16021649
if err != nil {
16031650
return err
16041651
}
16051652

16061653
var state string
1654+
payload, err := c.getPayload(payloadSize)
1655+
if err != nil {
1656+
return err
1657+
}
1658+
1659+
headers, err := iu.ParseStringsToHeader(c.hdrs, 0)
1660+
if err != nil {
1661+
return err
1662+
}
1663+
1664+
message := nats.Msg{Data: payload, Header: headers}
16071665

16081666
if progress != nil {
16091667
progress.PrependFunc(func(b *uiprogress.Bar) string {
@@ -1619,19 +1677,20 @@ func (c *benchCmd) jsPublisher(nc *nats.Conn, progress *uiprogress.Bar, msg []by
16191677
futures := make([]jetstream.PubAckFuture, min(c.batchSize, numMsg-i))
16201678
for j := 0; j < c.batchSize && (i+j) < numMsg; j++ {
16211679
if c.deDuplication {
1622-
header := nats.Header{}
1623-
header.Set(nats.MsgIdHdr, idPrefix+"-"+pubNumber+"-"+strconv.Itoa(i+j+offset))
1624-
message := nats.Msg{Data: msg, Header: header, Subject: c.getPublishSubject(i + j + offset)}
1625-
futures[j], err = js.PublishMsgAsync(&message)
1626-
} else {
1627-
futures[j], err = js.PublishAsync(c.getPublishSubject(i+j+offset), msg)
1680+
message.Header.Set(nats.MsgIdHdr, idPrefix+"-"+pubNumber+"-"+strconv.Itoa(i+j+offset))
16281681
}
1682+
1683+
message.Subject = c.getPublishSubject(i + j + offset)
1684+
1685+
futures[j], err = js.PublishMsgAsync(&message)
16291686
if err != nil {
16301687
return fmt.Errorf("publishing asynchronously: %w", err)
16311688
}
1689+
16321690
if progress != nil {
16331691
progress.Incr()
16341692
}
1693+
16351694
time.Sleep(c.sleep)
16361695
}
16371696

@@ -1659,14 +1718,14 @@ func (c *benchCmd) jsPublisher(nc *nats.Conn, progress *uiprogress.Bar, msg []by
16591718
if progress != nil {
16601719
progress.Incr()
16611720
}
1721+
16621722
if c.deDuplication {
1663-
header := nats.Header{}
1664-
header.Set(nats.MsgIdHdr, idPrefix+"-"+pubNumber+"-"+strconv.Itoa(i+offset))
1665-
message := nats.Msg{Data: msg, Header: header, Subject: c.getPublishSubject(i + offset)}
1666-
_, err = js.PublishMsg(ctx, &message)
1667-
} else {
1668-
_, err = js.Publish(ctx, c.getPublishSubject(i+offset), msg)
1723+
message.Header.Set(nats.MsgIdHdr, idPrefix+"-"+pubNumber+"-"+strconv.Itoa(i+offset))
16691724
}
1725+
1726+
message.Subject = c.getPublishSubject(i + offset)
1727+
1728+
_, err = js.PublishMsg(ctx, &message)
16701729
if err != nil {
16711730
return fmt.Errorf("publishing synchronously: %w", err)
16721731
}
@@ -1742,11 +1801,6 @@ func (c *benchCmd) runCorePublisher(bm *bench.Benchmark, errChan chan error, nc
17421801
return
17431802
}
17441803

1745-
var msg []byte
1746-
if c.msgSize > 0 {
1747-
msg = make([]byte, c.msgSize)
1748-
}
1749-
17501804
<-trigger
17511805

17521806
// introduces some jitter between the publishers if sleep is set and more than one publisher
@@ -1756,7 +1810,7 @@ func (c *benchCmd) runCorePublisher(bm *bench.Benchmark, errChan chan error, nc
17561810
}
17571811

17581812
start := time.Now()
1759-
err := c.coreNATSPublisher(nc, progress, msg, numMsg, offset)
1813+
err := c.coreNATSPublisher(nc, progress, c.msgSize, numMsg, offset)
17601814
if err != nil {
17611815
errChan <- fmt.Errorf("publishing: %w", err)
17621816
donewg.Done()
@@ -1862,11 +1916,6 @@ func (c *benchCmd) runCoreRequester(bm *bench.Benchmark, errChan chan error, nc
18621916
progress.Width = iu.ProgressWidth()
18631917
}
18641918

1865-
var msg []byte
1866-
if c.msgSize > 0 {
1867-
msg = make([]byte, c.msgSize)
1868-
}
1869-
18701919
<-trigger
18711920

18721921
// introduces some jitter between the publishers if sleep is set and more than one publisher
@@ -1876,7 +1925,7 @@ func (c *benchCmd) runCoreRequester(bm *bench.Benchmark, errChan chan error, nc
18761925
}
18771926

18781927
start := time.Now()
1879-
err := c.coreNATSRequester(nc, progress, msg, numMsg, offset)
1928+
err := c.coreNATSRequester(nc, progress, c.msgSize, numMsg, offset)
18801929
if err != nil {
18811930
errChan <- fmt.Errorf("requesting: %w", err)
18821931
donewg.Done()
@@ -1955,11 +2004,6 @@ func (c *benchCmd) runJSPublisher(bm *bench.Benchmark, errChan chan error, nc *n
19552004
progress.Width = iu.ProgressWidth()
19562005
}
19572006

1958-
var msg []byte
1959-
if c.msgSize > 0 {
1960-
msg = make([]byte, c.msgSize)
1961-
}
1962-
19632007
<-trigger
19642008

19652009
// introduces some jitter between the publishers if sleep is set and more than one publisher
@@ -1969,7 +2013,7 @@ func (c *benchCmd) runJSPublisher(bm *bench.Benchmark, errChan chan error, nc *n
19692013
}
19702014

19712015
start := time.Now()
1972-
err := c.jsPublisher(nc, progress, msg, numMsg, idPrefix, pubNumber, offset)
2016+
err := c.jsPublisher(nc, progress, c.msgSize, numMsg, idPrefix, pubNumber, offset)
19732017
if err != nil {
19742018
errChan <- fmt.Errorf("publishing: %w", err)
19752019
donewg.Done()

0 commit comments

Comments
 (0)