Do not fetch accounts on system events.

Noticed we would lookup accounts, but would also fetch them when tracking remote connections, etc.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-11-16 18:05:42 -08:00
parent d5a8f0ef24
commit 093b57ed40
3 changed files with 104 additions and 10 deletions

View File

@@ -678,10 +678,16 @@ func (s *Server) connsRequest(sub *subscription, _ *client, subject, reply strin
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
return
}
acc, _ := s.lookupAccount(m.Account)
// Here we really only want to lookup the account if its local. We do not want to fetch this
// account if we have no interest in it.
var acc *Account
if v, ok := s.accounts.Load(m.Account); ok {
acc = v.(*Account)
}
if acc == nil {
return
}
// We know this is a local connection.
if nlc := acc.NumLocalConnections(); nlc > 0 {
s.mu.Lock()
s.sendAccConnsUpdate(acc, reply)
@@ -733,7 +739,15 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply
}
// See if we have the account registered, if not drop it.
acc, _ := s.lookupAccount(m.Account)
// Make sure this does not force us to load this account here.
var acc *Account
if v, ok := s.accounts.Load(m.Account); ok {
acc = v.(*Account)
}
// Silently ignore these if we do not have local interest in the account.
if acc == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
@@ -742,23 +756,17 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply
if !s.running || !s.eventsEnabled() {
return
}
// Double check that this is not us, should never happen, so error if it does.
if m.Server.ID == s.info.ID {
s.sys.client.Errorf("Processing our own account connection event message: ignored")
return
}
if acc == nil {
s.sys.client.Debugf("Received account connection event for unknown account: %s", m.Account)
return
}
// If we are here we have interest in tracking this account. Update our accounting.
acc.updateRemoteServer(&m)
s.updateRemoteServer(&m.Server)
}
// Setup tracking for this account. This allows us to track globally
// account activity.
// Setup tracking for this account. This allows us to track global account activity.
// Lock should be held on entry.
func (s *Server) enableAccountTracking(a *Account) {
if a == nil || !s.eventsEnabled() {

View File

@@ -699,6 +699,11 @@ func (s *Server) numAccounts() int {
return count
}
// NumLoadedAccounts returns the number of loaded accounts.
func (s *Server) NumLoadedAccounts() int {
return s.numAccounts()
}
// LookupOrRegisterAccount will return the given account if known or create a new entry.
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
if v, ok := s.accounts.Load(name); ok {

View File

@@ -427,7 +427,7 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) {
checkForMsg()
}
func TestReloadDoesUpdatesAccountsWithMemoryResolver(t *testing.T) {
func TestReloadDoesUpdateAccountsWithMemoryResolver(t *testing.T) {
// We will run an operator mode server with a memory resolver.
// Reloading should behave similar to configured accounts.
@@ -578,3 +578,84 @@ func TestReloadFailsWithBadAccountsWithMemoryResolver(t *testing.T) {
t.Fatalf("Got unexpected error on reload: %v", err)
}
}
func TestConnsRequestDoesNotLoadAccountCheckingConnLimits(t *testing.T) {
// Create two accounts, system and normal account.
sysJWT, sysKP := createAccountForConfig(t)
sysPub, _ := sysKP.PublicKey()
// Do this account by nad to add in connection limits
okp, _ := nkeys.FromSeed(oSeed)
accKP, _ := nkeys.CreateAccount()
accPub, _ := accKP.PublicKey()
nac := jwt.NewAccountClaims(accPub)
nac.Limits.Conn = 10
accJWT, _ := nac.Encode(okp)
cf := `
listen: 127.0.0.1:-1
cluster {
listen: 127.0.0.1:-1
authorization {
timeout: 2.2
} %s
}
operator = "./configs/nkeys/op.jwt"
system_account = "%s"
resolver = MEMORY
resolver_preload = {
%s : "%s"
%s : "%s"
}
`
contents := strings.Replace(fmt.Sprintf(cf, "", sysPub, sysPub, sysJWT, accPub, accJWT), "\n\t", "\n", -1)
conf := createConfFile(t, []byte(contents))
defer os.Remove(conf)
s, opts := RunServerWithConfig(conf)
defer s.Shutdown()
// Create a new server and route to main one.
routeStr := fmt.Sprintf("\n\t\troutes = [nats-route://%s:%d]", opts.Cluster.Host, opts.Cluster.Port)
contents2 := strings.Replace(fmt.Sprintf(cf, routeStr, sysPub, sysPub, sysJWT, accPub, accJWT), "\n\t", "\n", -1)
conf2 := createConfFile(t, []byte(contents2))
defer os.Remove(conf2)
s2, _ := RunServerWithConfig(conf2)
defer s2.Shutdown()
checkClusterFormed(t, s, s2)
// Make sure that we do not have the account loaded.
// Just SYS and $G
if nla := s.NumLoadedAccounts(); nla != 2 {
t.Fatalf("Expected only 2 loaded accounts, got %d", nla)
}
if nla := s2.NumLoadedAccounts(); nla != 2 {
t.Fatalf("Expected only 2 loaded accounts, got %d", nla)
}
// Now connect to first server on accPub.
nc, err := nats.Connect(s.ClientURL(), createUserCreds(t, s, accKP))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Just wait for the request for connections to move to S2 and cause a fetch.
// This is what we want to fix.
time.Sleep(100 * time.Millisecond)
// We should have 3 here for sure.
if nla := s.NumLoadedAccounts(); nla != 3 {
t.Fatalf("Expected 3 loaded accounts, got %d", nla)
}
// Now make sure that we still only have 2 loaded accounts on server 2.
if nla := s2.NumLoadedAccounts(); nla != 2 {
t.Fatalf("Expected only 2 loaded accounts, got %d", nla)
}
}