diff --git a/scripts/runTestsOnTravis.sh b/scripts/runTestsOnTravis.sh index 7561bd72dda..f60a9528313 100755 --- a/scripts/runTestsOnTravis.sh +++ b/scripts/runTestsOnTravis.sh @@ -107,7 +107,8 @@ elif [ "$1" = "srv_pkg_non_js_tests" ]; then # by using `skip_js_tests`, MQTT tests by using `skip_mqtt_tests` and # message tracing tests by using `skip_msgtrace_tests`. - go test -race -v -p=1 ./server/... -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast + # Also including the ldflag with the version since this includes the `TestVersionMatchesTag`. + go test -race -v -p=1 ./server/... -ldflags="-X=github.com/nats-io/nats-server/v2/server.serverVersion=$TRAVIS_TAG" -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast elif [ "$1" = "non_srv_pkg_tests" ]; then diff --git a/server/client.go b/server/client.go index 99134bd0c5b..e68d70fa947 100644 --- a/server/client.go +++ b/server/client.go @@ -5436,7 +5436,9 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { if !ok { if c.kind == ROUTER && len(c.route.accName) > 0 { - acc = c.acc + if acc = c.acc; acc == nil { + return nil, nil + } } else { // Match correct account and sublist. if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { diff --git a/server/consumer.go b/server/consumer.go index f7970732757..6eb0b4745ba 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2898,6 +2898,28 @@ func (o *consumer) isFiltered() bool { return false } +// Check if we would have matched and needed an ack for this store seq. +// This is called for interest based retention streams to remove messages. +func (o *consumer) matchAck(sseq uint64) bool { + o.mu.RLock() + defer o.mu.RUnlock() + + // Check if we are filtered, and if so check if this is even applicable to us. + if o.isFiltered() { + if o.mset == nil { + return false + } + var svp StoreMsg + if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { + return false + } + if !o.isFilteredMatch(svp.subj) { + return false + } + } + return true +} + // Check if we need an ack for this store seq. // This is called for interest based retention streams to remove messages. func (o *consumer) needAck(sseq uint64, subj string) bool { @@ -5499,7 +5521,9 @@ func (o *consumer) checkStateForInterestStream() error { } for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ { - mset.ackMsg(o, seq) + if o.matchAck(seq) { + mset.ackMsg(o, seq) + } } o.mu.RLock() diff --git a/server/filestore.go b/server/filestore.go index e059d28bec4..975e312ed89 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2610,6 +2610,11 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { // This is used to see if we can selectively jump start blocks based on filter subject and a floor block index. // Will return -1 if no matches at all. func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) { + if filter == _EMPTY_ { + filter = fwcs + wc = true + } + start, stop := uint32(math.MaxUint32), uint32(0) if wc { fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { @@ -4029,7 +4034,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // All other more thorough cleanup will happen in syncBlocks logic. // Note that we do not have to store empty records for the deleted, so don't use to calculate. // TODO(dlc) - This should not be inline, should kick the sync routine. - if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock { + if !isLastBlock && mb.shouldCompactInline() { mb.compact() } } @@ -4091,6 +4096,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( return true, nil } +// Tests whether we should try to compact this block while inline removing msgs. +// We will want rbytes to be over the minimum and have a 2x potential savings. +// Lock should be held. +func (mb *msgBlock) shouldCompactInline() bool { + return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes +} + +// Tests whether we should try to compact this block while running periodic sync. +// We will want rbytes to be over the minimum and have a 2x potential savings. +// Ignores 2MB minimum. +// Lock should be held. +func (mb *msgBlock) shouldCompactSync() bool { + return mb.bytes*2 < mb.rbytes +} + // This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. // This should not be called on the lmb since we will prune tail deleted messages which could cause issues with // writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. @@ -4984,6 +5004,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte } // Write index mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit) + } else { + // Make sure to account for tombstones in rbytes. + mb.rbytes += rl } fch, werr := mb.fch, mb.werr @@ -5327,7 +5350,7 @@ func (fs *fileStore) syncBlocks() { // Check if we should compact here as well. // Do not compact last mb. var needsCompact bool - if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes { + if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.shouldCompactSync() { needsCompact = true markDirty = true } @@ -6424,7 +6447,10 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store // Nothing found in this block. We missed, if first block (bi) check psim. // Similar to above if start <= first seq. // TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers. - if i == bi { + // We should not do this at all if we are already on the last block. + // Also if we are a wildcard do not check if large subject space. + const wcMaxSizeToCheck = 64 * 1024 + if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) { nbi, lbi := fs.checkSkipFirstBlock(filter, wc) // Nothing available. if nbi < 0 || lbi <= bi { diff --git a/server/filestore_test.go b/server/filestore_test.go index e3f40a89c98..a03d63e00ab 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7248,6 +7248,94 @@ func TestFileStoreCheckSkipFirstBlockBug(t *testing.T) { require_NoError(t, err) } +// https://github.com/nats-io/nats-server/issues/5705 +func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 128}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + // Create 4 blocks, each block holds 2 msgs + for i := 0; i < 4; i++ { + fs.StoreMsg("foo.22.bar", nil, msg) + fs.StoreMsg("foo.22.baz", nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 4) + + nbi, lbi := fs.checkSkipFirstBlock(_EMPTY_, false) + require_Equal(t, nbi, 0) + require_Equal(t, lbi, 3) +} + +// https://github.com/nats-io/nats-server/issues/5702 +func TestFileStoreTombstoneRbytes(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 1024}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // Block can hold 24 msgs. + // So will fill one block and half of the other + msg := []byte("hello") + for i := 0; i < 34; i++ { + fs.StoreMsg("foo.22", nil, msg) + } + require_True(t, fs.numMsgBlocks() > 1) + // Now delete second half of first block which will place tombstones in second blk. + for seq := 11; seq <= 24; seq++ { + fs.RemoveMsg(uint64(seq)) + } + // Now check that rbytes has been properly accounted for in second block. + fs.mu.RLock() + blk := fs.blks[1] + fs.mu.RUnlock() + + blk.mu.RLock() + bytes, rbytes := blk.bytes, blk.rbytes + blk.mu.RUnlock() + require_True(t, rbytes > bytes) +} + +func TestFileStoreMsgBlockShouldCompact(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // 127 fit into a block. + msg := bytes.Repeat([]byte("Z"), 64*1024) + for i := 0; i < 190; i++ { + fs.StoreMsg("foo.22", nil, msg) + } + require_True(t, fs.numMsgBlocks() > 1) + // Now delete second half of first block which will place tombstones in second blk. + for seq := 64; seq <= 127; seq++ { + fs.RemoveMsg(uint64(seq)) + } + fs.mu.RLock() + fblk := fs.blks[0] + sblk := fs.blks[1] + fs.mu.RUnlock() + + fblk.mu.RLock() + bytes, rbytes := fblk.bytes, fblk.rbytes + shouldCompact := fblk.shouldCompactInline() + fblk.mu.RUnlock() + // Should have tripped compaction already. + require_Equal(t, bytes, rbytes) + require_False(t, shouldCompact) + + sblk.mu.RLock() + shouldCompact = sblk.shouldCompactInline() + sblk.mu.RUnlock() + require_False(t, shouldCompact) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// @@ -7506,6 +7594,37 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin } } +func Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock(b *testing.B) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: b.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}) + require_NoError(b, err) + defer fs.Stop() + + // Small om purpose. + msg := []byte("ok") + + // Make first msg one that would match as well. + fs.StoreMsg("foo.1.baz", nil, msg) + // Add in a bunch of msgs. + // We need to make sure we have a range of subjects that could kick in a linear scan. + for i := 0; i < 1_000_000; i++ { + subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2) + fs.StoreMsg(subj, nil, msg) + } + // Make last msg one that would match as well. + fs.StoreMsg("foo.1.baz", nil, msg) + + b.ResetTimer() + + var smv StoreMsg + for i := 0; i < b.N; i++ { + // Make sure not first seq. + _, _, err := fs.LoadNextMsg("foo.*.baz", true, 999_990, &smv) + require_NoError(b, err) + } +} + func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) { fs, err := newFileStore( FileStoreConfig{StoreDir: b.TempDir()}, diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 8b4abd03e80..92c88bf12c9 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -23,6 +23,7 @@ import ( "math/rand" "os" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -1839,3 +1840,166 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) { } } } + +// https://github.com/nats-io/nats-server/pull/5600 +func TestJetStreamClusterConsumerLeak(t *testing.T) { + N := 2000 // runs in under 10s, but significant enough to see the difference. + NConcurrent := 100 + + clusterConf := ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + leafnodes { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + + cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3) + defer cl.shutdown() + cl.waitOnLeader() + + s := cl.randomNonLeader() + + // Create the test stream. + streamName := "LEAK_TEST_STREAM" + nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p")) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{"$SOMETHING.>"}, + Storage: nats.FileStorage, + Retention: nats.InterestPolicy, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + concurrent := make(chan struct{}, NConcurrent) + for i := 0; i < NConcurrent; i++ { + concurrent <- struct{}{} + } + errors := make(chan error, N) + + wg := sync.WaitGroup{} + wg.Add(N) + + // Gather the stats for comparison. + before := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(before) + + for i := 0; i < N; { + // wait for a slot to open up + <-concurrent + i++ + go func() { + defer func() { + concurrent <- struct{}{} + wg.Done() + }() + + nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p")) + defer nc.Close() + + consumerName := "sessid_" + nuid.Next() + _, err := js.AddConsumer(streamName, &nats.ConsumerConfig{ + DeliverSubject: "inbox", + Durable: consumerName, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverNewPolicy, + FilterSubject: "$SOMETHING.ELSE.subject", + AckWait: 30 * time.Second, + MaxAckPending: 1024, + }) + if err != nil { + errors <- fmt.Errorf("Error on JetStream consumer creation: %v", err) + return + } + + err = js.DeleteConsumer(streamName, consumerName) + if err != nil { + errors <- fmt.Errorf("Error on JetStream consumer deletion: %v", err) + } + }() + } + + wg.Wait() + if len(errors) > 0 { + for err := range errors { + t.Fatalf("%v", err) + } + } + + after := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(after) + + // Before https://github.com/nats-io/nats-server/pull/5600 this test was + // adding 180Mb+ to HeapInuse. Now it's under 40Mb (ran locally on a Mac) + limit := before.HeapInuse + 100*1024*1024 // 100MB + if after.HeapInuse > before.HeapInuse+limit { + t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse) + } +} + +func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "wq_stream", + Subjects: []string{"something.>"}, + Storage: nats.FileStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + n := (i % 5) + 1 + _, err := js.Publish(fmt.Sprintf("something.%d", n), nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "something.5", + "wq_consumer_5", + nats.BindStream("wq_stream"), + nats.ConsumerReplicas(3), + ) + require_NoError(t, err) + + for { + msgs, _ := sub.Fetch(5) + if len(msgs) == 0 { + break + } + for _, msg := range msgs { + require_NoError(t, msg.AckSync()) + } + } + + si, err := js.StreamInfo("wq_stream") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 80) + require_Equal(t, si.State.NumDeleted, 20) + require_Equal(t, si.State.NumSubjects, 4) +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index e8ef300f8ee..6ff7ca09237 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -4258,6 +4258,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { state = mset.state() mset.delete() + req, _ = json.Marshal(rreq) rmsg, err = nc2.Request(strings.ReplaceAll(JSApiStreamRestoreT, JSApiPrefix, "$JS.domain.API"), req, time.Second) if err != nil { t.Fatalf("Unexpected error on snapshot request: %v", err) diff --git a/server/monitor.go b/server/monitor.go index b72ee09d57f..e3d020e0f46 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2724,15 +2724,16 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { // JSzOptions are options passed to Jsz type JSzOptions struct { - Account string `json:"account,omitempty"` - Accounts bool `json:"accounts,omitempty"` - Streams bool `json:"streams,omitempty"` - Consumer bool `json:"consumer,omitempty"` - Config bool `json:"config,omitempty"` - LeaderOnly bool `json:"leader_only,omitempty"` - Offset int `json:"offset,omitempty"` - Limit int `json:"limit,omitempty"` - RaftGroups bool `json:"raft,omitempty"` + Account string `json:"account,omitempty"` + Accounts bool `json:"accounts,omitempty"` + Streams bool `json:"streams,omitempty"` + Consumer bool `json:"consumer,omitempty"` + Config bool `json:"config,omitempty"` + LeaderOnly bool `json:"leader_only,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` + RaftGroups bool `json:"raft,omitempty"` + StreamLeaderOnly bool `json:"stream_leader_only,omitempty"` } // HealthzOptions are options passed to Healthz @@ -2806,7 +2807,7 @@ type JSInfo struct { AccountDetails []*AccountDetail `json:"account_details,omitempty"` } -func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft bool) *AccountDetail { +func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail { jsa.mu.RLock() acc := jsa.account name := acc.GetName() @@ -2852,6 +2853,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, c := stream.config() cfg = &c } + // Skip if we are only looking for stream leaders. + if optStreamLeader && ci != nil && ci.Leader != s.Name() { + continue + } sdet := StreamDetail{ Name: stream.name(), Created: stream.createdTime(), @@ -2907,7 +2912,7 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } - return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups), nil + return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil } // helper to get cluster info from node via dummy group @@ -3034,7 +3039,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { } // if wanted, obtain accounts/streams/consumer for _, jsa := range accounts { - detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups) + detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly) jsi.AccountDetails = append(jsi.AccountDetails, detail) } return jsi, nil @@ -3078,16 +3083,22 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { return } + sleader, err := decodeBool(w, r, "stream-leader-only") + if err != nil { + return + } + l, err := s.Jsz(&JSzOptions{ - r.URL.Query().Get("acc"), - accounts, - streams, - consumers, - config, - leader, - offset, - limit, - rgroups, + Account: r.URL.Query().Get("acc"), + Accounts: accounts, + Streams: streams, + Consumer: consumers, + Config: config, + LeaderOnly: leader, + Offset: offset, + Limit: limit, + RaftGroups: rgroups, + StreamLeaderOnly: sleader, }) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -3787,8 +3798,14 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { gfilter := r.URL.Query().Get("group") afilter := r.URL.Query().Get("acc") - if afilter == "" { - afilter = s.SystemAccount().Name + if afilter == _EMPTY_ { + if sys := s.SystemAccount(); sys != nil { + afilter = sys.Name + } else { + w.WriteHeader(404) + w.Write([]byte("System account not found, the server may be shutting down")) + return + } } groups := map[string]RaftNode{} diff --git a/server/monitor_test.go b/server/monitor_test.go index 79c82adacbc..eec4d3229cf 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4414,6 +4414,25 @@ func TestMonitorJsz(t *testing.T) { } } }) + t.Run("stream-leader-only", func(t *testing.T) { + // First server + info := readJsInfo(monUrl1 + "?streams=true&stream-leader-only=1") + for _, a := range info.AccountDetails { + for _, s := range a.Streams { + if s.Cluster.Leader != srvs[0].serverName() { + t.Fatalf("expected stream leader to be %s but got %s", srvs[0].serverName(), s.Cluster.Leader) + } + } + } + info = readJsInfo(monUrl2 + "?streams=true&stream-leader-only=1") + for _, a := range info.AccountDetails { + for _, s := range a.Streams { + if s.Cluster.Leader != srvs[1].serverName() { + t.Fatalf("expected stream leader to be %s but got %s", srvs[0].serverName(), s.Cluster.Leader) + } + } + } + }) t.Run("consumers", func(t *testing.T) { for _, url := range []string{monUrl1, monUrl2} { info := readJsInfo(url + "?acc=ACC&consumers=true") diff --git a/server/raft.go b/server/raft.go index 347d788eb39..165cbbe91fb 100644 --- a/server/raft.go +++ b/server/raft.go @@ -499,7 +499,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe // If we fail to do this for some reason then this is fatal — we cannot // continue setting up or the Raft node may be partially/totally isolated. if err := n.createInternalSubs(); err != nil { - n.shutdown(true) + n.shutdown(false) return nil, err } diff --git a/server/raft_test.go b/server/raft_test.go index beb15d63480..02ba3bb84ca 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -382,7 +382,7 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { leaderOriginal := leader.etlr followerOriginal := follower.etlr vr := &voteRequest{ - term: follower.term, + term: follower.term - 1, lastTerm: follower.term - 1, lastIndex: 0, candidate: follower.id, diff --git a/server/route.go b/server/route.go index 0341f79868e..d200d26264d 100644 --- a/server/route.go +++ b/server/route.go @@ -98,6 +98,9 @@ type route struct { // Selected compression mode, which may be different from the // server configured mode. compression string + // Transient value used to set the Info.NoGossip when initiating + // an implicit route and sending to the remote. + noGossip bool } type connectInfo struct { @@ -700,12 +703,15 @@ func (c *client) processRouteInfo(info *Info) { return } + var sendDelayedInfo bool + // First INFO, check if this server is configured for compression because // if that is the case, we need to negotiate it with the remote server. if needsCompression(opts.Cluster.Compression.Mode) { accName := bytesToString(c.route.accName) // If we did not yet negotiate... - if !c.flags.isSet(compressionNegotiated) { + compNeg := c.flags.isSet(compressionNegotiated) + if !compNeg { // Prevent from getting back here. c.flags.set(compressionNegotiated) // Release client lock since following function will need server lock. @@ -722,24 +728,21 @@ func (c *client) processRouteInfo(info *Info) { } // No compression because one side does not want/can't, so proceed. c.mu.Lock() - } else if didSolicit { - // The other side has switched to compression, so we can now set - // the first ping timer and send the delayed INFO for situations - // where it was not already sent. - c.setFirstPingTimer() - if !routeShouldDelayInfo(accName, opts) { - cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) - // Need to release and then reacquire... + // Check that the connection did not close if the lock was released. + if c.isClosed() { c.mu.Unlock() - s.sendDelayedRouteInfo(c, accName, cm) - c.mu.Lock() + return } } - // Check that the connection did not close if the lock was released. - if c.isClosed() { - c.mu.Unlock() - return + // We can set the ping timer after we just negotiated compression above, + // or for solicited routes if we already negotiated. + if !compNeg || didSolicit { + c.setFirstPingTimer() } + // When compression is configured, we delay the initial INFO for any + // solicited route. So we need to send the delayed INFO simply based + // on the didSolicit boolean. + sendDelayedInfo = didSolicit } else { // Coming from an old server, the Compression field would be the empty // string. For servers that are configured with CompressionNotSupported, @@ -749,6 +752,10 @@ func (c *client) processRouteInfo(info *Info) { } else { c.route.compression = CompressionOff } + // When compression is not configured, we delay the initial INFO only + // for solicited pooled routes, so use the same check that we did when + // we decided to delay in createRoute(). + sendDelayedInfo = didSolicit && routeShouldDelayInfo(bytesToString(c.route.accName), opts) } // Mark that the INFO protocol has been received, so we can detect updates. @@ -825,11 +832,15 @@ func (c *client) processRouteInfo(info *Info) { } accName := string(c.route.accName) + // Capture the noGossip value and reset it here. + noGossip := c.route.noGossip + c.route.noGossip = false + // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. c.mu.Unlock() - if added := s.addRoute(c, didSolicit, info, accName); added { + if added := s.addRoute(c, didSolicit, sendDelayedInfo, noGossip, info, accName); added { if accName != _EMPTY_ { c.Debugf("Registering remote route %q for account %q", info.ID, accName) } else { @@ -863,7 +874,7 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, if needsCompression(cm) { // Generate an INFO with the chosen compression mode. s.mu.Lock() - infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0) + infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, false) s.mu.Unlock() // If we solicited, then send this INFO protocol BEFORE switching @@ -892,29 +903,9 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, c.mu.Unlock() return true, nil } - // We are not using compression, set the ping timer. - c.mu.Lock() - c.setFirstPingTimer() - c.mu.Unlock() - // If this is a solicited route, we need to send the INFO if it was not - // done during createRoute() and will not be done in addRoute(). - if didSolicit && !routeShouldDelayInfo(accName, opts) { - cm = compressionModeForInfoProtocol(&opts.Cluster.Compression, cm) - s.sendDelayedRouteInfo(c, accName, cm) - } return false, nil } -func (s *Server) sendDelayedRouteInfo(c *client, accName, cm string) { - s.mu.Lock() - infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0) - s.mu.Unlock() - - c.mu.Lock() - c.enqueueProto(infoProto) - c.mu.Unlock() -} - // Possibly sends local subscriptions interest to this route // based on changes in the remote's Export permissions. func (s *Server) updateRemoteRoutePerms(c *client, info *Info) { @@ -1050,7 +1041,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { if info.AuthRequired { r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password) } - s.startGoRoutine(func() { s.connectToRoute(r, false, true, info.RouteAccount) }) + s.startGoRoutine(func() { s.connectToRoute(r, Implicit, true, info.NoGossip, info.RouteAccount) }) // If we are processing an implicit route from a route that does not // support pooling/pinned-accounts, we won't receive an INFO for each of // the pinned-accounts that we would normally receive. In that case, just @@ -1060,7 +1051,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { rURL := r for _, an := range opts.Cluster.PinnedAccounts { accName := an - s.startGoRoutine(func() { s.connectToRoute(rURL, false, true, accName) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, Implicit, true, info.NoGossip, accName) }) } } } @@ -1112,11 +1103,39 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { b, _ := json.Marshal(info) infoJSON := []byte(fmt.Sprintf(InfoProto, b)) + // If this is for a pinned account, we will try to send the gossip + // through our pinned account routes, but fall back to the other + // routes in case we don't have one for a given remote. + accRemotes := map[string]struct{}{} + if info.RouteAccount != _EMPTY_ { + if remotes, ok := s.accRoutes[info.RouteAccount]; ok { + for remoteID, r := range remotes { + if r == nil { + continue + } + accRemotes[remoteID] = struct{}{} + r.mu.Lock() + // Do not send to a remote that does not support pooling/pinned-accounts. + if remoteID != info.ID && !r.route.noPool { + r.enqueueProto(infoJSON) + } + r.mu.Unlock() + } + } + } + s.forEachRemote(func(r *client) { r.mu.Lock() + remoteID := r.route.remoteID + if info.RouteAccount != _EMPTY_ { + if _, processed := accRemotes[remoteID]; processed { + r.mu.Unlock() + return + } + } // If this is a new route for a given account, do not send to a server // that does not support pooling/pinned-accounts. - if r.route.remoteID != info.ID && + if remoteID != info.ID && (info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) { r.enqueueProto(infoJSON) } @@ -1695,17 +1714,12 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra c.enqueueProto(buf) } -func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *client { +func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, noGossip bool, accName string) *client { // Snapshot server options. opts := s.getOpts() didSolicit := rURL != nil - r := &route{didSolicit: didSolicit, poolIdx: -1} - for _, route := range opts.Routes { - if rURL != nil && (strings.EqualFold(rURL.Host, route.Host)) { - r.routeType = Explicit - } - } + r := &route{routeType: rtype, didSolicit: didSolicit, poolIdx: -1, noGossip: noGossip} c := &client{srv: s, nc: conn, opts: ClientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r, start: time.Now()} @@ -1722,7 +1736,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie // the incoming INFO from the remote. Also delay if configured for compression. delayInfo := didSolicit && (compressionConfigured || routeShouldDelayInfo(accName, opts)) if !delayInfo { - infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0) + infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0, noGossip) } authRequired := s.routeInfo.AuthRequired tlsRequired := s.routeInfo.TLSRequired @@ -1855,7 +1869,7 @@ func routeShouldDelayInfo(accName string, opts *Options) bool { // To be used only when a route is created (to send the initial INFO protocol). // // Server lock held on entry. -func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int) []byte { +func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int, noGossip bool) []byte { // New proto wants a nonce (although not used in routes, that is, not signed in CONNECT) var raw [nonceLen]byte nonce := raw[:] @@ -1865,11 +1879,11 @@ func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolI if s.getOpts().Cluster.Compression.Mode == CompressionS2Auto { compression = CompressionS2Auto } - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression = string(nonce), accName, poolIdx, compression + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = string(nonce), accName, poolIdx, compression, noGossip infoJSON := generateInfoJSON(&s.routeInfo) // Clear now that it has been serialized. Will prevent nonce to be included in async INFO that we may send. // Same for some other fields. - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression = _EMPTY_, _EMPTY_, 0, _EMPTY_ + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = _EMPTY_, _EMPTY_, 0, _EMPTY_, false return infoJSON } @@ -1878,7 +1892,7 @@ const ( _EMPTY_ = "" ) -func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string) bool { +func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, info *Info, accName string) bool { id := info.ID var acc *Account @@ -1964,12 +1978,19 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string c.mu.Lock() idHash := c.route.idHash cid := c.cid + if sendDelayedInfo { + cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) + c.enqueueProto(s.generateRouteInitialInfoJSON(accName, cm, 0, noGossip)) + } if c.last.IsZero() { c.last = time.Now() } if acc != nil { c.acc = acc } + // This will be true if this is a route that was initiated from the + // gossip protocol (basically invoked from processImplicitRoute). + fromGossip := didSolicit && c.route.routeType == Implicit c.mu.Unlock() // Store this route with key being the route id hash + account name @@ -1978,8 +1999,20 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // Now that we have registered the route, we can remove from the temp map. s.removeFromTempClients(cid) - // Notify other routes about this new route - s.forwardNewRouteInfoToKnownServers(info) + // We will not gossip if we are an implicit route created due to + // gossip, or if the remote instructed us not to gossip. + if !fromGossip && !info.NoGossip { + if !didSolicit { + // If the connection was accepted, instruct the neighbors to + // set Info.NoGossip to true also when sending their own INFO + // protocol. In normal situations, any implicit route would + // set their Info.NoGossip to true, but we do this to solve + // a very specific situation. For some background, see test + // TestRouteImplicitJoinsSeparateGroups. + info.NoGossip = true + } + s.forwardNewRouteInfoToKnownServers(info) + } // Send subscription interest s.sendSubsToRoute(c, -1, accName) @@ -2060,9 +2093,9 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string rHash := c.route.hash rn := c.route.remoteName url := c.route.url - // For solicited routes, we need now to send the INFO protocol. - if didSolicit { - c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, c.route.compression, idx)) + if sendDelayedInfo { + cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) + c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, cm, idx, noGossip)) } if c.last.IsZero() { c.last = time.Now() @@ -2100,8 +2133,13 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string } // we don't need to send if the only route is the one we just accepted. - if len(s.routes) > 1 { - // Now let the known servers know about this new route + // For other checks, see other call to forwardNewRouteInfoToKnownServers + // in the handling of pinned account above. + fromGossip := didSolicit && rtype == Implicit + if len(s.routes) > 1 && !fromGossip && !info.NoGossip { + if !didSolicit { + info.NoGossip = true + } s.forwardNewRouteInfoToKnownServers(info) } @@ -2137,7 +2175,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string s.grWG.Done() return } - s.connectToRoute(url, rtype == Explicit, true, _EMPTY_) + s.connectToRoute(url, rtype, true, noGossip, _EMPTY_) }) } } @@ -2550,7 +2588,7 @@ func (s *Server) startRouteAcceptLoop() { } // Start the accept loop in a different go routine. - go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, _EMPTY_) }, nil) + go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, false, _EMPTY_) }, nil) // Solicit Routes if applicable. This will not block. s.solicitRoutes(opts.Routes, opts.Cluster.PinnedAccounts) @@ -2592,14 +2630,13 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) { } func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string) { - tryForEver := rtype == Explicit // If A connects to B, and B to A (regardless if explicit or // implicit - due to auto-discovery), and if each server first // registers the route on the opposite TCP connection, the // two connections will end-up being closed. // Add some random delay to reduce risk of repeated failures. delay := time.Duration(rand.Intn(100)) * time.Millisecond - if tryForEver { + if rtype == Explicit { delay += DEFAULT_ROUTE_RECONNECT } select { @@ -2608,7 +2645,7 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string s.grWG.Done() return } - s.connectToRoute(rURL, tryForEver, false, accName) + s.connectToRoute(rURL, rtype, false, false, accName) } // Checks to make sure the route is still valid. @@ -2621,21 +2658,26 @@ func (s *Server) routeStillValid(rURL *url.URL) bool { return false } -func (s *Server) connectToRoute(rURL *url.URL, tryForEver, firstConnect bool, accName string) { +func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect, noGossip bool, accName string) { + defer s.grWG.Done() + if rURL == nil { + return + } + // For explicit routes, we will try to connect until we succeed. For implicit + // we will try only based on the number of ConnectRetries optin. + tryForEver := rtype == Explicit + // Snapshot server options. opts := s.getOpts() - defer s.grWG.Done() - const connErrFmt = "Error trying to connect to route (attempt %v): %v" - s.mu.Lock() + s.mu.RLock() resolver := s.routeResolver excludedAddresses := s.routesToSelf - s.mu.Unlock() + s.mu.RUnlock() - attempts := 0 - for s.isRunning() && rURL != nil { + for attempts := 0; s.isRunning(); { if tryForEver { if !s.routeStillValid(rURL) { return @@ -2689,7 +2731,7 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver, firstConnect bool, ac // We have a route connection here. // Go ahead and create it and exit this func. - s.createRoute(conn, rURL, accName) + s.createRoute(conn, rURL, rtype, noGossip, accName) return } } @@ -2718,13 +2760,13 @@ func (s *Server) solicitRoutes(routes []*url.URL, accounts []string) { s.saveRouteTLSName(routes) for _, r := range routes { route := r - s.startGoRoutine(func() { s.connectToRoute(route, true, true, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, _EMPTY_) }) } // Now go over possible per-account routes and create them. for _, an := range accounts { for _, r := range routes { route, accName := r, an - s.startGoRoutine(func() { s.connectToRoute(route, true, true, accName) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, accName) }) } } } @@ -2846,7 +2888,7 @@ func (s *Server) removeRoute(c *client) { opts = s.getOpts() rURL *url.URL noPool bool - didSolicit bool + rtype RouteType ) c.mu.Lock() cid := c.cid @@ -2865,7 +2907,7 @@ func (s *Server) removeRoute(c *client) { connectURLs = r.connectURLs wsConnectURLs = r.wsConnURLs rURL = r.url - didSolicit = r.didSolicit + rtype = r.routeType } c.mu.Unlock() if accName != _EMPTY_ { @@ -2928,12 +2970,12 @@ func (s *Server) removeRoute(c *client) { // this remote was a "no pool" route, attempt to reconnect. if noPool { if s.routesPoolSize > 1 { - s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, _EMPTY_) }) } if len(opts.Cluster.PinnedAccounts) > 0 { for _, an := range opts.Cluster.PinnedAccounts { accName := an - s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, accName) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, accName) }) } } } diff --git a/server/routes_test.go b/server/routes_test.go index acb626cd469..97c790d9d81 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -738,15 +738,10 @@ func TestClientConnectToRoutePort(t *testing.T) { } type checkDuplicateRouteLogger struct { - sync.Mutex + DummyLogger gotDuplicate bool } -func (l *checkDuplicateRouteLogger) Noticef(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Errorf(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Warnf(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Fatalf(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Tracef(format string, v ...any) {} func (l *checkDuplicateRouteLogger) Debugf(format string, v ...any) { l.Lock() defer l.Unlock() @@ -3357,7 +3352,8 @@ func TestRoutePoolAndPerAccountWithOlderServer(t *testing.T) { type testDuplicateRouteLogger struct { DummyLogger - ch chan struct{} + ch chan struct{} + count int } func (l *testDuplicateRouteLogger) Noticef(format string, args ...any) { @@ -3369,6 +3365,9 @@ func (l *testDuplicateRouteLogger) Noticef(format string, args ...any) { case l.ch <- struct{}{}: default: } + l.Mutex.Lock() + l.count++ + l.Mutex.Unlock() } // This test will make sure that a server with pooling does not @@ -4228,3 +4227,176 @@ func TestRouteNoRaceOnClusterNameNegotiation(t *testing.T) { s1.Shutdown() } } + +func TestRouteImplicitNotTooManyDuplicates(t *testing.T) { + for _, test := range []struct { + name string + pooling bool + compression bool + }{ + {"no pooling-no compression", false, false}, + {"no pooling-compression", false, true}, + {"pooling-no compression", true, false}, + {"pooling-compression", true, true}, + } { + t.Run(test.name, func(t *testing.T) { + o := DefaultOptions() + o.ServerName = "SEED" + if !test.pooling { + o.Cluster.PoolSize = -1 + } + if !test.compression { + o.Cluster.Compression.Mode = CompressionOff + } + seed := RunServer(o) + defer seed.Shutdown() + + dl := &testDuplicateRouteLogger{} + + servers := make([]*Server, 0, 10) + for i := 0; i < cap(servers); i++ { + io := DefaultOptions() + io.ServerName = fmt.Sprintf("IMPLICIT_%d", i+1) + io.NoLog = false + io.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o.Cluster.Port)) + if !test.pooling { + io.Cluster.PoolSize = -1 + } + if !test.compression { + io.Cluster.Compression.Mode = CompressionOff + } + is, err := NewServer(io) + require_NoError(t, err) + // Will do defer of shutdown later. + is.SetLogger(dl, true, false) + is.Start() + servers = append(servers, is) + } + + allServers := make([]*Server, 0, len(servers)+1) + allServers = append(allServers, seed) + allServers = append(allServers, servers...) + + // Let's make sure that we wait for each server to be ready. + for _, s := range allServers { + if !s.ReadyForConnections(2 * time.Second) { + t.Fatalf("Server %q is not ready for connections", s) + } + } + + // Do the defer of shutdown of all servers this way instead of individual + // defers when starting them. It takes less time for the servers to shutdown + // this way. + defer func() { + for _, s := range allServers { + s.Shutdown() + } + }() + checkClusterFormed(t, allServers...) + + dl.Mutex.Lock() + count := dl.count + dl.Mutex.Unlock() + // Getting duplicates should not be considered fatal, it is an optimization + // to reduce the occurrences of those. But to make sure we don't have a + // regression, we will fail the test if we get say more than 20 or so ( + // without the code change, we would get more than 500 of duplicates). + if count > 20 { + t.Fatalf("Got more duplicates than anticipated: %v", count) + } + }) + } +} + +func TestRouteImplicitJoinsSeparateGroups(t *testing.T) { + // The test TestRouteImplicitNotTooManyDuplicates makes sure that we do + // not have too many duplicate routes cases when processing implicit routes. + // This test is to ensure that the code changes to reduce the number + // of duplicate routes does not prevent the formation of the cluster + // with the given setup (which is admittedly not good since a disconnect + // between some routes would not result in a reconnect leading to a full mesh). + // Still, original code was able to create the original full mesh, so we want + // to make sure that this is still possible. + for _, test := range []struct { + name string + pooling bool + compression bool + }{ + {"no pooling-no compression", false, false}, + {"no pooling-compression", false, true}, + {"pooling-no compression", true, false}, + {"pooling-compression", true, true}, + } { + t.Run(test.name, func(t *testing.T) { + setOpts := func(o *Options) { + if !test.pooling { + o.Cluster.PoolSize = -1 + } + if !test.compression { + o.Cluster.Compression.Mode = CompressionOff + } + } + + // Create a cluster s1/s2/s3 + o1 := DefaultOptions() + o1.ServerName = "S1" + setOpts(o1) + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.ServerName = "S2" + setOpts(o2) + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + tmpl := ` + server_name: "S3" + listen: "127.0.0.1:-1" + cluster { + name: "abc" + listen: "127.0.0.1:-1" + %s + %s + routes: ["nats://127.0.0.1:%d"%s] + } + ` + var poolCfg string + var compressionCfg string + if !test.pooling { + poolCfg = "pool_size: -1" + } + if !test.compression { + compressionCfg = "compression: off" + } + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, poolCfg, compressionCfg, o1.Cluster.Port, _EMPTY_))) + s3, _ := RunServerWithConfig(conf) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + // Now s4 and s5 connected to each other, but not linked to s1/s2/s3 + o4 := DefaultOptions() + o4.ServerName = "S4" + setOpts(o4) + s4 := RunServer(o4) + defer s4.Shutdown() + + o5 := DefaultOptions() + o5.ServerName = "S5" + setOpts(o5) + o5.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o4.Cluster.Port)) + s5 := RunServer(o5) + defer s5.Shutdown() + + checkClusterFormed(t, s4, s5) + + // Now add a route from s3 to s4 and make sure that we have a full mesh. + routeToS4 := fmt.Sprintf(`, "nats://127.0.0.1:%d"`, o4.Cluster.Port) + reloadUpdateConfig(t, s3, conf, fmt.Sprintf(tmpl, poolCfg, compressionCfg, o1.Cluster.Port, routeToS4)) + + checkClusterFormed(t, s1, s2, s3, s4, s5) + }) + } +} diff --git a/server/server.go b/server/server.go index cc3130ebe5a..dd9a12f23e6 100644 --- a/server/server.go +++ b/server/server.go @@ -98,6 +98,7 @@ type Info struct { RoutePoolIdx int `json:"route_pool_idx,omitempty"` RouteAccount string `json:"route_account,omitempty"` RouteAccReqID string `json:"route_acc_add_reqid,omitempty"` + NoGossip bool `json:"no_gossip,omitempty"` // Gateways Specific Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO) diff --git a/server/stream.go b/server/stream.go index a09afdbf323..9137254607a 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5272,92 +5272,9 @@ func (mset *stream) checkInterestState() { return } - var zeroAcks []*consumer - var lowAckFloor uint64 = math.MaxUint64 - for _, o := range mset.getConsumers() { o.checkStateForInterestStream() - - o.mu.Lock() - if o.isLeader() { - // We need to account for consumers with ack floor of zero. - // We will collect them and see if we need to check pending below. - if o.asflr == 0 { - zeroAcks = append(zeroAcks, o) - } else if o.asflr < lowAckFloor { - lowAckFloor = o.asflr - } - } else { - // We are a follower so only have the store state, so read that in. - state, err := o.store.State() - if err != nil { - // On error we will not have enough information to process correctly so bail. - o.mu.Unlock() - return - } - // We need to account for consumers with ack floor of zero. - if state.AckFloor.Stream == 0 { - zeroAcks = append(zeroAcks, o) - } else if state.AckFloor.Stream < lowAckFloor { - lowAckFloor = state.AckFloor.Stream - } - // We are a follower here but if we detect a drift from when we were previous leader correct here. - if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 { - o.applyState(state) - } - } - o.mu.Unlock() } - - // If nothing was set we can bail. - if lowAckFloor == math.MaxUint64 { - return - } - - // Capture our current state. - // ok to do so without lock. - var state StreamState - mset.store.FastState(&state) - - if lowAckFloor <= state.FirstSeq { - return - } - - // Do not want to hold stream lock if calculating numPending. - // Check if we had any zeroAcks, we will need to check them. - for _, o := range zeroAcks { - var np uint64 - o.mu.RLock() - if o.isLeader() { - np = uint64(o.numPending()) - } else { - np, _ = o.calculateNumPending() - } - o.mu.RUnlock() - // This means we have pending and can not remove anything at this time. - if np > 0 { - return - } - } - - mset.mu.Lock() - defer mset.mu.Unlock() - - // Check which purge we need to perform. - if lowAckFloor <= state.LastSeq || state.Msgs == 0 { - // Purge the stream to lowest ack floor + 1 - mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0) - } else { - // Here we have a low ack floor higher then our last seq. - // So we will just do normal purge. - mset.store.Purge() - } - - // Make sure to reset our local lseq. - mset.store.FastState(&state) - mset.lseq = state.LastSeq - // Also make sure we clear any pending acks. - mset.clearAllPreAcksBelowFloor(state.FirstSeq) } func (mset *stream) isInterestRetention() bool {