Skip to content

Commit 1d8f32c

Browse files
NRG (2.11): Add ability to move cluster traffic into asset accounts (#5466)
This PR adds the ability to move NRG traffic out of the system account and into the asset accounts. Particularly in heavily multi-tenanted systems, this can help cases where head-of-line blocking may happen in the system account due to large amounts of replication traffic. This needs to be enabled on a per-account basis on each participating server. Servers advertise their capability to support account NRG to each other. If a Raft group detects a peer coming up without support, they will revert back to using the system account automatically. Requires nats-io/jwt#223 and then `go.mod` updating. Signed-off-by: Neil Twigg <[email protected]>
2 parents babdb82 + b38b039 commit 1d8f32c

12 files changed

+471
-29
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/google/go-tpm v0.9.0
77
github.com/klauspost/compress v1.17.9
88
github.com/minio/highwayhash v1.0.3
9-
github.com/nats-io/jwt/v2 v2.5.8
9+
github.com/nats-io/jwt/v2 v2.6.0
1010
github.com/nats-io/nats.go v1.36.0
1111
github.com/nats-io/nkeys v0.4.7
1212
github.com/nats-io/nuid v1.0.1

go.sum

+2-6
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2
66
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
77
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
88
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
9-
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
10-
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
9+
github.com/nats-io/jwt/v2 v2.6.0 h1:yXoBTdEotZw3NujMT+Nnu1UPNlFWdKQ3d0JJF/+pJag=
10+
github.com/nats-io/jwt/v2 v2.6.0/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
1111
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
1212
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
1313
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
@@ -22,13 +22,9 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
2222
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2323
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
2424
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
25-
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
26-
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
2725
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
2826
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
2927
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
30-
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
31-
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
3228
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
3329
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
3430
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=

server/accounts.go

+22
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Account struct {
6262
sqmu sync.Mutex
6363
sl *Sublist
6464
ic *client
65+
sq *sendq
6566
isid uint64
6667
etmr *time.Timer
6768
ctmr *time.Timer
@@ -3655,6 +3656,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
36553656

36563657
a.updated = time.Now()
36573658
clients := a.getClientsLocked()
3659+
ajs := a.js
36583660
a.mu.Unlock()
36593661

36603662
// Sort if we are over the limit.
@@ -3679,6 +3681,26 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
36793681
a.enableAllJetStreamServiceImportsAndMappings()
36803682
}
36813683

3684+
if ajs != nil {
3685+
// Check whether the account NRG status changed. If it has then we need to notify the
3686+
// Raft groups running on the system so that they can move their subs if needed.
3687+
a.mu.Lock()
3688+
previous := ajs.nrgAccount
3689+
switch tokens := strings.SplitN(ac.ClusterTraffic, ":", 2); tokens[0] {
3690+
case "system":
3691+
a.js.nrgAccount = _EMPTY_
3692+
case "owner":
3693+
a.js.nrgAccount = a.Name
3694+
default:
3695+
s.Errorf("Account claim for %q has invalid value %q for cluster traffic account", a.Name, ac.ClusterTraffic)
3696+
}
3697+
changed := ajs.nrgAccount != previous
3698+
a.mu.Unlock()
3699+
if changed {
3700+
s.updateNRGAccountStatus()
3701+
}
3702+
}
3703+
36823704
for i, c := range clients {
36833705
a.mu.RLock()
36843706
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)

server/events.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ type ServerInfo struct {
264264
const (
265265
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
266266
BinaryStreamSnapshot // New stream snapshot capability.
267+
AccountNRG // Move NRG traffic out of system account.
267268
)
268269

269270
// Set JetStream capability.
@@ -289,6 +290,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
289290
return si.Flags&BinaryStreamSnapshot != 0
290291
}
291292

293+
// Set account NRG capability.
294+
func (si *ServerInfo) SetAccountNRG() {
295+
si.Flags |= AccountNRG
296+
}
297+
298+
// AccountNRG indicates whether or not we support moving the NRG traffic out of the
299+
// system account and into the asset account.
300+
func (si *ServerInfo) AccountNRG() bool {
301+
return si.Flags&AccountNRG != 0
302+
}
303+
292304
// ClientInfo is detailed information about the client forming a connection.
293305
type ClientInfo struct {
294306
Start *time.Time `json:"start,omitempty"`
@@ -475,10 +487,14 @@ RESET:
475487
si.Version = VERSION
476488
si.Time = time.Now().UTC()
477489
si.Tags = tags
490+
si.Flags = 0
478491
if js {
479492
// New capability based flags.
480493
si.SetJetStreamEnabled()
481494
si.SetBinaryStreamSnapshot()
495+
if s.accountNRGAllowed.Load() {
496+
si.SetAccountNRG()
497+
}
482498
}
483499
}
484500
var b []byte
@@ -1616,7 +1632,8 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
16161632
}
16171633

16181634
node := getHash(si.Name)
1619-
s.nodeToInfo.Store(node, nodeInfo{
1635+
accountNRG := si.AccountNRG()
1636+
oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{
16201637
si.Name,
16211638
si.Version,
16221639
si.Cluster,
@@ -1628,7 +1645,14 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
16281645
false,
16291646
si.JetStreamEnabled(),
16301647
si.BinaryStreamSnapshot(),
1648+
accountNRG,
16311649
})
1650+
if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG {
1651+
// One of the servers we received statsz from changed its mind about
1652+
// whether or not it supports in-account NRG, so update the groups
1653+
// with this information.
1654+
s.updateNRGAccountStatus()
1655+
}
16321656
}
16331657

16341658
// updateRemoteServer is called when we have an update from a remote server.
@@ -1675,14 +1699,35 @@ func (s *Server) processNewServer(si *ServerInfo) {
16751699
false,
16761700
si.JetStreamEnabled(),
16771701
si.BinaryStreamSnapshot(),
1702+
si.AccountNRG(),
16781703
})
16791704
}
16801705
}
1706+
go s.updateNRGAccountStatus()
16811707
// Announce ourselves..
16821708
// Do this in a separate Go routine.
16831709
go s.sendStatszUpdate()
16841710
}
16851711

1712+
// Works out whether all nodes support moving the NRG traffic into
1713+
// the account and moves it appropriately.
1714+
// Server lock MUST NOT be held on entry.
1715+
func (s *Server) updateNRGAccountStatus() {
1716+
s.rnMu.RLock()
1717+
raftNodes := make([]RaftNode, 0, len(s.raftNodes))
1718+
for _, n := range s.raftNodes {
1719+
raftNodes = append(raftNodes, n)
1720+
}
1721+
s.rnMu.RUnlock()
1722+
for _, n := range raftNodes {
1723+
// In the event that the node is happy that all nodes that
1724+
// it cares about haven't changed, this will be a no-op.
1725+
if err := n.RecreateInternalSubs(); err != nil {
1726+
n.Stop()
1727+
}
1728+
}
1729+
}
1730+
16861731
// If GW is enabled on this server and there are any leaf node connections,
16871732
// this function will send a LeafNode connect system event to the super cluster
16881733
// to ensure that the GWs are in interest-only mode for this account.

server/jetstream.go

+3
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ type jsAccount struct {
174174
updatesSub *subscription
175175
lupdate time.Time
176176
utimer *time.Timer
177+
178+
// Which account to send NRG traffic into. Empty string is system account.
179+
nrgAccount string
177180
}
178181

179182
// Track general usage for this account.

server/jetstream_cluster_4_test.go

+144
Original file line numberDiff line numberDiff line change
@@ -2505,6 +2505,150 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) {
25052505
}
25062506
}
25072507

2508+
func TestJetStreamClusterAccountNRG(t *testing.T) {
2509+
c := createJetStreamClusterExplicit(t, "R3S", 3)
2510+
defer c.shutdown()
2511+
2512+
nc, js := jsClientConnect(t, c.randomServer())
2513+
defer nc.Close()
2514+
2515+
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
2516+
defer snc.Close()
2517+
2518+
_, err := js.AddStream(&nats.StreamConfig{
2519+
Name: "TEST",
2520+
Subjects: []string{"foo"},
2521+
Storage: nats.MemoryStorage,
2522+
Retention: nats.WorkQueuePolicy,
2523+
Replicas: 3,
2524+
})
2525+
require_NoError(t, err)
2526+
2527+
leader := c.streamLeader(globalAccountName, "TEST")
2528+
stream, err := leader.gacc.lookupStream("TEST")
2529+
require_NoError(t, err)
2530+
rg := stream.node.(*raft)
2531+
2532+
t.Run("Disabled", func(t *testing.T) {
2533+
// Switch off account NRG on all servers in the cluster.
2534+
for _, s := range c.servers {
2535+
s.accountNRGAllowed.Store(false)
2536+
s.sendStatszUpdate()
2537+
}
2538+
time.Sleep(time.Millisecond * 100)
2539+
for _, s := range c.servers {
2540+
s.GlobalAccount().js.nrgAccount = ""
2541+
s.updateNRGAccountStatus()
2542+
}
2543+
2544+
// Check account interest for the AppendEntry subject.
2545+
checkFor(t, time.Second, time.Millisecond*25, func() error {
2546+
for _, s := range c.servers {
2547+
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
2548+
return fmt.Errorf("system account should have interest")
2549+
}
2550+
if s.gacc.sl.hasInterest(rg.asubj, true) {
2551+
return fmt.Errorf("global account shouldn't have interest")
2552+
}
2553+
}
2554+
return nil
2555+
})
2556+
2557+
// Check that the Raft traffic is in the system account, as we
2558+
// haven't moved it elsewhere yet.
2559+
{
2560+
sub, err := snc.SubscribeSync(rg.asubj)
2561+
require_NoError(t, err)
2562+
require_NoError(t, sub.AutoUnsubscribe(1))
2563+
2564+
msg, err := sub.NextMsg(time.Second * 3)
2565+
require_NoError(t, err)
2566+
require_True(t, msg != nil)
2567+
}
2568+
})
2569+
2570+
t.Run("Mixed", func(t *testing.T) {
2571+
// Switch on account NRG on a single server in the cluster and
2572+
// leave it off on the rest.
2573+
for i, s := range c.servers {
2574+
s.accountNRGAllowed.Store(i == 0)
2575+
s.sendStatszUpdate()
2576+
}
2577+
time.Sleep(time.Millisecond * 100)
2578+
for i, s := range c.servers {
2579+
if i == 0 {
2580+
s.GlobalAccount().js.nrgAccount = globalAccountName
2581+
} else {
2582+
s.GlobalAccount().js.nrgAccount = ""
2583+
}
2584+
s.updateNRGAccountStatus()
2585+
}
2586+
2587+
// Check account interest for the AppendEntry subject.
2588+
checkFor(t, time.Second, time.Millisecond*25, func() error {
2589+
for _, s := range c.servers {
2590+
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
2591+
return fmt.Errorf("system account should have interest")
2592+
}
2593+
if s.gacc.sl.hasInterest(rg.asubj, true) {
2594+
return fmt.Errorf("global account shouldn't have interest")
2595+
}
2596+
}
2597+
return nil
2598+
})
2599+
2600+
// Check that the Raft traffic is in the system account, as we
2601+
// don't claim support for account NRG on all nodes in the group.
2602+
{
2603+
sub, err := snc.SubscribeSync(rg.asubj)
2604+
require_NoError(t, err)
2605+
require_NoError(t, sub.AutoUnsubscribe(1))
2606+
2607+
msg, err := sub.NextMsg(time.Second * 3)
2608+
require_NoError(t, err)
2609+
require_True(t, msg != nil)
2610+
}
2611+
})
2612+
2613+
t.Run("Enabled", func(t *testing.T) {
2614+
// Switch on account NRG on all servers in the cluster.
2615+
for _, s := range c.servers {
2616+
s.accountNRGAllowed.Store(true)
2617+
s.sendStatszUpdate()
2618+
}
2619+
time.Sleep(time.Millisecond * 100)
2620+
for _, s := range c.servers {
2621+
s.GlobalAccount().js.nrgAccount = globalAccountName
2622+
s.updateNRGAccountStatus()
2623+
}
2624+
2625+
// Check account interest for the AppendEntry subject.
2626+
checkFor(t, time.Second, time.Millisecond*25, func() error {
2627+
for _, s := range c.servers {
2628+
if s.sys.account.sl.hasInterest(rg.asubj, true) {
2629+
return fmt.Errorf("system account shouldn't have interest")
2630+
}
2631+
if !s.gacc.sl.hasInterest(rg.asubj, true) {
2632+
return fmt.Errorf("global account should have interest")
2633+
}
2634+
}
2635+
return nil
2636+
})
2637+
2638+
// Check that the traffic moved into the global account as
2639+
// expected.
2640+
{
2641+
sub, err := nc.SubscribeSync(rg.asubj)
2642+
require_NoError(t, err)
2643+
require_NoError(t, sub.AutoUnsubscribe(1))
2644+
2645+
msg, err := sub.NextMsg(time.Second * 3)
2646+
require_NoError(t, err)
2647+
require_True(t, msg != nil)
2648+
}
2649+
})
2650+
}
2651+
25082652
func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
25092653
c := createJetStreamClusterExplicit(t, "R3S", 3)
25102654
defer c.shutdown()

0 commit comments

Comments
 (0)