From c74bff4009f1a5e2f9fd4788e78befa8152b8f56 Mon Sep 17 00:00:00 2001 From: Lev <1187448+levb@users.noreply.github.com> Date: Thu, 20 Jun 2024 23:52:22 -0700 Subject: [PATCH] =?UTF-8?q?Added=20tests=20for=20MQTT=20retained=20message?= =?UTF-8?q?s=20in=20various=20cluster/domain=20conf=E2=80=A6=20(#5082)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is a Test- and CI-only PR. No server changes. See also https://github.com/ConnectEverything/mqtt-test/pull/2. --- .github/workflows/MQTT_test.yaml | 67 -- .github/workflows/mqtt-test.yaml | 48 ++ server/mqtt.go | 6 + ...{mqtt_ex_test.go => mqtt_ex_bench_test.go} | 136 +--- server/mqtt_ex_test_test.go | 599 ++++++++++++++++++ 5 files changed, 676 insertions(+), 180 deletions(-) delete mode 100644 .github/workflows/MQTT_test.yaml create mode 100644 .github/workflows/mqtt-test.yaml rename server/{mqtt_ex_test.go => mqtt_ex_bench_test.go} (70%) create mode 100644 server/mqtt_ex_test_test.go diff --git a/.github/workflows/MQTT_test.yaml b/.github/workflows/MQTT_test.yaml deleted file mode 100644 index 2bfd0bed2bb..00000000000 --- a/.github/workflows/MQTT_test.yaml +++ /dev/null @@ -1,67 +0,0 @@ -name: MQTTEx -on: [push, pull_request] - -permissions: - pull-requests: write # to comment on PRs - contents: write # to comment on commits (to upload artifacts?) - -jobs: - test: - env: - GOPATH: /home/runner/work/nats-server - GO111MODULE: "on" - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v4 - with: - path: src/github.com/nats-io/nats-server - - - name: Setup Go - uses: actions/setup-go@v5 - with: - go-version-file: src/github.com/nats-io/nats-server/go.mod - cache-dependency-path: src/github.com/nats-io/nats-server/go.sum - - - name: Set up testing tools and environment - shell: bash --noprofile --norc -eo pipefail {0} - id: setup - run: | - wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb - sudo apt install ./mqtt-cli-4.20.0.deb - go install github.com/ConnectEverything/mqtt-test@4dd571c31318dcfebe5443242f53f262403ceafb - - # - name: Download benchmark result for ${{ github.base_ref || github.ref_name }} - # uses: dawidd6/action-download-artifact@v2 - # continue-on-error: true - # with: - # path: src/github.com/nats-io/nats-server/bench - # name: bench-output-${{ runner.os }} - # branch: ${{ github.base_ref || github.ref }} - - - name: Run tests and benchmarks - shell: bash --noprofile --norc -eo pipefail {0} - run: | - cd src/github.com/nats-io/nats-server - go test -v --run='MQTTEx' ./server - # go test --run='-' --count=10 --bench 'MQTT_' ./server | tee output.txt - # go test --run='-' --count=10 --bench 'MQTTEx' --timeout=20m --benchtime=100x ./server | tee -a output.txt - go test --run='-' --count=3 --bench 'MQTTEx' --benchtime=100x ./server - - # - name: Compare benchmarks - # uses: benchmark-action/github-action-benchmark@v1 - # with: - # tool: go - # output-file-path: src/github.com/nats-io/nats-server/output.txt - # github-token: ${{ secrets.GITHUB_TOKEN }} - # alert-threshold: 140% - # comment-on-alert: true - # # fail-on-alert: true - # external-data-json-path: src/github.com/nats-io/nats-server/bench/benchmark-data.json - - # - name: Store benchmark result for ${{ github.ref_name }} - # if: always() - # uses: actions/upload-artifact@v3 - # with: - # path: src/github.com/nats-io/nats-server/bench - # name: bench-output-${{ runner.os }} diff --git a/.github/workflows/mqtt-test.yaml b/.github/workflows/mqtt-test.yaml new file mode 100644 index 00000000000..893837e6b60 --- /dev/null +++ b/.github/workflows/mqtt-test.yaml @@ -0,0 +1,48 @@ +name: MQTT external test +on: [pull_request] + +jobs: + test: + env: + GOPATH: /home/runner/work/nats-server + GO111MODULE: "on" + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + path: src/github.com/nats-io/nats-server + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version-file: src/github.com/nats-io/nats-server/go.mod + cache-dependency-path: src/github.com/nats-io/nats-server/go.sum + + - name: Set up testing tools and environment + shell: bash --noprofile --norc -eo pipefail {0} + id: setup + run: | + wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb + sudo apt install ./mqtt-cli-4.20.0.deb + go install github.com/ConnectEverything/mqtt-test@v0.1.0 + + - name: Run tests (3 times to detect flappers) + shell: bash --noprofile --norc -eo pipefail {0} + run: | + cd src/github.com/nats-io/nats-server + go test -v --count=3 --run='TestXMQTT' ./server + + - name: Run tests with --race + shell: bash --noprofile --norc -eo pipefail {0} + run: | + cd src/github.com/nats-io/nats-server + go test -v --race --failfast --run='TestXMQTT' ./server + + - name: Run benchmarks + shell: bash --noprofile --norc -eo pipefail {0} + run: | + cd src/github.com/nats-io/nats-server + go test --run='-' --count=3 --bench 'BenchmarkXMQTT' --benchtime=100x ./server + + # TODO: compare benchmarks diff --git a/server/mqtt.go b/server/mqtt.go index 7ca49081914..95868115dc1 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1950,9 +1950,13 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie } // If lastSeq is 0 (nothing to recover, or done doing it) and this is // from our own server, ignore. + as.mu.RLock() if as.rrmLastSeq == 0 && rm.Origin == as.jsa.id { + as.mu.RUnlock() return } + as.mu.RUnlock() + // At this point we either recover from our own server, or process a remote retained message. seq, _, _ := ackReplyInfo(reply) @@ -1960,11 +1964,13 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm, false) // If we were recovering (lastSeq > 0), then check if we are done. + as.mu.Lock() if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq { as.rrmLastSeq = 0 close(as.rrmDoneCh) as.rrmDoneCh = nil } + as.mu.Unlock() } func (as *mqttAccountSessionManager) processRetainedMsgDel(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { diff --git a/server/mqtt_ex_test.go b/server/mqtt_ex_bench_test.go similarity index 70% rename from server/mqtt_ex_test.go rename to server/mqtt_ex_bench_test.go index 44025bef806..efe2056f473 100644 --- a/server/mqtt_ex_test.go +++ b/server/mqtt_ex_bench_test.go @@ -17,51 +17,12 @@ package server import ( - "bytes" - "encoding/json" "fmt" - "os" - "os/exec" "strconv" "testing" "time" ) -func TestMQTTExCompliance(t *testing.T) { - mqttPath := os.Getenv("MQTT_CLI") - if mqttPath == "" { - if p, err := exec.LookPath("mqtt"); err == nil { - mqttPath = p - } - } - if mqttPath == "" { - t.Skip(`"mqtt" command is not found in $PATH nor $MQTT_CLI. See https://hivemq.github.io/mqtt-cli/docs/installation/#debian-package for installation instructions`) - } - - conf := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - server_name: mqtt - jetstream { - store_dir = %q - } - mqtt { - listen: 127.0.0.1:-1 - } - `, t.TempDir()))) - s, o := RunServerWithConfig(conf) - defer testMQTTShutdownServer(s) - - cmd := exec.Command(mqttPath, "test", "-V", "3", "-p", strconv.Itoa(o.MQTT.Port)) - - output, err := cmd.CombinedOutput() - t.Log(string(output)) - if err != nil { - if exitError, ok := err.(*exec.ExitError); ok { - t.Fatalf("mqtt cli exited with error: %v", exitError) - } - } -} - const ( KB = 1024 ) @@ -83,9 +44,6 @@ type mqttBenchContext struct { Host string Port int - - // full path to mqtt-test command - testCmdPath string } var mqttBenchDefaultMatrix = mqttBenchMatrix{ @@ -102,8 +60,12 @@ type MQTTBenchmarkResult struct { Bytes int64 `json:"bytes"` } -func BenchmarkMQTTEx(b *testing.B) { - bc := mqttNewBenchEx(b) +func BenchmarkXMQTT(b *testing.B) { + if mqttTestCommandPath == "" { + b.Skip(`"mqtt-test" command is not found in $PATH.`) + } + + bc := mqttBenchContext{} b.Run("Server", func(b *testing.B) { b.Cleanup(bc.startServer(b, false)) bc.runAll(b) @@ -142,11 +104,11 @@ func (bc mqttBenchContext) benchmarkPub(b *testing.B) { b.Run("PUB", func(b *testing.B) { m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) { - bc.runCommand(b, "pub", + bc.runAndReport(b, "pub", "--qos", strconv.Itoa(bc.QOS), - "--n", strconv.Itoa(b.N), + "--messages", strconv.Itoa(b.N), "--size", strconv.Itoa(bc.MessageSize), - "--num-publishers", strconv.Itoa(bc.Publishers), + "--publishers", strconv.Itoa(bc.Publishers), ) }) }) @@ -165,11 +127,11 @@ func (bc mqttBenchContext) benchmarkPubRetained(b *testing.B) { b.Run("PUBRET", func(b *testing.B) { m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) { - bc.runCommand(b, "pub", "--retain", + bc.runAndReport(b, "pub", "--retain", "--qos", strconv.Itoa(bc.QOS), - "--n", strconv.Itoa(b.N), + "--messages", strconv.Itoa(b.N), "--size", strconv.Itoa(bc.MessageSize), - "--num-publishers", strconv.Itoa(bc.Publishers), + "--publishers", strconv.Itoa(bc.Publishers), ) }) }) @@ -185,11 +147,11 @@ func (bc mqttBenchContext) benchmarkPubSub(b *testing.B) { b.Run("PUBSUB", func(b *testing.B) { m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) { - bc.runCommand(b, "pubsub", + bc.runAndReport(b, "pubsub", "--qos", strconv.Itoa(bc.QOS), - "--n", strconv.Itoa(b.N), + "--messages", strconv.Itoa(b.N), "--size", strconv.Itoa(bc.MessageSize), - "--num-subscribers", strconv.Itoa(bc.Subscribers), + "--subscribers", strconv.Itoa(bc.Subscribers), ) }) }) @@ -206,67 +168,23 @@ func (bc mqttBenchContext) benchmarkSubRet(b *testing.B) { b.Run("SUBRET", func(b *testing.B) { m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) { - bc.runCommand(b, "subret", + bc.runAndReport(b, "subret", "--qos", strconv.Itoa(bc.QOS), - "--n", strconv.Itoa(b.N), // number of subscribe requests - "--num-subscribers", strconv.Itoa(bc.Subscribers), - "--num-topics", strconv.Itoa(bc.Topics), + "--topics", strconv.Itoa(bc.Topics), // number of retained messages + "--subscribers", strconv.Itoa(bc.Subscribers), "--size", strconv.Itoa(bc.MessageSize), + "--repeat", strconv.Itoa(b.N), // number of subscribe requests ) }) }) } -func mqttBenchLookupCommand(b *testing.B, name string) string { +func (bc mqttBenchContext) runAndReport(b *testing.B, name string, extraArgs ...string) { b.Helper() - cmd, err := exec.LookPath(name) - if err != nil { - b.Skipf("%q command is not found in $PATH. Please `go install github.com/nats-io/meta-nats/apps/go/mqtt/...@latest` and try again.", name) - } - return cmd -} - -func (bc mqttBenchContext) runCommand(b *testing.B, name string, extraArgs ...string) { - b.Helper() - - args := append([]string{ - name, - "-q", - "--servers", fmt.Sprintf("%s:%d", bc.Host, bc.Port), - }, extraArgs...) - - cmd := exec.Command(bc.testCmdPath, args...) - stdout, err := cmd.StdoutPipe() - if err != nil { - b.Fatalf("Error executing %q: %v", cmd.String(), err) - } - defer stdout.Close() - errbuf := bytes.Buffer{} - cmd.Stderr = &errbuf - if err = cmd.Start(); err != nil { - b.Fatalf("Error executing %q: %v", cmd.String(), err) - } - r := &MQTTBenchmarkResult{} - if err = json.NewDecoder(stdout).Decode(r); err != nil { - b.Fatalf("failed to decode output of %q: %v\n\n%s", cmd.String(), err, errbuf.String()) - } - if err = cmd.Wait(); err != nil { - b.Fatalf("Error executing %q: %v\n\n%s", cmd.String(), err, errbuf.String()) - } - + r := mqttRunExCommandTest(b, name, mqttNewDial("", "", bc.Host, bc.Port, ""), extraArgs...) r.report(b) } -func (bc mqttBenchContext) initServer(b *testing.B) { - b.Helper() - bc.runCommand(b, "pubsub", - "--id", "__init__", - "--qos", "0", - "--n", "1", - "--size", "100", - "--num-subscribers", "1") -} - func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func() { b.Helper() b.StopTimer() @@ -278,7 +196,7 @@ func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func o = s.getOpts() bc.Host = o.MQTT.Host bc.Port = o.MQTT.Port - bc.initServer(b) + mqttInitTestServer(b, mqttNewDial("", "", bc.Host, bc.Port, "")) return func() { testMQTTShutdownServer(s) testDisableRMSCache = prevDisableRMSCache @@ -314,7 +232,7 @@ func (bc *mqttBenchContext) startCluster(b *testing.B, disableRMSCache bool) fun o := cl.randomNonLeader().getOpts() bc.Host = o.MQTT.Host bc.Port = o.MQTT.Port - bc.initServer(b) + mqttInitTestServer(b, mqttNewDial("", "", bc.Host, bc.Port, "")) return func() { cl.shutdown() testDisableRMSCache = prevDisableRMSCache @@ -410,15 +328,7 @@ func (r MQTTBenchmarkResult) report(b *testing.B) { nsOp := float64(ns) / float64(r.Ops) b.ReportMetric(nsOp/1000000, unit+"_ms/op") } - // Diable ReportAllocs() since it confuses the github benchmarking action // with the noise. // b.ReportAllocs() } - -func mqttNewBenchEx(b *testing.B) *mqttBenchContext { - cmd := mqttBenchLookupCommand(b, "mqtt-test") - return &mqttBenchContext{ - testCmdPath: cmd, - } -} diff --git a/server/mqtt_ex_test_test.go b/server/mqtt_ex_test_test.go new file mode 100644 index 00000000000..5ae007ea3c0 --- /dev/null +++ b/server/mqtt_ex_test_test.go @@ -0,0 +1,599 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !skip_mqtt_tests +// +build !skip_mqtt_tests + +package server + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "testing" + + "github.com/nats-io/nuid" +) + +type mqttDial string + +type mqttTarget struct { + singleServers []*Server + clusters []*cluster + configs []mqttTestConfig + all []mqttDial +} + +type mqttTestConfig struct { + name string + pub []mqttDial + sub []mqttDial +} + +func TestXMQTTCompliance(t *testing.T) { + if mqttCLICommandPath == _EMPTY_ { + t.Skip(`"mqtt" command is not found in $PATH nor $MQTT_CLI. See https://hivemq.github.io/mqtt-cli/docs/installation/#debian-package for installation instructions`) + } + + o := testMQTTDefaultOptions() + s := testMQTTRunServer(t, o) + o = s.getOpts() + defer testMQTTShutdownServer(s) + + cmd := exec.Command(mqttCLICommandPath, "test", "-V", "3", "-p", strconv.Itoa(o.MQTT.Port)) + + output, err := cmd.CombinedOutput() + t.Log(string(output)) + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + t.Fatalf("mqtt cli exited with error: %v", exitError) + } + } +} + +func TestXMQTTRetainedMessages(t *testing.T) { + if mqttTestCommandPath == _EMPTY_ { + t.Skip(`"mqtt-test" command is not found in $PATH.`) + } + + for _, topo := range []struct { + name string + makef func(testing.TB) *mqttTarget + }{ + { + name: "single server", + makef: mqttMakeTestServer, + }, + { + name: "server with leafnode", + makef: mqttMakeTestServerWithLeafnode("HUBD", "LEAFD", true), + }, + { + name: "server with leafnode no domains", + makef: mqttMakeTestServerWithLeafnode("", "", true), + }, + { + name: "server with leafnode no system account", + makef: mqttMakeTestServerWithLeafnode("HUBD", "LEAFD", false), + }, + { + name: "cluster", + makef: mqttMakeTestCluster(4, ""), + }, + { + name: "cluster with leafnode cluster", + makef: mqttMakeTestClusterWithLeafnodeCluster("HUBD", "LEAFD", true), + }, + { + name: "cluster with leafnode cluster no system account", + makef: mqttMakeTestClusterWithLeafnodeCluster("HUBD", "LEAFD", false), + }, + } { + t.Run(topo.name, func(t *testing.T) { + target := topo.makef(t) + t.Cleanup(target.Shutdown) + + // initialize the MQTT assets by "touching" all nodes in the + // cluster, but then reload to start with fresh server state. + for _, dial := range target.all { + mqttInitTestServer(t, dial) + } + + numRMS := 100 + strNumRMS := strconv.Itoa(numRMS) + topics := make([]string, len(target.configs)) + + for i, tc := range target.configs { + // Publish numRMS retained messages one at a time, + // round-robin across pub nodes. Remember the topic for each + // test config to check the subs after reload. + topic := "subret_" + nuid.Next() + topics[i] = topic + iNode := 0 + for i := 0; i < numRMS; i++ { + pubTopic := fmt.Sprintf("%s/%d", topic, i) + dial := tc.pub[iNode%len(tc.pub)] + mqttRunExCommandTest(t, "pub", dial, + "--retain", + "--topic", pubTopic, + "--qos", "0", + "--size", "128", // message size 128 bytes + ) + iNode++ + } + } + + // Check all sub nodes for retained messages + for i, tc := range target.configs { + for _, dial := range tc.sub { + mqttRunExCommandTest(t, "sub", dial, + "--retained", strNumRMS, + "--qos", "0", + "--topic", topics[i], + ) + } + } + + // Reload the target + target.Reload(t) + + // Now check again + for i, tc := range target.configs { + for _, dial := range tc.sub { + mqttRunExCommandTestRetry(t, 1, "sub", dial, + "--retained", strNumRMS, + "--qos", "0", + "--topic", topics[i], + ) + } + } + }) + } +} + +func mqttInitTestServer(tb testing.TB, dial mqttDial) { + tb.Helper() + mqttRunExCommandTestRetry(tb, 5, "pub", dial) +} + +func mqttNewDialForServer(s *Server, username, password string) mqttDial { + o := s.getOpts().MQTT + return mqttNewDial(username, password, o.Host, o.Port, s.Name()) +} + +func mqttNewDial(username, password, host string, port int, comment string) mqttDial { + d := "" + switch { + case username != "" && password != "": + d = fmt.Sprintf("%s:%s@%s:%d", username, password, host, port) + case username != "": + d = fmt.Sprintf("%s@%s:%d", username, host, port) + default: + d = fmt.Sprintf("%s:%d", host, port) + } + if comment != "" { + d += "#" + comment + } + return mqttDial(d) +} + +func (d mqttDial) Get() (u, p, s, c string) { + if d == "" { + return "", "", "127.0.0.1:1883", "" + } + in := string(d) + if i := strings.LastIndex(in, "#"); i != -1 { + c = in[i+1:] + in = in[:i] + } + if i := strings.LastIndex(in, "@"); i != -1 { + up := in[:i] + in = in[i+1:] + u = up + if i := strings.Index(up, ":"); i != -1 { + u = up[:i] + p = up[i+1:] + } + } + s = in + return u, p, s, c +} + +func (d mqttDial) Name() string { + _, _, _, c := d.Get() + return c +} + +func (t *mqttTarget) Reload(tb testing.TB) { + tb.Helper() + + for _, c := range t.clusters { + c.stopAll() + c.restartAllSamePorts() + } + + for i, s := range t.singleServers { + o := s.getOpts() + s.Shutdown() + t.singleServers[i] = testMQTTRunServer(tb, o) + } + + for _, dial := range t.all { + mqttInitTestServer(tb, dial) + } +} + +func (t *mqttTarget) Shutdown() { + for _, c := range t.clusters { + c.shutdown() + } + for _, s := range t.singleServers { + testMQTTShutdownServer(s) + } +} + +func mqttMakeTestServer(tb testing.TB) *mqttTarget { + tb.Helper() + o := testMQTTDefaultOptions() + s := testMQTTRunServer(tb, o) + all := []mqttDial{mqttNewDialForServer(s, "", "")} + return &mqttTarget{ + singleServers: []*Server{s}, + all: all, + configs: []mqttTestConfig{ + { + name: "single server", + pub: all, + sub: all, + }, + }, + } +} + +func mqttMakeTestServerWithLeafnode(hubd, leafd string, connectSystemAccount bool) func(tb testing.TB) *mqttTarget { + return func(tb testing.TB) *mqttTarget { + tb.Helper() + + if hubd != "" { + hubd = "domain: " + hubd + ", " + } + sconf := ` +listen: 127.0.0.1:-1 + +server_name: HUB +jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + hubd + `store_dir: '` + tb.TempDir() + `'} + +leafnodes { + listen: 127.0.0.1:-1 +} + +accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } +} + +mqtt { + listen: 127.0.0.1:-1 +} +` + hubConf := createConfFile(tb, []byte(sconf)) + hubServer, o := RunServerWithConfig(hubConf) + leafRemoteAddr := fmt.Sprintf("%s:%d", o.LeafNode.Host, o.LeafNode.Port) + tb.Cleanup(func() { + os.Remove(hubConf) + }) + + sysRemote := "" + if connectSystemAccount { + sysRemote = `{ url: "nats://admin:s3cr3t!@` + leafRemoteAddr + `", account: "$SYS" },` + "\n\t\t" + } + if leafd != "" { + leafd = "domain: " + leafd + ", " + } + leafconf := ` +listen: 127.0.0.1:-1 + +server_name: SPOKE +jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + leafd + `store_dir: '` + tb.TempDir() + `'} + +leafnodes { + remotes = [ + ` + sysRemote + `{ url: "nats://one:p@` + leafRemoteAddr + `", account: "ONE" }, + ] +} + +accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } +} + +mqtt { + listen: 127.0.0.1:-1 +} +` + leafConf := createConfFile(tb, []byte(leafconf)) + leafServer, _ := RunServerWithConfig(leafConf) + tb.Cleanup(func() { + os.Remove(leafConf) + }) + + both := []mqttDial{ + mqttNewDialForServer(hubServer, "one", "p"), + mqttNewDialForServer(leafServer, "one", "p"), + } + return &mqttTarget{ + singleServers: []*Server{hubServer, leafServer}, + all: both, + configs: []mqttTestConfig{ + {name: "pub to all", pub: both, sub: both}, + {name: "pub to SPOKE", pub: both[1:], sub: both}, + {name: "pub to HUB", pub: both[:1], sub: both}, + }, + } + } +} + +func mqttMakeTestCluster(size int, domain string) func(tb testing.TB) *mqttTarget { + return func(tb testing.TB) *mqttTarget { + tb.Helper() + if size < 3 { + tb.Fatal("cluster size must be at least 3") + } + + if domain != "" { + domain = "domain: " + domain + ", " + } + clusterConf := ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + domain + `store_dir: '%s'} + + leafnodes { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + mqtt { + listen: 127.0.0.1:-1 + stream_replicas: 3 + } + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + cl := createJetStreamClusterWithTemplate(tb, clusterConf, "MQTT", size) + cl.waitOnLeader() + + all := []mqttDial{} + for _, s := range cl.servers { + all = append(all, mqttNewDialForServer(s, "one", "p")) + } + + return &mqttTarget{ + clusters: []*cluster{cl}, + all: all, + configs: []mqttTestConfig{ + { + name: "publish to one", + pub: []mqttDial{ + mqttNewDialForServer(cl.randomServer(), "one", "p"), + }, + sub: all, + }, + { + name: "publish to all", + pub: all, + sub: all, + }, + }, + } + } +} + +func mqttMakeTestClusterWithLeafnodeCluster(hubd, leafd string, connectSystemAccount bool) func(tb testing.TB) *mqttTarget { + return func(tb testing.TB) *mqttTarget { + tb.Helper() + + // Create HUB cluster. + if hubd != "" { + hubd = "domain: " + hubd + ", " + } + hubConf := ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + hubd + `store_dir: '%s'} + + leafnodes { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + mqtt { + listen: 127.0.0.1:-1 + stream_replicas: 3 + } + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + hub := createJetStreamClusterWithTemplate(tb, hubConf, "HUB", 3) + hub.waitOnLeader() + + // Pick a host to connect leafnodes to + lno := hub.randomNonLeader().getOpts().LeafNode + leafRemoteAddr := fmt.Sprintf("%s:%d", lno.Host, lno.Port) + hubRandom := mqttNewDialForServer(hub.randomNonLeader(), "one", "p") + hubAll := []mqttDial{} + for _, s := range hub.servers { + hubAll = append(hubAll, mqttNewDialForServer(s, "one", "p")) + } + + // Create SPOKE (leafnode) cluster. + sysRemote := "" + if connectSystemAccount { + sysRemote = `{ url: "nats://admin:s3cr3t!@` + leafRemoteAddr + `", account: "$SYS" },` + "\n\t\t\t" + } + if leafd != "" { + leafd = "domain: " + leafd + ", " + } + leafConf := ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + leafd + `store_dir: '%s'} + + leafnodes { + remotes = [ + ` + sysRemote + `{ url: "nats://one:p@` + leafRemoteAddr + `", account: "ONE" }, + ] + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + mqtt { + listen: 127.0.0.1:-1 + stream_replicas: 3 + } + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + spoke := createJetStreamCluster(tb, leafConf, "SPOKE", "SPOKE-", 3, 22111, false) + expectedConnections := 2 + if !connectSystemAccount { + expectedConnections = 1 + } + for _, s := range spoke.servers { + checkLeafNodeConnectedCount(tb, s, expectedConnections) + } + spoke.waitOnPeerCount(3) + spokeRandom := mqttNewDialForServer(spoke.randomNonLeader(), "one", "p") + spokeAll := []mqttDial{} + for _, s := range spoke.servers { + spokeAll = append(spokeAll, mqttNewDialForServer(s, "one", "p")) + } + + all := append(hubAll, spokeAll...) + + return &mqttTarget{ + clusters: []*cluster{hub, spoke}, + all: all, + configs: []mqttTestConfig{ + {name: "publish to all", pub: all, sub: all}, + {name: "publish to all hub", pub: hubAll, sub: all}, + {name: "publish to random in hub", pub: []mqttDial{hubRandom}, sub: all}, + {name: "publish to all spoke", pub: spokeAll, sub: all}, + {name: "publish to random in spoke", pub: []mqttDial{spokeRandom}, sub: all}, + }, + } + } +} + +var mqttCLICommandPath = func() string { + p := os.Getenv("MQTT_CLI") + if p == "" { + p, _ = exec.LookPath("mqtt") + } + return p +}() + +var mqttTestCommandPath = func() string { + p, _ := exec.LookPath("mqtt-test") + return p +}() + +func mqttRunExCommandTest(tb testing.TB, subCommand string, dial mqttDial, extraArgs ...string) *MQTTBenchmarkResult { + tb.Helper() + return mqttRunExCommandTestRetry(tb, 1, subCommand, dial, extraArgs...) +} + +func mqttRunExCommandTestRetry(tb testing.TB, n int, subCommand string, dial mqttDial, extraArgs ...string) (r *MQTTBenchmarkResult) { + tb.Helper() + var err error + for i := 0; i < n; i++ { + if r, err = mqttTryExCommandTest(tb, subCommand, dial, extraArgs...); err == nil { + return r + } + + if i < (n - 1) { + tb.Logf("failed to %q %s to %q on attempt %v, will retry.", subCommand, extraArgs, dial.Name(), i) + } else { + tb.Fatal(err) + } + } + return nil +} + +func mqttTryExCommandTest(tb testing.TB, subCommand string, dial mqttDial, extraArgs ...string) (r *MQTTBenchmarkResult, err error) { + tb.Helper() + if mqttTestCommandPath == "" { + tb.Skip(`"mqtt-test" command is not found in $PATH.`) + } + + args := []string{subCommand, // "-q", + "-s", string(dial), + } + args = append(args, extraArgs...) + cmd := exec.Command(mqttTestCommandPath, args...) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("error executing %q: %v", cmd.String(), err) + } + defer stdout.Close() + errbuf := bytes.Buffer{} + cmd.Stderr = &errbuf + if err = cmd.Start(); err != nil { + return nil, fmt.Errorf("error executing %q: %v", cmd.String(), err) + } + out, err := io.ReadAll(stdout) + if err != nil { + return nil, fmt.Errorf("error executing %q: failed to read output: %v", cmd.String(), err) + } + if err = cmd.Wait(); err != nil { + return nil, fmt.Errorf("error executing %q: %v\n\n%s\n\n%s", cmd.String(), err, string(out), errbuf.String()) + } + + r = &MQTTBenchmarkResult{} + if err := json.Unmarshal(out, r); err != nil { + tb.Fatalf("error executing %q: failed to decode output: %v\n\n%s\n\n%s", cmd.String(), err, string(out), errbuf.String()) + } + return r, nil +}