Skip to content

Commit f6b2099

Browse files
committed
Move NRG traffic into asset account
This adds a new account NRG capability into statsz so that we can detect when all servers in the cluster support moving traffic into the asset account, instead of all being in the system account. Signed-off-by: Neil Twigg <[email protected]>
1 parent 1761eac commit f6b2099

10 files changed

+291
-29
lines changed

server/accounts.go

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type Account struct {
6161
sqmu sync.Mutex
6262
sl *Sublist
6363
ic *client
64+
sq *sendq
6465
isid uint64
6566
etmr *time.Timer
6667
ctmr *time.Timer

server/events.go

+58
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ type ServerInfo struct {
263263
const (
264264
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
265265
BinaryStreamSnapshot // New stream snapshot capability.
266+
AccountNRG // Move NRG traffic out of system account.
266267
)
267268

268269
// Set JetStream capability.
@@ -288,6 +289,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
288289
return si.Flags&BinaryStreamSnapshot != 0
289290
}
290291

292+
// Set account NRG capability.
293+
func (si *ServerInfo) SetAccountNRG() {
294+
si.Flags |= AccountNRG
295+
}
296+
297+
// AccountNRG indicates whether or not we support moving the NRG traffic out of the
298+
// system account and into the asset account.
299+
func (si *ServerInfo) AccountNRG() bool {
300+
return si.Flags&AccountNRG != 0
301+
}
302+
291303
// ClientInfo is detailed information about the client forming a connection.
292304
type ClientInfo struct {
293305
Start *time.Time `json:"start,omitempty"`
@@ -486,6 +498,7 @@ RESET:
486498
// New capability based flags.
487499
si.SetJetStreamEnabled()
488500
si.SetBinaryStreamSnapshot()
501+
si.SetAccountNRG()
489502
}
490503
}
491504
var b []byte
@@ -912,6 +925,7 @@ func (s *Server) sendStatsz(subj string) {
912925
// JetStream
913926
if js := s.js.Load(); js != nil {
914927
jStat := &JetStreamVarz{}
928+
jStat.AccountNRGActive = s.accountNRG.Load()
915929
s.mu.RUnlock()
916930
js.mu.RLock()
917931
c := js.config
@@ -1589,7 +1603,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
15891603
false,
15901604
si.JetStreamEnabled(),
15911605
si.BinaryStreamSnapshot(),
1606+
si.AccountNRG(),
15921607
})
1608+
s.updateNRGAccountStatus()
15931609
}
15941610

15951611
// updateRemoteServer is called when we have an update from a remote server.
@@ -1636,14 +1652,56 @@ func (s *Server) processNewServer(si *ServerInfo) {
16361652
false,
16371653
si.JetStreamEnabled(),
16381654
si.BinaryStreamSnapshot(),
1655+
si.AccountNRG(),
16391656
})
16401657
}
16411658
}
1659+
go s.updateNRGAccountStatus()
16421660
// Announce ourselves..
16431661
// Do this in a separate Go routine.
16441662
go s.sendStatszUpdate()
16451663
}
16461664

1665+
// Works out whether all nodes support moving the NRG traffic into
1666+
// the account and moves it appropriately.
1667+
// Server lock MUST NOT be held on entry.
1668+
func (s *Server) updateNRGAccountStatus() {
1669+
var raftNodes []RaftNode
1670+
s.optsMu.RLock()
1671+
supported := s.opts.JetStreamAccountNRG
1672+
s.optsMu.RUnlock()
1673+
if supported {
1674+
s.rnMu.Lock()
1675+
raftNodes = make([]RaftNode, 0, len(s.raftNodes))
1676+
for _, n := range s.raftNodes {
1677+
raftNodes = append(raftNodes, n)
1678+
}
1679+
s.rnMu.Unlock()
1680+
s.mu.Lock()
1681+
s.nodeToInfo.Range(func(key, value any) bool {
1682+
si := value.(nodeInfo)
1683+
if !s.sameDomain(si.domain) {
1684+
return true
1685+
}
1686+
if supported = supported && si.accountNRG; !supported {
1687+
return false
1688+
}
1689+
return true
1690+
})
1691+
s.mu.Unlock()
1692+
}
1693+
if s.accountNRG.CompareAndSwap(!supported, supported) {
1694+
if supported {
1695+
s.Noticef("Moving NRG traffic into asset accounts")
1696+
} else {
1697+
s.Warnf("Moving NRG traffic back into system account due to old nodes coming online")
1698+
}
1699+
for _, n := range raftNodes {
1700+
n.RecreateInternalSubs(supported)
1701+
}
1702+
}
1703+
}
1704+
16471705
// If GW is enabled on this server and there are any leaf node connections,
16481706
// this function will send a LeafNode connect system event to the super cluster
16491707
// to ensure that the GWs are in interest-only mode for this account.

server/jetstream.go

+3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type JetStreamConfig struct {
4848
Domain string `json:"domain,omitempty"`
4949
CompressOK bool `json:"compress_ok,omitempty"`
5050
UniqueTag string `json:"unique_tag,omitempty"`
51+
AccountNRG bool `json:"account_nrg_enabled,omitempty"`
5152
}
5253

5354
// Statistics about JetStream for this server.
@@ -544,6 +545,7 @@ func (s *Server) restartJetStream() error {
544545
MaxMemory: opts.JetStreamMaxMemory,
545546
MaxStore: opts.JetStreamMaxStore,
546547
Domain: opts.JetStreamDomain,
548+
AccountNRG: opts.JetStreamAccountNRG,
547549
}
548550
s.Noticef("Restarting JetStream")
549551
err := s.EnableJetStream(&cfg)
@@ -2513,6 +2515,7 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je
25132515
}
25142516

25152517
opts := s.getOpts()
2518+
jsc.AccountNRG = opts.JetStreamAccountNRG
25162519

25172520
// Sync options.
25182521
jsc.SyncInterval = opts.SyncInterval

server/jetstream_cluster_4_test.go

+112
Original file line numberDiff line numberDiff line change
@@ -2329,3 +2329,115 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) {
23292329
}
23302330
}
23312331
}
2332+
2333+
func TestJetStreamClusterAccountNRG(t *testing.T) {
2334+
c := createJetStreamClusterExplicit(t, "R3S", 3)
2335+
defer c.shutdown()
2336+
2337+
nc, js := jsClientConnect(t, c.randomServer())
2338+
defer nc.Close()
2339+
2340+
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
2341+
defer snc.Close()
2342+
2343+
_, err := js.AddStream(&nats.StreamConfig{
2344+
Name: "TEST",
2345+
Subjects: []string{"foo"},
2346+
Storage: nats.MemoryStorage,
2347+
Retention: nats.WorkQueuePolicy,
2348+
Replicas: 3,
2349+
})
2350+
require_NoError(t, err)
2351+
2352+
leader := c.streamLeader(globalAccountName, "TEST")
2353+
stream, err := leader.gacc.lookupStream("TEST")
2354+
require_NoError(t, err)
2355+
rg := stream.node.(*raft)
2356+
2357+
// System account should have interest, but the global account
2358+
// shouldn't.
2359+
for _, s := range c.servers {
2360+
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
2361+
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
2362+
}
2363+
2364+
// Check that varz tells us what we expect.
2365+
// set a header to make sure request parsing knows to ignore them
2366+
ri := snc.NewRespInbox()
2367+
replies, err := snc.SubscribeSync(ri)
2368+
require_NoError(t, err)
2369+
require_NoError(t, snc.PublishMsg(&nats.Msg{
2370+
Subject: "$SYS.REQ.SERVER.PING.VARZ",
2371+
Reply: ri,
2372+
}))
2373+
for i := 0; i < len(c.servers); i++ {
2374+
msg, err := replies.NextMsg(time.Second * 5)
2375+
require_NoError(t, err)
2376+
var v Varz
2377+
require_NoError(t, json.Unmarshal(msg.Data, &v))
2378+
//require_Equal(t, v.JetStream.Config.AccountNRG, false)
2379+
require_Equal(t, v.JetStream.AccountNRGActive, false)
2380+
}
2381+
p, _, _ := replies.Pending()
2382+
require_Equal(t, p, 0)
2383+
2384+
// First of all check that the Raft traffic is in the system
2385+
// account, as we haven't moved it elsewhere yet.
2386+
{
2387+
sub, err := snc.SubscribeSync(rg.asubj)
2388+
require_NoError(t, err)
2389+
require_NoError(t, sub.AutoUnsubscribe(1))
2390+
2391+
msg, err := sub.NextMsg(time.Second * 3)
2392+
require_NoError(t, err)
2393+
require_True(t, msg != nil)
2394+
}
2395+
2396+
// Switch on account NRG on all servers in the cluster. Then
2397+
// we wait, as we will need statsz to be sent for all servers
2398+
// in the cluster.
2399+
for _, s := range c.servers {
2400+
s.optsMu.Lock()
2401+
s.opts.JetStreamAccountNRG = true
2402+
s.optsMu.Unlock()
2403+
s.updateNRGAccountStatus()
2404+
}
2405+
2406+
// Now check that the traffic has moved into the asset acc.
2407+
// In this case the system account should no longer have
2408+
// subscriptions for those subjects.
2409+
{
2410+
sub, err := nc.SubscribeSync(rg.asubj)
2411+
require_NoError(t, err)
2412+
require_NoError(t, sub.AutoUnsubscribe(1))
2413+
2414+
msg, err := sub.NextMsg(time.Second * 3)
2415+
require_NoError(t, err)
2416+
require_True(t, msg != nil)
2417+
}
2418+
2419+
// The global account should now have interest and the
2420+
// system account shouldn't.
2421+
for _, s := range c.servers {
2422+
require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true))
2423+
require_True(t, s.gacc.sl.hasInterest(rg.asubj, true))
2424+
}
2425+
2426+
// Check varz again.
2427+
require_NoError(t, snc.PublishMsg(&nats.Msg{
2428+
Subject: "$SYS.REQ.SERVER.PING.VARZ",
2429+
Reply: ri,
2430+
}))
2431+
for i := 0; i < len(c.servers); i++ {
2432+
msg, err := replies.NextMsg(time.Second * 5)
2433+
require_NoError(t, err)
2434+
var v Varz
2435+
//Server Varz `json:"server"`
2436+
//}
2437+
require_NoError(t, json.Unmarshal(msg.Data, &v))
2438+
t.Logf("JSON: %s", msg.Data)
2439+
t.Logf("Varz: %+v", v)
2440+
//require_Equal(t, v.JetStream.Config.AccountNRG, true)
2441+
//require_Equal(t, v.JetStream.AccountNRGActive, true)
2442+
}
2443+
}

server/monitor.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -1226,9 +1226,10 @@ type Varz struct {
12261226

12271227
// JetStreamVarz contains basic runtime information about jetstream
12281228
type JetStreamVarz struct {
1229-
Config *JetStreamConfig `json:"config,omitempty"`
1230-
Stats *JetStreamStats `json:"stats,omitempty"`
1231-
Meta *MetaClusterInfo `json:"meta,omitempty"`
1229+
Config *JetStreamConfig `json:"config,omitempty"`
1230+
Stats *JetStreamStats `json:"stats,omitempty"`
1231+
Meta *MetaClusterInfo `json:"meta,omitempty"`
1232+
AccountNRGActive bool `json:"account_nrg_active,omitempty"`
12321233
}
12331234

12341235
// ClusterOptsVarz contains monitoring cluster information
@@ -1443,6 +1444,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
14431444
}
14441445

14451446
func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
1447+
v.AccountNRGActive = s.accountNRG.Load()
14461448
if doConfig {
14471449
js.mu.RLock()
14481450
// We want to snapshot the config since it will then be available outside
@@ -1821,6 +1823,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
18211823
// Now update server's varz
18221824
s.mu.RLock()
18231825
sv := &s.varz.JetStream
1826+
sv.AccountNRGActive = s.accountNRG.Load()
18241827
if created {
18251828
sv.Config = v.Config
18261829
}
@@ -2794,11 +2797,12 @@ type JSInfo struct {
27942797
Disabled bool `json:"disabled,omitempty"`
27952798
Config JetStreamConfig `json:"config,omitempty"`
27962799
JetStreamStats
2797-
Streams int `json:"streams"`
2798-
Consumers int `json:"consumers"`
2799-
Messages uint64 `json:"messages"`
2800-
Bytes uint64 `json:"bytes"`
2801-
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
2800+
Streams int `json:"streams"`
2801+
Consumers int `json:"consumers"`
2802+
Messages uint64 `json:"messages"`
2803+
Bytes uint64 `json:"bytes"`
2804+
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
2805+
AccountNRGActive bool `json:"account_nrg_active,omitempty"`
28022806

28032807
// aggregate raft info
28042808
AccountDetails []*AccountDetail `json:"account_details,omitempty"`

server/opts.go

+3
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ type Options struct {
317317
JetStreamLimits JSLimitOpts
318318
JetStreamTpm JSTpmOpts
319319
JetStreamMaxCatchup int64
320+
JetStreamAccountNRG bool
320321
StoreDir string `json:"-"`
321322
SyncInterval time.Duration `json:"-"`
322323
SyncAlways bool `json:"-"`
@@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
23102311
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
23112312
}
23122313
opts.JetStreamMaxCatchup = s
2314+
case "account_nrg":
2315+
opts.JetStreamAccountNRG = mv.(bool)
23132316
default:
23142317
if !tk.IsUsedVariable() {
23152318
err := &unknownConfigFieldErr{

0 commit comments

Comments
 (0)