diff --git a/server/client.go b/server/client.go index 151e2894..55384674 100644 --- a/server/client.go +++ b/server/client.go @@ -313,10 +313,13 @@ type perm struct { } type permissions struct { + // Have these 2 first for memory alignment due to the use of atomic. + pcsz int32 + prun int32 sub perm pub perm resp *ResponsePermission - pcache map[string]bool + pcache sync.Map } // This is used to dynamically track responses and reply subjects @@ -838,7 +841,6 @@ func (c *client) setPermissions(perms *Permissions) { return } c.perms = &permissions{} - c.perms.pcache = make(map[string]bool) // Loop over publish permissions if perms.Publish != nil { @@ -914,7 +916,6 @@ func (c *client) mergePubDenyPermissions(denyPubs []string) { } if c.perms == nil { c.perms = &permissions{} - c.perms.pcache = make(map[string]bool) } if c.perms.pub.deny == nil { c.perms.pub.deny = NewSublistWithCache() @@ -2981,7 +2982,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g // Check if we are a leafnode and have perms to check. if client.kind == LEAF && client.perms != nil { - if !client.pubAllowed(string(subject)) { + if !client.pubAllowedFullCheck(string(subject), true, true) { client.mu.Unlock() client.Debugf("Not permitted to publish to %q", subject) return false @@ -3269,32 +3270,38 @@ func (c *client) pruneDenyCache() { // prunePubPermsCache will prune the cache via randomly // deleting items. Doing so pruneSize items at a time. func (c *client) prunePubPermsCache() { + const maxPruneAtOnce = 1000 r := 0 - for subject := range c.perms.pcache { - delete(c.perms.pcache, subject) - if r++; r > pruneSize { - break + c.perms.pcache.Range(func(k, _ interface{}) bool { + c.perms.pcache.Delete(k) + if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) || + (r > maxPruneAtOnce) { + return false } - } + return true + }) + atomic.AddInt32(&c.perms.pcsz, -int32(r)) + atomic.CompareAndSwapInt32(&c.perms.prun, 1, 0) } // pubAllowed checks on publish permissioning. // Lock should not be held. func (c *client) pubAllowed(subject string) bool { - return c.pubAllowedFullCheck(subject, true) + return c.pubAllowedFullCheck(subject, true, false) } // pubAllowedFullCheck checks on all publish permissioning depending // on the flag for dynamic reply permissions. -func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool { +func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bool { if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) { return true } // Check if published subject is allowed if we have permissions in place. - allowed, ok := c.perms.pcache[subject] + v, ok := c.perms.pcache.Load(subject) if ok { - return allowed + return v.(bool) } + var allowed bool // Cache miss, check allow then deny as needed. if c.perms.pub.allow != nil { r := c.perms.pub.allow.Match(subject) @@ -3313,7 +3320,9 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool { // dynamically, check to see if we are allowed here but avoid pcache. // We need to acquire the lock though. if !allowed && fullCheck && c.perms.resp != nil { - c.mu.Lock() + if !hasLock { + c.mu.Lock() + } if resp := c.replies[subject]; resp != nil { resp.n++ // Check if we have sent too many responses. @@ -3325,12 +3334,17 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool { allowed = true } } - c.mu.Unlock() + if !hasLock { + c.mu.Unlock() + } } else { // Update our cache here. - c.perms.pcache[string(subject)] = allowed - // Prune if needed. - if len(c.perms.pcache) > maxPermCacheSize { + c.perms.pcache.Store(string(subject), allowed) + // There is a case where we can invoke this from multiple go routines, + // (in deliverMsg() if sub.client is a LEAF), so we make sure to prune + // from only one go routine at a time. + if n := atomic.AddInt32(&c.perms.pcsz, 1); n > maxPermCacheSize && + atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) { c.prunePubPermsCache() } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 77dbbbd5..e4a32e6d 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1293,6 +1293,93 @@ func TestLeafNodePermissions(t *testing.T) { } } +func TestLeafNodePermissionsConcurrentAccess(t *testing.T) { + lo1 := DefaultOptions() + lo1.LeafNode.Host = "127.0.0.1" + lo1.LeafNode.Port = -1 + ln1 := RunServer(lo1) + defer ln1.Shutdown() + + nc1 := natsConnect(t, ln1.ClientURL()) + defer nc1.Close() + + natsSub(t, nc1, "_INBOX.>", func(_ *nats.Msg) {}) + natsFlush(t, nc1) + + ch := make(chan struct{}, 1) + wg := sync.WaitGroup{} + wg.Add(2) + + publish := func(nc *nats.Conn) { + defer wg.Done() + + for { + select { + case <-ch: + return + default: + nc.Publish(nats.NewInbox(), []byte("hello")) + } + } + } + + go publish(nc1) + + u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port)) + lo2 := DefaultOptions() + lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond + lo2.LeafNode.connDelay = 500 * time.Millisecond + lo2.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{u}, + DenyExports: []string{"foo"}, + DenyImports: []string{"bar"}, + }, + } + ln2 := RunServer(lo2) + defer ln2.Shutdown() + + nc2 := natsConnect(t, ln2.ClientURL()) + defer nc2.Close() + + natsSub(t, nc2, "_INBOX.>", func(_ *nats.Msg) {}) + natsFlush(t, nc2) + + go publish(nc2) + + checkLeafNodeConnected(t, ln1) + checkLeafNodeConnected(t, ln2) + + time.Sleep(50 * time.Millisecond) + close(ch) + wg.Wait() +} + +func TestLeafNodePubAllowedPruning(t *testing.T) { + c := &client{} + c.setPermissions(&Permissions{Publish: &SubjectPermission{Allow: []string{"foo"}}}) + + gr := 100 + wg := sync.WaitGroup{} + wg.Add(gr) + for i := 0; i < gr; i++ { + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + c.pubAllowed(nats.NewInbox()) + } + }() + } + + wg.Wait() + if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize { + t.Fatalf("Expected size to be less than %v, got %v", maxPermCacheSize, n) + } + if n := atomic.LoadInt32(&c.perms.prun); n != 0 { + t.Fatalf("c.perms.prun should be 0, was %v", n) + } +} + func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) { lo1 := DefaultOptions() lo1.Accounts = []*Account{NewAccount("SYS")} diff --git a/server/route.go b/server/route.go index edeb4b00..d9e4614b 100644 --- a/server/route.go +++ b/server/route.go @@ -832,7 +832,7 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { func (c *client) canImport(subject string) bool { // Use pubAllowed() since this checks Publish permissions which // is what Import maps to. - return c.pubAllowedFullCheck(subject, false) + return c.pubAllowedFullCheck(subject, false, true) } // canExport is whether or not we will accept a SUB from the remote for a given subject.