mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[FIXED] Possible panic due to concurrent access to unlocked map
This could happen when a leafnode has permissions set and another connection (client, etc..) is about to assign a message to the leafnode while the leafnode itself is receiving messages and they both check permissions at the same time. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -313,10 +313,13 @@ type perm struct {
|
||||
}
|
||||
|
||||
type permissions struct {
|
||||
// Have these 2 first for memory alignment due to the use of atomic.
|
||||
pcsz int32
|
||||
prun int32
|
||||
sub perm
|
||||
pub perm
|
||||
resp *ResponsePermission
|
||||
pcache map[string]bool
|
||||
pcache sync.Map
|
||||
}
|
||||
|
||||
// This is used to dynamically track responses and reply subjects
|
||||
@@ -838,7 +841,6 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
return
|
||||
}
|
||||
c.perms = &permissions{}
|
||||
c.perms.pcache = make(map[string]bool)
|
||||
|
||||
// Loop over publish permissions
|
||||
if perms.Publish != nil {
|
||||
@@ -914,7 +916,6 @@ func (c *client) mergePubDenyPermissions(denyPubs []string) {
|
||||
}
|
||||
if c.perms == nil {
|
||||
c.perms = &permissions{}
|
||||
c.perms.pcache = make(map[string]bool)
|
||||
}
|
||||
if c.perms.pub.deny == nil {
|
||||
c.perms.pub.deny = NewSublistWithCache()
|
||||
@@ -2981,7 +2982,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
|
||||
|
||||
// Check if we are a leafnode and have perms to check.
|
||||
if client.kind == LEAF && client.perms != nil {
|
||||
if !client.pubAllowed(string(subject)) {
|
||||
if !client.pubAllowedFullCheck(string(subject), true, true) {
|
||||
client.mu.Unlock()
|
||||
client.Debugf("Not permitted to publish to %q", subject)
|
||||
return false
|
||||
@@ -3269,32 +3270,38 @@ func (c *client) pruneDenyCache() {
|
||||
// prunePubPermsCache will prune the cache via randomly
|
||||
// deleting items. Doing so pruneSize items at a time.
|
||||
func (c *client) prunePubPermsCache() {
|
||||
const maxPruneAtOnce = 1000
|
||||
r := 0
|
||||
for subject := range c.perms.pcache {
|
||||
delete(c.perms.pcache, subject)
|
||||
if r++; r > pruneSize {
|
||||
break
|
||||
c.perms.pcache.Range(func(k, _ interface{}) bool {
|
||||
c.perms.pcache.Delete(k)
|
||||
if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) ||
|
||||
(r > maxPruneAtOnce) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
atomic.AddInt32(&c.perms.pcsz, -int32(r))
|
||||
atomic.CompareAndSwapInt32(&c.perms.prun, 1, 0)
|
||||
}
|
||||
|
||||
// pubAllowed checks on publish permissioning.
|
||||
// Lock should not be held.
|
||||
func (c *client) pubAllowed(subject string) bool {
|
||||
return c.pubAllowedFullCheck(subject, true)
|
||||
return c.pubAllowedFullCheck(subject, true, false)
|
||||
}
|
||||
|
||||
// pubAllowedFullCheck checks on all publish permissioning depending
|
||||
// on the flag for dynamic reply permissions.
|
||||
func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
|
||||
func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bool {
|
||||
if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) {
|
||||
return true
|
||||
}
|
||||
// Check if published subject is allowed if we have permissions in place.
|
||||
allowed, ok := c.perms.pcache[subject]
|
||||
v, ok := c.perms.pcache.Load(subject)
|
||||
if ok {
|
||||
return allowed
|
||||
return v.(bool)
|
||||
}
|
||||
var allowed bool
|
||||
// Cache miss, check allow then deny as needed.
|
||||
if c.perms.pub.allow != nil {
|
||||
r := c.perms.pub.allow.Match(subject)
|
||||
@@ -3313,7 +3320,9 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
|
||||
// dynamically, check to see if we are allowed here but avoid pcache.
|
||||
// We need to acquire the lock though.
|
||||
if !allowed && fullCheck && c.perms.resp != nil {
|
||||
c.mu.Lock()
|
||||
if !hasLock {
|
||||
c.mu.Lock()
|
||||
}
|
||||
if resp := c.replies[subject]; resp != nil {
|
||||
resp.n++
|
||||
// Check if we have sent too many responses.
|
||||
@@ -3325,12 +3334,17 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
|
||||
allowed = true
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
if !hasLock {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
} else {
|
||||
// Update our cache here.
|
||||
c.perms.pcache[string(subject)] = allowed
|
||||
// Prune if needed.
|
||||
if len(c.perms.pcache) > maxPermCacheSize {
|
||||
c.perms.pcache.Store(string(subject), allowed)
|
||||
// There is a case where we can invoke this from multiple go routines,
|
||||
// (in deliverMsg() if sub.client is a LEAF), so we make sure to prune
|
||||
// from only one go routine at a time.
|
||||
if n := atomic.AddInt32(&c.perms.pcsz, 1); n > maxPermCacheSize &&
|
||||
atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) {
|
||||
c.prunePubPermsCache()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1293,6 +1293,93 @@ func TestLeafNodePermissions(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodePermissionsConcurrentAccess(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.LeafNode.Host = "127.0.0.1"
|
||||
lo1.LeafNode.Port = -1
|
||||
ln1 := RunServer(lo1)
|
||||
defer ln1.Shutdown()
|
||||
|
||||
nc1 := natsConnect(t, ln1.ClientURL())
|
||||
defer nc1.Close()
|
||||
|
||||
natsSub(t, nc1, "_INBOX.>", func(_ *nats.Msg) {})
|
||||
natsFlush(t, nc1)
|
||||
|
||||
ch := make(chan struct{}, 1)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
publish := func(nc *nats.Conn) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
return
|
||||
default:
|
||||
nc.Publish(nats.NewInbox(), []byte("hello"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go publish(nc1)
|
||||
|
||||
u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port))
|
||||
lo2 := DefaultOptions()
|
||||
lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond
|
||||
lo2.LeafNode.connDelay = 500 * time.Millisecond
|
||||
lo2.LeafNode.Remotes = []*RemoteLeafOpts{
|
||||
{
|
||||
URLs: []*url.URL{u},
|
||||
DenyExports: []string{"foo"},
|
||||
DenyImports: []string{"bar"},
|
||||
},
|
||||
}
|
||||
ln2 := RunServer(lo2)
|
||||
defer ln2.Shutdown()
|
||||
|
||||
nc2 := natsConnect(t, ln2.ClientURL())
|
||||
defer nc2.Close()
|
||||
|
||||
natsSub(t, nc2, "_INBOX.>", func(_ *nats.Msg) {})
|
||||
natsFlush(t, nc2)
|
||||
|
||||
go publish(nc2)
|
||||
|
||||
checkLeafNodeConnected(t, ln1)
|
||||
checkLeafNodeConnected(t, ln2)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
close(ch)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestLeafNodePubAllowedPruning(t *testing.T) {
|
||||
c := &client{}
|
||||
c.setPermissions(&Permissions{Publish: &SubjectPermission{Allow: []string{"foo"}}})
|
||||
|
||||
gr := 100
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(gr)
|
||||
for i := 0; i < gr; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 100; i++ {
|
||||
c.pubAllowed(nats.NewInbox())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize {
|
||||
t.Fatalf("Expected size to be less than %v, got %v", maxPermCacheSize, n)
|
||||
}
|
||||
if n := atomic.LoadInt32(&c.perms.prun); n != 0 {
|
||||
t.Fatalf("c.perms.prun should be 0, was %v", n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.Accounts = []*Account{NewAccount("SYS")}
|
||||
|
||||
@@ -832,7 +832,7 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
|
||||
func (c *client) canImport(subject string) bool {
|
||||
// Use pubAllowed() since this checks Publish permissions which
|
||||
// is what Import maps to.
|
||||
return c.pubAllowedFullCheck(subject, false)
|
||||
return c.pubAllowedFullCheck(subject, false, true)
|
||||
}
|
||||
|
||||
// canExport is whether or not we will accept a SUB from the remote for a given subject.
|
||||
|
||||
Reference in New Issue
Block a user