Skip to content

Commit

Permalink
Add 'Chain of Blocks' tests for Raft
Browse files Browse the repository at this point in the history
Chain of Blocks is a simple state machine on top of Raft/NRG.
Values delivered through Raft are hashed allowing for easy verification
of replicas divergence.

 - Add one happy-path test
 - Add one test for recovery/catchup
 - Add a 'Long' randomized test
  • Loading branch information
mprimi committed Jan 2, 2025
1 parent afb7e0b commit fc3553a
Show file tree
Hide file tree
Showing 3 changed files with 914 additions and 0 deletions.
300 changes: 300 additions & 0 deletions server/jetstream_cluster_long_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package server
import (
"fmt"
"math/rand"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -183,3 +184,302 @@ func TestLongKVPutWithServerRestarts(t *testing.T) {
)
}
}

// This is a RaftChainOfBlocks test that randomly starts and stops nodes to exercise recovery and snapshots.
func TestLongNRGChainOfBlocks(t *testing.T) {
const (
ClusterSize = 3
GroupSize = 3
ConvergenceTimeout = 30 * time.Second
Duration = 10 * time.Minute
PrintStateInterval = 3 * time.Second
)

// Create cluster
c := createJetStreamClusterExplicit(t, "Test", ClusterSize)
defer c.shutdown()

rg := c.createRaftGroup("ChainOfBlocks", GroupSize, newRaftChainStateMachine)
rg.waitOnLeader()

// Available operations
type TestOperation string
const (
StopOne TestOperation = "Stop one active node"
StopAll = "Stop all active nodes"
RestartOne = "Restart one stopped node"
RestartAll = "Restart all stopped nodes"
Snapshot = "Snapshot one active node"
Propose = "Propose a value via one active node"
ProposeLeader = "Propose a value via leader"
Pause = "Let things run undisturbed for a while"
Check = "Wait for nodes to converge"
)

// Weighted distribution of operations, one is randomly chosen from this vector in each iteration
opsWeighted := []TestOperation{
StopOne,
StopAll,
RestartOne,
RestartOne,
RestartAll,
RestartAll,
RestartAll,
Snapshot,
Snapshot,
Propose,
Propose,
Propose,
Propose,
Propose,
Propose,
ProposeLeader,
ProposeLeader,
ProposeLeader,
ProposeLeader,
ProposeLeader,
ProposeLeader,
Pause,
Pause,
Pause,
Pause,
Pause,
Pause,
Check,
Check,
Check,
Check,
}

rng := rand.New(rand.NewSource(time.Now().UnixNano()))

// Chose a node from the list (and remove it)
pickRandomNode := func(nodes []stateMachine) ([]stateMachine, stateMachine) {
if len(nodes) == 0 {
// Input list is empty
return nodes, nil
}
// Pick random node
i := rng.Intn(len(nodes))
node := nodes[i]
// Move last element in its place
nodes[i] = nodes[len(nodes)-1]
// Return slice excluding last element
return nodes[:len(nodes)-1], node
}

// Create summary status string for all replicas
chainStatusString := func() string {
b := strings.Builder{}
for _, sm := range rg {
csm := sm.(*RCOBStateMachine)
running, blocksCount, blockHash := csm.getCurrentHash()
if running {
b.WriteString(
fmt.Sprintf(
" [%s (%s): %d blocks, hash=%s],",
csm.server().Name(),
csm.node().ID(),
blocksCount,
blockHash,
),
)
} else {
b.WriteString(
fmt.Sprintf(
" [%s (%s): STOPPED],",
csm.server().Name(),
csm.node().ID(),
),
)

}
}
return b.String()
}

// Track the highest number of blocks applied by any of the replicas
highestBlocksCount := uint64(0)

// Track active and stopped nodes
activeNodes := make([]stateMachine, 0, GroupSize)
stoppedNodes := make([]stateMachine, 0, GroupSize)

// Initially all nodes are active
activeNodes = append(activeNodes, rg...)

defer func() {
t.Logf("Final state: %s", chainStatusString())
}()

printStateTicker := time.NewTicker(PrintStateInterval)
testTimer := time.NewTimer(Duration)
start := time.Now()
iteration := 0

for {

iteration++
select {
case <-printStateTicker.C:
t.Logf(
"[%s] State: %s",
time.Since(start).Round(time.Second),
chainStatusString(),
)
case <-testTimer.C:
// Test completed
return
default:
// Continue
}

// Choose a random operation to perform in this iteration
nextOperation := opsWeighted[rng.Intn(len(opsWeighted))]
if RCOBOptions.verbose {
t.Logf("Iteration %d: %s", iteration, nextOperation)
}

switch nextOperation {

case StopOne:
// Stop an active node (if any are left active)
var n stateMachine
activeNodes, n = pickRandomNode(activeNodes)
if n != nil {
n.stop()
stoppedNodes = append(stoppedNodes, n)
}

case StopAll:
// Stop all active nodes (if any are active)
for _, node := range activeNodes {
node.stop()
}
stoppedNodes = append(stoppedNodes, activeNodes...)
activeNodes = make([]stateMachine, 0, GroupSize)

case RestartOne:
// Restart a stopped node (if any are stopped)
var n stateMachine
stoppedNodes, n = pickRandomNode(stoppedNodes)
if n != nil {
n.restart()
activeNodes = append(activeNodes, n)
}

case RestartAll:
// Restart all stopped nodes (if any)
for _, node := range stoppedNodes {
node.restart()
}
activeNodes = append(activeNodes, stoppedNodes...)
stoppedNodes = make([]stateMachine, 0, GroupSize)

case Snapshot:
// Choose a random active node and tell it to create a snapshot
if len(activeNodes) > 0 {
n := activeNodes[rng.Intn(len(activeNodes))]
n.(*RCOBStateMachine).createSnapshot()
}

case Propose:
// Make an active node propose the next block (if any nodes are active)
if len(activeNodes) > 0 {
n := activeNodes[rng.Intn(len(activeNodes))]
n.(*RCOBStateMachine).proposeBlock()
}

case ProposeLeader:
// Make the leader propose the next block (if a leader is active)
leader := rg.leader()
if leader != nil {
leader.(*RCOBStateMachine).proposeBlock()
}

case Pause:
// Noop, let things run undisturbed for a little bit
time.Sleep(time.Duration(rng.Intn(250)) * time.Millisecond)

case Check:
// Restart any stopped node
for _, node := range stoppedNodes {
node.restart()
}
activeNodes = append(activeNodes, stoppedNodes...)
stoppedNodes = make([]stateMachine, 0, GroupSize)

// Ensure all nodes (eventually) converge
checkFor(t, ConvergenceTimeout, 250*time.Millisecond,
func() error {
referenceNode := rg[0]
// Save block count and hash of first node as reference
_, referenceBlocksCount, referenceHash := referenceNode.(*RCOBStateMachine).getCurrentHash()

// Compare each node against reference
for _, n := range rg {
sm := n.(*RCOBStateMachine)
running, blocksCount, blockHash := sm.getCurrentHash()
if !running {
return fmt.Errorf(
"node not running: %s (%s)",
sm.server().Name(),
sm.node().ID(),
)
}

// Track the highest block delivered by any node
if blocksCount > highestBlocksCount {
if RCOBOptions.verbose {
t.Logf(
"New highest blocks count: %d (%s (%s))",
blocksCount,
sm.s.Name(),
sm.n.ID(),
)
}
highestBlocksCount = blocksCount
}

// Each replica must match the reference node

if blocksCount != referenceBlocksCount {
return fmt.Errorf(
"different number of blocks %d (%s (%s) vs. %d (%s (%s))",
blocksCount,
sm.server().Name(),
sm.node().ID(),
referenceBlocksCount,
referenceNode.server().Name(),
referenceNode.node().ID(),
)
} else if blockHash != referenceHash {
return fmt.Errorf(
"different hash after %d blocks %s (%s (%s) vs. %s (%s (%s))",
blocksCount,
blockHash,
sm.server().Name(),
sm.node().ID(),
referenceHash,
referenceNode.server().Name(),
referenceNode.node().ID(),
)
}
}

// Verify consistency check was against the highest block known
if referenceBlocksCount < highestBlocksCount {
return fmt.Errorf(
"nodes converged below highest known block count: %d: %s",
highestBlocksCount,
chainStatusString(),
)
}

// All nodes reached the same state, check passed
return nil
},
)
}
}
}
Loading

0 comments on commit fc3553a

Please sign in to comment.