diff --git a/.github/actions/nightly-release/action.yaml b/.github/actions/nightly-release/action.yaml index 48cdff56e18..3d1ffbaaf11 100644 --- a/.github/actions/nightly-release/action.yaml +++ b/.github/actions/nightly-release/action.yaml @@ -2,14 +2,6 @@ name: Nightly Docker Releaser description: Builds nightly docker images inputs: - go: - description: The version of go to build with - required: true - - label: - description: The label to use for built images - required: true - hub_username: description: Docker hub username required: true @@ -25,35 +17,20 @@ inputs: runs: using: composite steps: + - name: Log in to Docker Hub + shell: bash + run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}" + - name: Set up Go uses: actions/setup-go@v4 with: - go-version: "${{ inputs.go }}" + go-version: "stable" - - name: goreleaser + - name: Build and push Docker images # Use commit hash here to avoid a re-tagging attack, as this is a third-party action # Commit 5742e2a039330cbb23ebf35f046f814d4c6ff811 = tag v5 uses: goreleaser/goreleaser-action@5742e2a039330cbb23ebf35f046f814d4c6ff811 with: workdir: "${{ inputs.workdir }}" - version: latest - args: release --snapshot --config .goreleaser-nightly.yml - - - name: images - shell: bash - run: docker images - - - name: docker_login - shell: bash - run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}" - - - name: docker_push - shell: bash - run: | - NDATE=$(date +%Y%m%d) - - docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }}-${NDATE} - docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }} - - docker push synadia/nats-server:${{ inputs.label }}-${NDATE} - docker push synadia/nats-server:${{ inputs.label }} + version: ~> v2 + args: release --skip=announce,validate --config .goreleaser-nightly.yml diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml index 24f32493225..62b6be0eec8 100644 --- a/.github/workflows/nightly.yaml +++ b/.github/workflows/nightly.yaml @@ -3,7 +3,7 @@ on: workflow_dispatch: inputs: target: - description: "Override image branch (optional)" + description: "Override source branch (optional)" type: string required: false @@ -19,11 +19,11 @@ jobs: with: path: src/github.com/nats-io/nats-server ref: ${{ inputs.target || 'main' }} + fetch-depth: 0 + fetch-tags: true - uses: ./src/github.com/nats-io/nats-server/.github/actions/nightly-release with: - go: "1.21" workdir: src/github.com/nats-io/nats-server - label: nightly hub_username: "${{ secrets.DOCKER_USERNAME }}" hub_password: "${{ secrets.DOCKER_PASSWORD }}" diff --git a/.goreleaser-nightly.yml b/.goreleaser-nightly.yml index 6365b1204cd..1f42c71385e 100644 --- a/.goreleaser-nightly.yml +++ b/.goreleaser-nightly.yml @@ -14,16 +14,19 @@ builds: goarch: - amd64 +release: + disable: true + dockers: - goos: linux goarch: amd64 - skip_push: true dockerfile: docker/Dockerfile.nightly + skip_push: false build_flag_templates: - - '--build-arg=VERSION={{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}' + - '--build-arg=VERSION={{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}' image_templates: - - synadia/nats-server:{{.Version}} - - synadia/nats-server:{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }} + - synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }} + - synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }} extra_files: - docker/nats-server.conf @@ -32,4 +35,4 @@ checksum: algorithm: sha256 snapshot: - name_template: '{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}' + version_template: '{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}' diff --git a/.travis.yml b/.travis.yml index f1337e3b75d..eb03f2663b0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,8 @@ language: go go: # This should be quoted or use .x, but should not be unquoted. # Remember that a YAML bare float drops trailing zeroes. - - "1.23.5" - - "1.22.11" + - "1.23.6" + - "1.22.12" go_import_path: github.com/nats-io/nats-server diff --git a/docker/Dockerfile.nightly b/docker/Dockerfile.nightly index dbb2c093270..3c9d22f4b50 100644 --- a/docker/Dockerfile.nightly +++ b/docker/Dockerfile.nightly @@ -1,4 +1,4 @@ -FROM golang:1.21-alpine AS builder +FROM golang:alpine AS builder ARG VERSION="nightly" diff --git a/server/client.go b/server/client.go index d7a3ccb1252..a7b40914d4a 100644 --- a/server/client.go +++ b/server/client.go @@ -4129,9 +4129,20 @@ func (c *client) setHeader(key, value string, msg []byte) []byte { return bb.Bytes() } -// Will return the value for the header denoted by key or nil if it does not exists. -// This function ignores errors and tries to achieve speed and no additional allocations. +// Will return a copy of the value for the header denoted by key or nil if it does not exist. +// If you know that it is safe to refer to the underlying hdr slice for the period that the +// return value is used, then sliceHeader() will be faster. func getHeader(key string, hdr []byte) []byte { + v := sliceHeader(key, hdr) + if v == nil { + return nil + } + return append(make([]byte, 0, len(v)), v...) +} + +// Will return the sliced value for the header denoted by key or nil if it does not exists. +// This function ignores errors and tries to achieve speed and no additional allocations. +func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } @@ -4156,15 +4167,14 @@ func getHeader(key string, hdr []byte) []byte { index++ } // Collect together the rest of the value until we hit a CRLF. - var value []byte + start := index for index < hdrLen { if hdr[index] == '\r' && index < hdrLen-1 && hdr[index+1] == '\n' { break } - value = append(value, hdr[index]) index++ } - return value + return hdr[start:index:index] } // For bytes.HasPrefix below. @@ -4278,7 +4288,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt var ci *ClientInfo if hadPrevSi && c.pa.hdr >= 0 { var cis ClientInfo - if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil { + if err := json.Unmarshal(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil { ci = &cis ci.Service = acc.Name // Check if we are moving into a share details account from a non-shared @@ -4287,7 +4297,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt c.addServerAndClusterInfo(ci) } } - } else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 { + } else if c.kind != LEAF || c.pa.hdr < 0 || len(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 { ci = c.getClientInfo(share) // If we did not share but the imports destination is the system account add in the server and cluster info. if !share && isSysImport { @@ -4846,7 +4856,7 @@ func (c *client) checkLeafClientInfoHeader(msg []byte) (dmsg []byte, setHdr bool if c.pa.hdr < 0 || len(msg) < c.pa.hdr { return msg, false } - cir := getHeader(ClientInfoHdr, msg[:c.pa.hdr]) + cir := sliceHeader(ClientInfoHdr, msg[:c.pa.hdr]) if len(cir) == 0 { return msg, false } diff --git a/server/client_test.go b/server/client_test.go index aa5daf2a71f..551bc48aa7e 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2964,6 +2964,29 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) { } } +func TestSliceHeader(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + hdr = genHeader(hdr, "a", "1") + hdr = genHeader(hdr, JSExpectedStream, "my-stream") + hdr = genHeader(hdr, JSExpectedLastSeq, "22") + hdr = genHeader(hdr, "b", "2") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + hdr = genHeader(hdr, JSExpectedLastMsgId, "1") + hdr = genHeader(hdr, "c", "3") + + sliced := sliceHeader(JSExpectedLastSubjSeq, hdr) + copied := getHeader(JSExpectedLastSubjSeq, hdr) + + require_NotNil(t, sliced) + require_Equal(t, cap(sliced), 2) + + require_NotNil(t, copied) + require_Equal(t, cap(copied), len(copied)) + + require_True(t, bytes.Equal(sliced, copied)) +} + func TestClientFlushOutboundNoSlowConsumer(t *testing.T) { opts := DefaultOptions() opts.MaxPending = 1024 * 1024 * 140 // 140MB diff --git a/server/consumer.go b/server/consumer.go index 4d66b23b97c..67210b0a565 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -60,7 +60,6 @@ type ConsumerInfo struct { } type ConsumerConfig struct { - // Durable is deprecated. All consumers should have names, picked by clients. Durable string `json:"durable_name,omitempty"` Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` @@ -1583,6 +1582,12 @@ var ( // deleteNotActive must only be called from time.AfterFunc or in its own // goroutine, as it can block on clean-up. func (o *consumer) deleteNotActive() { + // Take a copy of these when the goroutine starts, mostly it avoids a + // race condition with tests that modify these consts, such as + // TestJetStreamClusterGhostEphemeralsAfterRestart. + cnaMax := consumerNotActiveMaxInterval + cnaStart := consumerNotActiveStartInterval + o.mu.Lock() if o.mset == nil { o.mu.Unlock() @@ -1626,10 +1631,10 @@ func (o *consumer) deleteNotActive() { if o.srv != nil { qch = o.srv.quitCh } - if o.js != nil { - cqch = o.js.clusterQuitC() - } o.mu.Unlock() + if js != nil { + cqch = js.clusterQuitC() + } // Useful for pprof. setGoRoutineLabels(pprofLabels{ @@ -1663,8 +1668,8 @@ func (o *consumer) deleteNotActive() { if ca != nil && cc != nil { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. - jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) - interval := consumerNotActiveStartInterval + jitter + jitter := time.Duration(rand.Int63n(int64(cnaStart))) + interval := cnaStart + jitter ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -1686,7 +1691,7 @@ func (o *consumer) deleteNotActive() { if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) - if interval < consumerNotActiveMaxInterval { + if interval < cnaMax { interval *= 2 ticker.Reset(interval) } @@ -1859,9 +1864,6 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { if cfg.FlowControl != ncfg.FlowControl { return errors.New("flow control can not be updated") } - if cfg.MaxWaiting != ncfg.MaxWaiting { - return errors.New("max waiting can not be updated") - } // Deliver Subject is conditional on if its bound. if cfg.DeliverSubject != ncfg.DeliverSubject { @@ -1876,6 +1878,10 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { } } + if cfg.MaxWaiting != ncfg.MaxWaiting { + return errors.New("max waiting can not be updated") + } + // Check if BackOff is defined, MaxDeliver is within range. if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver { return NewJSConsumerMaxDeliverBackoffError() diff --git a/server/events.go b/server/events.go index 7cb9feb6a77..a09e6f1ec41 100644 --- a/server/events.go +++ b/server/events.go @@ -1215,6 +1215,14 @@ func (s *Server) initEventTracking() { optz := &ExpvarzEventOptions{} s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.expvarz(optz), nil }) }, + "IPQUEUESZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { + optz := &IpqueueszEventOptions{} + s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Ipqueuesz(&optz.IpqueueszOptions), nil }) + }, + "RAFTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { + optz := &RaftzEventOptions{} + s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Raftz(&optz.RaftzOptions), nil }) + }, } profilez := func(_ *subscription, c *client, _ *Account, _, rply string, rmsg []byte) { hdr, msg := c.msgParts(rmsg) @@ -1921,6 +1929,18 @@ type ExpvarzEventOptions struct { EventFilterOptions } +// In the context of system events, IpqueueszEventOptions are options passed to Ipqueuesz +type IpqueueszEventOptions struct { + EventFilterOptions + IpqueueszOptions +} + +// In the context of system events, RaftzEventOptions are options passed to Raftz +type RaftzEventOptions struct { + EventFilterOptions + RaftzOptions +} + // returns true if the request does NOT apply to this server and can be ignored. // DO NOT hold the server lock when func (s *Server) filterRequest(fOpts *EventFilterOptions) bool { @@ -2043,6 +2063,20 @@ type ServerAPIExpvarzResponse struct { Error *ApiError `json:"error,omitempty"` } +// ServerAPIpqueueszResponse is the response type for ipqueuesz +type ServerAPIpqueueszResponse struct { + Server *ServerInfo `json:"server"` + Data *IpqueueszStatus `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIRaftzResponse is the response type for raftz +type ServerAPIRaftzResponse struct { + Server *ServerInfo `json:"server"` + Data *RaftzStatus `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + // statszReq is a request for us to respond with current statsz. func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { if !s.EventsEnabled() { diff --git a/server/events_test.go b/server/events_test.go index 6545db66753..469e2df5454 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1675,7 +1675,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new initial subscription for the eventing system. - checkExpectedSubs(t, 58, sa) + checkExpectedSubs(t, 62, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) diff --git a/server/filestore.go b/server/filestore.go index 4e0c28ca4ab..248b63e01ce 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -33,6 +33,7 @@ import ( "net" "os" "path/filepath" + "runtime" "slices" "sort" "strings" @@ -2004,7 +2005,7 @@ func (fs *fileStore) expireMsgsOnRecover() error { } // Make sure we do subject cleanup as well. mb.ensurePerSubjectInfoLoaded() - mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + mb.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool { subj := bytesToString(bsubj) for i := uint64(0); i < ss.Msgs; i++ { fs.removePerSubject(subj) @@ -2207,12 +2208,15 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { // Find the first matching message against a sublist. func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) { mb.mu.Lock() - defer mb.mu.Unlock() - - // Will just do linear walk for now. - // TODO(dlc) - Be better at skipping blocks that will not match us regardless. - var didLoad bool + var updateLLTS bool + defer func() { + if updateLLTS { + mb.llts = time.Now().UnixNano() + } + mb.mu.Unlock() + }() + // Need messages loaded from here on out. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -2231,20 +2235,88 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) sm = new(StoreMsg) } - for seq := start; seq <= lseq; seq++ { - llseq := mb.llseq - fsm, err := mb.cacheLookup(seq, sm) - if err != nil { - continue + // If the FSS state has fewer entries than sequences in the linear scan, + // then use intersection instead as likely going to be cheaper. This will + // often be the case with high numbers of deletes, as well as a smaller + // number of subjects in the block. + if uint64(mb.fss.Size()) < lseq-start { + // If there are no subject matches then this is effectively no-op. + hseq := uint64(math.MaxUint64) + IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) { + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + // mb is already loaded into the cache so should be fast-ish. + mb.recalculateForSubj(bytesToString(subj), ss) + } + first := ss.First + if start > first { + first = start + } + if first > ss.Last || first >= hseq { + // The start cutoff is after the last sequence for this subject, + // or we think we already know of a subject with an earlier msg + // than our first seq for this subject. + return + } + if first == ss.First { + // If the start floor is below where this subject starts then we can + // short-circuit, avoiding needing to scan for the next message. + if fsm, err := mb.cacheLookup(ss.First, sm); err == nil { + sm = fsm + hseq = ss.First + } + return + } + for seq := first; seq <= ss.Last; seq++ { + // Otherwise we have a start floor that intersects where this subject + // has messages in the block, so we need to walk up until we find a + // message matching the subject. + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + // Instead we will update it only once in a defer. + updateLLTS = true + continue + } + llseq := mb.llseq + fsm, err := mb.cacheLookup(seq, sm) + if err != nil { + continue + } + updateLLTS = false // cacheLookup already updated it. + if sl.HasInterest(fsm.subj) { + hseq = seq + sm = fsm + break + } + // If we are here we did not match, so put the llseq back. + mb.llseq = llseq + } + }) + if hseq < uint64(math.MaxUint64) && sm != nil { + return sm, didLoad, nil } - expireOk := seq == lseq && mb.llseq == seq - - if sl.HasInterest(fsm.subj) { - return fsm, expireOk, nil + } else { + for seq := start; seq <= lseq; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + // Instead we will update it only once in a defer. + updateLLTS = true + continue + } + llseq := mb.llseq + fsm, err := mb.cacheLookup(seq, sm) + if err != nil { + continue + } + expireOk := seq == lseq && mb.llseq == seq + updateLLTS = false // cacheLookup already updated it. + if sl.HasInterest(fsm.subj) { + return fsm, expireOk, nil + } + // If we are here we did not match, so put the llseq back. + mb.llseq = llseq } - // If we are here we did not match, so put the llseq back. - mb.llseq = llseq } + return nil, didLoad, ErrStoreMsgNotFound } @@ -2252,7 +2324,13 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) // fs lock should be held. func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) { mb.mu.Lock() - defer mb.mu.Unlock() + var updateLLTS bool + defer func() { + if updateLLTS { + mb.llts = time.Now().UnixNano() + } + mb.mu.Unlock() + }() fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter} @@ -2364,6 +2442,12 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor } for seq := fseq; seq <= lseq; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + // Instead we will update it only once in a defer. + updateLLTS = true + continue + } llseq := mb.llseq fsm, err := mb.cacheLookup(seq, sm) if err != nil { @@ -2372,6 +2456,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor } continue } + updateLLTS = false // cacheLookup already updated it. expireOk := seq == lseq && mb.llseq == seq if isAll { return fsm, expireOk, nil @@ -2876,6 +2961,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) // Now check if we need to inspect the seqStart block. // Grab write lock in case we need to load in msgs. mb.mu.Lock() + var updateLLTS bool var shouldExpire bool // We need to walk this block to correct accounting from above. if sseq > mb.first.seq { @@ -2889,10 +2975,16 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) } var smv StoreMsg for seq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + updateLLTS = true + continue + } sm, _ := mb.cacheLookup(seq, &smv) if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { continue } + updateLLTS = false // cacheLookup already updated it. if isMatch(sm.subj) { // If less than sseq adjust off of total as long as this subject matched the last block. if seq < sseq { @@ -2913,6 +3005,9 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) if shouldExpire { mb.tryForceExpireCacheLocked() } + if updateLLTS { + mb.llts = time.Now().UnixNano() + } mb.mu.Unlock() return total, validThrough } @@ -3023,6 +3118,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) } // We need to scan this block. var shouldExpire bool + var updateLLTS bool mb.mu.Lock() // Check if we should include all of this block in adjusting. If so work with metadata. if sseq > atomic.LoadUint64(&mb.last.seq) { @@ -3055,10 +3151,16 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) // We need to walk all messages in this block var smv StoreMsg for seq := atomic.LoadUint64(&mb.first.seq); seq < last; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + updateLLTS = true + continue + } sm, _ := mb.cacheLookup(seq, &smv) if sm == nil || sm.subj == _EMPTY_ { continue } + updateLLTS = false // cacheLookup already updated it. // Check if it matches our filter. if sm.seq < sseq && isMatch(sm.subj) { adjust++ @@ -3069,6 +3171,9 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) if shouldExpire { mb.tryForceExpireCacheLocked() } + if updateLLTS { + mb.llts = time.Now().UnixNano() + } mb.mu.Unlock() } // Make final adjustment. @@ -3109,7 +3214,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // See if filter was provided but its the only subject. if !isAll && fs.psim.Size() == 1 { - fs.psim.Iter(func(subject []byte, _ *psi) bool { + fs.psim.IterFast(func(subject []byte, _ *psi) bool { isAll = sl.HasInterest(bytesToString(subject)) return true }) @@ -3166,6 +3271,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // Grab write lock in case we need to load in msgs. mb.mu.Lock() var shouldExpire bool + var updateLLTS bool // We need to walk this block to correct accounting from above. if sseq > mb.first.seq { // Track the ones we add back in case more than one. @@ -3178,10 +3284,16 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo } var smv StoreMsg for seq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + updateLLTS = true + continue + } sm, _ := mb.cacheLookup(seq, &smv) if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { continue } + updateLLTS = false // cacheLookup already updated it. if isMatch(sm.subj) { // If less than sseq adjust off of total as long as this subject matched the last block. if seq < sseq { @@ -3202,6 +3314,9 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo if shouldExpire { mb.tryForceExpireCacheLocked() } + if updateLLTS { + mb.llts = time.Now().UnixNano() + } mb.mu.Unlock() return total, validThrough } @@ -3229,6 +3344,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo var t uint64 var havePartial bool + var updateLLTS bool IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) { subj := bytesToString(bsubj) if havePartial { @@ -3261,8 +3377,14 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo } var smv StoreMsg for seq, lseq := start, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + updateLLTS = true + continue + } if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { t++ + updateLLTS = false // cacheLookup already updated it. } } } @@ -3270,6 +3392,9 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo if shouldExpire { mb.tryForceExpireCacheLocked() } + if updateLLTS { + mb.llts = time.Now().UnixNano() + } mb.mu.Unlock() total += t } @@ -3314,6 +3439,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo } // We need to scan this block. var shouldExpire bool + var updateLLTS bool mb.mu.Lock() // Check if we should include all of this block in adjusting. If so work with metadata. if sseq > atomic.LoadUint64(&mb.last.seq) { @@ -3345,10 +3471,16 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // We need to walk all messages in this block var smv StoreMsg for seq := atomic.LoadUint64(&mb.first.seq); seq < last; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + updateLLTS = true + continue + } sm, _ := mb.cacheLookup(seq, &smv) if sm == nil || sm.subj == _EMPTY_ { continue } + updateLLTS = false // cacheLookup already updated it. // Check if it matches our filter. if sm.seq < sseq && isMatch(sm.subj) { adjust++ @@ -3359,6 +3491,9 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo if shouldExpire { mb.tryForceExpireCacheLocked() } + if updateLLTS { + mb.llts = time.Now().UnixNano() + } mb.mu.Unlock() } // Make final adjustment. @@ -3992,7 +4127,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { // collect all that are not correct. needAttention := make(map[string]*psi) - fs.psim.Iter(func(subj []byte, psi *psi) bool { + fs.psim.IterFast(func(subj []byte, psi *psi) bool { numMsgs += psi.total if psi.total > maxMsgsPer { needAttention[string(subj)] = psi @@ -4017,7 +4152,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { fs.rebuildStateLocked(nil) // Need to redo blocks that need attention. needAttention = make(map[string]*psi) - fs.psim.Iter(func(subj []byte, psi *psi) bool { + fs.psim.IterFast(func(subj []byte, psi *psi) bool { if psi.total > maxMsgsPer { needAttention[string(subj)] = psi } @@ -6763,6 +6898,57 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store return nil, fs.state.LastSeq, ErrStoreEOF } +// Will load the next non-deleted msg starting at the start sequence and walking backwards. +func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + if fs.closed { + return nil, ErrStoreClosed + } + if fs.state.Msgs == 0 || start < fs.state.FirstSeq { + return nil, ErrStoreEOF + } + + if start > fs.state.LastSeq { + start = fs.state.LastSeq + } + if smp == nil { + smp = new(StoreMsg) + } + + if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 { + for i := bi; i >= 0; i-- { + mb := fs.blks[i] + mb.mu.Lock() + // Need messages loaded from here on out. + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + mb.mu.Unlock() + return nil, err + } + } + + lseq, fseq := atomic.LoadUint64(&mb.last.seq), atomic.LoadUint64(&mb.first.seq) + if start > lseq { + start = lseq + } + for seq := start; seq >= fseq; seq-- { + if mb.dmap.Exists(seq) { + continue + } + if sm, err := mb.cacheLookup(seq, smp); err == nil { + mb.mu.Unlock() + return sm, nil + } + } + mb.mu.Unlock() + } + } + + return nil, ErrStoreEOF +} + // Type returns the type of the underlying store. func (fs *fileStore) Type() StorageType { return FileStorage @@ -7353,7 +7539,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { bytes += mb.bytes // Make sure we do subject cleanup as well. mb.ensurePerSubjectInfoLoaded() - mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + mb.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool { subj := bytesToString(bsubj) for i := uint64(0); i < ss.Msgs; i++ { fs.removePerSubject(subj) @@ -8007,6 +8193,11 @@ func (mb *msgBlock) generatePerSubjectInfo() error { var smv StoreMsg fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) for seq := fseq; seq <= lseq; seq++ { + if mb.dmap.Exists(seq) { + // Optimisation to avoid calling cacheLookup which hits time.Now(). + // It gets set later on if the fss is non-empty anyway. + continue + } sm, err := mb.cacheLookup(seq, &smv) if err != nil { // Since we are walking by sequence we can ignore some errors that are benign to rebuilding our state. @@ -8066,7 +8257,7 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { } // Now populate psim. - mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + mb.fss.IterFast(func(bsubj []byte, ss *SimpleState) bool { if len(bsubj) > 0 { if info, ok := fs.psim.Find(bsubj); ok { info.total += ss.Msgs @@ -9482,8 +9673,15 @@ var dios chan struct{} // Used to setup our simplistic counting semaphore using buffered channels. // golang.org's semaphore seemed a bit heavy. func init() { - // Limit ourselves to a max of 4 blocking IO calls. - const nIO = 4 + // Limit ourselves to a sensible number of blocking I/O calls. Range between + // 4-16 concurrent disk I/Os based on CPU cores, or 50% of cores if greater + // than 32 cores. + mp := runtime.GOMAXPROCS(-1) + nIO := min(16, max(4, mp)) + if mp > 32 { + // If the system has more than 32 cores then limit dios to 50% of cores. + nIO = max(16, min(mp, mp/2)) + } dios = make(chan struct{}, nIO) // Fill it up to start. for i := 0; i < nIO; i++ { diff --git a/server/jetstream.go b/server/jetstream.go index c1e709a19ef..d52d4b2d238 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1501,12 +1501,14 @@ func (a *Account) filteredStreams(filter string) []*stream { var msets []*stream for _, mset := range jsa.streams { if filter != _EMPTY_ { + mset.cfgMu.RLock() for _, subj := range mset.cfg.Subjects { if SubjectsCollide(filter, subj) { msets = append(msets, mset) break } } + mset.cfgMu.RUnlock() } else { msets = append(msets, mset) } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index e7fb21c1a17..3feeacf04de 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -765,7 +765,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub s, rr := js.srv, js.apiSubs.Match(subject) hdr, msg := c.msgParts(rmsg) - if len(getHeader(ClientInfoHdr, hdr)) == 0 { + if len(sliceHeader(ClientInfoHdr, hdr)) == 0 { // Check if this is the system account. We will let these through for the account info only. sacc := s.SystemAccount() if sacc != acc { @@ -1008,7 +1008,7 @@ func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Ac var ci ClientInfo if len(hdr) > 0 { - if err := json.Unmarshal(getHeader(ClientInfoHdr, hdr), &ci); err != nil { + if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil { return nil, nil, nil, nil, err } } @@ -4271,7 +4271,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, // Since these could wait on the Raft group lock, don't do so under the JS lock. ourID := meta.ID() - groupLeader := meta.GroupLeader() + groupLeaderless := meta.Leaderless() groupCreated := meta.Created() js.mu.RLock() @@ -4289,7 +4289,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, // Also capture if we think there is no meta leader. var isLeaderLess bool if !isLeader { - isLeaderLess = groupLeader == _EMPTY_ && time.Since(groupCreated) > lostQuorumIntervalDefault + isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault } js.mu.RUnlock() @@ -4376,7 +4376,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, return } // If we are a member and we have a group leader or we had a previous leader consider bailing out. - if node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader() { + if !node.Leaderless() || node.HadPreviousLeader() { if leaderNotPartOfGroup { resp.Error = NewJSConsumerOfflineError() s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil) diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index faabb8b5ff7..e894dfdb150 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -424,6 +424,8 @@ func BenchmarkJetStreamConsumeWithFilters(b *testing.B) { }{ {1, 1, nats.MemoryStorage}, {3, 3, nats.MemoryStorage}, + {1, 1, nats.FileStorage}, + {3, 3, nats.FileStorage}, } benchmarksCases := []struct { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 57cec3873c8..c7cbe57d021 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -142,6 +142,7 @@ type streamAssignment struct { responded bool recovering bool reassigning bool // i.e. due to placement issues, lack of resources, etc. + resetting bool // i.e. there was an error, and we're stopping and starting the stream err error } @@ -444,108 +445,113 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. -func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { +func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error { js.mu.RLock() s, cc := js.srv, js.cluster if cc == nil { // Non-clustered mode js.mu.RUnlock() - return true + return nil } - - // Pull the group out. - rg := sa.Group - if rg == nil { + if sa == nil || sa.Group == nil { js.mu.RUnlock() - return false + return errors.New("stream assignment or group missing") } - streamName := sa.Config.Name - node := rg.node + node := sa.Group.node js.mu.RUnlock() // First lookup stream and make sure its there. mset, err := acc.lookupStream(streamName) if err != nil { - return false + return errors.New("stream not found") } - // If R1 we are good. - if node == nil { - return true - } + switch { + case mset.cfg.Replicas <= 1: + return nil // No further checks for R=1 streams - // Here we are a replicated stream. - // First make sure our monitor routine is running. - if !mset.isMonitorRunning() { - return false - } + case node == nil: + return errors.New("group node missing") - if node.Healthy() { - // Check if we are processing a snapshot and are catching up. - if !mset.isCatchingUp() { - return true - } - } else { // node != nil - if node != mset.raftNode() { - s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName) - node.Delete() - mset.resetClusteredState(nil) - } + case node != mset.raftNode(): + s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName) + node.Delete() + mset.resetClusteredState(nil) + return errors.New("cluster node skew detected") + + case !mset.isMonitorRunning(): + return errors.New("monitor goroutine not running") + + case !node.Healthy(): + return errors.New("group node unhealthy") + + case mset.isCatchingUp(): + return errors.New("stream catching up") + + default: + return nil } - return false } // isConsumerHealthy will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. -func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) bool { +func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) error { if mset == nil { - return false + return errors.New("stream missing") } - js.mu.RLock() - cc := js.cluster + s, cc := js.srv, js.cluster if cc == nil { // Non-clustered mode js.mu.RUnlock() - return true + return nil } - // These are required. if ca == nil || ca.Group == nil { js.mu.RUnlock() - return false + return errors.New("consumer assignment or group missing") } - s := js.srv - // Capture RAFT node from assignment. node := ca.Group.node js.mu.RUnlock() // Check if not running at all. o := mset.lookupConsumer(consumer) if o == nil { - return false + return errors.New("consumer not found") } - // Check RAFT node state. - if node == nil || node.Healthy() { - return true - } else if node != nil { - if node != o.raftNode() { - mset.mu.RLock() - accName, streamName := mset.acc.GetName(), mset.cfg.Name - mset.mu.RUnlock() - s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer) - node.Delete() - o.deleteWithoutAdvisory() + rc, _ := o.replica() + switch { + case rc <= 1: + return nil // No further checks for R=1 consumers - // When we try to restart we nil out the node and reprocess the consumer assignment. - js.mu.Lock() - ca.Group.node = nil - js.mu.Unlock() - js.processConsumerAssignment(ca) - } + case node == nil: + return errors.New("group node missing") + + case node != o.raftNode(): + mset.mu.RLock() + accName, streamName := mset.acc.GetName(), mset.cfg.Name + mset.mu.RUnlock() + s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer) + node.Delete() + o.deleteWithoutAdvisory() + + // When we try to restart we nil out the node and reprocess the consumer assignment. + js.mu.Lock() + ca.Group.node = nil + js.mu.Unlock() + js.processConsumerAssignment(ca) + return errors.New("cluster node skew detected") + + case !o.isMonitorRunning(): + return errors.New("monitor goroutine not running") + + case !node.Healthy(): + return errors.New("group node unhealthy") + + default: + return nil } - return false } // subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap @@ -819,7 +825,7 @@ func (js *jetStream) isLeaderless() bool { // If we don't have a leader. // Make sure we have been running for enough time. - if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault { + if meta.Leaderless() && time.Since(meta.Created()) > lostQuorumIntervalDefault { return true } return false @@ -851,7 +857,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { node := rg.node js.mu.RUnlock() // If we don't have a leader. - if node.GroupLeader() == _EMPTY_ { + if node.Leaderless() { // Threshold for jetstream startup. const startupThreshold = 10 * time.Second @@ -1067,7 +1073,7 @@ func (js *jetStream) checkForOrphans() { // We only want to cleanup any orphans if we know we are current with the meta-leader. meta := cc.meta - if meta == nil || meta.GroupLeader() == _EMPTY_ { + if meta == nil || meta.Leaderless() { js.mu.Unlock() s.Debugf("JetStream cluster skipping check for orphans, no meta-leader") return @@ -1366,7 +1372,7 @@ func (js *jetStream) monitorCluster() { // If we have a current leader or had one in the past we can cancel this here since the metaleader // will be in charge of all peer state changes. // For cold boot only. - if n.GroupLeader() != _EMPTY_ || n.HadPreviousLeader() { + if !n.Leaderless() || n.HadPreviousLeader() { lt.Stop() continue } @@ -2503,7 +2509,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ce.ReturnToPool() } else { // Our stream was closed out from underneath of us, simply return here. - if err == errStreamClosed { + if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning { + aq.recycle(&ces) return } s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err) @@ -2549,7 +2556,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Always cancel if this was running. stopDirectMonitoring() - } else if n.GroupLeader() != noLeader { + } else if !n.Leaderless() { js.setStreamAssignmentRecovering(sa) } @@ -2913,6 +2920,9 @@ func (mset *stream) resetClusteredState(err error) bool { } s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) + // Mark stream assignment as resetting, so we don't double-account reserved resources. + // But only if we're not also releasing the resources as part of the delete. + sa.resetting = !shouldDelete // Now wipe groups from assignments. sa.Group.node = nil var consumers []*consumerAssignment @@ -3146,7 +3156,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } else if e.Type == EntrySnapshot { if mset == nil { - return nil + continue } // Everything operates on new replicated state. Will convert legacy snapshots to this for processing. @@ -3216,7 +3226,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco mset.stop(true, false) } } - return nil } } return nil @@ -4046,7 +4055,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, js.mu.RLock() s := js.srv node := sa.Group.node - hadLeader := node == nil || node.GroupLeader() != noLeader + hadLeader := node == nil || !node.Leaderless() offline := s.allPeersOffline(sa.Group) var isMetaLeader bool if cc := js.cluster; cc != nil { @@ -5034,7 +5043,6 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea o.stopWithFlags(true, false, false, false) } } - return nil } else if e.Type == EntryAddPeer { // Ignore for now. } else { @@ -8316,13 +8324,13 @@ RETRY: // the semaphore. releaseSyncOutSem() - if n.GroupLeader() == _EMPTY_ { + if n.Leaderless() { // Prevent us from spinning if we've installed a snapshot from a leader but there's no leader online. // We wait a bit to check if a leader has come online in the meantime, if so we can continue. var canContinue bool if numRetries == 0 { time.Sleep(startInterval) - canContinue = n.GroupLeader() != _EMPTY_ + canContinue = !n.Leaderless() } if !canContinue { return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name()) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index bc62a7eca54..0f70db0c0d3 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -20,6 +20,7 @@ import ( "bytes" "context" crand "crypto/rand" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -7175,9 +7176,10 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) { } _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + Retention: nats.InterestPolicy, // Replicated consumers by default }) require_NoError(t, err) _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) @@ -7235,6 +7237,96 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) { checkNodeIsClosed(ca) } +func TestJetStreamClusterPeerRemoveStreamConsumerDesync(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Must have at least one message in the stream. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + acc, err := sl.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + peer := rs.Name() + rn := mset.raftNode() + mset.mu.Lock() + esm := encodeStreamMsgAllowCompress("foo", _EMPTY_, nil, nil, mset.clseq, time.Now().UnixNano(), false) + mset.clseq++ + mset.mu.Unlock() + // Propose both remove peer and a normal entry within the same append entry. + err = rn.ProposeMulti([]*Entry{ + newEntry(EntryRemovePeer, []byte(peer)), + newEntry(EntryNormal, esm), + }) + require_NoError(t, err) + + // If the previous normal entry was skipped, we'd get a seq mismatch error here. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + // Now check the same but for a consumer. + _, err = js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + + cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") + acc, err = cl.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err = acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + updateDeliveredBuffer := func() []byte { + var b [4*binary.MaxVarintLen64 + 1]byte + b[0] = byte(updateDeliveredOp) + n := 1 + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 1) + n += binary.PutVarint(b[n:], time.Now().UnixNano()) + return b[:n] + } + + rs = c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") + peer = rs.Name() + rn = o.raftNode() + // Propose both remove peer and a normal entry within the same append entry. + err = rn.ProposeMulti([]*Entry{ + newEntry(EntryRemovePeer, []byte(peer)), + newEntry(EntryNormal, updateDeliveredBuffer()), + }) + require_NoError(t, err) + + // Check the normal entry was applied. + checkFor(t, 2*time.Second, 500*time.Millisecond, func() error { + o.mu.Lock() + defer o.mu.Unlock() + cs, err := o.store.State() + if err != nil { + return err + } + if cs.Delivered.Consumer != 100 || cs.Delivered.Stream != 100 { + return fmt.Errorf("expected sequence 100, got: %v", cs.Delivered) + } + return nil + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 3b8fd75110b..1c962c25eec 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3583,6 +3583,94 @@ func TestJetStreamConsumerReplicasAfterScale(t *testing.T) { require_Equal(t, len(ci.Cluster.Replicas), 2) } +func TestJetStreamClusterDesyncAfterQuitDuringCatchup(t *testing.T) { + for title, test := range map[string]func(s *Server, rn RaftNode){ + "RAFT": func(s *Server, rn RaftNode) { + rn.Stop() + rn.WaitForStop() + }, + "server": func(s *Server, rn RaftNode) { + s.running.Store(false) + }, + } { + t.Run(title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Wait for all servers to have applied everything up to this point. + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + _, _, applied := mset.raftNode().Progress() + if applied != 1 { + return fmt.Errorf("expected applied to be %d, got %d", 1, applied) + } + } + return nil + }) + + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := rs.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + rn := mset.raftNode() + snap, err := json.Marshal(streamSnapshot{Msgs: 1, Bytes: 1, FirstSeq: 100, LastSeq: 100, Failed: 0, Deleted: nil}) + require_NoError(t, err) + esm := encodeStreamMsgAllowCompress("foo", _EMPTY_, nil, nil, 0, 0, false) + + // Lock stream so that we can go into processSnapshot but must wait for this to unlock. + mset.mu.Lock() + var unlocked bool + defer func() { + if !unlocked { + mset.mu.Unlock() + } + }() + + rn.ApplyQ().push(newCommittedEntry(100, []*Entry{newEntry(EntrySnapshot, snap)})) + rn.ApplyQ().push(newCommittedEntry(101, []*Entry{newEntry(EntryNormal, esm)})) + + // Waiting for the apply queue entry to be captured in monitorStream first. + time.Sleep(time.Second) + + // Set commit to a very high number, just so that we allow upping Applied() + n := rn.(*raft) + n.Lock() + n.commit = 1000 + n.Unlock() + + // Now stop the underlying RAFT node/server so processSnapshot must exit because of it. + test(rs, rn) + mset.mu.Unlock() + unlocked = true + + // Allow some time for the applied number to be updated, in which case it's an error. + time.Sleep(time.Second) + _, _, applied := mset.raftNode().Progress() + require_Equal(t, applied, 1) + }) + } +} + func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { tests := []struct { title string @@ -4192,6 +4280,64 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) } } +func TestJetStreamClusterReservedResourcesAccountingAfterClusterReset(t *testing.T) { + for _, clusterResetErr := range []error{errLastSeqMismatch, errFirstSequenceMismatch} { + t.Run(clusterResetErr.Error(), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + maxBytes := int64(1024 * 1024 * 1024) + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + MaxBytes: maxBytes, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + + mem, store, err := sl.JetStreamReservedResources() + require_NoError(t, err) + require_Equal(t, mem, 0) + require_Equal(t, store, maxBytes) + + acc, err := sl.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + sjs := sl.getJetStream() + rn := mset.raftNode() + sa := mset.streamAssignment() + sjs.mu.RLock() + saGroupNode := sa.Group.node + sjs.mu.RUnlock() + require_NotNil(t, sa) + require_Equal(t, rn, saGroupNode) + + require_True(t, mset.resetClusteredState(clusterResetErr)) + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + sjs.mu.RLock() + defer sjs.mu.RUnlock() + if sa.Group.node == nil || sa.Group.node == saGroupNode { + return errors.New("waiting for reset to complete") + } + return nil + }) + + mem, store, err = sl.JetStreamReservedResources() + require_NoError(t, err) + require_Equal(t, mem, 0) + require_Equal(t, store, maxBytes) + }) + } +} + func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/jetstream_test.go b/server/jetstream_test.go index f5a5624741d..f05f4f99cdd 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -9275,6 +9275,47 @@ func TestJetStreamPullConsumerMaxWaiting(t *testing.T) { } } +func TestJetStreamChangeConsumerType(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"test.*"}}) + require_NoError(t, err) + + // create pull consumer + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "pull", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // cannot update pull -> push + _, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{ + Name: "pull", + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: "foo", + }) + require_Contains(t, err.Error(), "can not update pull consumer to push based") + + // create push consumer + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "push", + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: "foo", + }) + require_NoError(t, err) + + // cannot change push -> pull + _, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{ + Name: "push", + AckPolicy: nats.AckExplicitPolicy, + }) + require_Contains(t, err.Error(), "can not update push consumer to pull based") +} + //////////////////////////////////////// // Benchmark placeholders // TODO(dlc) - move diff --git a/server/memstore.go b/server/memstore.go index 350cfa388e9..0209d00c06f 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -92,7 +92,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { // If the value is smaller, or was unset before, we need to enforce that. if ms.maxp > 0 && (maxp == 0 || ms.maxp < maxp) { lm := uint64(ms.maxp) - ms.fss.Iter(func(subj []byte, ss *SimpleState) bool { + ms.fss.IterFast(func(subj []byte, ss *SimpleState) bool { if ss.Msgs > lm { ms.enforcePerSubjectLimit(bytesToString(subj), ss) } @@ -1299,6 +1299,33 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store return nil, ms.state.LastSeq, ErrStoreEOF } +// Will load the next non-deleted msg starting at the start sequence and walking backwards. +func (ms *memStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + + if ms.msgs == nil { + return nil, ErrStoreClosed + } + if ms.state.Msgs == 0 || start < ms.state.FirstSeq { + return nil, ErrStoreEOF + } + if start > ms.state.LastSeq { + start = ms.state.LastSeq + } + + for seq := start; seq >= ms.state.FirstSeq; seq-- { + if sm, ok := ms.msgs[seq]; ok { + if smp == nil { + smp = new(StoreMsg) + } + sm.copy(smp) + return smp, nil + } + } + return nil, ErrStoreEOF +} + // RemoveMsg will remove the message from this store. // Will return the number of bytes removed. func (ms *memStore) RemoveMsg(seq uint64) (bool, error) { diff --git a/server/monitor.go b/server/monitor.go index 77a6c1fe71a..43e3d4061b7 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1122,20 +1122,16 @@ func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, buf[:n]) } -type monitorIPQueue struct { +type IpqueueszStatusIPQ struct { Pending int `json:"pending"` InProgress int `json:"in_progress,omitempty"` } -func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { - all, err := decodeBool(w, r, "all") - if err != nil { - return - } - qfilter := r.URL.Query().Get("queues") - - queues := map[string]monitorIPQueue{} +type IpqueueszStatus map[string]IpqueueszStatusIPQ +func (s *Server) Ipqueuesz(opts *IpqueueszOptions) *IpqueueszStatus { + all, qfilter := opts.All, opts.Filter + queues := IpqueueszStatus{} s.ipQueues.Range(func(k, v any) bool { var pending, inProgress int name := k.(string) @@ -1152,9 +1148,23 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) { return true } - queues[name] = monitorIPQueue{Pending: pending, InProgress: inProgress} + queues[name] = IpqueueszStatusIPQ{Pending: pending, InProgress: inProgress} return true }) + return &queues +} + +func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { + all, err := decodeBool(w, r, "all") + if err != nil { + return + } + qfilter := r.URL.Query().Get("queues") + + queues := s.Ipqueuesz(&IpqueueszOptions{ + All: all, + Filter: qfilter, + }) b, _ := json.MarshalIndent(queues, "", " ") ResponseHandler(w, r, b) @@ -2760,6 +2770,18 @@ type ProfilezOptions struct { Duration time.Duration `json:"duration,omitempty"` } +// IpqueueszOptions are options passed to Ipqueuesz +type IpqueueszOptions struct { + All bool `json:"all"` + Filter string `json:"filter"` +} + +// RaftzOptions are options passed to Raftz +type RaftzOptions struct { + AccountFilter string `json:"account"` + GroupFilter string `json:"group"` +} + // StreamDetail shows information about the stream state and its consumers. type StreamDetail struct { Name string `json:"name"` @@ -3676,27 +3698,27 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { for stream, sa := range asa { // Make sure we can look up - if !js.isStreamHealthy(acc, sa) { + if err := js.isStreamHealthy(acc, sa); err != nil { if !details { health.Status = na - health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream) + health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err) return health } health.Errors = append(health.Errors, HealthzError{ Type: HealthzErrorStream, Account: accName, Stream: stream, - Error: fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream), + Error: fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err), }) continue } mset, _ := acc.lookupStream(stream) // Now check consumers. for consumer, ca := range sa.consumers { - if !js.isConsumerHealthy(mset, consumer, ca) { + if err := js.isConsumerHealthy(mset, consumer, ca); err != nil { if !details { health.Status = na - health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) + health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err) return health } health.Errors = append(health.Errors, HealthzError{ @@ -3704,7 +3726,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { Account: accName, Stream: stream, Consumer: consumer, - Error: fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer), + Error: fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err), }) } } @@ -3813,6 +3835,8 @@ type RaftzGroupPeer struct { LastSeen string `json:"last_seen,omitempty"` } +type RaftzStatus map[string]map[string]RaftzGroup + func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { if s.raftNodes == nil { w.WriteHeader(404) @@ -3820,20 +3844,34 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { return } - gfilter := r.URL.Query().Get("group") - afilter := r.URL.Query().Get("acc") + groups := s.Raftz(&RaftzOptions{ + AccountFilter: r.URL.Query().Get("acc"), + GroupFilter: r.URL.Query().Get("group"), + }) + + if groups == nil { + w.WriteHeader(404) + w.Write([]byte("No Raft nodes returned, check supplied filters")) + return + } + + b, _ := json.MarshalIndent(groups, "", " ") + ResponseHandler(w, r, b) +} + +func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus { + afilter, gfilter := opts.AccountFilter, opts.GroupFilter + if afilter == _EMPTY_ { if sys := s.SystemAccount(); sys != nil { afilter = sys.Name } else { - w.WriteHeader(404) - w.Write([]byte("System account not found, the server may be shutting down")) - return + return nil } } groups := map[string]RaftNode{} - infos := map[string]map[string]RaftzGroup{} // account -> group ID + infos := RaftzStatus{} // account -> group ID s.rnMu.RLock() if gfilter != _EMPTY_ { @@ -3859,12 +3897,6 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { } s.rnMu.RUnlock() - if len(groups) == 0 { - w.WriteHeader(404) - w.Write([]byte("No Raft nodes found, does the specified account/group exist?")) - return - } - for name, rg := range groups { n, ok := rg.(*raft) if n == nil || !ok { @@ -3887,7 +3919,7 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { Applied: n.applied, CatchingUp: n.catchup != nil, Leader: n.leader, - EverHadLeader: n.pleader, + EverHadLeader: n.pleader.Load(), Term: n.term, Vote: n.vote, PTerm: n.pterm, @@ -3918,6 +3950,5 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { infos[n.accName][name] = info } - b, _ := json.MarshalIndent(infos, "", " ") - ResponseHandler(w, r, b) + return &infos } diff --git a/server/monitor_test.go b/server/monitor_test.go index 7003de12316..fa9ffa7ee68 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4044,7 +4044,7 @@ func TestMonitorAccountz(t *testing.T) { body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath))) require_Contains(t, body, `"account_detail": {`) require_Contains(t, body, `"account_name": "$SYS",`) - require_Contains(t, body, `"subscriptions": 52,`) + require_Contains(t, body, `"subscriptions": 56,`) require_Contains(t, body, `"is_system": true,`) require_Contains(t, body, `"system_account": "$SYS"`) @@ -5384,10 +5384,11 @@ func TestIpqzWithGenerics(t *testing.T) { body := readBody(t, url) require_True(t, len(body) > 0) - queues := map[string]*monitorIPQueue{} + queues := IpqueueszStatus{} require_NoError(t, json.Unmarshal(body, &queues)) require_True(t, len(queues) >= 4) - require_True(t, queues["SendQ"] != nil) + _, ok := queues["SendQ"] + require_True(t, ok) } func TestVarzSyncInterval(t *testing.T) { diff --git a/server/norace_test.go b/server/norace_test.go index cb6bd56a011..27f0c667078 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -9004,6 +9004,7 @@ func TestNoRaceStoreStreamEncoderDecoder(t *testing.T) { } ms, err := newMemStore(cfg) require_NoError(t, err) + defer ms.Stop() fs, err := newFileStore( FileStoreConfig{StoreDir: t.TempDir()}, @@ -11288,3 +11289,70 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) { require_NoError(t, n.InstallSnapshot(snap)) t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap))) } + +func TestNoRaceStoreReverseWalkWithDeletesPerf(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage} + + fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg) + require_NoError(t, err) + defer fs.Stop() + + cfg.Storage = MemoryStorage + ms, err := newMemStore(&cfg) + require_NoError(t, err) + defer ms.Stop() + + msg := []byte("Hello") + + for _, store := range []StreamStore{fs, ms} { + store.StoreMsg("foo.A", nil, msg) + for i := 0; i < 1_000_000; i++ { + store.StoreMsg("foo.B", nil, msg) + } + store.StoreMsg("foo.C", nil, msg) + + var ss StreamState + store.FastState(&ss) + require_Equal(t, ss.Msgs, 1_000_002) + + // Create a bunch of interior deletes. + p, err := store.PurgeEx("foo.B", 1, 0) + require_NoError(t, err) + require_Equal(t, p, 1_000_000) + + // Now simulate a walk backwards as we currently do when searching for starting sequence numbers in sourced streams. + start := time.Now() + var smv StoreMsg + for seq := ss.LastSeq; seq > 0; seq-- { + _, err := store.LoadMsg(seq, &smv) + if err == errDeletedMsg || err == ErrStoreMsgNotFound { + continue + } + require_NoError(t, err) + } + elapsed := time.Since(start) + + // Now use the optimized load prev. + seq, seen := ss.LastSeq, 0 + start = time.Now() + for { + sm, err := store.LoadPrevMsg(seq, &smv) + if err == ErrStoreEOF { + break + } + require_NoError(t, err) + seq = sm.seq - 1 + seen++ + } + elapsedNew := time.Since(start) + require_Equal(t, seen, 2) + + switch store.(type) { + case *memStore: + require_True(t, elapsedNew < elapsed) + case *fileStore: + // Bigger gains for filestore, 10x + require_True(t, elapsedNew*10 < elapsed) + } + } +} diff --git a/server/raft.go b/server/raft.go index 427e8ce6773..306c9790871 100644 --- a/server/raft.go +++ b/server/raft.go @@ -52,6 +52,7 @@ type RaftNode interface { Current() bool Healthy() bool Term() uint64 + Leaderless() bool GroupLeader() string HadPreviousLeader() bool StepDown(preferred ...string) error @@ -174,9 +175,10 @@ type raft struct { c *client // Internal client for subscriptions js *jetStream // JetStream, if running, to see if we are out of resources - dflag bool // Debug flag - pleader bool // Has the group ever had a leader? - observer bool // The node is observing, i.e. not participating in voting + dflag bool // Debug flag + hasleader atomic.Bool // Is there a group leader right now? + pleader atomic.Bool // Has the group ever had a leader? + observer bool // The node is observing, i.e. not participating in voting extSt extensionState // Extension state @@ -830,7 +832,7 @@ func (n *raft) AdjustBootClusterSize(csz int) error { n.Lock() defer n.Unlock() - if n.leader != noLeader || n.pleader { + if n.leader != noLeader || n.pleader.Load() { return errAdjustBootCluster } // Same floor as bootstrap. @@ -1386,9 +1388,7 @@ func (n *raft) Healthy() bool { // HadPreviousLeader indicates if this group ever had a leader. func (n *raft) HadPreviousLeader() bool { - n.RLock() - defer n.RUnlock() - return n.pleader + return n.pleader.Load() } // GroupLeader returns the current leader of the group. @@ -1401,6 +1401,17 @@ func (n *raft) GroupLeader() string { return n.leader } +// Leaderless is a lockless way of finding out if the group has a +// leader or not. Use instead of GroupLeader in hot paths. +func (n *raft) Leaderless() bool { + if n == nil { + return true + } + // Negated because we want the default state of hasLeader to be + // false until the first setLeader() call. + return !n.hasleader.Load() +} + // Guess the best next leader. Stepdown will check more thoroughly. // Lock should be held. func (n *raft) selectNextLeader() string { @@ -3146,8 +3157,9 @@ func (n *raft) resetWAL() { // Lock should be held func (n *raft) updateLeader(newLeader string) { n.leader = newLeader - if !n.pleader && newLeader != noLeader { - n.pleader = true + n.hasleader.Store(newLeader != _EMPTY_) + if !n.pleader.Load() && newLeader != noLeader { + n.pleader.Store(true) } } diff --git a/server/store.go b/server/store.go index 2d72f694740..bfb5aec18f2 100644 --- a/server/store.go +++ b/server/store.go @@ -91,6 +91,7 @@ type StreamStore interface { LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error) + LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) RemoveMsg(seq uint64) (bool, error) EraseMsg(seq uint64) (bool, error) Purge() (uint64, error) diff --git a/server/stream.go b/server/stream.go index a2883631d48..836c90b8a1f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -660,15 +660,25 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } // Set our stream assignment if in clustered mode. + reserveResources := true if sa != nil { mset.setStreamAssignment(sa) + + // If the stream is resetting we must not double-account resources, they were already accounted for. + js.mu.Lock() + if sa.resetting { + reserveResources, sa.resetting = false, false + } + js.mu.Unlock() } // Setup our internal send go routine. mset.setupSendCapabilities() // Reserve resources if MaxBytes present. - mset.js.reserveStreamResources(&mset.cfg) + if reserveResources { + mset.js.reserveStreamResources(&mset.cfg) + } // Call directly to set leader if not in clustered mode. // This can be called though before we actually setup clustering, so check both. @@ -3488,16 +3498,21 @@ func (mset *stream) setStartingSequenceForSources(iNames map[string]struct{}) { } var smv StoreMsg - for seq := state.LastSeq; seq >= state.FirstSeq; seq-- { - sm, err := mset.store.LoadMsg(seq, &smv) - if err != nil || len(sm.hdr) == 0 { + for seq := state.LastSeq; seq >= state.FirstSeq; { + sm, err := mset.store.LoadPrevMsg(seq, &smv) + if err == ErrStoreEOF || err != nil { + break + } + seq = sm.seq - 1 + if len(sm.hdr) == 0 { continue } + ss := getHeader(JSStreamSource, sm.hdr) if len(ss) == 0 { continue } - streamName, indexName, sseq := streamAndSeq(string(ss)) + streamName, indexName, sseq := streamAndSeq(bytesToString(ss)) if _, ok := iNames[indexName]; ok { si := mset.sources[indexName] @@ -3603,9 +3618,13 @@ func (mset *stream) startingSequenceForSources() { } var smv StoreMsg - for seq := state.LastSeq; seq >= state.FirstSeq; seq-- { - sm, err := mset.store.LoadMsg(seq, &smv) - if err != nil || sm == nil || len(sm.hdr) == 0 { + for seq := state.LastSeq; ; { + sm, err := mset.store.LoadPrevMsg(seq, &smv) + if err == ErrStoreEOF || err != nil { + break + } + seq = sm.seq - 1 + if len(sm.hdr) == 0 { continue } ss := getHeader(JSStreamSource, sm.hdr) @@ -3613,7 +3632,7 @@ func (mset *stream) startingSequenceForSources() { continue } - streamName, iName, sseq := streamAndSeq(string(ss)) + streamName, iName, sseq := streamAndSeq(bytesToString(ss)) if iName == _EMPTY_ { // Pre-2.10 message header means it's a match for any source using that stream name for _, ssi := range mset.cfg.Sources { if streamName == ssi.Name || (ssi.External != nil && streamName == ssi.Name+":"+getHash(ssi.External.ApiPrefix)) { diff --git a/server/stree/stree.go b/server/stree/stree.go index 828631888f9..25ab06221fe 100644 --- a/server/stree/stree.go +++ b/server/stree/stree.go @@ -124,13 +124,22 @@ func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) { t.match(t.root, parts, _pre[:0], cb) } -// Iter will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk. -func (t *SubjectTree[T]) Iter(cb func(subject []byte, val *T) bool) { +// IterOrdered will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk. +func (t *SubjectTree[T]) IterOrdered(cb func(subject []byte, val *T) bool) { if t == nil || t.root == nil { return } var _pre [256]byte - t.iter(t.root, _pre[:0], cb) + t.iter(t.root, _pre[:0], true, cb) +} + +// IterFast will walk all entries in the SubjectTree with no guarantees of ordering. The callback can return false to terminate the walk. +func (t *SubjectTree[T]) IterFast(cb func(subject []byte, val *T) bool) { + if t == nil || t.root == nil { + return + } + var _pre [256]byte + t.iter(t.root, _pre[:0], false, cb) } // Internal methods @@ -369,7 +378,7 @@ func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subje } // Interal iter function to walk nodes in lexigraphical order. -func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T) bool) bool { +func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject []byte, val *T) bool) bool { if n.isLeaf() { ln := n.(*leaf[T]) return cb(append(pre, ln.suffix...), &ln.value) @@ -378,6 +387,19 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T bn := n.base() // Note that this append may reallocate, but it doesn't modify "pre" at the "iter" callsite. pre = append(pre, bn.prefix...) + // Not everything requires lexicographical sorting, so support a fast path for iterating in + // whatever order the stree has things stored instead. + if !ordered { + for _, cn := range n.children() { + if cn == nil { + continue + } + if !t.iter(cn, pre, false, cb) { + return false + } + } + return true + } // Collect nodes since unsorted. var _nodes [256]node nodes := _nodes[:0] @@ -390,7 +412,7 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T slices.SortStableFunc(nodes, func(a, b node) int { return bytes.Compare(a.path(), b.path()) }) // Now walk the nodes in order and call into next iter. for i := range nodes { - if !t.iter(nodes[i], pre, cb) { + if !t.iter(nodes[i], pre, true, cb) { return false } } diff --git a/server/stree/stree_test.go b/server/stree/stree_test.go index 8bf24181d2a..4e62607c249 100644 --- a/server/stree/stree_test.go +++ b/server/stree/stree_test.go @@ -465,7 +465,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) { seen++ }) // Now check via walk to make sure we are right. - st.Iter(func(subject []byte, v *int) bool { + st.IterOrdered(func(subject []byte, v *int) bool { tokens := strings.Split(string(subject), ".") require_Equal(t, len(tokens), 3) if tokens[1] == "2" { @@ -479,7 +479,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) { st.Match(b("*.*.222"), func(_ []byte, _ *int) { seen++ }) - st.Iter(func(subject []byte, v *int) bool { + st.IterOrdered(func(subject []byte, v *int) bool { tokens := strings.Split(string(subject), ".") require_Equal(t, len(tokens), 3) if tokens[2] == "222" { @@ -490,7 +490,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) { require_Equal(t, seen, verified) } -func TestSubjectTreeIter(t *testing.T) { +func TestSubjectTreeIterOrdered(t *testing.T) { st := NewSubjectTree[int]() st.Insert(b("foo.bar.A"), 1) st.Insert(b("foo.bar.B"), 2) @@ -531,12 +531,53 @@ func TestSubjectTreeIter(t *testing.T) { return true } // Kick in the iter. - st.Iter(walk) + st.IterOrdered(walk) require_Equal(t, received, len(checkOrder)) // Make sure we can terminate properly. received = 0 - st.Iter(func(subject []byte, v *int) bool { + st.IterOrdered(func(subject []byte, v *int) bool { + received++ + return received != 4 + }) + require_Equal(t, received, 4) +} + +func TestSubjectTreeIterFast(t *testing.T) { + st := NewSubjectTree[int]() + st.Insert(b("foo.bar.A"), 1) + st.Insert(b("foo.bar.B"), 2) + st.Insert(b("foo.bar.C"), 3) + st.Insert(b("foo.baz.A"), 11) + st.Insert(b("foo.baz.B"), 22) + st.Insert(b("foo.baz.C"), 33) + st.Insert(b("foo.bar"), 42) + + checkValMap := map[string]int{ + "foo.bar.A": 1, + "foo.bar.B": 2, + "foo.bar.C": 3, + "foo.baz.A": 11, + "foo.baz.B": 22, + "foo.baz.C": 33, + "foo.bar": 42, + } + var received int + walk := func(subject []byte, v *int) bool { + received++ + require_True(t, v != nil) + if expected := checkValMap[string(subject)]; expected != *v { + t.Fatalf("Expected %q to have value of %d, but got %d", subject, expected, *v) + } + return true + } + // Kick in the iter. + st.IterFast(walk) + require_Equal(t, received, len(checkValMap)) + + // Make sure we can terminate properly. + received = 0 + st.IterFast(func(subject []byte, v *int) bool { received++ return received != 4 }) @@ -710,7 +751,7 @@ func TestSubjectTreeIterPerf(t *testing.T) { start := time.Now() count := 0 - st.Iter(func(_ []byte, _ *int) bool { + st.IterOrdered(func(_ []byte, _ *int) bool { count++ return true }) diff --git a/server/stree/util.go b/server/stree/util.go index 108f78fda92..1581f03b49c 100644 --- a/server/stree/util.go +++ b/server/stree/util.go @@ -55,11 +55,3 @@ func pivot[N position](subject []byte, pos N) byte { } return subject[pos] } - -// TODO(dlc) - Can be removed with Go 1.21 once server is on Go 1.22. -func min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/server/sublist.go b/server/sublist.go index b7650ede6f2..a22ec051ea9 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1744,7 +1744,13 @@ func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj [ func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) { if r.numNodes() == 0 { - st.Match(subj, cb) + // For wildcards we can't avoid Match, but if it's a literal subject at + // this point, using Find is considerably cheaper. + if subjectHasWildcard(bytesToString(subj)) { + st.Match(subj, cb) + } else if e, ok := st.Find(subj); ok { + cb(subj, e) + } return } nsubj := subj diff --git a/server/websocket.go b/server/websocket.go index 69e6e1a9a70..9e714bcf0b4 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1311,6 +1311,9 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { } if usz <= wsCompressThreshold { compress = false + if cp := c.ws.compressor; cp != nil { + cp.Reset(nil) + } } } if compress && len(nb) > 0 { @@ -1331,13 +1334,11 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { for len(b) > 0 { n, err := cp.Write(b) if err != nil { - if err == io.EOF { - break - } - c.Errorf("Error during compression: %v", err) - c.markConnAsClosed(WriteError) - nbPoolPut(b) - return nil, 0 + // Whatever this error is, it'll be handled by the cp.Flush() + // call below, as the same error will be returned there. + // Let the outer loop return all the buffers back to the pool + // and fall through naturally. + break } b = b[n:] } @@ -1346,6 +1347,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if err := cp.Flush(); err != nil { c.Errorf("Error during compression: %v", err) c.markConnAsClosed(WriteError) + cp.Reset(nil) return nil, 0 } b := buf.Bytes() @@ -1461,6 +1463,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { bufs = append(bufs, c.ws.closeMsg) c.ws.fs += int64(len(c.ws.closeMsg)) c.ws.closeMsg = nil + c.ws.compressor = nil } c.ws.frames = nil return bufs, c.ws.fs