diff --git a/server/accounts.go b/server/accounts.go index c5ae07f7..e812ae87 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -484,6 +484,21 @@ func (a *Account) TotalSubs() int { return int(a.sl.Count()) } +// SubscriptionInterest returns true if this account has a matching subscription +// for the given `subject`. Works only for literal subjects. +// TODO: Add support for wildcards +func (a *Account) SubscriptionInterest(subject string) bool { + var interest bool + a.mu.RLock() + if a.sl != nil { + if res := a.sl.Match(subject); len(res.psubs)+len(res.qsubs) > 0 { + interest = true + } + } + a.mu.RUnlock() + return interest +} + // addClient keeps our accounting of local active clients or leafnodes updated. // Returns previous total. func (a *Account) addClient(c *client) int { diff --git a/server/client_test.go b/server/client_test.go index aaff319d..5da8f46c 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1818,6 +1818,7 @@ func TestResponsePermissions(t *testing.T) { svcNC := natsConnect(t, fmt.Sprintf("nats://service:pwd@%s:%d", opts.Host, opts.Port)) defer svcNC.Close() reqSub := natsSubSync(t, svcNC, "request") + natsFlush(t, svcNC) nc := natsConnect(t, fmt.Sprintf("nats://ivan:pwd@%s:%d", opts.Host, opts.Port)) defer nc.Close() diff --git a/test/cluster_test.go b/test/cluster_test.go index 448b6980..a2e01f12 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -72,6 +72,20 @@ func checkExpectedSubs(expected int, servers ...*server.Server) error { return nil } +func checkSubInterest(t *testing.T, s *server.Server, accName, subject string, timeout time.Duration) { + t.Helper() + checkFor(t, timeout, 15*time.Millisecond, func() error { + acc, err := s.LookupAccount(accName) + if err != nil { + return fmt.Errorf("error looking up account %q: %v", accName, err) + } + if acc.SubscriptionInterest(subject) { + return nil + } + return fmt.Errorf("no subscription interest for account %q on %q", accName, subject) + }) +} + func runThreeServers(t *testing.T) (srvA, srvB, srvC *server.Server, optsA, optsB, optsC *server.Options) { srvA, optsA = RunServerWithConfig("./configs/srv_a.conf") srvB, optsB = RunServerWithConfig("./configs/srv_b.conf") diff --git a/test/operator_test.go b/test/operator_test.go index a69e7b3c..eb2d3cf3 100644 --- a/test/operator_test.go +++ b/test/operator_test.go @@ -381,6 +381,7 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) { // Use this to check for message. checkForMsg := func() { + t.Helper() select { case <-ch: case <-time.After(2 * time.Second): @@ -388,6 +389,9 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) { } } + // Wait for "foo" interest to be propagated to s2's account `accPub` + checkSubInterest(t, s2, accPub, "foo", 2*time.Second) + // Create second client and send message from this one. Interest should be here. url2 := fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port) nc2, err := nats.Connect(url2, createUserCreds(t, s2, accKP)) @@ -423,6 +427,8 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) { checkClusterFormed(t, s, s2) + checkSubInterest(t, s2, accPub, "foo", 2*time.Second) + // Reconnect and make sure this works. If accounts blown away this will fail. url2 = fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port) nc2, err = nats.Connect(url2, createUserCreds(t, s2, accKP))