Skip to content

Commit b806546

Browse files
committed
Move NRG traffic into asset account
This adds a new account NRG capability into statsz so that we can detect when all servers in the cluster support moving traffic into the asset account, instead of all being in the system account. Signed-off-by: Neil Twigg <[email protected]>
1 parent 7672774 commit b806546

8 files changed

+209
-21
lines changed

server/accounts.go

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type Account struct {
6161
sqmu sync.Mutex
6262
sl *Sublist
6363
ic *client
64+
sq *sendq
6465
isid uint64
6566
etmr *time.Timer
6667
ctmr *time.Timer

server/events.go

+57
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ type ServerInfo struct {
256256
const (
257257
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
258258
BinaryStreamSnapshot // New stream snapshot capability.
259+
AccountNRG // Move NRG traffic out of system account.
259260
)
260261

261262
// Set JetStream capability.
@@ -281,6 +282,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
281282
return si.Flags&BinaryStreamSnapshot != 0
282283
}
283284

285+
// Set account NRG capability.
286+
func (si *ServerInfo) SetAccountNRG() {
287+
si.Flags |= AccountNRG
288+
}
289+
290+
// AccountNRG indicates whether or not we support moving the NRG traffic out of the
291+
// system account and into the asset account.
292+
func (si *ServerInfo) AccountNRG() bool {
293+
return si.Flags&AccountNRG != 0
294+
}
295+
284296
// ClientInfo is detailed information about the client forming a connection.
285297
type ClientInfo struct {
286298
Start *time.Time `json:"start,omitempty"`
@@ -479,6 +491,7 @@ RESET:
479491
// New capability based flags.
480492
si.SetJetStreamEnabled()
481493
si.SetBinaryStreamSnapshot()
494+
si.SetAccountNRG()
482495
}
483496
}
484497
var b []byte
@@ -1543,7 +1556,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
15431556
false,
15441557
si.JetStreamEnabled(),
15451558
si.BinaryStreamSnapshot(),
1559+
si.AccountNRG(),
15461560
})
1561+
s.updateNRGAccountStatus()
15471562
}
15481563

15491564
// updateRemoteServer is called when we have an update from a remote server.
@@ -1590,14 +1605,56 @@ func (s *Server) processNewServer(si *ServerInfo) {
15901605
false,
15911606
si.JetStreamEnabled(),
15921607
si.BinaryStreamSnapshot(),
1608+
si.AccountNRG(),
15931609
})
15941610
}
15951611
}
1612+
go s.updateNRGAccountStatus()
15961613
// Announce ourselves..
15971614
// Do this in a separate Go routine.
15981615
go s.sendStatszUpdate()
15991616
}
16001617

1618+
// Works out whether all nodes support moving the NRG traffic into
1619+
// the account and moves it appropriately.
1620+
// Server lock MUST NOT be held on entry.
1621+
func (s *Server) updateNRGAccountStatus() {
1622+
var raftNodes []RaftNode
1623+
s.optsMu.RLock()
1624+
supported := s.opts.JetStreamAccountNRG
1625+
s.optsMu.RUnlock()
1626+
if supported {
1627+
s.rnMu.Lock()
1628+
raftNodes = make([]RaftNode, 0, len(s.raftNodes))
1629+
for _, n := range s.raftNodes {
1630+
raftNodes = append(raftNodes, n)
1631+
}
1632+
s.rnMu.Unlock()
1633+
s.mu.Lock()
1634+
s.nodeToInfo.Range(func(key, value any) bool {
1635+
si := value.(nodeInfo)
1636+
if !s.sameDomain(si.domain) {
1637+
return true
1638+
}
1639+
if supported = supported && si.accountNRG; !supported {
1640+
return false
1641+
}
1642+
return true
1643+
})
1644+
s.mu.Unlock()
1645+
}
1646+
if s.accountNRG.CompareAndSwap(!supported, supported) {
1647+
if supported {
1648+
s.Noticef("Moving NRG traffic into asset accounts")
1649+
} else {
1650+
s.Warnf("Moving NRG traffic back into system account due to old nodes coming online")
1651+
}
1652+
for _, n := range raftNodes {
1653+
n.RecreateInternalSubs(supported)
1654+
}
1655+
}
1656+
}
1657+
16011658
// If GW is enabled on this server and there are any leaf node connections,
16021659
// this function will send a LeafNode connect system event to the super cluster
16031660
// to ensure that the GWs are in interest-only mode for this account.

server/jetstream_cluster_4_test.go

+74
Original file line numberDiff line numberDiff line change
@@ -2184,3 +2184,77 @@ func TestJetStreamClusterBusyStreams(t *testing.T) {
21842184
}
21852185
})
21862186
}
2187+
2188+
func TestJetStreamClusterAccountNRG(t *testing.T) {
2189+
c := createJetStreamClusterExplicit(t, "R3S", 3)
2190+
defer c.shutdown()
2191+
2192+
nc, js := jsClientConnect(t, c.randomServer())
2193+
defer nc.Close()
2194+
2195+
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
2196+
defer snc.Close()
2197+
2198+
_, err := js.AddStream(&nats.StreamConfig{
2199+
Name: "TEST",
2200+
Subjects: []string{"foo"},
2201+
Storage: nats.MemoryStorage,
2202+
Retention: nats.WorkQueuePolicy,
2203+
Replicas: 3,
2204+
})
2205+
require_NoError(t, err)
2206+
2207+
leader := c.streamLeader(globalAccountName, "TEST")
2208+
stream, err := leader.gacc.lookupStream("TEST")
2209+
require_NoError(t, err)
2210+
rg := stream.node.(*raft)
2211+
2212+
// System account should have interest, but the global account
2213+
// shouldn't.
2214+
for _, s := range c.servers {
2215+
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
2216+
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
2217+
}
2218+
2219+
// First of all check that the Raft traffic is in the system
2220+
// account, as we haven't moved it elsewhere yet.
2221+
{
2222+
sub, err := snc.SubscribeSync(rg.asubj)
2223+
require_NoError(t, err)
2224+
require_NoError(t, sub.AutoUnsubscribe(1))
2225+
2226+
msg, err := sub.NextMsg(time.Second * 3)
2227+
require_NoError(t, err)
2228+
require_True(t, msg != nil)
2229+
}
2230+
2231+
// Switch on account NRG on all servers in the cluster. Then
2232+
// we wait, as we will need statsz to be sent for all servers
2233+
// in the cluster.
2234+
for _, s := range c.servers {
2235+
s.optsMu.Lock()
2236+
s.opts.JetStreamAccountNRG = true
2237+
s.optsMu.Unlock()
2238+
s.updateNRGAccountStatus()
2239+
}
2240+
2241+
// Now check that the traffic has moved into the asset acc.
2242+
// In this case the system account should no longer have
2243+
// subscriptions for those subjects.
2244+
{
2245+
sub, err := nc.SubscribeSync(rg.asubj)
2246+
require_NoError(t, err)
2247+
require_NoError(t, sub.AutoUnsubscribe(1))
2248+
2249+
msg, err := sub.NextMsg(time.Second * 3)
2250+
require_NoError(t, err)
2251+
require_True(t, msg != nil)
2252+
}
2253+
2254+
// The global account should now have interest and the
2255+
// system account shouldn't.
2256+
for _, s := range c.servers {
2257+
require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true))
2258+
require_True(t, s.gacc.sl.hasInterest(rg.asubj, true))
2259+
}
2260+
}

server/opts.go

+3
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ type Options struct {
317317
JetStreamLimits JSLimitOpts
318318
JetStreamTpm JSTpmOpts
319319
JetStreamMaxCatchup int64
320+
JetStreamAccountNRG bool
320321
StoreDir string `json:"-"`
321322
SyncInterval time.Duration `json:"-"`
322323
SyncAlways bool `json:"-"`
@@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
23102311
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
23112312
}
23122313
opts.JetStreamMaxCatchup = s
2314+
case "account_nrg":
2315+
opts.JetStreamAccountNRG = mv.(bool)
23132316
default:
23142317
if !tk.IsUsedVariable() {
23152318
err := &unknownConfigFieldErr{

server/raft.go

+62-15
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type RaftNode interface {
7676
Stop()
7777
Delete()
7878
Wipe()
79+
RecreateInternalSubs(acc bool) error
7980
}
8081

8182
type WAL interface {
@@ -128,6 +129,7 @@ type raft struct {
128129

129130
created time.Time // Time that the group was created
130131
accName string // Account name of the asset this raft group is for
132+
acc *Account // Account that NRG traffic will be sent/received in
131133
group string // Raft group
132134
sd string // Store directory
133135
id string // Node ID
@@ -346,8 +348,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
346348
s.mu.RUnlock()
347349
return nil, ErrNoSysAccount
348350
}
349-
sq := s.sys.sq
350-
sacc := s.sys.account
351351
hash := s.sys.shash
352352
s.mu.RUnlock()
353353

@@ -375,9 +375,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
375375
acks: make(map[uint64]map[string]struct{}),
376376
pae: make(map[uint64]*appendEntry),
377377
s: s,
378-
c: s.createInternalSystemClient(),
379378
js: s.getJetStream(),
380-
sq: sq,
381379
quit: make(chan struct{}),
382380
reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"),
383381
votes: newIPQueue[*voteResponse](s, qpfx+"vresp"),
@@ -391,7 +389,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
391389
observer: cfg.Observer,
392390
extSt: ps.domainExt,
393391
}
394-
n.c.registerWithAccount(sacc)
392+
393+
// Setup our internal subscriptions for proposals, votes and append entries.
394+
// If we fail to do this for some reason then this is fatal — we cannot
395+
// continue setting up or the Raft node may be partially/totally isolated.
396+
if err := n.RecreateInternalSubs(n.s.opts.JetStreamAccountNRG); err != nil {
397+
n.shutdown(true)
398+
return nil, err
399+
}
395400

396401
if atomic.LoadInt32(&s.logging.debug) > 0 {
397402
n.dflag = true
@@ -488,14 +493,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
488493
}
489494
}
490495

491-
// Setup our internal subscriptions for proposals, votes and append entries.
492-
// If we fail to do this for some reason then this is fatal — we cannot
493-
// continue setting up or the Raft node may be partially/totally isolated.
494-
if err := n.createInternalSubs(); err != nil {
495-
n.shutdown(true)
496-
return nil, err
497-
}
498-
499496
n.debug("Started")
500497

501498
// Check if we need to start in observer mode due to lame duck status.
@@ -524,6 +521,57 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
524521
return n, nil
525522
}
526523

524+
func (n *raft) RecreateInternalSubs(acc bool) error {
525+
n.Lock()
526+
defer n.Unlock()
527+
528+
// Need to cancel any in-progress catch-ups, otherwise the
529+
// inboxes are about to be pulled out from underneath it in
530+
// the next step...
531+
n.cancelCatchup()
532+
533+
// If we have an existing client then tear down any existing
534+
// subscriptions and close the internal client.
535+
if c := n.c; c != nil {
536+
var subs []*subscription
537+
c.mu.Lock()
538+
for _, sub := range c.subs {
539+
subs = append(subs, sub)
540+
}
541+
c.mu.Unlock()
542+
for _, sub := range subs {
543+
n.unsubscribe(sub)
544+
}
545+
c.closeConnection(InternalClient)
546+
}
547+
548+
// Look up which account we think we should be participating
549+
// on. This will either be the system account (default) or it
550+
// will be the account that the asset is resident in.
551+
var nrgAcc *Account
552+
if n.s.sys != nil {
553+
nrgAcc = n.s.sys.account
554+
}
555+
if acc { // Should we setup in the asset account?
556+
var err error
557+
if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil {
558+
return err
559+
}
560+
}
561+
c := n.s.createInternalSystemClient()
562+
c.registerWithAccount(nrgAcc)
563+
if nrgAcc.sq == nil {
564+
nrgAcc.sq = n.s.newSendQ(nrgAcc)
565+
}
566+
n.c = c
567+
n.sq = nrgAcc.sq
568+
n.acc = nrgAcc
569+
570+
// Recreate any internal subscriptions for voting, append
571+
// entries etc in the new account.
572+
return n.createInternalSubs()
573+
}
574+
527575
// outOfResources checks to see if we are out of resources.
528576
func (n *raft) outOfResources() bool {
529577
js := n.js
@@ -1734,9 +1782,8 @@ func (n *raft) unsubscribe(sub *subscription) {
17341782
}
17351783
}
17361784

1785+
// Lock should be held.
17371786
func (n *raft) createInternalSubs() error {
1738-
n.Lock()
1739-
defer n.Unlock()
17401787
n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox()
17411788
n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox()
17421789
n.psubj = fmt.Sprintf(raftPropSubj, n.group)

server/route.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2065,7 +2065,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
20652065
// check to be consistent and future proof. but will be same domain
20662066
if s.sameDomain(info.Domain) {
20672067
s.nodeToInfo.Store(rHash,
2068-
nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false})
2068+
nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false})
20692069
}
20702070
}
20712071

server/sendq.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ type sendq struct {
2929
mu sync.Mutex
3030
q *ipQueue[*outMsg]
3131
s *Server
32+
a *Account
3233
}
3334

34-
func (s *Server) newSendQ() *sendq {
35-
sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")}
35+
func (s *Server) newSendQ(acc *Account) *sendq {
36+
sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc}
3637
s.startGoRoutine(sq.internalLoop)
3738
return sq
3839
}
@@ -44,8 +45,9 @@ func (sq *sendq) internalLoop() {
4445

4546
defer s.grWG.Done()
4647

48+
//c := s.createInternalAccountClient()
4749
c := s.createInternalSystemClient()
48-
c.registerWithAccount(s.SystemAccount())
50+
c.registerWithAccount(sq.a)
4951
c.noIcb = true
5052

5153
defer c.closeConnection(ClientClosed)

0 commit comments

Comments
 (0)