From d0f65c8a74ba753baf5c61c5528040473c05806f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 5 Jun 2020 13:20:01 -0700 Subject: [PATCH] Don't leak service import subs on claim updates Signed-off-by: Derek Collison --- server/accounts.go | 14 +++++++++++ server/const.go | 2 +- server/events_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++- server/reload.go | 4 +-- 4 files changed, 73 insertions(+), 5 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 089486d4..17664797 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2437,6 +2437,20 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) { }) } + // Now make sure we shutdown the old service import subscriptions. + var sids [][]byte + a.mu.RLock() + c := a.ic + for _, si := range old.imports.services { + if c != nil && si.sub != nil && si.sub.sid != nil { + sids = append(sids, si.sub.sid) + } + } + a.mu.RUnlock() + for _, sid := range sids { + c.processUnsub(sid) + } + // Now do limits if they are present. a.mu.Lock() a.msubs = int32(ac.Limits.Subs) diff --git a/server/const.go b/server/const.go index 5f1a2666..35a8c853 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-beta.14" + VERSION = "2.2.0-beta.15" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/events_test.go b/server/events_test.go index cef47bf2..4f6e0a6f 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1059,7 +1059,7 @@ func TestAccountClaimsUpdates(t *testing.T) { sacc, sakp := createAccount(s) s.setSystemAccount(sacc) - // Let's create a normal account with limits we can update. + // Let's create a normal account with limits we can update. okp, _ := nkeys.FromSeed(oSeed) akp, _ := nkeys.CreateAccount() pub, _ := akp.PublicKey() @@ -1102,6 +1102,62 @@ func TestAccountClaimsUpdates(t *testing.T) { } } +func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) { + s, opts := runTrustedServer(t) + defer s.Shutdown() + + sacc, sakp := createAccount(s) + s.setSystemAccount(sacc) + + okp, _ := nkeys.FromSeed(oSeed) + + // Let's create an account with service export. + akp, _ := nkeys.CreateAccount() + pub, _ := akp.PublicKey() + nac := jwt.NewAccountClaims(pub) + nac.Exports.Add(&jwt.Export{Subject: "req.*", Type: jwt.Service}) + ajwt, _ := nac.Encode(okp) + addAccountToMemResolver(s, pub, ajwt) + s.LookupAccount(pub) + + // Now add an account with multiple service imports. + akp2, _ := nkeys.CreateAccount() + pub2, _ := akp2.PublicKey() + nac2 := jwt.NewAccountClaims(pub2) + nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service}) + ajwt2, _ := nac2.Encode(okp) + + addAccountToMemResolver(s, pub2, ajwt2) + s.LookupAccount(pub2) + + startSubs := s.NumSubscriptions() + + // Simulate a systems publisher so we can do an account claims update. + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + nc, err := nats.Connect(url, createUserCreds(t, s, sakp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Update the account several times + for i := 1; i <= 10; i++ { + nac2 = jwt.NewAccountClaims(pub2) + nac2.Limits.Conn = int64(i) + nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service}) + ajwt2, _ = nac2.Encode(okp) + + // Publish to the system update subject. + claimUpdateSubj := fmt.Sprintf(accUpdateEventSubj, pub2) + nc.Publish(claimUpdateSubj, []byte(ajwt2)) + } + nc.Flush() + + if startSubs != s.NumSubscriptions() { + t.Fatalf("Subscriptions leaked: %d vs %d", startSubs, s.NumSubscriptions()) + } +} + func TestAccountConnsLimitExceededAfterUpdate(t *testing.T) { s, opts := runTrustedServer(t) defer s.Shutdown() diff --git a/server/reload.go b/server/reload.go index 1bbd52d8..aab7ff77 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1148,9 +1148,7 @@ func (s *Server) reloadAuthorization() { if acc == s.gacc { return true } - acc.mu.RLock() - accName := acc.Name - acc.mu.RUnlock() + accName := acc.GetName() // Release server lock for following actions s.mu.Unlock() accClaims, claimJWT, _ := s.fetchAccountClaims(accName)