We were not accounting for some newer internal clients (JETSTREAM, ACCOUNT, etc) when reloading authorization, etc.

We were also not copying over local state that has been added over the years to track different types of clients.
We also needed to make sure to reuse the account's internal client and the subscription id (acc.isid).

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-03-30 18:16:17 -07:00
parent 3933c1f3d8
commit 5182154cd2
2 changed files with 52 additions and 39 deletions

View File

@@ -16846,15 +16846,16 @@ func TestJetStreamImportReload(t *testing.T) {
ncB := natsConnect(t, s.ClientURL(), nats.UserInfo("user_b", "pwd"))
defer ncB.Close()
jsB, err := ncB.JetStream()
require_NoError(t, err)
_, err = jsB.AddStream(&nats.StreamConfig{Name: "news", Subjects: []string{"news.>"}})
require_NoError(t, err)
defer jsB.DeleteStream("foo")
require_NoError(t, ncB.Flush())
require_NoError(t, ncA.Publish("news.article", nil))
require_NoError(t, ncA.Flush())
si, err := jsB.StreamInfo("news")
require_NoError(t, err)
require_True(t, si.State.Msgs == 1)
@@ -16875,6 +16876,7 @@ func TestJetStreamImportReload(t *testing.T) {
require_NoError(t, ncA.Publish("news.article", nil))
require_NoError(t, ncA.Flush())
si, err = jsB.StreamInfo("news")
require_NoError(t, err)
require_True(t, si.State.Msgs == 1)

View File

@@ -1455,9 +1455,12 @@ func (s *Server) reloadAuthorization() {
oldAccounts := make(map[string]*Account)
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
acc.mu.Lock()
oldAccounts[acc.Name] = acc
acc.mu.RUnlock()
// Need to clear out eventing timers since they close over this account and not the new one.
clearTimer(&acc.etmr)
clearTimer(&acc.ctmr)
acc.mu.Unlock()
s.accounts.Delete(k)
return true
})
@@ -1482,8 +1485,18 @@ func (s *Server) reloadAuthorization() {
newAcc.sl = acc.sl
newAcc.rm = acc.rm
// Transfer internal client state. The configureAccounts call from above may have set up a new one.
// We need to use the old one, and the isid to not confuse internal subs.
newAcc.ic, newAcc.isid = acc.ic, acc.isid
// Transfer any JetStream state.
newAcc.js = acc.js
// Also transfer any internal accounting on different client types. We copy over all clients
// so need to copy this as well for proper accounting going forward.
newAcc.nrclients = acc.nrclients
newAcc.sysclients = acc.sysclients
newAcc.nleafs = acc.nleafs
newAcc.nrleafs = acc.nrleafs
// Process any reverse map entries.
if len(acc.imports.rrMap) > 0 {
newAcc.imports.rrMap = make(map[string][]*serviceRespEntry)
for k, v := range acc.imports.rrMap {
@@ -1546,33 +1559,51 @@ func (s *Server) reloadAuthorization() {
}
}
// Gather clients that changed accounts. We will close them and they
// will reconnect, doing the right thing.
var (
cclientsa [64]*client
cclients = cclientsa[:0]
clients = map[*client]struct{}{}
iClientsa [64]*client
iClients = iClientsa[:0]
clientsa [64]*client
clients = clientsa[:0]
routesa [64]*client
routes = routesa[:0]
)
// Gather clients that changed accounts. We will close them and they
// will reconnect, doing the right thing.
for _, client := range s.clients {
if s.clientHasMovedToDifferentAccount(client) {
cclients = append(cclients, client)
} else {
clients[client] = struct{}{}
clients = append(clients, client)
}
}
for _, route := range s.routes {
routes = append(routes, route)
}
// Check here for any system/internal clients which will not be in the servers map of normal clients.
if s.sys != nil && s.sys.account != nil && !s.opts.NoSystemAccount {
s.accounts.Store(s.sys.account.Name, s.sys.account)
}
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
// Check for sysclients accounting, ignore the system account.
if acc.sysclients > 0 && (s.sys == nil || s.sys.account != acc) {
for c := range acc.clients {
if c.kind != CLIENT && c.kind != LEAF {
clients = append(clients, c)
}
}
}
acc.mu.RUnlock()
return true
})
var resetCh chan struct{}
var sysAcc *Account
if s.sys != nil {
// can't hold the lock as go routine reading it may be waiting for lock as well
resetCh = s.sys.resetCh
sysAcc = s.sys.account
}
s.mu.Unlock()
@@ -1588,16 +1619,17 @@ func (s *Server) reloadAuthorization() {
client.closeConnection(ClientClosed)
}
for client := range clients {
for _, c := range clients {
// Disconnect any unauthorized clients.
if !s.isClientAuthorized(client) {
client.authViolation()
// Ignore internal clients.
if (c.kind == CLIENT || c.kind == LEAF) && !s.isClientAuthorized(c) {
c.authViolation()
continue
}
// Check to make sure account is correct.
client.swapAccountAfterReload()
c.swapAccountAfterReload()
// Remove any unauthorized subscriptions and check for account imports.
client.processSubsOnConfigReload(awcsti)
c.processSubsOnConfigReload(awcsti)
}
for _, route := range routes {
@@ -1621,27 +1653,6 @@ func (s *Server) reloadAuthorization() {
s.Errorf(err.Error())
}
}
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
for client, _ := range acc.clients {
if acc != sysAcc {
if _, ok := clients[client]; !ok {
iClients = append(iClients, client)
}
}
}
acc.mu.RUnlock()
return true
})
// process internal clients that don't need to authorize or switch accounts
for _, client := range iClients {
// TODO mh we seem to trigger the sublist remove, but unit test still fails
// Remove any unauthorized subscriptions and check for account imports.
client.processSubsOnConfigReload(awcsti)
}
}
// Returns true if given client current account has changed (or user