Skip to content

Commit

Permalink
Cherry-picks for 2.10.22-RC.1 (#5979)
Browse files Browse the repository at this point in the history
Includes the following:

- #5944
- #5945
- #5939
- #5935
- #5960
- #5970
- #5971
- #5963
- #5973
- #5978

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Oct 9, 2024
2 parents d3a8868 + 660036d commit 7f381e0
Show file tree
Hide file tree
Showing 17 changed files with 696 additions and 160 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ module github.com/nats-io/nats-server/v2
go 1.21.0

require (
github.com/klauspost/compress v1.17.9
github.com/klauspost/compress v1.17.10
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.27.0
golang.org/x/sys v0.25.0
golang.org/x/time v0.6.0
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.28.0
golang.org/x/sys v0.26.0
golang.org/x/time v0.7.0
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
Expand All @@ -18,14 +18,14 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Server Options:
-c, --config <file> Configuration file
-t Test configuration and exit
-sl,--signal <signal>[=<pid>] Send signal to nats-server process (ldm, stop, quit, term, reopen, reload)
pid> can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid)
<pid> can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid)
--client_advertise <string> Client URL to advertise to other servers
--ports_file_dir <dir> Creates a ports file in the specified directory (<executable_name>_<pid>.ports).
Expand Down
71 changes: 39 additions & 32 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ type consumer struct {
dseq uint64 // delivered consumer sequence
adflr uint64 // ack delivery floor
asflr uint64 // ack store floor
chkflr uint64 // our check floor, interest streams only.
npc int64 // Num Pending Count
npf uint64 // Num Pending Floor Sequence
dsubj string
Expand Down Expand Up @@ -2918,28 +2919,6 @@ func (o *consumer) isFiltered() bool {
return false
}

// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}

// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
Expand Down Expand Up @@ -5512,16 +5491,24 @@ func (o *consumer) isMonitorRunning() bool {
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() error {
func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState || o.store == nil {
if o.closed || !shouldProcessState || o.store == nil || ss == nil {
o.mu.RUnlock()
return nil
}
store := mset.store
state, err := o.store.State()

filters, subjf, filter := o.filters, o.subjf, _EMPTY_
var wc bool
if filters == nil && subjf != nil {
filter, wc = subjf[0].subject, subjf[0].hasWildcard
}
chkfloor := o.chkflr
o.mu.RUnlock()

if err != nil {
Expand All @@ -5534,26 +5521,46 @@ func (o *consumer) checkStateForInterestStream() error {
return nil
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

// Check if the underlying stream's last sequence is less than our floor.
// This can happen if the stream has been reset and has not caught up yet.
if asflr > ss.LastSeq {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
if o.matchAck(seq) {
var smv StoreMsg
var seq, nseq uint64
// Start at first stream seq or a previous check floor, whichever is higher.
// Note this will really help for interest retention, with WQ the loadNextMsg
// gets us a long way already since it will skip deleted msgs not for our filter.
fseq := ss.FirstSeq
if chkfloor > fseq {
fseq = chkfloor
}

for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
} else {
_, nseq, err = store.LoadNextMsg(filter, wc, seq, &smv)
}
// if we advanced sequence update our seq. This can be on no error and EOF.
if nseq > seq {
seq = nseq
}
// Only ack though if no error and seq <= ack floor.
if err == nil && seq <= asflr {
mset.ackMsg(o, seq)
}
}

o.mu.RLock()
o.mu.Lock()
// Update our check floor.
if seq > o.chkflr {
o.chkflr = seq
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
o.mu.RUnlock()
o.mu.Unlock()

// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if state != nil && len(state.Pending) > 0 && state.AckFloor.Stream > 0 {
Expand Down
Loading

0 comments on commit 7f381e0

Please sign in to comment.