Skip to content

Commit 81bbac3

Browse files
committed
Move NRG traffic into asset account
This adds a new account NRG capability into statsz so that we can detect when all servers in the cluster support moving traffic into the asset account, instead of all being in the system account. Signed-off-by: Neil Twigg <[email protected]>
1 parent 7f92c34 commit 81bbac3

12 files changed

+424
-24
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ module github.com/nats-io/nats-server/v2
22

33
go 1.22.0
44

5+
replace github.com/nats-io/jwt/v2 => github.com/nats-io/jwt/v2 v2.5.9-0.20240801130136-270cc45c44ee
6+
57
require (
68
github.com/google/go-tpm v0.9.0
79
github.com/klauspost/compress v1.17.9

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2
66
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
77
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
88
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
9-
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
10-
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
9+
github.com/nats-io/jwt/v2 v2.5.9-0.20240801130136-270cc45c44ee h1:kiTo11kMbk4UMNjdgXAFFSq8+p1RLI3XPcAukFaiw+g=
10+
github.com/nats-io/jwt/v2 v2.5.9-0.20240801130136-270cc45c44ee/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
1111
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
1212
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
1313
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=

server/accounts.go

+21
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Account struct {
6262
sqmu sync.Mutex
6363
sl *Sublist
6464
ic *client
65+
sq *sendq
6566
isid uint64
6667
etmr *time.Timer
6768
ctmr *time.Timer
@@ -3679,6 +3680,26 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
36793680
a.enableAllJetStreamServiceImportsAndMappings()
36803681
}
36813682

3683+
if a.js != nil {
3684+
// Check whether the account NRG status changed. If it has then we need to notify the
3685+
// Raft groups running on the system so that they can move their subs if needed.
3686+
a.mu.Lock()
3687+
previous := a.js.nrgAccount
3688+
switch strings.ToLower(ac.NRGAccount) {
3689+
case "account":
3690+
a.js.nrgAccount = a.Name
3691+
case "system":
3692+
a.js.nrgAccount = ""
3693+
default:
3694+
s.Errorf("Account claim for %q has invalid value %q for account NRG status", a.Name, ac.NRGAccount)
3695+
}
3696+
changed := a.js.nrgAccount != previous
3697+
a.mu.Unlock()
3698+
if changed {
3699+
s.updateNRGAccountStatus()
3700+
}
3701+
}
3702+
36823703
for i, c := range clients {
36833704
a.mu.RLock()
36843705
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)

server/events.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ type ServerInfo struct {
264264
const (
265265
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
266266
BinaryStreamSnapshot // New stream snapshot capability.
267+
AccountNRG // Move NRG traffic out of system account.
267268
)
268269

269270
// Set JetStream capability.
@@ -289,6 +290,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
289290
return si.Flags&BinaryStreamSnapshot != 0
290291
}
291292

293+
// Set account NRG capability.
294+
func (si *ServerInfo) SetAccountNRG() {
295+
si.Flags |= AccountNRG
296+
}
297+
298+
// AccountNRG indicates whether or not we support moving the NRG traffic out of the
299+
// system account and into the asset account.
300+
func (si *ServerInfo) AccountNRG() bool {
301+
return si.Flags&AccountNRG != 0
302+
}
303+
292304
// ClientInfo is detailed information about the client forming a connection.
293305
type ClientInfo struct {
294306
Start *time.Time `json:"start,omitempty"`
@@ -475,10 +487,14 @@ RESET:
475487
si.Version = VERSION
476488
si.Time = time.Now().UTC()
477489
si.Tags = tags
490+
si.Flags = 0
478491
if js {
479492
// New capability based flags.
480493
si.SetJetStreamEnabled()
481494
si.SetBinaryStreamSnapshot()
495+
if s.accountNRGAllowed.Load() {
496+
si.SetAccountNRG()
497+
}
482498
}
483499
}
484500
var b []byte
@@ -1616,7 +1632,8 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
16161632
}
16171633

16181634
node := getHash(si.Name)
1619-
s.nodeToInfo.Store(node, nodeInfo{
1635+
accountNRG := si.AccountNRG()
1636+
oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{
16201637
si.Name,
16211638
si.Version,
16221639
si.Cluster,
@@ -1628,7 +1645,14 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
16281645
false,
16291646
si.JetStreamEnabled(),
16301647
si.BinaryStreamSnapshot(),
1648+
accountNRG,
16311649
})
1650+
if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG {
1651+
// One of the servers we received statsz from changed its mind about
1652+
// whether or not it supports in-account NRG, so update the groups
1653+
// with this information.
1654+
s.updateNRGAccountStatus()
1655+
}
16321656
}
16331657

16341658
// updateRemoteServer is called when we have an update from a remote server.
@@ -1675,14 +1699,35 @@ func (s *Server) processNewServer(si *ServerInfo) {
16751699
false,
16761700
si.JetStreamEnabled(),
16771701
si.BinaryStreamSnapshot(),
1702+
si.AccountNRG(),
16781703
})
16791704
}
16801705
}
1706+
go s.updateNRGAccountStatus()
16811707
// Announce ourselves..
16821708
// Do this in a separate Go routine.
16831709
go s.sendStatszUpdate()
16841710
}
16851711

1712+
// Works out whether all nodes support moving the NRG traffic into
1713+
// the account and moves it appropriately.
1714+
// Server lock MUST NOT be held on entry.
1715+
func (s *Server) updateNRGAccountStatus() {
1716+
s.rnMu.Lock()
1717+
raftNodes := make([]RaftNode, 0, len(s.raftNodes))
1718+
for _, n := range s.raftNodes {
1719+
raftNodes = append(raftNodes, n)
1720+
}
1721+
s.rnMu.Unlock()
1722+
for _, n := range raftNodes {
1723+
// In the event that the node is happy that all nodes that
1724+
// it cares about haven't changed, this will be a no-op.
1725+
if err := n.RecreateInternalSubs(); err != nil {
1726+
n.Stop()
1727+
}
1728+
}
1729+
}
1730+
16861731
// If GW is enabled on this server and there are any leaf node connections,
16871732
// this function will send a LeafNode connect system event to the super cluster
16881733
// to ensure that the GWs are in interest-only mode for this account.

server/jetstream.go

+3
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ type jsAccount struct {
174174
updatesSub *subscription
175175
lupdate time.Time
176176
utimer *time.Timer
177+
178+
// Which account to send NRG traffic into. Empty string is system account.
179+
nrgAccount string
177180
}
178181

179182
// Track general usage for this account.

server/jetstream_cluster_4_test.go

+127
Original file line numberDiff line numberDiff line change
@@ -2505,6 +2505,133 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) {
25052505
}
25062506
}
25072507

2508+
func TestJetStreamClusterAccountNRG(t *testing.T) {
2509+
c := createJetStreamClusterExplicit(t, "R3S", 3)
2510+
defer c.shutdown()
2511+
2512+
nc, js := jsClientConnect(t, c.randomServer())
2513+
defer nc.Close()
2514+
2515+
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
2516+
defer snc.Close()
2517+
2518+
_, err := js.AddStream(&nats.StreamConfig{
2519+
Name: "TEST",
2520+
Subjects: []string{"foo"},
2521+
Storage: nats.MemoryStorage,
2522+
Retention: nats.WorkQueuePolicy,
2523+
Replicas: 3,
2524+
})
2525+
require_NoError(t, err)
2526+
2527+
leader := c.streamLeader(globalAccountName, "TEST")
2528+
stream, err := leader.gacc.lookupStream("TEST")
2529+
require_NoError(t, err)
2530+
rg := stream.node.(*raft)
2531+
2532+
t.Run("Disabled", func(t *testing.T) {
2533+
// Switch off account NRG on all servers in the cluster.
2534+
for _, s := range c.servers {
2535+
s.accountNRGAllowed.Store(false)
2536+
s.sendStatszUpdate()
2537+
}
2538+
time.Sleep(time.Millisecond * 100)
2539+
for _, s := range c.servers {
2540+
s.GlobalAccount().js.nrgAccount = ""
2541+
s.updateNRGAccountStatus()
2542+
}
2543+
2544+
// System account should have interest, but the global account
2545+
// shouldn't.
2546+
for _, s := range c.servers {
2547+
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
2548+
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
2549+
}
2550+
2551+
// First of all check that the Raft traffic is in the system
2552+
// account, as we haven't moved it elsewhere yet.
2553+
{
2554+
sub, err := snc.SubscribeSync(rg.asubj)
2555+
require_NoError(t, err)
2556+
require_NoError(t, sub.AutoUnsubscribe(1))
2557+
2558+
msg, err := sub.NextMsg(time.Second * 3)
2559+
require_NoError(t, err)
2560+
require_True(t, msg != nil)
2561+
}
2562+
})
2563+
2564+
t.Run("Mixed", func(t *testing.T) {
2565+
// Switch on account NRG on a single server in the cluster and
2566+
// leave it off on the rest.
2567+
for i, s := range c.servers {
2568+
s.accountNRGAllowed.Store(i == 0)
2569+
s.sendStatszUpdate()
2570+
}
2571+
time.Sleep(time.Millisecond * 100)
2572+
for i, s := range c.servers {
2573+
if i == 0 {
2574+
s.GlobalAccount().js.nrgAccount = globalAccountName
2575+
} else {
2576+
s.GlobalAccount().js.nrgAccount = ""
2577+
}
2578+
s.updateNRGAccountStatus()
2579+
}
2580+
2581+
// System account should have interest, but the global account
2582+
// shouldn't.
2583+
for _, s := range c.servers {
2584+
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
2585+
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
2586+
}
2587+
2588+
// First of all check that the Raft traffic is in the system
2589+
// account, as we haven't moved it elsewhere yet.
2590+
{
2591+
sub, err := snc.SubscribeSync(rg.asubj)
2592+
require_NoError(t, err)
2593+
require_NoError(t, sub.AutoUnsubscribe(1))
2594+
2595+
msg, err := sub.NextMsg(time.Second * 3)
2596+
require_NoError(t, err)
2597+
require_True(t, msg != nil)
2598+
}
2599+
})
2600+
2601+
t.Run("Enabled", func(t *testing.T) {
2602+
// Switch on account NRG on all servers in the cluster.
2603+
for _, s := range c.servers {
2604+
s.accountNRGAllowed.Store(true)
2605+
s.sendStatszUpdate()
2606+
}
2607+
time.Sleep(time.Millisecond * 100)
2608+
for _, s := range c.servers {
2609+
s.GlobalAccount().js.nrgAccount = globalAccountName
2610+
s.updateNRGAccountStatus()
2611+
}
2612+
2613+
// Now check that the traffic has moved into the asset acc.
2614+
// In this case the system account should no longer have
2615+
// subscriptions for those subjects.
2616+
{
2617+
sub, err := nc.SubscribeSync(rg.asubj)
2618+
require_NoError(t, err)
2619+
require_NoError(t, sub.AutoUnsubscribe(1))
2620+
2621+
msg, err := sub.NextMsg(time.Second * 3)
2622+
require_NoError(t, err)
2623+
require_True(t, msg != nil)
2624+
}
2625+
2626+
// The global account should now have interest and the
2627+
// system account shouldn't.
2628+
for _, s := range c.servers {
2629+
require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true))
2630+
require_True(t, s.gacc.sl.hasInterest(rg.asubj, true))
2631+
}
2632+
})
2633+
}
2634+
25082635
func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
25092636
c := createJetStreamClusterExplicit(t, "R3S", 3)
25102637
defer c.shutdown()

server/jetstream_jwt_test.go

+90
Original file line numberDiff line numberDiff line change
@@ -1532,3 +1532,93 @@ func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testi
15321532
require_Equal(t, r3.Streams, 1)
15331533
require_Equal(t, r3.Consumers, 0)
15341534
}
1535+
1536+
func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
1537+
_, syspub := createKey(t)
1538+
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
1539+
1540+
_, aExpPub := createKey(t)
1541+
accClaim := jwt.NewAccountClaims(aExpPub)
1542+
accClaim.Name = "acc"
1543+
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 10, Streams: 1}
1544+
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 1, Streams: 1}
1545+
accJwt := encodeClaim(t, accClaim, aExpPub)
1546+
1547+
tmlp := `
1548+
listen: 127.0.0.1:-1
1549+
server_name: %s
1550+
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
1551+
leaf {
1552+
listen: 127.0.0.1:-1
1553+
}
1554+
cluster {
1555+
name: %s
1556+
listen: 127.0.0.1:%d
1557+
routes = [%s]
1558+
}
1559+
` + fmt.Sprintf(`
1560+
operator: %s
1561+
system_account: %s
1562+
resolver = MEMORY
1563+
resolver_preload = {
1564+
%s : %s
1565+
%s : %s
1566+
}
1567+
`, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)
1568+
1569+
c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3)
1570+
defer c.shutdown()
1571+
1572+
// We'll try flipping the state a few times and then do some sanity
1573+
// checks to check that it took effect.
1574+
for _, state := range []string{"account", "system", "account"} {
1575+
accClaim.NRGAccount = state
1576+
accJwt = encodeClaim(t, accClaim, aExpPub)
1577+
1578+
for _, s := range c.servers {
1579+
// Find the account.
1580+
acc, err := s.lookupAccount(aExpPub)
1581+
require_NoError(t, err)
1582+
1583+
// Submit a claim update using the new JWT.
1584+
require_NoError(t, s.updateAccountWithClaimJWT(acc, accJwt))
1585+
1586+
// Check that everything looks like it should.
1587+
require_True(t, acc != nil)
1588+
require_True(t, acc.js != nil)
1589+
switch state {
1590+
case "account":
1591+
require_Equal(t, acc.js.nrgAccount, aExpPub)
1592+
case "system":
1593+
require_Equal(t, acc.js.nrgAccount, _EMPTY_)
1594+
}
1595+
1596+
// Now get a list of all of the Raft nodes that should
1597+
// have been updated by now.
1598+
s.rnMu.Lock()
1599+
raftNodes := make([]*raft, 0, len(s.raftNodes))
1600+
for _, n := range s.raftNodes {
1601+
rg := n.(*raft)
1602+
if rg.accName != acc.Name {
1603+
continue
1604+
}
1605+
raftNodes = append(raftNodes, rg)
1606+
}
1607+
s.rnMu.Unlock()
1608+
1609+
// Check whether each of the Raft nodes reports being
1610+
// in-account or not.
1611+
for _, rg := range raftNodes {
1612+
rg.Lock()
1613+
rgAcc := rg.acc
1614+
rg.Unlock()
1615+
switch state {
1616+
case "account":
1617+
require_Equal(t, rgAcc.Name, aExpPub)
1618+
case "system":
1619+
require_Equal(t, rgAcc.Name, syspub)
1620+
}
1621+
}
1622+
}
1623+
}
1624+
}

0 commit comments

Comments
 (0)