Fixed cluster permissions configuration reload

This is a rework of incorrect changes made in PR #4001.
This affects only the `dev` branch.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2023-05-18 18:56:39 -06:00
parent 646ae9eddf
commit 607b0ca7f3
4 changed files with 275 additions and 85 deletions

View File

@@ -2029,11 +2029,8 @@ func (s *Server) clientHasMovedToDifferentAccount(c *client) bool {
// import subjects.
func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
s.mu.Lock()
var (
infoJSON []byte
newPerms = s.getOpts().Cluster.Permissions
routes = make(map[uint64]*client, s.numRoutes())
)
newPerms := s.getOpts().Cluster.Permissions
routes := make(map[uint64]*client, s.numRoutes())
// Get all connected routes
s.forEachRoute(func(route *client) {
route.mu.Lock()
@@ -2048,8 +2045,7 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
s.routeInfo.Import = newPerms.Import
s.routeInfo.Export = newPerms.Export
}
// Copy the current route's INFO struct. We will need to modify it per-account
routeInfo := s.routeInfo
infoJSON := generateInfoJSON(&s.routeInfo)
s.mu.Unlock()
// Close connections for routes that don't understand async INFO.
@@ -2075,17 +2071,46 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
newPermsTester := &client{}
newPermsTester.setRoutePermissions(newPerms)
// For a given account and list of remotes, will send an INFO protocol so
// that remote updates its route permissions and will also send the RS+ or
// RS- for subscriptions that become or are no longer permitted.
update := func(accName string, sl *Sublist, remotes []*client) {
var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
subsNeedSUB []*subscription
subsNeedUNSUB []*subscription
deleteRoutedSubs []*subscription
)
var (
_localSubs [4096]*subscription
subsNeedSUB = map[*client][]*subscription{}
subsNeedUNSUB = map[*client][]*subscription{}
deleteRoutedSubs []*subscription
)
getRouteForAccount := func(accName string, poolIdx int) *client {
for _, r := range routes {
r.mu.Lock()
ok := (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) || r.route.noPool
r.mu.Unlock()
if ok {
return r
}
}
return nil
}
// First set the new permissions on all routes.
for _, route := range routes {
route.mu.Lock()
route.setRoutePermissions(newPerms)
route.mu.Unlock()
}
// Then, go over all accounts and gather local subscriptions that need to be
// sent over as SUB or removed as UNSUB, and routed subscriptions that need
// to be dropped due to export permissions.
s.accounts.Range(func(_, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
acc.mu.RUnlock()
// Get the route handling this account. If no route or sublist, bail out.
route := getRouteForAccount(accName, poolIdx)
if route == nil || sl == nil {
return true
}
localSubs := _localSubs[:0]
sl.localSubs(&localSubs, false)
// Go through all local subscriptions
@@ -2097,64 +2122,53 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
if canImportNow {
// If we could not before, then will need to send a SUB protocol.
if !couldImportThen {
subsNeedSUB = append(subsNeedSUB, sub)
list := subsNeedSUB[route]
list = append(list, sub)
subsNeedSUB[route] = list
}
} else if couldImportThen {
// We were previously able to import this sub, but now
// we can't so we need to send an UNSUB protocol
subsNeedUNSUB = append(subsNeedUNSUB, sub)
list := subsNeedUNSUB[route]
list = append(list, sub)
subsNeedUNSUB[route] = list
}
}
for _, route := range remotes {
// We do this manually here, not invoke generateRouteInfoJSON()
routeInfo.RemoteAccount = accName
infoJSON = generateInfoJSON(&routeInfo)
route.mu.Lock()
route.setRoutePermissions(newPerms)
for _, sub := range route.subs {
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
subj := string(sub.subject)
if !route.canExport(subj) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
deleteRoutedSubs = deleteRoutedSubs[:0]
route.mu.Lock()
for key, sub := range route.subs {
if an := strings.Fields(key)[0]; an != accName {
continue
}
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
subj := string(sub.subject)
if !route.canExport(subj) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
route.enqueueProto(infoJSON)
// Now send SUB and UNSUB protocols as needed.
route.sendRouteSubProtos(subsNeedSUB, false, nil)
route.sendRouteUnSubProtos(subsNeedUNSUB, false, nil)
route.mu.Unlock()
}
route.mu.Unlock()
// Remove as a batch all the subs that we have removed from each route.
sl.RemoveBatch(deleteRoutedSubs)
}
// Now go over all accounts and invoke the update function defined above.
s.accounts.Range(func(_, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
acc.mu.RUnlock()
if sl == nil {
return true
}
var remotes []*client
for _, r := range routes {
r.mu.Lock()
if (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) {
remotes = append(remotes, r)
}
r.mu.Unlock()
}
update(accName, sl, remotes)
return true
})
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
for _, route := range routes {
route.mu.Lock()
route.enqueueProto(infoJSON)
// Now send SUB and UNSUB protocols as needed.
if subs, ok := subsNeedSUB[route]; ok && len(subs) > 0 {
route.sendRouteSubProtos(subs, false, nil)
}
if unsubs, ok := subsNeedUNSUB[route]; ok && len(unsubs) > 0 {
route.sendRouteUnSubProtos(unsubs, false, nil)
}
route.mu.Unlock()
}
}
func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options) {

View File

@@ -4525,12 +4525,12 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
poolSize string
accounts string
}{
{"regular", _EMPTY_, _EMPTY_},
{"regular", "pool_size: -1", _EMPTY_},
{"pooling", "pool_size: 5", _EMPTY_},
{"per-account", _EMPTY_, "accounts: [\"A\"]"},
{"pool and per-account", "pool_size: 3", "accounts: [\"A\"]"},
} {
t.Run(test.name, func(t *testing.T) {
t.Run("import "+test.name, func(t *testing.T) {
confATemplate := `
server_name: "A"
port: -1
@@ -4604,6 +4604,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
natsFlush(t, ncA)
checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)
ncB := natsConnect(t, srvb.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncB.Close()
@@ -4613,7 +4614,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
if expected {
natsNexMsg(t, sub, time.Second)
} else {
if msg, err := sub.NextMsg(250 * time.Millisecond); err == nil {
if msg, err := sub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Should not have gotten the message, got %s/%s", msg.Subject, msg.Data)
}
}
@@ -4634,6 +4635,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
checkClusterFormed(t, srva, srvb)
checkSubNoInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubInterest(t, srvb, "A", "bar", 2*time.Second)
// Should not receive on foo
@@ -4653,6 +4655,158 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
checkClusterFormed(t, srva, srvb)
checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)
// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo3"))
check(sub1Foo, true)
check(sub2Foo, true)
// But make sure there are no more than what we expect
check(sub1Foo, false)
check(sub2Foo, false)
// And now "bar" should fail
natsPub(t, ncB, "bar", []byte("bar3"))
check(sub1Bar, false)
check(sub2Bar, false)
})
}
// Check export now
for _, test := range []struct {
name string
poolSize string
accounts string
}{
{"regular", "pool_size: -1", _EMPTY_},
{"pooling", "pool_size: 5", _EMPTY_},
{"per-account", _EMPTY_, "accounts: [\"A\"]"},
{"pool and per-account", "pool_size: 3", "accounts: [\"A\"]"},
} {
t.Run("export "+test.name, func(t *testing.T) {
confATemplate := `
server_name: "A"
port: -1
accounts {
A { users: [{user: "user1", password: "pwd"}] }
B { users: [{user: "user2", password: "pwd"}] }
C { users: [{user: "user3", password: "pwd"}] }
D { users: [{user: "user4", password: "pwd"}] }
}
cluster {
name: "local"
listen: 127.0.0.1:-1
permissions {
import {
allow: ">"
}
export {
allow: %s
}
}
%s
%s
}
`
confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `"foo"`, test.poolSize, test.accounts)))
srva, optsA := RunServerWithConfig(confA)
defer srva.Shutdown()
confBTemplate := `
server_name: "B"
port: -1
accounts {
A { users: [{user: "user1", password: "pwd"}] }
B { users: [{user: "user2", password: "pwd"}] }
C { users: [{user: "user3", password: "pwd"}] }
D { users: [{user: "user4", password: "pwd"}] }
}
cluster {
listen: 127.0.0.1:-1
name: "local"
permissions {
import {
allow: ">"
}
export {
allow: %s
}
}
routes = [
"nats://127.0.0.1:%d"
]
%s
%s
}
`
confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, `"foo"`, optsA.Cluster.Port, test.poolSize, test.accounts)))
srvb, _ := RunServerWithConfig(confB)
defer srvb.Shutdown()
checkClusterFormed(t, srva, srvb)
ncA := natsConnect(t, srva.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncA.Close()
sub1Foo := natsSubSync(t, ncA, "foo")
sub2Foo := natsSubSync(t, ncA, "foo")
sub1Bar := natsSubSync(t, ncA, "bar")
sub2Bar := natsSubSync(t, ncA, "bar")
natsFlush(t, ncA)
checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)
ncB := natsConnect(t, srvb.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncB.Close()
check := func(sub *nats.Subscription, expected bool) {
t.Helper()
if expected {
natsNexMsg(t, sub, time.Second)
} else {
if msg, err := sub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Should not have gotten the message, got %s/%s", msg.Subject, msg.Data)
}
}
}
// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo1"))
check(sub1Foo, true)
check(sub2Foo, true)
// But not on "bar"
natsPub(t, ncB, "bar", []byte("bar1"))
check(sub1Bar, false)
check(sub2Bar, false)
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`, test.poolSize, test.accounts))
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `["foo", "bar"]`, optsA.Cluster.Port, test.poolSize, test.accounts))
checkClusterFormed(t, srva, srvb)
checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubInterest(t, srvb, "A", "bar", 2*time.Second)
// Should receive on foo and bar
natsPub(t, ncB, "foo", []byte("foo2"))
check(sub1Foo, true)
check(sub2Foo, true)
natsPub(t, ncB, "bar", []byte("bar2"))
check(sub1Bar, true)
check(sub2Bar, true)
// Remove "bar"
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"foo"`, test.poolSize, test.accounts))
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `"foo"`, optsA.Cluster.Port, test.poolSize, test.accounts))
checkClusterFormed(t, srva, srvb)
checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)
// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo3"))

View File

@@ -927,31 +927,39 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) {
c.opts.Import = info.Import
c.opts.Export = info.Export
routeAcc, poolIdx, noPool := string(c.route.accName), c.route.poolIdx, c.route.noPool
c.mu.Unlock()
var acc *Account
var err error
if an := info.RouteAccount; an == _EMPTY_ {
acc = s.globalAccount()
} else if acc, err = s.LookupAccount(an); err != nil {
c.Errorf("Unable to lookup account %q to update remote route permissions: %v", an, err)
return
}
acc.mu.RLock()
sl := acc.sl
acc.mu.RUnlock()
if sl == nil {
return
}
var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
_allSubs [4096]*subscription
allSubs = _allSubs[:0]
)
sl.localSubs(&localSubs, false)
s.accounts.Range(func(_, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
accName, sl, accPoolIdx := acc.Name, acc.sl, acc.routePoolIdx
acc.mu.RUnlock()
// Do this only for accounts handled by this route
if (accPoolIdx >= 0 && accPoolIdx == poolIdx) || (routeAcc == accName) || noPool {
localSubs := _localSubs[:0]
sl.localSubs(&localSubs, false)
if len(localSubs) > 0 {
allSubs = append(allSubs, localSubs...)
}
}
return true
})
if len(allSubs) == 0 {
return
}
c.mu.Lock()
c.sendRouteSubProtos(localSubs, false, func(sub *subscription) bool {
c.sendRouteSubProtos(allSubs, false, func(sub *subscription) bool {
subj := string(sub.subject)
// If the remote can now export but could not before, and this server can import this
// subject, then send SUB protocol.

View File

@@ -66,6 +66,20 @@ func checkSubInterest(t *testing.T, s *Server, accName, subject string, timeout
})
}
func checkSubNoInterest(t *testing.T, s *Server, accName, subject string, timeout time.Duration) {
t.Helper()
checkFor(t, timeout, 15*time.Millisecond, func() error {
acc, err := s.LookupAccount(accName)
if err != nil {
return fmt.Errorf("error looking up account %q: %v", accName, err)
}
if acc.SubscriptionInterest(subject) {
return fmt.Errorf("unexpected subscription interest for account %q on %q", accName, subject)
}
return nil
})
}
func TestRouteConfig(t *testing.T) {
opts, err := ProcessConfigFile("./configs/cluster.conf")
if err != nil {