Enforce account limits on system account too

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-12-06 08:37:22 -08:00
parent 0bb8562930
commit b9aa2a3da4
4 changed files with 54 additions and 5 deletions

View File

@@ -400,7 +400,7 @@ func (c *client) reportErrRegisterAccount(acc *Account, err error) {
c.sendErr("Failed Account Registration")
}
// RegisterWithAccount will register the given user with a specific
// registerWithAccount will register the given user with a specific
// account. This will change the subject namespace.
func (c *client) registerWithAccount(acc *Account) error {
if acc == nil || acc.sl == nil {

View File

@@ -599,7 +599,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg
// account activity.
// Lock should be held on entry.
func (s *Server) enableAccountTracking(a *Account) {
if a == nil || !s.eventsEnabled() || a == s.sys.account {
if a == nil || !s.eventsEnabled() {
return
}
@@ -620,7 +620,7 @@ const eventsHBInterval = 30 * time.Second
// account's local connections.
// Lock should be held on entry.
func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
if !s.eventsEnabled() || a == nil || a == s.sys.account || a == s.gacc {
if !s.eventsEnabled() || a == nil || a == s.gacc {
return
}
// Update timer first
@@ -652,7 +652,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
func (s *Server) accConnsUpdate(a *Account) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() || a == nil || a == s.sys.account {
if !s.eventsEnabled() || a == nil {
return
}
subj := fmt.Sprintf(accConnsEventSubj, a.Name)

View File

@@ -429,6 +429,56 @@ func TestSystemAccountConnectionLimits(t *testing.T) {
})
}
// Make sure connection limits apply to the system account itself.
func TestSystemAccountSystemConnectionLimitsHonored(t *testing.T) {
sa, optsA, sb, optsB, sakp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
okp, _ := nkeys.FromSeed(oSeed)
// Update system account to have 10 connections
pub, _ := sakp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, ajwt)
addAccountToMemResolver(sb, pub, ajwt)
// Update the accounts on each server with new claims to force update.
sysAccA := sa.SystemAccount()
sa.updateAccountWithClaimJWT(sysAccA, ajwt)
sysAccB := sb.SystemAccount()
sb.updateAccountWithClaimJWT(sysAccB, ajwt)
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
// Create a user on each server. Break on first failure.
for {
nca1, err := nats.Connect(urlA, createUserCreds(t, sa, sakp))
if err != nil {
break
}
defer nca1.Close()
ncb1, err := nats.Connect(urlB, createUserCreds(t, sb, sakp))
if err != nil {
break
}
defer ncb1.Close()
}
checkFor(t, 1*time.Second, 50*time.Millisecond, func() error {
total := sa.NumClients() + sb.NumClients()
if total > int(nac.Limits.Conn) {
return fmt.Errorf("Expected only %d connections, was allowed to connect %d", nac.Limits.Conn, total)
}
return nil
})
}
// Test that the remote accounting works when a server is started some time later.
func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
sa, optsA, sb, optsB, _ := runTrustedCluster(t)

View File

@@ -382,7 +382,6 @@ func (s *Server) configureAccounts() error {
}
}
}
// Set the system account if it was configured.
if opts.SystemAccount != _EMPTY_ {
if _, err := s.lookupAccount(opts.SystemAccount); err != nil {