From 353d543c166c3e9c1ad5c9b0c99980c7cf0d6092 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 10 Jul 2023 21:03:47 -0700 Subject: [PATCH] When a queue subscriber was updated multiple times over a leafnode connection we added in more shadow subscriptions which could become zombies when the connection went away. In a case where a leafnode server had multiple queue subscribers on the same queue group, the hub server would add in multiple shadow subs. These subs would not be properly cleaned up and could lead to stale connections being associated with them. Signed-off-by: Derek Collison --- server/leafnode.go | 7 +- server/leafnode_test.go | 182 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 186 insertions(+), 3 deletions(-) diff --git a/server/leafnode.go b/server/leafnode.go index 05d94db2..39170e3c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2101,8 +2101,11 @@ func (c *client) processLeafSub(argo []byte) (err error) { spoke := c.isSpokeLeafNode() c.mu.Unlock() - if err := c.addShadowSubscriptions(acc, sub); err != nil { - c.Errorf(err.Error()) + // Only add in shadow subs if a new sub or qsub. + if osub == nil { + if err := c.addShadowSubscriptions(acc, sub); err != nil { + c.Errorf(err.Error()) + } } // If we are not solicited, treat leaf node subscriptions similar to a diff --git a/server/leafnode_test.go b/server/leafnode_test.go index ade5ce85..10caf8cb 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5111,7 +5111,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 5, 2) defer sc.shutdown() - // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO. + // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC. var lnTmpl = ` listen: 127.0.0.1:-1 server_name: %s @@ -5381,3 +5381,183 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived) } + +func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *testing.T) { + var tmpl = ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + leaf { listen: 127.0.0.1:-1 } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + EFG { + users = [ { user: "efg", pass: "p" } ] + jetstream: enabled + exports [ { stream: "RESPONSE" } ] + } + STL { + users = [ { user: "stl", pass: "p" } ] + imports [ { stream: { account: EFG, subject: "RESPONSE"} } ] + } + KSC { + users = [ { user: "ksc", pass: "p" } ] + imports [ { stream: { account: EFG, subject: "RESPONSE"} } ] + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + + c := createJetStreamClusterWithTemplate(t, tmpl, "US-CENTRAL", 3) + defer c.shutdown() + + // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC. + var lnTmpl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + {{leaf}} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }} + ` + + var leafFrag = ` + leaf { + listen: 127.0.0.1:-1 + remotes [ { urls: [ %s ] } ] + }` + + genLeafTmpl := func(tmpl string) string { + t.Helper() + + var ln []string + for _, s := range c.servers { + lno := s.getOpts().LeafNode + ln = append(ln, fmt.Sprintf("nats://ksc:p@%s:%d", lno.Host, lno.Port)) + } + return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln, ", ")), 1) + } + + tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1) + tmpl = genLeafTmpl(tmpl) + + ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false) + ln.waitOnClusterReady() + defer ln.shutdown() + + for _, s := range ln.servers { + checkLeafNodeConnectedCount(t, s, 1) + } + + // Create 10 subscribers. + var rsubs []*nats.Subscription + + closeSubs := func(subs []*nats.Subscription) { + for _, sub := range subs { + sub.Unsubscribe() + } + } + + checkAllRespReceived := func() error { + t.Helper() + var total int + for _, sub := range rsubs { + n, _, err := sub.Pending() + require_NoError(t, err) + total += n + } + if total == 100 { + return nil + } + return fmt.Errorf("Not all responses received: %d vs %d", total, 100) + } + + s := ln.randomServer() + for i := 0; i < 4; i++ { + nc, _ := jsClientConnect(t, s) + defer nc.Close() + sub, err := nc.QueueSubscribeSync("RESPONSE", "SA") + require_NoError(t, err) + nc.Flush() + rsubs = append(rsubs, sub) + } + + // Now connect and send responses from EFG in cloud. + nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("efg", "p")) + for i := 0; i < 100; i++ { + require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) + } + nc.Flush() + + // Make sure all received. + checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived) + + checkAccountInterest := func(s *Server, accName string) *SublistResult { + t.Helper() + acc, err := s.LookupAccount(accName) + require_NoError(t, err) + acc.mu.RLock() + r := acc.sl.Match("RESPONSE") + acc.mu.RUnlock() + return r + } + + checkInterest := func() error { + t.Helper() + for _, s := range c.servers { + if r := checkAccountInterest(s, "KSC"); len(r.psubs)+len(r.qsubs) > 0 { + return fmt.Errorf("Subs still present for %q: %+v", "KSC", r) + } + if r := checkAccountInterest(s, "EFG"); len(r.psubs)+len(r.qsubs) > 0 { + return fmt.Errorf("Subs still present for %q: %+v", "EFG", r) + } + } + return nil + } + + // Now unsub them and create new ones on a different server. + closeSubs(rsubs) + rsubs = rsubs[:0] + + // Also restart the server that we had all the rsubs on. + s.Shutdown() + s.WaitForShutdown() + s = ln.restartServer(s) + ln.waitOnClusterReady() + ln.waitOnServerCurrent(s) + + checkFor(t, time.Second, 200*time.Millisecond, checkInterest) + + for i := 0; i < 4; i++ { + nc, _ := jsClientConnect(t, s) + defer nc.Close() + sub, err := nc.QueueSubscribeSync("RESPONSE", "SA") + require_NoError(t, err) + nc.Flush() + rsubs = append(rsubs, sub) + } + + for i := 0; i < 100; i++ { + require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) + } + nc.Flush() + + // Make sure all received. + checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived) + + closeSubs(rsubs) + checkFor(t, time.Second, 200*time.Millisecond, checkInterest) +}