Skip to content

Commit

Permalink
Leverage Antithesis SDK for server tests
Browse files Browse the repository at this point in the history
Overview:

1. Introduce dependency on antithesis-sdk-go (default NOOP variant)
2. Introduce assertions utilities suitable for use in tests
3. Instrument frequently used test utilities with the new assertions

N.B. unless the `enable_antithesis_sdk` tag is specified at build time, both the SDK and the assertions wrappers are NOOP, and should have no effect.

This will also facilitate other Antithesis one off experiments since developers no longer have to manually add the SDK to their branch.
  • Loading branch information
mprimi committed Dec 12, 2024
1 parent cc0b181 commit 5855303
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
toolchain go1.22.8

require (
github.com/antithesishq/antithesis-sdk-go v0.4.2-default-no-op
github.com/google/go-tpm v0.9.0
github.com/klauspost/compress v1.17.11
github.com/minio/highwayhash v1.0.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/antithesishq/antithesis-sdk-go v0.4.2-default-no-op h1:fpjv35LjUu5j4MC1aaMlmCMNECdl4hj0yK2CWdPhNjA=
github.com/antithesishq/antithesis-sdk-go v0.4.2-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-tpm v0.9.0 h1:sQF6YqWMi+SCXpsmS3fd21oPy/vSddwZry4JnmltHVk=
Expand Down
27 changes: 27 additions & 0 deletions internal/antithesis/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This file is used iff the `enable_antithesis_sdk` build tag is not present
//go:build !enable_antithesis_sdk

package antithesis

import (
"testing"
)

// AssertUnreachable this implementation is a NOOP
func AssertUnreachable(_ testing.TB, _ string, _ map[string]any) {}

// Assert this implementation is a NOOP
func Assert(_ testing.TB, _ bool, _ string, _ map[string]any) {}
111 changes: 111 additions & 0 deletions internal/antithesis/test_assert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This file is used iff the `enable_antithesis_sdk` build tag is present
//go:build enable_antithesis_sdk

package antithesis

import (
"encoding/json"
"fmt"
"runtime/debug"
"testing"

"github.com/antithesishq/antithesis-sdk-go/assert"
)

// This file provides assertions utility functions suitable for use in tests.
// It is a thin wrapper around the Antithesis SDK.
//
// Notice that unlike other assertions libraries, a violation does not halt execution or fail the test.
// The effects of violating an assertion, are:
// 1. Print the violation message, prefixed by the test name in which it happened
// 2. Print the stack for the calling goroutine so each failed test has a clear trace of the failure
// 3. Invoke the underlying Antithesis assertion
//
// N.B. Enabling this module outside for tests running outside of Antithesis will enable 1 and 2 above, but not 3.
// therefore it can be useful to output additional test failure details when running tests locally or in CI.

// AssertUnreachable is used to flag code branches that should not get invoked ever.
// Example:
//
// pubAck, err := js.Publish(...)
//
// if err != nil {
// antithesis.AssertUnreachable(t, "Publish failed", map[string]any{"error": err.Error()})
// t.Fatalf("Publish failed with error: %s", err)
// }
func AssertUnreachable(t testing.TB, message string, details map[string]any) {
// Always print a message
fmt.Printf("{*} [%s] Assert Unreachable violation: %s\n", t.Name(), message)
if details != nil && len(details) > 0 {
fmt.Printf("{*} Details:\n")
jsonDetails, err := json.MarshalIndent(details, "", " ")
if err != nil {
panic(err)
}
fmt.Println(string(jsonDetails))
}

// Always print the stack trace
fmt.Printf("{*} Stack trace:\n")
debug.PrintStack()

// N.B. as of today, message (as-is) is the unique identifier of the event
// Therefore this will de-duplicate the same assertion failing in 2 different tests
// But not the same assertion failing at 2 different lines of the same test
messageWithTestName := fmt.Sprintf("[%s] %s", t.Name(), message)

// Fire assertion violation event (if Antithesis is enabled)
assert.Unreachable(messageWithTestName, details)
}

// Assert is used to check that some given condition is always true,
// Example:
//
// antithesis.Assert(t, sequence > lastSequence, "Non-monotonic stream sequence number", map[string]any{
// "stream": streamName,
// "connection_id": nc.Id(),
// "sequence": sequence,
// "lastSequence": lastSequence,
// })
func Assert(t testing.TB, condition bool, message string, details map[string]any) {
// Condition is true, nothing to do
if condition {
return
}

// Always print a message
fmt.Printf("{*} [%s] Assert violation: %s\n", t.Name(), message)
if details != nil && len(details) > 0 {
fmt.Printf("{*} Details:\n")
jsonDetails, err := json.MarshalIndent(details, "", " ")
if err != nil {
panic(err)
}
fmt.Println(string(jsonDetails))
}

// Always print the stack trace
fmt.Printf("{*} Stack trace:\n")
debug.PrintStack()

// N.B. as of today, message (as-is) is the unique identifier of the event
// Therefore this will de-duplicate the same assertion failing in 2 different tests
// But not the same assertion failing at 2 different lines of the same test
messageWithTestName := fmt.Sprintf("[%s] %s", t.Name(), message)

// Fire assertion violation event (if Antithesis is enabled)
assert.AlwaysOrUnreachable(false, messageWithTestName, details)
}
62 changes: 62 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (

"github.com/nats-io/nats.go"
"golang.org/x/time/rate"

"github.com/nats-io/nats-server/v2/internal/antithesis"
)

// Support functions
Expand Down Expand Up @@ -104,6 +106,10 @@ func (sc *supercluster) waitOnStreamLeader(account, stream string) {
}
time.Sleep(100 * time.Millisecond)
}
antithesis.AssertUnreachable(sc.t, "Timeout in supercluster.waitOnStreamLeader", map[string]any{
"account": account,
"stream": stream,
})
sc.t.Fatalf("Expected a stream leader for %q %q, got none", account, stream)
}

Expand Down Expand Up @@ -553,6 +559,7 @@ func (sc *supercluster) waitOnLeader() {
}
time.Sleep(25 * time.Millisecond)
}
antithesis.AssertUnreachable(sc.t, "Timeout in supercluster.waitOnStreamLeader", nil)
sc.t.Fatalf("Expected a cluster leader, got none")
}

Expand Down Expand Up @@ -590,6 +597,7 @@ func (sc *supercluster) waitOnPeerCount(n int) {
}
time.Sleep(100 * time.Millisecond)
}
antithesis.AssertUnreachable(sc.t, "Timeout in supercluster.waitOnPeerCount", nil)
sc.t.Fatalf("Expected a super cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers()))
}

Expand Down Expand Up @@ -1248,6 +1256,9 @@ func fetchMsgs(t *testing.T, sub *nats.Subscription, numExpected int, totalWait
for start, count, wait := time.Now(), numExpected, totalWait; len(result) != numExpected; {
msgs, err := sub.Fetch(count, nats.MaxWait(wait))
if err != nil {
antithesis.AssertUnreachable(t, "Fetch error", map[string]any{
"error": err.Error(),
})
t.Fatal(err)
}
result = append(result, msgs...)
Expand All @@ -1257,6 +1268,10 @@ func fetchMsgs(t *testing.T, sub *nats.Subscription, numExpected int, totalWait
}
}
if len(result) != numExpected {
antithesis.AssertUnreachable(t, "Unexpected fetch messages count", map[string]any{
"expected": numExpected,
"actual": len(result),
})
t.Fatalf("Unexpected msg count, got %d, want %d", len(result), numExpected)
}
return result
Expand Down Expand Up @@ -1307,6 +1322,9 @@ func (c *cluster) waitOnPeerCount(n int) {
leader = c.leader()
}
}
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnPeerCount", map[string]any{
"cluster": c.name,
})
c.t.Fatalf("Expected a cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers()))
}

Expand All @@ -1320,6 +1338,12 @@ func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) {
}
time.Sleep(100 * time.Millisecond)
}
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnConsumerLeader", map[string]any{
"cluster": c.name,
"account": account,
"stream": stream,
"consumer": consumer,
})
c.t.Fatalf("Expected a consumer leader for %q %q %q, got none", account, stream, consumer)
}

Expand Down Expand Up @@ -1353,6 +1377,11 @@ func (c *cluster) waitOnStreamLeader(account, stream string) {
}
time.Sleep(100 * time.Millisecond)
}
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnStreamLeader", map[string]any{
"cluster": c.name,
"account": account,
"stream": stream,
})
c.t.Fatalf("Expected a stream leader for %q %q, got none", account, stream)
}

Expand Down Expand Up @@ -1386,6 +1415,11 @@ func (c *cluster) waitOnStreamCurrent(s *Server, account, stream string) {
}
time.Sleep(100 * time.Millisecond)
}
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnStreamCurrent", map[string]any{
"cluster": c.name,
"account": account,
"stream": stream,
})
c.t.Fatalf("Expected server %q to eventually be current for stream %q", s, stream)
}

Expand All @@ -1399,6 +1433,10 @@ func (c *cluster) waitOnServerHealthz(s *Server) {
}
time.Sleep(100 * time.Millisecond)
}
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnServerHealthz", map[string]any{
"cluster": c.name,
"server": s.Name(),
})
c.t.Fatalf("Expected server %q to eventually return healthz 'ok', but got %q", s, s.healthz(nil).Error)
}

Expand All @@ -1411,6 +1449,10 @@ func (c *cluster) waitOnServerCurrent(s *Server) {
return
}
}
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnServerCurrent", map[string]any{
"cluster": c.name,
"server": s.Name(),
})
c.t.Fatalf("Expected server %q to eventually be current", s)
}

Expand Down Expand Up @@ -1458,6 +1500,9 @@ func (c *cluster) expectNoLeader() {
}
time.Sleep(20 * time.Millisecond)
}
antithesis.AssertUnreachable(c.t, "Timeout in cluster.expectNoLeader", map[string]any{
"cluster": c.name,
})
c.t.Fatalf("Expected no leader but have one")
}

Expand All @@ -1472,6 +1517,9 @@ func (c *cluster) waitOnLeader() {
time.Sleep(10 * time.Millisecond)
}

antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnLeader", map[string]any{
"cluster": c.name,
})
c.t.Fatalf("Expected a cluster leader, got none")
}

Expand All @@ -1491,6 +1539,9 @@ func (c *cluster) waitOnAccount(account string) {
continue
}

antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnAccount", map[string]any{
"account": account,
})
c.t.Fatalf("Expected account %q to exist but didn't", account)
}

Expand Down Expand Up @@ -1522,15 +1573,26 @@ func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) {

if leader == nil {
c.shutdown()
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnClusterReadyWithNumPeers (1)", map[string]any{
"cluster": c.name,
})
c.t.Fatalf("Failed to elect a meta-leader")
}

peersSeen := len(leader.JetStreamClusterPeers())
c.shutdown()

if leader == nil {
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnClusterReadyWithNumPeers (2)", map[string]any{
"cluster": c.name,
})
c.t.Fatalf("Expected a cluster leader and fully formed cluster, no leader")
} else {
antithesis.AssertUnreachable(c.t, "Timeout in cluster.waitOnClusterReadyWithNumPeers (3)", map[string]any{
"cluster": c.name,
"peers_expected": numPeersExpected,
"peers_seen": peersSeen,
})
c.t.Fatalf("Expected a fully formed cluster, only %d of %d peers seen", peersSeen, numPeersExpected)
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/nats-io/nats.go"

"github.com/nats-io/nats-server/v2/internal/antithesis"
srvlog "github.com/nats-io/nats-server/v2/logger"
)

Expand All @@ -57,6 +58,7 @@ func checkFor(t testing.TB, totalWait, sleepDur time.Duration, f func() error) {
t.Helper()
err := checkForErr(totalWait, sleepDur, f)
if err != nil {
antithesis.AssertUnreachable(t, "Timeout in checkFor", nil)
t.Fatal(err.Error())
}
}
Expand Down
Loading

0 comments on commit 5855303

Please sign in to comment.