Skip to content

Commit acb2ee8

Browse files
authored
Cherry-picks for 2.10.17 (#5598)
Includes: * #5593 * #5597 * #5600 Signed-off-by: Neil Twigg <[email protected]>
2 parents cc0ec6a + b8df1ea commit acb2ee8

File tree

4 files changed

+52
-13
lines changed

4 files changed

+52
-13
lines changed

server/client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,7 @@ func (c *client) applyAccountLimits() {
847847
c.msubs = jwt.NoLimit
848848
if c.opts.JWT != _EMPTY_ { // user jwt implies account
849849
if uc, _ := jwt.DecodeUserClaims(c.opts.JWT); uc != nil {
850-
c.mpay = int32(uc.Limits.Payload)
850+
atomic.StoreInt32(&c.mpay, int32(uc.Limits.Payload))
851851
c.msubs = int32(uc.Limits.Subs)
852852
if uc.IssuerAccount != _EMPTY_ && uc.IssuerAccount != uc.Issuer {
853853
if scope, ok := c.acc.signingKeys[uc.Issuer]; ok {
@@ -5427,7 +5427,7 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
54275427

54285428
if genid := atomic.LoadUint64(&sl.genid); genid != pac.genid {
54295429
ok = false
5430-
delete(c.in.pacache, bytesToString(c.pa.pacache))
5430+
c.in.pacache = make(map[string]*perAccountCache)
54315431
} else {
54325432
acc = pac.acc
54335433
r = pac.results

server/filestore.go

+29-7
Original file line numberDiff line numberDiff line change
@@ -7635,10 +7635,23 @@ func (fs *fileStore) Delete() error {
76357635
os.RemoveAll(pdir)
76367636
}
76377637

7638-
// Do Purge() since if we have lots of blocks uses a mv/rename.
7639-
fs.Purge()
7638+
// Quickly close all blocks and simulate a purge w/o overhead an new write block.
7639+
fs.mu.Lock()
7640+
for _, mb := range fs.blks {
7641+
mb.dirtyClose()
7642+
}
7643+
dmsgs := fs.state.Msgs
7644+
dbytes := int64(fs.state.Bytes)
7645+
fs.state.Msgs, fs.state.Bytes = 0, 0
7646+
fs.blks = nil
7647+
cb := fs.scb
7648+
fs.mu.Unlock()
7649+
7650+
if cb != nil {
7651+
cb(-int64(dmsgs), -dbytes, 0, _EMPTY_)
7652+
}
76407653

7641-
if err := fs.stop(false); err != nil {
7654+
if err := fs.stop(true, false); err != nil {
76427655
return err
76437656
}
76447657

@@ -7654,14 +7667,19 @@ func (fs *fileStore) Delete() error {
76547667
// Do this in separate Go routine in case lots of blocks.
76557668
// Purge above protects us as does the removal of meta artifacts above.
76567669
go func() {
7670+
<-dios
76577671
err := os.RemoveAll(ndir)
7672+
dios <- struct{}{}
76587673
if err == nil {
76597674
return
76607675
}
76617676
ttl := time.Now().Add(time.Second)
76627677
for time.Now().Before(ttl) {
76637678
time.Sleep(10 * time.Millisecond)
7664-
if err = os.RemoveAll(ndir); err == nil {
7679+
<-dios
7680+
err = os.RemoveAll(ndir)
7681+
dios <- struct{}{}
7682+
if err == nil {
76657683
return
76667684
}
76677685
}
@@ -7927,11 +7945,11 @@ func (fs *fileStore) _writeFullState(force bool) error {
79277945

79287946
// Stop the current filestore.
79297947
func (fs *fileStore) Stop() error {
7930-
return fs.stop(true)
7948+
return fs.stop(false, true)
79317949
}
79327950

79337951
// Stop the current filestore.
7934-
func (fs *fileStore) stop(writeState bool) error {
7952+
func (fs *fileStore) stop(delete, writeState bool) error {
79357953
fs.mu.Lock()
79367954
if fs.closed || fs.closing {
79377955
fs.mu.Unlock()
@@ -7982,7 +8000,11 @@ func (fs *fileStore) stop(writeState bool) error {
79828000
fs.cmu.Unlock()
79838001

79848002
for _, o := range cfs {
7985-
o.Stop()
8003+
if delete {
8004+
o.StreamDelete()
8005+
} else {
8006+
o.Stop()
8007+
}
79868008
}
79878009

79888010
if bytes > 0 && cb != nil {

server/jetstream_cluster.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4109,7 +4109,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
41094109
sa.consumers = make(map[string]*consumerAssignment)
41104110
} else if oca := sa.consumers[ca.Name]; oca != nil {
41114111
wasExisting = true
4112-
// Copy over private existing state from former SA.
4112+
// Copy over private existing state from former CA.
41134113
if ca.Group != nil {
41144114
ca.Group.node = oca.Group.node
41154115
}
@@ -4231,6 +4231,7 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
42314231
if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name {
42324232
needDelete = true
42334233
oca.deleted = true
4234+
oca.Group.node = nil
42344235
delete(sa.consumers, ca.Name)
42354236
}
42364237
}
@@ -7760,7 +7761,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
77607761
if err == nil {
77617762
err = NewJSAccountResourcesExceededError()
77627763
}
7763-
s.RateLimitWarnf(err.Error())
7764+
s.RateLimitWarnf("JetStream account limits exceeded for '%s': %s", jsa.acc().GetName(), err.Error())
77647765
if canRespond {
77657766
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
77667767
resp.Error = err

server/raft.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ var (
275275
errLeaderLen = fmt.Errorf("raft: leader should be exactly %d bytes", idLen)
276276
errTooManyEntries = errors.New("raft: append entry can contain a max of 64k entries")
277277
errBadAppendEntry = errors.New("raft: append entry corrupt")
278+
errNoInternalClient = errors.New("raft: no internal client")
278279
)
279280

280281
// This will bootstrap a raftNode by writing its config into the store directory.
@@ -1625,6 +1626,13 @@ func (n *raft) shutdown(shouldDelete bool) {
16251626
// allowing shutdown() to be called again. If that happens then the below
16261627
// close(n.quit) will panic from trying to close an already-closed channel.
16271628
if n.state.Swap(int32(Closed)) == int32(Closed) {
1629+
// If we get called again with shouldDelete, in case we were called first with Stop() cleanup
1630+
if shouldDelete {
1631+
if wal := n.wal; wal != nil {
1632+
wal.Delete()
1633+
}
1634+
os.RemoveAll(n.sd)
1635+
}
16281636
n.Unlock()
16291637
return
16301638
}
@@ -1641,17 +1649,22 @@ func (n *raft) shutdown(shouldDelete bool) {
16411649
n.unsubscribe(sub)
16421650
}
16431651
c.closeConnection(InternalClient)
1652+
n.c = nil
16441653
}
1654+
16451655
s, g, wal := n.s, n.group, n.wal
16461656

16471657
// Unregistering ipQueues do not prevent them from push/pop
16481658
// just will remove them from the central monitoring map
16491659
queues := []interface {
16501660
unregister()
1661+
drain()
16511662
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown}
16521663
for _, q := range queues {
1664+
q.drain()
16531665
q.unregister()
16541666
}
1667+
sd := n.sd
16551668
n.Unlock()
16561669

16571670
s.unregisterRaftNode(g)
@@ -1666,7 +1679,7 @@ func (n *raft) shutdown(shouldDelete bool) {
16661679

16671680
if shouldDelete {
16681681
// Delete all our peer state and vote state and any snapshots.
1669-
os.RemoveAll(n.sd)
1682+
os.RemoveAll(sd)
16701683
n.debug("Deleted")
16711684
} else {
16721685
n.debug("Shutdown")
@@ -1721,12 +1734,15 @@ func (n *raft) newInbox() string {
17211734
// Our internal subscribe.
17221735
// Lock should be held.
17231736
func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) {
1737+
if n.c == nil {
1738+
return nil, errNoInternalClient
1739+
}
17241740
return n.s.systemSubscribe(subject, _EMPTY_, false, n.c, cb)
17251741
}
17261742

17271743
// Lock should be held.
17281744
func (n *raft) unsubscribe(sub *subscription) {
1729-
if sub != nil {
1745+
if n.c != nil && sub != nil {
17301746
n.c.processUnsub(sub.sid)
17311747
}
17321748
}

0 commit comments

Comments
 (0)