From af6541d9223522871518e70ec5c27989dda87588 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 15 Jan 2025 17:54:09 -0700 Subject: [PATCH] [FIXED] LeafNodes: Queue interest may be lost in super cluster If a cluster has leafnode connections and each have the same queue group, the loss of a leafnode connection could cause the server in the "hub" cluster to drop interest across a gateway for this queue group. The issue is fixed by properly accounting for all queue sub and unsub in the server gateway interest map. Signed-off-by: Ivan Kozlovic --- server/leafnode.go | 20 ++- server/leafnode_test.go | 343 ++++++++++++++++++++++++++++++++++++++++ server/route.go | 20 ++- 3 files changed, 361 insertions(+), 22 deletions(-) diff --git a/server/leafnode.go b/server/leafnode.go index 0219f4fecff..e927363f03c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2525,7 +2525,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { } key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -2536,7 +2535,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -2559,7 +2557,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { if !spoke { // If we are routing add to the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, delta) - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } } @@ -2601,27 +2599,27 @@ func (c *client) processLeafUnsub(arg []byte) error { return nil } - updateGWs := false spoke := c.isSpokeLeafNode() // We store local subs by account and subject and optionally queue name. // LS- will have the arg exactly as the key. sub, ok := c.subs[string(arg)] + if !ok { + // If not found, don't try to update routes/gws/leaf nodes. + c.mu.Unlock() + return nil + } delta := int32(1) - if ok && len(sub.queue) > 0 { + if len(sub.queue) > 0 { delta = sub.qw } c.mu.Unlock() - if ok { - c.unsubscribe(acc, sub, true, true) - updateGWs = srv.gateway.enabled - } - + c.unsubscribe(acc, sub, true, true) if !spoke { // If we are routing subtract from the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, -delta) // Gateways - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, -delta) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index b5f8266a912..f292a498280 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4554,6 +4554,349 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { } } +func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testing.T) { + SetGatewaysSolicitDelay(0) + defer ResetGatewaysSolicitDelay() + + // + // D + // | + // Leaf + // | + // v + // C + // ^ ^ + // / \ + // GW GW + // / \ + // v \ + // B1 <--- route ---> B2 <----*----------* + // ^ <---* | | + // | | Leaf Leaf + // Leaf *-- Leaf ---* | | + // | | | | + // A1 <--- route ---> A2 OTHER1 OTHER2 + // + + accs := ` + accounts { + SYS: {users: [{user:sys, password: pwd}]} + USER: {users: [{user:user, password: pwd}]} + } + system_account: SYS + ` + bConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: "B" + listen: "127.0.0.1:-1" + no_advertise: true + %s + } + gateway { + name: "B" + listen: "127.0.0.1:-1" + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + sb1Conf := createConfFile(t, []byte(fmt.Sprintf(bConf, accs, "B1", _EMPTY_))) + sb1, sb1o := RunServerWithConfig(sb1Conf) + defer sb1.Shutdown() + + sb2Conf := createConfFile(t, []byte(fmt.Sprintf(bConf, accs, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", sb1o.Cluster.Port)))) + sb2, sb2o := RunServerWithConfig(sb2Conf) + defer sb2.Shutdown() + + checkClusterFormed(t, sb1, sb2) + + cConf := ` + %s + server_name: C + listen: "127.0.0.1:-1" + cluster { + name: "C" + listen: "127.0.0.1:-1" + } + gateway { + name: "C" + listen: "127.0.0.1:-1" + gateways [ + { + name: B + url: "nats://127.0.0.1:%d" + } + ] + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + scConf := createConfFile(t, []byte(fmt.Sprintf(cConf, accs, sb1o.Gateway.Port))) + sc, sco := RunServerWithConfig(scConf) + defer sc.Shutdown() + + waitForOutboundGateways(t, sc, 1, 2*time.Second) + waitForOutboundGateways(t, sb1, 1, 2*time.Second) + waitForOutboundGateways(t, sb2, 1, 2*time.Second) + waitForInboundGateways(t, sc, 2, 2*time.Second) + waitForInboundGateways(t, sb1, 1, 2*time.Second) + + dConf := ` + %s + server_name: D + listen: "127.0.0.1:-1" + cluster { + name: "D" + listen: "127.0.0.1:-1" + } + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + sdConf := createConfFile(t, []byte(fmt.Sprintf(dConf, accs, sco.LeafNode.Port))) + sd, _ := RunServerWithConfig(sdConf) + defer sd.Shutdown() + + checkLeafNodeConnected(t, sc) + checkLeafNodeConnected(t, sd) + + aConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: A + listen: "127.0.0.1:-1" + no_advertise: true + %s + } + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + a1Conf := createConfFile(t, []byte(fmt.Sprintf(aConf, accs, "A1", _EMPTY_, sb1o.LeafNode.Port))) + sa1, sa1o := RunServerWithConfig(a1Conf) + defer sa1.Shutdown() + + checkLeafNodeConnected(t, sa1) + checkLeafNodeConnected(t, sb1) + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(aConf, accs, "A2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", sa1o.Cluster.Port), sb1o.LeafNode.Port))) + sa2, _ := RunServerWithConfig(a2Conf) + defer sa2.Shutdown() + + checkClusterFormed(t, sa1, sa2) + checkLeafNodeConnected(t, sa2) + checkLeafNodeConnectedCount(t, sb1, 2) + + otherLeafsConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + o1Conf := createConfFile(t, []byte(fmt.Sprintf(otherLeafsConf, accs, "OTHERLEAF1", sb2o.LeafNode.Port))) + so1, _ := RunServerWithConfig(o1Conf) + defer so1.Shutdown() + checkLeafNodeConnected(t, so1) + checkLeafNodeConnectedCount(t, sb2, 1) + + o2Conf := createConfFile(t, []byte(fmt.Sprintf(otherLeafsConf, accs, "OTHERLEAF2", sb2o.LeafNode.Port))) + so2, _ := RunServerWithConfig(o2Conf) + defer so2.Shutdown() + checkLeafNodeConnected(t, so2) + checkLeafNodeConnectedCount(t, sb2, 2) + + // Helper to check that the interest is propagated to all servers + checkInterest := func(t *testing.T, expected []int, expectedGW int32) { + t.Helper() + subj := "foo" + for i, s := range []*Server{sa1, sa2, so1, so2, sb1, sb2, sc, sd} { + if s == sc || !s.isRunning() { + continue + } + acc, err := s.LookupAccount("USER") + require_NoError(t, err) + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + n := acc.Interest(subj) + if n == expected[i] { + return nil + } + return fmt.Errorf("Expected interest count for server %q to be %v, got %v", s, expected[i], n) + }) + } + // For server C, need to check in gateway's account. + checkForRegisteredQSubInterest(t, sc, "B", "USER", "foo", expected[6], time.Second) + + // For server B1 and B2, check that we have the proper counts in the map. + for _, s := range []*Server{sb1, sb2} { + if !s.isRunning() { + continue + } + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + s.gateway.pasi.Lock() + accMap := s.gateway.pasi.m + st := accMap["USER"] + var n int32 + entry, ok := st["foo bar"] + if ok { + n = entry.n + } + s.gateway.pasi.Unlock() + if n == expectedGW { + return nil + } + return fmt.Errorf("Expected GW interest count for server %q to be %v, got %v", s, expectedGW, n) + }) + } + } + + ncA1 := natsConnect(t, sa1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA1.Close() + for i := 0; i < 3; i++ { + natsQueueSubSync(t, ncA1, "foo", "bar") + } + natsFlush(t, ncA1) + // With 3 queue subs on A1, we should have for servers (in order checked in checkInterest) + // for A1: 3 locals, for all others, 1 for the remote sub from A1. + // B1 and B2 GW map will be 3 (1 for each sub) + checkInterest(t, []int{3, 1, 1, 1, 1, 1, 1, 1}, 3) + + ncA2 := natsConnect(t, sa2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA2.Close() + ncA2qsub1 := natsQueueSubSync(t, ncA2, "foo", "bar") + ncA2qsub2 := natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + // A1 will have 1 more for remote sub, same for A2 (2 locals + 1 remote). + // B1 will have 2 interest (1 per leaf connection) + // B1 and B2 GW map goes to 5. + checkInterest(t, []int{4, 3, 1, 1, 2, 1, 1, 1}, 5) + + ncOther1 := natsConnect(t, so1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncOther1.Close() + natsQueueSubSync(t, ncOther1, "foo", "bar") + natsQueueSubSync(t, ncOther1, "foo", "bar") + natsFlush(t, ncOther1) + // A1, A2 will have one more because of routed interest + // O1 will have 3 (2 locals + 1 for remote interest) + // O2 has still 1 for remote interest + // B1 has 1 more because of new leaf interest and B2 because of routed interest. + // B1 and B2 GW map goes to 7. + checkInterest(t, []int{5, 4, 3, 1, 3, 2, 1, 1}, 7) + + ncOther2 := natsConnect(t, so2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncOther2.Close() + natsQueueSubSync(t, ncOther2, "foo", "bar") + natsFlush(t, ncOther2) + // O2 1 more for local interest + // B2 1 more for the new leaf interest + // B1 and B2 GW map goes to 8. + checkInterest(t, []int{5, 4, 3, 2, 3, 3, 1, 1}, 8) + + // Stop the server so1. + so1.Shutdown() + so1.WaitForShutdown() + checkLeafNodeConnectedCount(t, sb2, 1) + // Now check interest still valid, but wait a little bit to make sure that + // even with the bug where we would send an RS- through the gateway, there + // would be enough time for it to propagate before we check for interest. + time.Sleep(250 * time.Millisecond) + // O1 is stopped, so expect 0 + // B2 has 1 less because leaf connection went away. + // B1 and B2 GW map goes down to 6. + checkInterest(t, []int{5, 4, 0, 2, 3, 2, 1, 1}, 6) + + // Store server sa1. + sa1.Shutdown() + sa1.WaitForShutdown() + checkLeafNodeConnectedCount(t, sb1, 1) + time.Sleep(250 * time.Millisecond) + // A1 and O1 are gone, so 0 + // A2 has 1 less due to loss of routed interest + // B1 has 1 less because 1 leaf connection went away. + // B1 and B2 GW map goes down to 3. + checkInterest(t, []int{0, 3, 0, 2, 2, 2, 1, 1}, 3) + + // Now remove the queue subs from A2 + ncA2qsub1.Unsubscribe() + natsFlush(t, ncA2) + // A2 has 1 less + checkInterest(t, []int{0, 2, 0, 2, 2, 2, 1, 1}, 2) + + ncA2qsub2.Unsubscribe() + natsFlush(t, ncA2) + // A2 has 1 (no more locals but still interest for O2). + // O2 has 1 (no more for remote interest, only local). + // B1, B2 has 1 less since no interest from any of its leaf connections. + checkInterest(t, []int{0, 1, 0, 1, 1, 1, 1, 1}, 1) + + // Removing (closing connection) of the sub on O2 will remove + // interest globally. + ncOther2.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Resubscribe now, and again, interest should be propagated. + natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + checkInterest(t, []int{0, 1, 0, 1, 1, 1, 1, 1}, 1) + + natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + checkInterest(t, []int{0, 2, 0, 1, 1, 1, 1, 1}, 2) + + // Close the client connection that has the 2 queue subs. + ncA2.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Now we will test when a route is lost on a server that has gateway enabled + // that we update counts properly. + ncB2 := natsConnect(t, sb2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB2.Close() + natsQueueSubSync(t, ncB2, "foo", "bar") + natsQueueSubSync(t, ncB2, "foo", "bar") + natsQueueSubSync(t, ncB2, "foo", "bar") + natsFlush(t, ncB2) + checkInterest(t, []int{0, 1, 0, 1, 1, 3, 1, 1}, 3) + + ncB1 := natsConnect(t, sb1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB1.Close() + natsQueueSubSync(t, ncB1, "foo", "bar") + natsQueueSubSync(t, ncB1, "foo", "bar") + checkInterest(t, []int{0, 1, 0, 1, 3, 4, 1, 1}, 5) + + // Now shutdown B2 + sb2.Shutdown() + sa1.WaitForShutdown() + time.Sleep(250 * time.Millisecond) + checkInterest(t, []int{0, 1, 0, 0, 2, 0, 1, 1}, 2) + + ncB1.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) +} + func TestLeafNodeQueueInterestAndWeightCorrectAfterServerRestartOrConnectionClose(t *testing.T) { // Note that this is not what a normal configuration should be. Users should diff --git a/server/route.go b/server/route.go index 3ecedf2f11c..be63abaae6a 100644 --- a/server/route.go +++ b/server/route.go @@ -1394,8 +1394,6 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { return nil } - updateGWs := false - _keya := [128]byte{} _key := _keya[:0] @@ -1419,19 +1417,21 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { if ok { delete(c.subs, key) acc.sl.Remove(sub) - updateGWs = srv.gateway.enabled if len(sub.queue) > 0 { delta = sub.qw } } c.mu.Unlock() - if updateGWs { - srv.gatewayUpdateSubInterest(accountName, sub, -delta) - } + // Update gateways and leaf nodes only if the subscription was found. + if ok { + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(accountName, sub, -delta) + } - // Now check on leafnode updates. - acc.updateLeafNodes(sub, -delta) + // Now check on leafnode updates. + acc.updateLeafNodes(sub, -delta) + } if c.opts.Verbose { c.sendOK() @@ -1646,7 +1646,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // We use the sub.sid for the key of the c.subs map. key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1657,7 +1656,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -1666,7 +1664,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } c.mu.Unlock() - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) }