Fix flappers

- TestResponsePermissions: ensure subscription for service is
registered by server before sending requests.
- TestReloadDoesNotWipeAccountsWithOperatorMode: wait for subject
propagation.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-05-28 12:00:37 -06:00
parent f859edaf4f
commit b0e43b6aa9
4 changed files with 36 additions and 0 deletions

View File

@@ -484,6 +484,21 @@ func (a *Account) TotalSubs() int {
return int(a.sl.Count())
}
// SubscriptionInterest returns true if this account has a matching subscription
// for the given `subject`. Works only for literal subjects.
// TODO: Add support for wildcards
func (a *Account) SubscriptionInterest(subject string) bool {
var interest bool
a.mu.RLock()
if a.sl != nil {
if res := a.sl.Match(subject); len(res.psubs)+len(res.qsubs) > 0 {
interest = true
}
}
a.mu.RUnlock()
return interest
}
// addClient keeps our accounting of local active clients or leafnodes updated.
// Returns previous total.
func (a *Account) addClient(c *client) int {

View File

@@ -1818,6 +1818,7 @@ func TestResponsePermissions(t *testing.T) {
svcNC := natsConnect(t, fmt.Sprintf("nats://service:pwd@%s:%d", opts.Host, opts.Port))
defer svcNC.Close()
reqSub := natsSubSync(t, svcNC, "request")
natsFlush(t, svcNC)
nc := natsConnect(t, fmt.Sprintf("nats://ivan:pwd@%s:%d", opts.Host, opts.Port))
defer nc.Close()

View File

@@ -72,6 +72,20 @@ func checkExpectedSubs(expected int, servers ...*server.Server) error {
return nil
}
func checkSubInterest(t *testing.T, s *server.Server, accName, subject string, timeout time.Duration) {
t.Helper()
checkFor(t, timeout, 15*time.Millisecond, func() error {
acc, err := s.LookupAccount(accName)
if err != nil {
return fmt.Errorf("error looking up account %q: %v", accName, err)
}
if acc.SubscriptionInterest(subject) {
return nil
}
return fmt.Errorf("no subscription interest for account %q on %q", accName, subject)
})
}
func runThreeServers(t *testing.T) (srvA, srvB, srvC *server.Server, optsA, optsB, optsC *server.Options) {
srvA, optsA = RunServerWithConfig("./configs/srv_a.conf")
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")

View File

@@ -381,6 +381,7 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) {
// Use this to check for message.
checkForMsg := func() {
t.Helper()
select {
case <-ch:
case <-time.After(2 * time.Second):
@@ -388,6 +389,9 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) {
}
}
// Wait for "foo" interest to be propagated to s2's account `accPub`
checkSubInterest(t, s2, accPub, "foo", 2*time.Second)
// Create second client and send message from this one. Interest should be here.
url2 := fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port)
nc2, err := nats.Connect(url2, createUserCreds(t, s2, accKP))
@@ -423,6 +427,8 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) {
checkClusterFormed(t, s, s2)
checkSubInterest(t, s2, accPub, "foo", 2*time.Second)
// Reconnect and make sure this works. If accounts blown away this will fail.
url2 = fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port)
nc2, err = nats.Connect(url2, createUserCreds(t, s2, accKP))