Skip to content

Commit

Permalink
Added tests for MQTT retained messages in various cluster/domain conf… (
Browse files Browse the repository at this point in the history
#5082)

This is a Test- and CI-only PR. No server changes.

See also ConnectEverything/mqtt-test#2.
  • Loading branch information
levb authored and neilalexander committed Jun 21, 2024
1 parent 6718784 commit c74bff4
Show file tree
Hide file tree
Showing 5 changed files with 676 additions and 180 deletions.
67 changes: 0 additions & 67 deletions .github/workflows/MQTT_test.yaml

This file was deleted.

48 changes: 48 additions & 0 deletions .github/workflows/mqtt-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: MQTT external test
on: [pull_request]

jobs:
test:
env:
GOPATH: /home/runner/work/nats-server
GO111MODULE: "on"
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
path: src/github.com/nats-io/nats-server

- name: Setup Go
uses: actions/setup-go@v5
with:
go-version-file: src/github.com/nats-io/nats-server/go.mod
cache-dependency-path: src/github.com/nats-io/nats-server/go.sum

- name: Set up testing tools and environment
shell: bash --noprofile --norc -eo pipefail {0}
id: setup
run: |
wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb
sudo apt install ./mqtt-cli-4.20.0.deb
go install github.com/ConnectEverything/[email protected]
- name: Run tests (3 times to detect flappers)
shell: bash --noprofile --norc -eo pipefail {0}
run: |
cd src/github.com/nats-io/nats-server
go test -v --count=3 --run='TestXMQTT' ./server
- name: Run tests with --race
shell: bash --noprofile --norc -eo pipefail {0}
run: |
cd src/github.com/nats-io/nats-server
go test -v --race --failfast --run='TestXMQTT' ./server
- name: Run benchmarks
shell: bash --noprofile --norc -eo pipefail {0}
run: |
cd src/github.com/nats-io/nats-server
go test --run='-' --count=3 --bench 'BenchmarkXMQTT' --benchtime=100x ./server
# TODO: compare benchmarks
6 changes: 6 additions & 0 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,21 +1950,27 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
}
// If lastSeq is 0 (nothing to recover, or done doing it) and this is
// from our own server, ignore.
as.mu.RLock()
if as.rrmLastSeq == 0 && rm.Origin == as.jsa.id {
as.mu.RUnlock()
return
}
as.mu.RUnlock()

// At this point we either recover from our own server, or process a remote retained message.
seq, _, _ := ackReplyInfo(reply)

// Handle this retained message, no need to copy the bytes.
as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm, false)

// If we were recovering (lastSeq > 0), then check if we are done.
as.mu.Lock()
if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq {
as.rrmLastSeq = 0
close(as.rrmDoneCh)
as.rrmDoneCh = nil
}
as.mu.Unlock()
}

func (as *mqttAccountSessionManager) processRetainedMsgDel(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
Expand Down
136 changes: 23 additions & 113 deletions server/mqtt_ex_test.go → server/mqtt_ex_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,12 @@
package server

import (
"bytes"
"encoding/json"
"fmt"
"os"
"os/exec"
"strconv"
"testing"
"time"
)

func TestMQTTExCompliance(t *testing.T) {
mqttPath := os.Getenv("MQTT_CLI")
if mqttPath == "" {
if p, err := exec.LookPath("mqtt"); err == nil {
mqttPath = p
}
}
if mqttPath == "" {
t.Skip(`"mqtt" command is not found in $PATH nor $MQTT_CLI. See https://hivemq.github.io/mqtt-cli/docs/installation/#debian-package for installation instructions`)
}

conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
server_name: mqtt
jetstream {
store_dir = %q
}
mqtt {
listen: 127.0.0.1:-1
}
`, t.TempDir())))
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)

cmd := exec.Command(mqttPath, "test", "-V", "3", "-p", strconv.Itoa(o.MQTT.Port))

output, err := cmd.CombinedOutput()
t.Log(string(output))
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
t.Fatalf("mqtt cli exited with error: %v", exitError)
}
}
}

const (
KB = 1024
)
Expand All @@ -83,9 +44,6 @@ type mqttBenchContext struct {

Host string
Port int

// full path to mqtt-test command
testCmdPath string
}

var mqttBenchDefaultMatrix = mqttBenchMatrix{
Expand All @@ -102,8 +60,12 @@ type MQTTBenchmarkResult struct {
Bytes int64 `json:"bytes"`
}

func BenchmarkMQTTEx(b *testing.B) {
bc := mqttNewBenchEx(b)
func BenchmarkXMQTT(b *testing.B) {
if mqttTestCommandPath == "" {
b.Skip(`"mqtt-test" command is not found in $PATH.`)
}

bc := mqttBenchContext{}
b.Run("Server", func(b *testing.B) {
b.Cleanup(bc.startServer(b, false))
bc.runAll(b)
Expand Down Expand Up @@ -142,11 +104,11 @@ func (bc mqttBenchContext) benchmarkPub(b *testing.B) {

b.Run("PUB", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pub",
bc.runAndReport(b, "pub",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-publishers", strconv.Itoa(bc.Publishers),
"--publishers", strconv.Itoa(bc.Publishers),
)
})
})
Expand All @@ -165,11 +127,11 @@ func (bc mqttBenchContext) benchmarkPubRetained(b *testing.B) {

b.Run("PUBRET", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pub", "--retain",
bc.runAndReport(b, "pub", "--retain",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-publishers", strconv.Itoa(bc.Publishers),
"--publishers", strconv.Itoa(bc.Publishers),
)
})
})
Expand All @@ -185,11 +147,11 @@ func (bc mqttBenchContext) benchmarkPubSub(b *testing.B) {

b.Run("PUBSUB", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pubsub",
bc.runAndReport(b, "pubsub",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-subscribers", strconv.Itoa(bc.Subscribers),
"--subscribers", strconv.Itoa(bc.Subscribers),
)
})
})
Expand All @@ -206,67 +168,23 @@ func (bc mqttBenchContext) benchmarkSubRet(b *testing.B) {

b.Run("SUBRET", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "subret",
bc.runAndReport(b, "subret",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N), // number of subscribe requests
"--num-subscribers", strconv.Itoa(bc.Subscribers),
"--num-topics", strconv.Itoa(bc.Topics),
"--topics", strconv.Itoa(bc.Topics), // number of retained messages
"--subscribers", strconv.Itoa(bc.Subscribers),
"--size", strconv.Itoa(bc.MessageSize),
"--repeat", strconv.Itoa(b.N), // number of subscribe requests
)
})
})
}

func mqttBenchLookupCommand(b *testing.B, name string) string {
func (bc mqttBenchContext) runAndReport(b *testing.B, name string, extraArgs ...string) {
b.Helper()
cmd, err := exec.LookPath(name)
if err != nil {
b.Skipf("%q command is not found in $PATH. Please `go install github.com/nats-io/meta-nats/apps/go/mqtt/...@latest` and try again.", name)
}
return cmd
}

func (bc mqttBenchContext) runCommand(b *testing.B, name string, extraArgs ...string) {
b.Helper()

args := append([]string{
name,
"-q",
"--servers", fmt.Sprintf("%s:%d", bc.Host, bc.Port),
}, extraArgs...)

cmd := exec.Command(bc.testCmdPath, args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
b.Fatalf("Error executing %q: %v", cmd.String(), err)
}
defer stdout.Close()
errbuf := bytes.Buffer{}
cmd.Stderr = &errbuf
if err = cmd.Start(); err != nil {
b.Fatalf("Error executing %q: %v", cmd.String(), err)
}
r := &MQTTBenchmarkResult{}
if err = json.NewDecoder(stdout).Decode(r); err != nil {
b.Fatalf("failed to decode output of %q: %v\n\n%s", cmd.String(), err, errbuf.String())
}
if err = cmd.Wait(); err != nil {
b.Fatalf("Error executing %q: %v\n\n%s", cmd.String(), err, errbuf.String())
}

r := mqttRunExCommandTest(b, name, mqttNewDial("", "", bc.Host, bc.Port, ""), extraArgs...)
r.report(b)
}

func (bc mqttBenchContext) initServer(b *testing.B) {
b.Helper()
bc.runCommand(b, "pubsub",
"--id", "__init__",
"--qos", "0",
"--n", "1",
"--size", "100",
"--num-subscribers", "1")
}

func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func() {
b.Helper()
b.StopTimer()
Expand All @@ -278,7 +196,7 @@ func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func
o = s.getOpts()
bc.Host = o.MQTT.Host
bc.Port = o.MQTT.Port
bc.initServer(b)
mqttInitTestServer(b, mqttNewDial("", "", bc.Host, bc.Port, ""))
return func() {
testMQTTShutdownServer(s)
testDisableRMSCache = prevDisableRMSCache
Expand Down Expand Up @@ -314,7 +232,7 @@ func (bc *mqttBenchContext) startCluster(b *testing.B, disableRMSCache bool) fun
o := cl.randomNonLeader().getOpts()
bc.Host = o.MQTT.Host
bc.Port = o.MQTT.Port
bc.initServer(b)
mqttInitTestServer(b, mqttNewDial("", "", bc.Host, bc.Port, ""))
return func() {
cl.shutdown()
testDisableRMSCache = prevDisableRMSCache
Expand Down Expand Up @@ -410,15 +328,7 @@ func (r MQTTBenchmarkResult) report(b *testing.B) {
nsOp := float64(ns) / float64(r.Ops)
b.ReportMetric(nsOp/1000000, unit+"_ms/op")
}

// Diable ReportAllocs() since it confuses the github benchmarking action
// with the noise.
// b.ReportAllocs()
}

func mqttNewBenchEx(b *testing.B) *mqttBenchContext {
cmd := mqttBenchLookupCommand(b, "mqtt-test")
return &mqttBenchContext{
testCmdPath: cmd,
}
}
Loading

0 comments on commit c74bff4

Please sign in to comment.