From 70aed7c37697b24b844ba2501794e9d45ba4084a Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 20 Sep 2024 17:56:20 +0100 Subject: [PATCH 1/3] De-flake `TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer` (again) Signed-off-by: Neil Twigg --- server/raft_test.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/server/raft_test.go b/server/raft_test.go index 02ba3bb84ca..545561eec97 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -355,6 +355,15 @@ func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) { } func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { + // This test relies on nodes not hitting their election timer too often, + // otherwise the step later where we capture the election time before and + // after the failed vote request will flake. + origMinTimeout, origMaxTimeout, origHBInterval := minElectionTimeout, maxElectionTimeout, hbInterval + minElectionTimeout, maxElectionTimeout, hbInterval = time.Second*5, time.Second*10, time.Second*10 + defer func() { + minElectionTimeout, maxElectionTimeout, hbInterval = origMinTimeout, origMaxTimeout, origHBInterval + }() + c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() c.waitOnLeader() @@ -379,6 +388,8 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { // time so that it guarantees that both the leader and the follower aren't // operating at the time we take the etlr snapshots. rg.lockAll() + leader.resetElect(maxElectionTimeout) + follower.resetElect(maxElectionTimeout) leaderOriginal := leader.etlr followerOriginal := follower.etlr vr := &voteRequest{ @@ -404,9 +415,11 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { // Neither the leader nor our chosen follower should have updated their // election timer as a result of this. rg.lockAll() - defer rg.unlockAll() - require_True(t, leaderOriginal.Equal(leader.etlr)) - require_True(t, followerOriginal.Equal(follower.etlr)) + leaderEqual := leaderOriginal.Equal(leader.etlr) + followerEqual := followerOriginal.Equal(follower.etlr) + rg.unlockAll() + require_True(t, leaderEqual) + require_True(t, followerEqual) } func TestNRGInvalidTAVDoesntPanic(t *testing.T) { From ccc51ae8b681d58a2d6c8c4afde1e6857edc2c01 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 23 Sep 2024 09:11:14 -0700 Subject: [PATCH 2/3] [IMPROVED] Limit stepdown to system account. (#5914) Signed-off-by: Derek Collison --------- Signed-off-by: Derek Collison --- server/jetstream_api.go | 6 +++++ server/jetstream_cluster.go | 5 ++-- server/jetstream_cluster_1_test.go | 37 ++++++++++++++++++++++++++ server/jetstream_cluster_3_test.go | 11 ++++++-- server/jetstream_super_cluster_test.go | 4 +-- server/jetstream_test.go | 3 ++- server/norace_test.go | 11 ++++++-- server/stream.go | 4 +-- 8 files changed, 69 insertions(+), 12 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6e7d82f9fc0..7bcc7d37ebf 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2764,6 +2764,12 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun return } + // This should only be coming from the System Account. + if acc != s.SystemAccount() { + s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User) + return + } + js, cc := s.getJetStreamCluster() if js == nil || cc == nil || cc.meta == nil { return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 14af031ea4e..06c3c11ca72 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -583,10 +583,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum return false } s := js.srv - js.mu.RUnlock() - // Capture RAFT node from assignment. node := ca.Group.node + js.mu.RUnlock() // When we try to restart we nil out the node if applicable // and reprocess the consumer assignment. @@ -1039,7 +1038,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool { ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { - if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() { + if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) { return true } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b759e557078..f5f345e484f 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6403,6 +6403,43 @@ func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) { require_Equal(t, si.State.Msgs, 0) } +// Make sure to not allow non-system accounts to move meta leader. +func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomServer() + + // Client based API + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + ml := c.leader() + + _, err := nc.Request(JSApiLeaderStepDown, nil, time.Second) + require_Error(t, err, nats.ErrTimeout) + + // Make sure we did not move. + c.waitOnLeader() + require_Equal(t, ml, c.leader()) + + // System user can move it. + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + resp, err := snc.Request(JSApiLeaderStepDown, nil, time.Second) + require_NoError(t, err) + + var sdr JSApiLeaderStepDownResponse + require_NoError(t, json.Unmarshal(resp.Data, &sdr)) + require_True(t, sdr.Success) + require_Equal(t, sdr.Error, nil) + + // Make sure we did move this time. + c.waitOnLeader() + require_NotEqual(t, ml, c.leader()) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 82c928440e7..bedcccb0044 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4900,8 +4900,9 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { } ` - _, syspub := createKey(t) + sysKp, syspub := createKey(t) sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) accKp, aExpPub := createKey(t) accClaim := jwt.NewAccountClaims(aExpPub) @@ -4994,6 +4995,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { // Move our R3 stream leader and make sure acounting is correct. _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second) require_NoError(t, err) + c.waitOnStreamLeader(aExpPub, "TEST1") checkAccount(sir1.State.Bytes, sir3.State.Bytes) @@ -5025,6 +5027,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { Replicas: 3, }) require_NoError(t, err) + c.waitOnStreamLeader(aExpPub, "TEST1") checkAccount(sir1.State.Bytes, sir3.State.Bytes) @@ -5042,11 +5045,15 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { checkAccount(sir1.State.Bytes, sir3.State.Bytes) + // Need system user here to move the leader. + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserCredentials(sysCreds)) + defer snc.Close() + requestLeaderStepDown := func() { ml := c.leader() checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { if cml := c.leader(); cml == ml { - nc.Request(JSApiLeaderStepDown, nil, time.Second) + snc.Request(JSApiLeaderStepDown, nil, time.Second) return fmt.Errorf("Metaleader has not moved yet") } return nil diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 7323af98b7a..0e095bd30d4 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -3361,13 +3361,13 @@ func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) { defer sCluster.shutdown() requestLeaderStepDown := func(clientURL string) error { - nc, err := nats.Connect(clientURL) + nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!")) if err != nil { return err } defer nc.Close() - ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second) + ncResp, err := nc.Request(JSApiLeaderStepDown, nil, time.Second) if err != nil { return err } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 1ad01d8f4dd..aff9dcd9213 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -6542,6 +6542,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) { listen: 127.0.0.1:%d routes = [%s] } + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` storeCnf := func(serverName, clusterName, storeDir, conf string) string { switch serverName { @@ -6566,7 +6567,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) { defer cluster.shutdown() requestLeaderStepDown := func(clientURL string) error { - nc, err := nats.Connect(clientURL) + nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!")) if err != nil { return err } diff --git a/server/norace_test.go b/server/norace_test.go index f1a8fa51688..c2ddf6aa669 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1882,7 +1882,10 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) { doneCh := make(chan bool) if sl == sc.leader() { - nc.Request(JSApiLeaderStepDown, nil, time.Second) + snc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + _, err := snc.Request(JSApiLeaderStepDown, nil, time.Second) + require_NoError(t, err) sc.waitOnLeader() } @@ -10315,7 +10318,11 @@ func TestNoRaceWQAndMultiSubjectFiltersRace(t *testing.T) { return nil } // Move meta-leader since stream can be R1. - nc.Request(JSApiLeaderStepDown, nil, time.Second) + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + if _, err := snc.Request(JSApiLeaderStepDown, nil, time.Second); err != nil { + return err + } return fmt.Errorf("stream leader on meta-leader") }) diff --git a/server/stream.go b/server/stream.go index fefbc1f11dd..428891cf9d8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -650,9 +650,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt mset.store.FastState(&state) // Possible race with consumer.setLeader during recovery. - mset.mu.RLock() + mset.mu.Lock() mset.lseq = state.LastSeq - mset.mu.RUnlock() + mset.mu.Unlock() // If no msgs (new stream), set dedupe state loaded to true. if state.Msgs == 0 { From e9585598132cf3f6fc90e195cdd7e1ba48b8df0a Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 23 Sep 2024 15:00:41 +0200 Subject: [PATCH 3/3] Log only durable orphaned consumers But log the rest at debug for dev purposes Signed-off-by: R.I.Pienaar --- server/jetstream_cluster.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 06c3c11ca72..cc301e9bf84 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1200,7 +1200,12 @@ func (js *jetStream) checkForOrphans() { stream = mset.cfg.Name mset.mu.RUnlock() } - s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + if o.isDurable() { + s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + } else { + s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + } + if err := o.delete(); err != nil { s.Warnf("Deleting consumer encountered an error: %v", err) }