diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e5ef00eae1..da7147b1e2 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2721,8 +2721,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // keep stream assignment current sa = mset.streamAssignment() - // keep peer list up to date with config - js.checkPeers(mset.raftGroup()) // We get this when we have a new stream assignment caused by an update. // We want to know if we are migrating. if migrating := mset.isMigrating(); migrating { @@ -2810,7 +2808,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Check if we have a quorom. if current >= neededCurrent { s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader) - n.UpdateKnownPeers(newPeers) + n.ProposeKnownPeers(newPeers) n.StepDown(newLeaderPeer) } } @@ -3345,22 +3343,6 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo { return replicas } -// Will check our node peers and see if we should remove a peer. -func (js *jetStream) checkPeers(rg *raftGroup) { - js.mu.Lock() - defer js.mu.Unlock() - - // FIXME(dlc) - Single replicas? - if rg == nil || rg.node == nil { - return - } - for _, peer := range rg.node.Peers() { - if !rg.isMember(peer.ID) { - rg.node.ProposeRemovePeer(peer.ID) - } - } -} - // Process a leader change for the clustered stream. func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { if mset == nil { @@ -3393,8 +3375,6 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { if isLeader { s.Noticef("JetStream cluster new stream leader for '%s > %s'", account, streamName) s.sendStreamLeaderElectAdvisory(mset) - // Check for peer removal and process here if needed. - js.checkPeers(sa.Group) mset.checkAllowMsgCompress(peers) } else { // We are stepping down. @@ -3611,7 +3591,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { js.processClusterCreateStream(acc, sa) } else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // We have one here even though we are not a member. This can happen on re-assignment. - s.removeStream(ourID, mset, sa) + s.removeStream(mset, sa) } // If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected. @@ -3699,13 +3679,13 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { js.processClusterUpdateStream(acc, osa, sa) } else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // We have one here even though we are not a member. This can happen on re-assignment. - s.removeStream(ourID, mset, sa) + s.removeStream(mset, sa) } } -// Common function to remove ourself from this server. +// Common function to remove ourselves from this server. // This can happen on re-assignment, move, etc -func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) { +func (s *Server) removeStream(mset *stream, nsa *streamAssignment) { if mset == nil { return } @@ -3715,7 +3695,6 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) if node.Leader() { node.StepDown(nsa.Group.Preferred) } - node.ProposeRemovePeer(ourID) // shutdown monitor by shutting down raft. node.Delete() } @@ -5051,8 +5030,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // We get this when we have a new consumer assignment caused by an update. // We want to know if we are migrating. rg := o.raftGroup() - // keep peer list up to date with config - js.checkPeers(rg) // If we are migrating, monitor for the new peers to be caught up. replicas, err := o.replica() if err != nil { @@ -5369,8 +5346,6 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err if isLeader { s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.serviceAccount(), streamName, consumerName) s.sendConsumerLeaderElectAdvisory(o) - // Check for peer removal and process here if needed. - js.checkPeers(ca.Group) } else { // We are stepping down. // Make sure if we are doing so because we have lost quorum that we send the appropriate advisories. diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index cd1c15642e..8253d109e7 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6945,6 +6945,60 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) { require_NoError(t, err) } +func TestJetStreamClusterStreamUpscalePeersAfterDownscale(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R5S", 5) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + checkPeerSet := func() { + t.Helper() + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + peers := mset.raftNode().Peers() + if len(peers) != 5 { + return fmt.Errorf("expected 5 peers, got %d", len(peers)) + } + } + return nil + }) + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 5, + }) + require_NoError(t, err) + + checkPeerSet() + + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 5, + }) + require_NoError(t, err) + + checkPeerSet() +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/raft.go b/server/raft.go index 72a66f3a21..1cdcd66c16 100644 --- a/server/raft.go +++ b/server/raft.go @@ -61,6 +61,7 @@ type RaftNode interface { ID() string Group() string Peers() []*Peer + ProposeKnownPeers(knownPeers []string) UpdateKnownPeers(knownPeers []string) ProposeAddPeer(peer string) error ProposeRemovePeer(peer string) error @@ -1699,19 +1700,23 @@ func (n *raft) Peers() []*Peer { return peers } +// Update and propose our known set of peers. +func (n *raft) ProposeKnownPeers(knownPeers []string) { + // If we are the leader update and send this update out. + if n.State() != Leader { + return + } + n.UpdateKnownPeers(knownPeers) + n.sendPeerState() +} + // Update our known set of peers. func (n *raft) UpdateKnownPeers(knownPeers []string) { n.Lock() // Process like peer state update. ps := &peerState{knownPeers, len(knownPeers), n.extSt} n.processPeerState(ps) - isLeader := n.State() == Leader n.Unlock() - - // If we are the leader send this update out as well. - if isLeader { - n.sendPeerState() - } } // ApplyQ returns the apply queue that new commits will be sent to for the