Merge pull request #1457 from nats-io/sisubs

Don't leak service import subs on claim updates
This commit is contained in:
Derek Collison
2020-06-05 13:35:32 -07:00
committed by GitHub
4 changed files with 73 additions and 5 deletions

View File

@@ -2437,6 +2437,20 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
})
}
// Now make sure we shutdown the old service import subscriptions.
var sids [][]byte
a.mu.RLock()
c := a.ic
for _, si := range old.imports.services {
if c != nil && si.sub != nil && si.sub.sid != nil {
sids = append(sids, si.sub.sid)
}
}
a.mu.RUnlock()
for _, sid := range sids {
c.processUnsub(sid)
}
// Now do limits if they are present.
a.mu.Lock()
a.msubs = int32(ac.Limits.Subs)

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-beta.14"
VERSION = "2.2.0-beta.15"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -1059,7 +1059,7 @@ func TestAccountClaimsUpdates(t *testing.T) {
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
// Let's create a normal account with limits we can update.
// Let's create a normal account with limits we can update.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
@@ -1102,6 +1102,62 @@ func TestAccountClaimsUpdates(t *testing.T) {
}
}
func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
okp, _ := nkeys.FromSeed(oSeed)
// Let's create an account with service export.
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Exports.Add(&jwt.Export{Subject: "req.*", Type: jwt.Service})
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
s.LookupAccount(pub)
// Now add an account with multiple service imports.
akp2, _ := nkeys.CreateAccount()
pub2, _ := akp2.PublicKey()
nac2 := jwt.NewAccountClaims(pub2)
nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service})
ajwt2, _ := nac2.Encode(okp)
addAccountToMemResolver(s, pub2, ajwt2)
s.LookupAccount(pub2)
startSubs := s.NumSubscriptions()
// Simulate a systems publisher so we can do an account claims update.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, sakp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Update the account several times
for i := 1; i <= 10; i++ {
nac2 = jwt.NewAccountClaims(pub2)
nac2.Limits.Conn = int64(i)
nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service})
ajwt2, _ = nac2.Encode(okp)
// Publish to the system update subject.
claimUpdateSubj := fmt.Sprintf(accUpdateEventSubj, pub2)
nc.Publish(claimUpdateSubj, []byte(ajwt2))
}
nc.Flush()
if startSubs != s.NumSubscriptions() {
t.Fatalf("Subscriptions leaked: %d vs %d", startSubs, s.NumSubscriptions())
}
}
func TestAccountConnsLimitExceededAfterUpdate(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()

View File

@@ -1148,9 +1148,7 @@ func (s *Server) reloadAuthorization() {
if acc == s.gacc {
return true
}
acc.mu.RLock()
accName := acc.Name
acc.mu.RUnlock()
accName := acc.GetName()
// Release server lock for following actions
s.mu.Unlock()
accClaims, claimJWT, _ := s.fetchAccountClaims(accName)