Skip to content

Commit

Permalink
Add TestJetStreamClusterRoutedAPIRecoverPerformance
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored and wallyqs committed Jan 9, 2025
1 parent f8168e9 commit 82d47e2
Showing 1 changed file with 66 additions and 0 deletions.
66 changes: 66 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4457,3 +4457,69 @@ func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) {
// it should succeed.
require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))
}

func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

// We only run 16 JetStream API workers.
mp := runtime.GOMAXPROCS(0)
if mp > 16 {
mp = 16
}

leader := c.leader()
ljs := leader.js.Load()

// Take the JS lock, which allows the JS API queue to build up.
ljs.mu.Lock()
defer ljs.mu.Unlock()

count := JSDefaultRequestQueueLimit - 1
ch := make(chan *nats.Msg, count)

inbox := nc.NewRespInbox()
_, err := nc.ChanSubscribe(inbox, ch)
require_NoError(t, err)

// To ensure a fair starting line, we need to submit as many tasks as
// there are JS workers whilst holding the JS lock. This will ensure that
// each JS API worker is properly wedged.
msg := &nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"),
Reply: "no_one_here",
}
for i := 0; i < mp; i++ {
require_NoError(t, nc.PublishMsg(msg))
}

// Then we want to submit a fixed number of tasks, big enough to fill
// the queue, so that we can measure them.
msg = &nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"),
Reply: inbox,
}
for i := 0; i < count; i++ {
require_NoError(t, nc.PublishMsg(msg))
}
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
if queued := leader.jsAPIRoutedReqs.len(); queued != count {
return fmt.Errorf("expected %d queued requests, got %d", count, queued)
}
return nil
})

// Now we're going to release the lock and start timing. The workers
// will now race to clear the queues and we'll wait to see how long
// it takes for them all to respond.
start := time.Now()
ljs.mu.Unlock()
for i := 0; i < count; i++ {
<-ch
}
ljs.mu.Lock()
t.Logf("Took %s to clear %d items", time.Since(start), count)
}

0 comments on commit 82d47e2

Please sign in to comment.