Skip to content

Commit 1cd4693

Browse files
committed
Move NRG traffic into asset account
This adds a new account NRG capability into statsz so that we can detect when all servers in the cluster support moving traffic into the asset account, instead of all being in the system account. Signed-off-by: Neil Twigg <[email protected]>
1 parent a07bde9 commit 1cd4693

12 files changed

+465
-29
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/google/go-tpm v0.9.0
77
github.com/klauspost/compress v1.17.9
88
github.com/minio/highwayhash v1.0.3
9-
github.com/nats-io/jwt/v2 v2.5.8
9+
github.com/nats-io/jwt/v2 v2.6.0
1010
github.com/nats-io/nats.go v1.36.0
1111
github.com/nats-io/nkeys v0.4.7
1212
github.com/nats-io/nuid v1.0.1

go.sum

+2-6
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2
66
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
77
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
88
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
9-
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
10-
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
9+
github.com/nats-io/jwt/v2 v2.6.0 h1:yXoBTdEotZw3NujMT+Nnu1UPNlFWdKQ3d0JJF/+pJag=
10+
github.com/nats-io/jwt/v2 v2.6.0/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
1111
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
1212
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
1313
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
@@ -22,13 +22,9 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
2222
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2323
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
2424
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
25-
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
26-
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
2725
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
2826
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
2927
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
30-
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
31-
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
3228
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
3329
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
3430
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=

server/accounts.go

+23
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Account struct {
6262
sqmu sync.Mutex
6363
sl *Sublist
6464
ic *client
65+
sq *sendq
6566
isid uint64
6667
etmr *time.Timer
6768
ctmr *time.Timer
@@ -3679,6 +3680,28 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
36793680
a.enableAllJetStreamServiceImportsAndMappings()
36803681
}
36813682

3683+
if a.js != nil {
3684+
// Check whether the account NRG status changed. If it has then we need to notify the
3685+
// Raft groups running on the system so that they can move their subs if needed.
3686+
a.mu.Lock()
3687+
previous := a.js.nrgAccount
3688+
switch tokens := strings.SplitN(ac.ClusterTraffic, ":", 2); tokens[0] {
3689+
case "system":
3690+
a.js.nrgAccount = _EMPTY_
3691+
case "owner":
3692+
a.js.nrgAccount = a.Name
3693+
case "account":
3694+
a.js.nrgAccount = tokens[1]
3695+
default:
3696+
s.Errorf("Account claim for %q has invalid value %q for account NRG status", a.Name, ac.ClusterTraffic)
3697+
}
3698+
changed := a.js.nrgAccount != previous
3699+
a.mu.Unlock()
3700+
if changed {
3701+
s.updateNRGAccountStatus()
3702+
}
3703+
}
3704+
36823705
for i, c := range clients {
36833706
a.mu.RLock()
36843707
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)

server/events.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ type ServerInfo struct {
264264
const (
265265
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
266266
BinaryStreamSnapshot // New stream snapshot capability.
267+
AccountNRG // Move NRG traffic out of system account.
267268
)
268269

269270
// Set JetStream capability.
@@ -289,6 +290,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
289290
return si.Flags&BinaryStreamSnapshot != 0
290291
}
291292

293+
// Set account NRG capability.
294+
func (si *ServerInfo) SetAccountNRG() {
295+
si.Flags |= AccountNRG
296+
}
297+
298+
// AccountNRG indicates whether or not we support moving the NRG traffic out of the
299+
// system account and into the asset account.
300+
func (si *ServerInfo) AccountNRG() bool {
301+
return si.Flags&AccountNRG != 0
302+
}
303+
292304
// ClientInfo is detailed information about the client forming a connection.
293305
type ClientInfo struct {
294306
Start *time.Time `json:"start,omitempty"`
@@ -475,10 +487,14 @@ RESET:
475487
si.Version = VERSION
476488
si.Time = time.Now().UTC()
477489
si.Tags = tags
490+
si.Flags = 0
478491
if js {
479492
// New capability based flags.
480493
si.SetJetStreamEnabled()
481494
si.SetBinaryStreamSnapshot()
495+
if s.accountNRGAllowed.Load() {
496+
si.SetAccountNRG()
497+
}
482498
}
483499
}
484500
var b []byte
@@ -1616,7 +1632,8 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
16161632
}
16171633

16181634
node := getHash(si.Name)
1619-
s.nodeToInfo.Store(node, nodeInfo{
1635+
accountNRG := si.AccountNRG()
1636+
oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{
16201637
si.Name,
16211638
si.Version,
16221639
si.Cluster,
@@ -1628,7 +1645,14 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
16281645
false,
16291646
si.JetStreamEnabled(),
16301647
si.BinaryStreamSnapshot(),
1648+
accountNRG,
16311649
})
1650+
if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG {
1651+
// One of the servers we received statsz from changed its mind about
1652+
// whether or not it supports in-account NRG, so update the groups
1653+
// with this information.
1654+
s.updateNRGAccountStatus()
1655+
}
16321656
}
16331657

16341658
// updateRemoteServer is called when we have an update from a remote server.
@@ -1675,14 +1699,35 @@ func (s *Server) processNewServer(si *ServerInfo) {
16751699
false,
16761700
si.JetStreamEnabled(),
16771701
si.BinaryStreamSnapshot(),
1702+
si.AccountNRG(),
16781703
})
16791704
}
16801705
}
1706+
go s.updateNRGAccountStatus()
16811707
// Announce ourselves..
16821708
// Do this in a separate Go routine.
16831709
go s.sendStatszUpdate()
16841710
}
16851711

1712+
// Works out whether all nodes support moving the NRG traffic into
1713+
// the account and moves it appropriately.
1714+
// Server lock MUST NOT be held on entry.
1715+
func (s *Server) updateNRGAccountStatus() {
1716+
s.rnMu.Lock()
1717+
raftNodes := make([]RaftNode, 0, len(s.raftNodes))
1718+
for _, n := range s.raftNodes {
1719+
raftNodes = append(raftNodes, n)
1720+
}
1721+
s.rnMu.Unlock()
1722+
for _, n := range raftNodes {
1723+
// In the event that the node is happy that all nodes that
1724+
// it cares about haven't changed, this will be a no-op.
1725+
if err := n.RecreateInternalSubs(); err != nil {
1726+
n.Stop()
1727+
}
1728+
}
1729+
}
1730+
16861731
// If GW is enabled on this server and there are any leaf node connections,
16871732
// this function will send a LeafNode connect system event to the super cluster
16881733
// to ensure that the GWs are in interest-only mode for this account.

server/jetstream.go

+3
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ type jsAccount struct {
174174
updatesSub *subscription
175175
lupdate time.Time
176176
utimer *time.Timer
177+
178+
// Which account to send NRG traffic into. Empty string is system account.
179+
nrgAccount string
177180
}
178181

179182
// Track general usage for this account.

server/jetstream_cluster_4_test.go

+144
Original file line numberDiff line numberDiff line change
@@ -2505,6 +2505,150 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) {
25052505
}
25062506
}
25072507

2508+
func TestJetStreamClusterAccountNRG(t *testing.T) {
2509+
c := createJetStreamClusterExplicit(t, "R3S", 3)
2510+
defer c.shutdown()
2511+
2512+
nc, js := jsClientConnect(t, c.randomServer())
2513+
defer nc.Close()
2514+
2515+
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
2516+
defer snc.Close()
2517+
2518+
_, err := js.AddStream(&nats.StreamConfig{
2519+
Name: "TEST",
2520+
Subjects: []string{"foo"},
2521+
Storage: nats.MemoryStorage,
2522+
Retention: nats.WorkQueuePolicy,
2523+
Replicas: 3,
2524+
})
2525+
require_NoError(t, err)
2526+
2527+
leader := c.streamLeader(globalAccountName, "TEST")
2528+
stream, err := leader.gacc.lookupStream("TEST")
2529+
require_NoError(t, err)
2530+
rg := stream.node.(*raft)
2531+
2532+
t.Run("Disabled", func(t *testing.T) {
2533+
// Switch off account NRG on all servers in the cluster.
2534+
for _, s := range c.servers {
2535+
s.accountNRGAllowed.Store(false)
2536+
s.sendStatszUpdate()
2537+
}
2538+
time.Sleep(time.Millisecond * 100)
2539+
for _, s := range c.servers {
2540+
s.GlobalAccount().js.nrgAccount = ""
2541+
s.updateNRGAccountStatus()
2542+
}
2543+
2544+
// Check account interest for the AppendEntry subject.
2545+
checkFor(t, time.Second, time.Millisecond*25, func() error {
2546+
for _, s := range c.servers {
2547+
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
2548+
return fmt.Errorf("system account should have interest")
2549+
}
2550+
if s.gacc.sl.hasInterest(rg.asubj, true) {
2551+
return fmt.Errorf("global account shouldn't have interest")
2552+
}
2553+
}
2554+
return nil
2555+
})
2556+
2557+
// Check that the Raft traffic is in the system account, as we
2558+
// haven't moved it elsewhere yet.
2559+
{
2560+
sub, err := snc.SubscribeSync(rg.asubj)
2561+
require_NoError(t, err)
2562+
require_NoError(t, sub.AutoUnsubscribe(1))
2563+
2564+
msg, err := sub.NextMsg(time.Second * 3)
2565+
require_NoError(t, err)
2566+
require_True(t, msg != nil)
2567+
}
2568+
})
2569+
2570+
t.Run("Mixed", func(t *testing.T) {
2571+
// Switch on account NRG on a single server in the cluster and
2572+
// leave it off on the rest.
2573+
for i, s := range c.servers {
2574+
s.accountNRGAllowed.Store(i == 0)
2575+
s.sendStatszUpdate()
2576+
}
2577+
time.Sleep(time.Millisecond * 100)
2578+
for i, s := range c.servers {
2579+
if i == 0 {
2580+
s.GlobalAccount().js.nrgAccount = globalAccountName
2581+
} else {
2582+
s.GlobalAccount().js.nrgAccount = ""
2583+
}
2584+
s.updateNRGAccountStatus()
2585+
}
2586+
2587+
// Check account interest for the AppendEntry subject.
2588+
checkFor(t, time.Second, time.Millisecond*25, func() error {
2589+
for _, s := range c.servers {
2590+
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
2591+
return fmt.Errorf("system account should have interest")
2592+
}
2593+
if s.gacc.sl.hasInterest(rg.asubj, true) {
2594+
return fmt.Errorf("global account shouldn't have interest")
2595+
}
2596+
}
2597+
return nil
2598+
})
2599+
2600+
// Check that the Raft traffic is in the system account, as we
2601+
// don't claim support for account NRG on all nodes in the group.
2602+
{
2603+
sub, err := snc.SubscribeSync(rg.asubj)
2604+
require_NoError(t, err)
2605+
require_NoError(t, sub.AutoUnsubscribe(1))
2606+
2607+
msg, err := sub.NextMsg(time.Second * 3)
2608+
require_NoError(t, err)
2609+
require_True(t, msg != nil)
2610+
}
2611+
})
2612+
2613+
t.Run("Enabled", func(t *testing.T) {
2614+
// Switch on account NRG on all servers in the cluster.
2615+
for _, s := range c.servers {
2616+
s.accountNRGAllowed.Store(true)
2617+
s.sendStatszUpdate()
2618+
}
2619+
time.Sleep(time.Millisecond * 100)
2620+
for _, s := range c.servers {
2621+
s.GlobalAccount().js.nrgAccount = globalAccountName
2622+
s.updateNRGAccountStatus()
2623+
}
2624+
2625+
// Check account interest for the AppendEntry subject.
2626+
checkFor(t, time.Second, time.Millisecond*25, func() error {
2627+
for _, s := range c.servers {
2628+
if s.sys.account.sl.hasInterest(rg.asubj, true) {
2629+
return fmt.Errorf("system account shouldn't have interest")
2630+
}
2631+
if !s.gacc.sl.hasInterest(rg.asubj, true) {
2632+
return fmt.Errorf("global account should have interest")
2633+
}
2634+
}
2635+
return nil
2636+
})
2637+
2638+
// Check that the traffic moved into the global account as
2639+
// expected.
2640+
{
2641+
sub, err := nc.SubscribeSync(rg.asubj)
2642+
require_NoError(t, err)
2643+
require_NoError(t, sub.AutoUnsubscribe(1))
2644+
2645+
msg, err := sub.NextMsg(time.Second * 3)
2646+
require_NoError(t, err)
2647+
require_True(t, msg != nil)
2648+
}
2649+
})
2650+
}
2651+
25082652
func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
25092653
c := createJetStreamClusterExplicit(t, "R3S", 3)
25102654
defer c.shutdown()

0 commit comments

Comments
 (0)