diff --git a/server/leafnode.go b/server/leafnode.go index 05d94db2..39170e3c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2101,8 +2101,11 @@ func (c *client) processLeafSub(argo []byte) (err error) { spoke := c.isSpokeLeafNode() c.mu.Unlock() - if err := c.addShadowSubscriptions(acc, sub); err != nil { - c.Errorf(err.Error()) + // Only add in shadow subs if a new sub or qsub. + if osub == nil { + if err := c.addShadowSubscriptions(acc, sub); err != nil { + c.Errorf(err.Error()) + } } // If we are not solicited, treat leaf node subscriptions similar to a diff --git a/server/leafnode_test.go b/server/leafnode_test.go index ade5ce85..10caf8cb 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5111,7 +5111,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 5, 2) defer sc.shutdown() - // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO. + // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC. var lnTmpl = ` listen: 127.0.0.1:-1 server_name: %s @@ -5381,3 +5381,183 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived) } + +func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *testing.T) { + var tmpl = ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + leaf { listen: 127.0.0.1:-1 } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + EFG { + users = [ { user: "efg", pass: "p" } ] + jetstream: enabled + exports [ { stream: "RESPONSE" } ] + } + STL { + users = [ { user: "stl", pass: "p" } ] + imports [ { stream: { account: EFG, subject: "RESPONSE"} } ] + } + KSC { + users = [ { user: "ksc", pass: "p" } ] + imports [ { stream: { account: EFG, subject: "RESPONSE"} } ] + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + + c := createJetStreamClusterWithTemplate(t, tmpl, "US-CENTRAL", 3) + defer c.shutdown() + + // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC. + var lnTmpl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + {{leaf}} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }} + ` + + var leafFrag = ` + leaf { + listen: 127.0.0.1:-1 + remotes [ { urls: [ %s ] } ] + }` + + genLeafTmpl := func(tmpl string) string { + t.Helper() + + var ln []string + for _, s := range c.servers { + lno := s.getOpts().LeafNode + ln = append(ln, fmt.Sprintf("nats://ksc:p@%s:%d", lno.Host, lno.Port)) + } + return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln, ", ")), 1) + } + + tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1) + tmpl = genLeafTmpl(tmpl) + + ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false) + ln.waitOnClusterReady() + defer ln.shutdown() + + for _, s := range ln.servers { + checkLeafNodeConnectedCount(t, s, 1) + } + + // Create 10 subscribers. + var rsubs []*nats.Subscription + + closeSubs := func(subs []*nats.Subscription) { + for _, sub := range subs { + sub.Unsubscribe() + } + } + + checkAllRespReceived := func() error { + t.Helper() + var total int + for _, sub := range rsubs { + n, _, err := sub.Pending() + require_NoError(t, err) + total += n + } + if total == 100 { + return nil + } + return fmt.Errorf("Not all responses received: %d vs %d", total, 100) + } + + s := ln.randomServer() + for i := 0; i < 4; i++ { + nc, _ := jsClientConnect(t, s) + defer nc.Close() + sub, err := nc.QueueSubscribeSync("RESPONSE", "SA") + require_NoError(t, err) + nc.Flush() + rsubs = append(rsubs, sub) + } + + // Now connect and send responses from EFG in cloud. + nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("efg", "p")) + for i := 0; i < 100; i++ { + require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) + } + nc.Flush() + + // Make sure all received. + checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived) + + checkAccountInterest := func(s *Server, accName string) *SublistResult { + t.Helper() + acc, err := s.LookupAccount(accName) + require_NoError(t, err) + acc.mu.RLock() + r := acc.sl.Match("RESPONSE") + acc.mu.RUnlock() + return r + } + + checkInterest := func() error { + t.Helper() + for _, s := range c.servers { + if r := checkAccountInterest(s, "KSC"); len(r.psubs)+len(r.qsubs) > 0 { + return fmt.Errorf("Subs still present for %q: %+v", "KSC", r) + } + if r := checkAccountInterest(s, "EFG"); len(r.psubs)+len(r.qsubs) > 0 { + return fmt.Errorf("Subs still present for %q: %+v", "EFG", r) + } + } + return nil + } + + // Now unsub them and create new ones on a different server. + closeSubs(rsubs) + rsubs = rsubs[:0] + + // Also restart the server that we had all the rsubs on. + s.Shutdown() + s.WaitForShutdown() + s = ln.restartServer(s) + ln.waitOnClusterReady() + ln.waitOnServerCurrent(s) + + checkFor(t, time.Second, 200*time.Millisecond, checkInterest) + + for i := 0; i < 4; i++ { + nc, _ := jsClientConnect(t, s) + defer nc.Close() + sub, err := nc.QueueSubscribeSync("RESPONSE", "SA") + require_NoError(t, err) + nc.Flush() + rsubs = append(rsubs, sub) + } + + for i := 0; i < 100; i++ { + require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) + } + nc.Flush() + + // Make sure all received. + checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived) + + closeSubs(rsubs) + checkFor(t, time.Second, 200*time.Millisecond, checkInterest) +}