mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Moved to atomics to detect if we have mapped subjects for an account since check for each inbound message.
If an account has many connections on a server under heavy load this could be contended. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -30,6 +30,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nats-io/jwt/v2"
|
"github.com/nats-io/jwt/v2"
|
||||||
@@ -72,6 +73,7 @@ type Account struct {
|
|||||||
lqws map[string]int32
|
lqws map[string]int32
|
||||||
usersRevoked map[string]int64
|
usersRevoked map[string]int64
|
||||||
mappings []*mapping
|
mappings []*mapping
|
||||||
|
hasMapped atomic.Bool
|
||||||
lmu sync.RWMutex
|
lmu sync.RWMutex
|
||||||
lleafs []*client
|
lleafs []*client
|
||||||
leafClusters map[string]uint64
|
leafClusters map[string]uint64
|
||||||
@@ -291,6 +293,8 @@ func (a *Account) shallowCopy(na *Account) {
|
|||||||
if len(na.mappings) > 0 && na.prand == nil {
|
if len(na.mappings) > 0 && na.prand == nil {
|
||||||
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
|
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
}
|
}
|
||||||
|
na.hasMapped.Store(len(na.mappings) > 0)
|
||||||
|
|
||||||
// JetStream
|
// JetStream
|
||||||
na.jsLimits = a.jsLimits
|
na.jsLimits = a.jsLimits
|
||||||
// Server config account limits.
|
// Server config account limits.
|
||||||
@@ -703,6 +707,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
|
|||||||
}
|
}
|
||||||
// If we did not replace add to the end.
|
// If we did not replace add to the end.
|
||||||
a.mappings = append(a.mappings, m)
|
a.mappings = append(a.mappings, m)
|
||||||
|
a.hasMapped.Store(len(a.mappings) > 0)
|
||||||
|
|
||||||
// If we have connected leafnodes make sure to update.
|
// If we have connected leafnodes make sure to update.
|
||||||
if a.nleafs > 0 {
|
if a.nleafs > 0 {
|
||||||
@@ -729,6 +734,7 @@ func (a *Account) RemoveMapping(src string) bool {
|
|||||||
a.mappings[i] = a.mappings[len(a.mappings)-1]
|
a.mappings[i] = a.mappings[len(a.mappings)-1]
|
||||||
a.mappings[len(a.mappings)-1] = nil // gc
|
a.mappings[len(a.mappings)-1] = nil // gc
|
||||||
a.mappings = a.mappings[:len(a.mappings)-1]
|
a.mappings = a.mappings[:len(a.mappings)-1]
|
||||||
|
a.hasMapped.Store(len(a.mappings) > 0)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -740,28 +746,17 @@ func (a *Account) hasMappings() bool {
|
|||||||
if a == nil {
|
if a == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
a.mu.RLock()
|
return a.hasMapped.Load()
|
||||||
hm := a.hasMappingsLocked()
|
|
||||||
a.mu.RUnlock()
|
|
||||||
return hm
|
|
||||||
}
|
|
||||||
|
|
||||||
// Indicates we have mapping entries.
|
|
||||||
// The account has been verified to be non-nil.
|
|
||||||
// Read or Write lock held on entry.
|
|
||||||
func (a *Account) hasMappingsLocked() bool {
|
|
||||||
return len(a.mappings) > 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This performs the logic to map to a new dest subject based on mappings.
|
// This performs the logic to map to a new dest subject based on mappings.
|
||||||
// Should only be called from processInboundClientMsg or service import processing.
|
// Should only be called from processInboundClientMsg or service import processing.
|
||||||
func (a *Account) selectMappedSubject(dest string) (string, bool) {
|
func (a *Account) selectMappedSubject(dest string) (string, bool) {
|
||||||
a.mu.RLock()
|
if !a.hasMappings() {
|
||||||
if len(a.mappings) == 0 {
|
|
||||||
a.mu.RUnlock()
|
|
||||||
return dest, false
|
return dest, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.mu.RLock()
|
||||||
// In case we have to tokenize for subset matching.
|
// In case we have to tokenize for subset matching.
|
||||||
tsa := [32]string{}
|
tsa := [32]string{}
|
||||||
tts := tsa[:0]
|
tts := tsa[:0]
|
||||||
|
|||||||
@@ -983,7 +983,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
|
|||||||
acc.mu.RLock()
|
acc.mu.RLock()
|
||||||
c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+
|
c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+
|
||||||
"signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q has mappings %t accused %p",
|
"signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q has mappings %t accused %p",
|
||||||
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappingsLocked(), acc)
|
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappings(), acc)
|
||||||
acc.mu.RUnlock()
|
acc.mu.RUnlock()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user