diff --git a/server/events.go b/server/events.go index 8306fe4f..b4818de3 100644 --- a/server/events.go +++ b/server/events.go @@ -678,10 +678,16 @@ func (s *Server) connsRequest(sub *subscription, _ *client, subject, reply strin s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) return } - acc, _ := s.lookupAccount(m.Account) + // Here we really only want to lookup the account if its local. We do not want to fetch this + // account if we have no interest in it. + var acc *Account + if v, ok := s.accounts.Load(m.Account); ok { + acc = v.(*Account) + } if acc == nil { return } + // We know this is a local connection. if nlc := acc.NumLocalConnections(); nlc > 0 { s.mu.Lock() s.sendAccConnsUpdate(acc, reply) @@ -733,7 +739,15 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply } // See if we have the account registered, if not drop it. - acc, _ := s.lookupAccount(m.Account) + // Make sure this does not force us to load this account here. + var acc *Account + if v, ok := s.accounts.Load(m.Account); ok { + acc = v.(*Account) + } + // Silently ignore these if we do not have local interest in the account. + if acc == nil { + return + } s.mu.Lock() defer s.mu.Unlock() @@ -742,23 +756,17 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply if !s.running || !s.eventsEnabled() { return } - // Double check that this is not us, should never happen, so error if it does. if m.Server.ID == s.info.ID { s.sys.client.Errorf("Processing our own account connection event message: ignored") return } - if acc == nil { - s.sys.client.Debugf("Received account connection event for unknown account: %s", m.Account) - return - } // If we are here we have interest in tracking this account. Update our accounting. acc.updateRemoteServer(&m) s.updateRemoteServer(&m.Server) } -// Setup tracking for this account. This allows us to track globally -// account activity. +// Setup tracking for this account. This allows us to track global account activity. // Lock should be held on entry. func (s *Server) enableAccountTracking(a *Account) { if a == nil || !s.eventsEnabled() { diff --git a/server/server.go b/server/server.go index ca7353ae..c164df8e 100644 --- a/server/server.go +++ b/server/server.go @@ -699,6 +699,11 @@ func (s *Server) numAccounts() int { return count } +// NumLoadedAccounts returns the number of loaded accounts. +func (s *Server) NumLoadedAccounts() int { + return s.numAccounts() +} + // LookupOrRegisterAccount will return the given account if known or create a new entry. func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) { if v, ok := s.accounts.Load(name); ok { diff --git a/test/operator_test.go b/test/operator_test.go index 197958e4..5f46cb32 100644 --- a/test/operator_test.go +++ b/test/operator_test.go @@ -427,7 +427,7 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) { checkForMsg() } -func TestReloadDoesUpdatesAccountsWithMemoryResolver(t *testing.T) { +func TestReloadDoesUpdateAccountsWithMemoryResolver(t *testing.T) { // We will run an operator mode server with a memory resolver. // Reloading should behave similar to configured accounts. @@ -578,3 +578,84 @@ func TestReloadFailsWithBadAccountsWithMemoryResolver(t *testing.T) { t.Fatalf("Got unexpected error on reload: %v", err) } } + +func TestConnsRequestDoesNotLoadAccountCheckingConnLimits(t *testing.T) { + // Create two accounts, system and normal account. + sysJWT, sysKP := createAccountForConfig(t) + sysPub, _ := sysKP.PublicKey() + + // Do this account by nad to add in connection limits + okp, _ := nkeys.FromSeed(oSeed) + accKP, _ := nkeys.CreateAccount() + accPub, _ := accKP.PublicKey() + nac := jwt.NewAccountClaims(accPub) + nac.Limits.Conn = 10 + accJWT, _ := nac.Encode(okp) + + cf := ` + listen: 127.0.0.1:-1 + cluster { + listen: 127.0.0.1:-1 + authorization { + timeout: 2.2 + } %s + } + + operator = "./configs/nkeys/op.jwt" + system_account = "%s" + + resolver = MEMORY + resolver_preload = { + %s : "%s" + %s : "%s" + } + ` + contents := strings.Replace(fmt.Sprintf(cf, "", sysPub, sysPub, sysJWT, accPub, accJWT), "\n\t", "\n", -1) + conf := createConfFile(t, []byte(contents)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + // Create a new server and route to main one. + routeStr := fmt.Sprintf("\n\t\troutes = [nats-route://%s:%d]", opts.Cluster.Host, opts.Cluster.Port) + contents2 := strings.Replace(fmt.Sprintf(cf, routeStr, sysPub, sysPub, sysJWT, accPub, accJWT), "\n\t", "\n", -1) + + conf2 := createConfFile(t, []byte(contents2)) + defer os.Remove(conf2) + + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s, s2) + + // Make sure that we do not have the account loaded. + // Just SYS and $G + if nla := s.NumLoadedAccounts(); nla != 2 { + t.Fatalf("Expected only 2 loaded accounts, got %d", nla) + } + if nla := s2.NumLoadedAccounts(); nla != 2 { + t.Fatalf("Expected only 2 loaded accounts, got %d", nla) + } + + // Now connect to first server on accPub. + nc, err := nats.Connect(s.ClientURL(), createUserCreds(t, s, accKP)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Just wait for the request for connections to move to S2 and cause a fetch. + // This is what we want to fix. + time.Sleep(100 * time.Millisecond) + + // We should have 3 here for sure. + if nla := s.NumLoadedAccounts(); nla != 3 { + t.Fatalf("Expected 3 loaded accounts, got %d", nla) + } + + // Now make sure that we still only have 2 loaded accounts on server 2. + if nla := s2.NumLoadedAccounts(); nla != 2 { + t.Fatalf("Expected only 2 loaded accounts, got %d", nla) + } +}