mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge pull request #1055 from nats-io/sublist_cache
Allow sublist cache do be disabled globally for all accounts.
This commit is contained in:
@@ -120,7 +120,6 @@ type exportMap struct {
|
||||
func NewAccount(name string) *Account {
|
||||
a := &Account{
|
||||
Name: name,
|
||||
sl: NewSublist(),
|
||||
limits: limits{-1, -1, -1, -1, 0, 0},
|
||||
}
|
||||
return a
|
||||
|
||||
@@ -579,14 +579,14 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
// Loop over publish permissions
|
||||
if perms.Publish != nil {
|
||||
if len(perms.Publish.Allow) > 0 {
|
||||
c.perms.pub.allow = NewSublist()
|
||||
c.perms.pub.allow = NewSublistWithCache()
|
||||
}
|
||||
for _, pubSubject := range perms.Publish.Allow {
|
||||
sub := &subscription{subject: []byte(pubSubject)}
|
||||
c.perms.pub.allow.Insert(sub)
|
||||
}
|
||||
if len(perms.Publish.Deny) > 0 {
|
||||
c.perms.pub.deny = NewSublist()
|
||||
c.perms.pub.deny = NewSublistWithCache()
|
||||
}
|
||||
for _, pubSubject := range perms.Publish.Deny {
|
||||
sub := &subscription{subject: []byte(pubSubject)}
|
||||
@@ -597,14 +597,14 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
// Loop over subscribe permissions
|
||||
if perms.Subscribe != nil {
|
||||
if len(perms.Subscribe.Allow) > 0 {
|
||||
c.perms.sub.allow = NewSublist()
|
||||
c.perms.sub.allow = NewSublistWithCache()
|
||||
}
|
||||
for _, subSubject := range perms.Subscribe.Allow {
|
||||
sub := &subscription{subject: []byte(subSubject)}
|
||||
c.perms.sub.allow.Insert(sub)
|
||||
}
|
||||
if len(perms.Subscribe.Deny) > 0 {
|
||||
c.perms.sub.deny = NewSublist()
|
||||
c.perms.sub.deny = NewSublistWithCache()
|
||||
// Also hold onto this array for later.
|
||||
c.darray = perms.Subscribe.Deny
|
||||
}
|
||||
@@ -633,7 +633,7 @@ func (c *client) checkExpiration(claims *jwt.ClaimsData) {
|
||||
// messages based on a deny clause for subscriptions.
|
||||
// Lock should be held.
|
||||
func (c *client) loadMsgDenyFilter() {
|
||||
c.mperms = &msgDeny{NewSublist(), make(map[string]bool)}
|
||||
c.mperms = &msgDeny{NewSublistWithCache(), make(map[string]bool)}
|
||||
for _, sub := range c.darray {
|
||||
c.mperms.deny.Insert(&subscription{subject: []byte(sub)})
|
||||
}
|
||||
|
||||
@@ -1695,7 +1695,7 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
return nil
|
||||
} else {
|
||||
// Plain sub, assume optimistic sends, create entry.
|
||||
e = &outsie{ni: make(map[string]struct{}), sl: NewSublist()}
|
||||
e = &outsie{ni: make(map[string]struct{}), sl: NewSublistWithCache()}
|
||||
newe = true
|
||||
}
|
||||
// This is when a sub or queue sub is supposed to be in
|
||||
@@ -1796,7 +1796,7 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
} else if queue == nil {
|
||||
return nil
|
||||
} else {
|
||||
e = &outsie{ni: make(map[string]struct{}), sl: NewSublist()}
|
||||
e = &outsie{ni: make(map[string]struct{}), sl: NewSublistWithCache()}
|
||||
newe = true
|
||||
useSl = true
|
||||
}
|
||||
@@ -2594,7 +2594,7 @@ func (c *client) gatewayAllSubsReceiveStart(info *Info) {
|
||||
e.mode = Transitioning
|
||||
e.Unlock()
|
||||
} else {
|
||||
e := &outsie{sl: NewSublist()}
|
||||
e := &outsie{sl: NewSublistWithCache()}
|
||||
e.mode = Transitioning
|
||||
c.mu.Lock()
|
||||
c.gw.outsim.Store(account, e)
|
||||
|
||||
@@ -147,6 +147,7 @@ type Options struct {
|
||||
Debug bool `json:"-"`
|
||||
NoLog bool `json:"-"`
|
||||
NoSigs bool `json:"-"`
|
||||
NoSublistCache bool `json:"-"`
|
||||
Logtime bool `json:"-"`
|
||||
MaxConn int `json:"max_connections"`
|
||||
MaxSubs int `json:"max_subscriptions,omitempty"`
|
||||
@@ -423,6 +424,8 @@ func (o *Options) ProcessConfigFile(configFile string) error {
|
||||
case "logtime":
|
||||
o.Logtime = v.(bool)
|
||||
trackExplicitVal(o, &o.inConfig, "Logtime", o.Logtime)
|
||||
case "disable_sublist_cache", "no_sublist_cache":
|
||||
o.NoSublistCache = v.(bool)
|
||||
case "accounts":
|
||||
err := parseAccounts(tk, o, &errors, &warnings)
|
||||
if err != nil {
|
||||
|
||||
@@ -1907,3 +1907,54 @@ func TestHandleUnknownTopLevelConfigurationField(t *testing.T) {
|
||||
t.Fatal("Expected error, got none")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSublistNoCacheConfig(t *testing.T) {
|
||||
confFileName := createConfFile(t, []byte(`
|
||||
disable_sublist_cache: true
|
||||
`))
|
||||
defer os.Remove(confFileName)
|
||||
opts, err := ProcessConfigFile(confFileName)
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v", err)
|
||||
}
|
||||
if !opts.NoSublistCache {
|
||||
t.Fatalf("Expected sublist cache to be disabled")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSublistNoCacheConfigOnAccounts(t *testing.T) {
|
||||
confFileName := createConfFile(t, []byte(`
|
||||
listen: "127.0.0.1:-1"
|
||||
disable_sublist_cache: true
|
||||
|
||||
accounts {
|
||||
synadia {
|
||||
users [ {nkey : UBAAQWTW6CG2G6ANGNKB5U2B7HRWHSGMZEZX3AQSAJOQDAUGJD46LD2E} ]
|
||||
}
|
||||
nats.io {
|
||||
users [ {nkey : UC6NLCN7AS34YOJVCYD4PJ3QB7QGLYG5B5IMBT25VW5K4TNUJODM7BOX} ]
|
||||
}
|
||||
}
|
||||
`))
|
||||
defer os.Remove(confFileName)
|
||||
|
||||
s, _ := RunServerWithConfig(confFileName)
|
||||
defer s.Shutdown()
|
||||
|
||||
// Check that all account sublists do not have caching enabled.
|
||||
ta := s.numReservedAccounts() + 2
|
||||
if la := s.numAccounts(); la != ta {
|
||||
t.Fatalf("Expected to have a server with %d active accounts, got %v", ta, la)
|
||||
}
|
||||
|
||||
s.accounts.Range(func(k, v interface{}) bool {
|
||||
acc := v.(*Account)
|
||||
if acc == nil {
|
||||
t.Fatalf("Expected non-nil sublist for account")
|
||||
}
|
||||
if acc.sl.CacheEnabled() {
|
||||
t.Fatalf("Expected the account sublist to not have caching enabled")
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -765,7 +765,12 @@ func (s *Server) shouldTrackSubscriptions() bool {
|
||||
// Lock should be held on entry.
|
||||
func (s *Server) registerAccount(acc *Account) {
|
||||
if acc.sl == nil {
|
||||
acc.sl = NewSublist()
|
||||
opts := s.getOpts()
|
||||
if opts != nil && opts.NoSublistCache {
|
||||
acc.sl = NewSublistNoCache()
|
||||
} else {
|
||||
acc.sl = NewSublistWithCache()
|
||||
}
|
||||
}
|
||||
if acc.maxnae == 0 {
|
||||
acc.maxnae = DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS
|
||||
|
||||
@@ -97,14 +97,27 @@ func newLevel() *level {
|
||||
return &level{nodes: make(map[string]*node)}
|
||||
}
|
||||
|
||||
// NewSublist will create a default sublist
|
||||
func NewSublist() *Sublist {
|
||||
return &Sublist{root: newLevel(), cache: &sync.Map{}}
|
||||
// In general caching is recommended however in some extreme cases where
|
||||
// interest changes are high, suppressing the cache can help.
|
||||
// https://github.com/nats-io/nats-server/issues/941
|
||||
// FIXME(dlc) - should be more dynamic at some point based on cache thrashing.
|
||||
|
||||
// NewSublist will create a default sublist with caching enabled per the flag.
|
||||
func NewSublist(enableCache bool) *Sublist {
|
||||
if enableCache {
|
||||
return &Sublist{root: newLevel(), cache: &sync.Map{}}
|
||||
}
|
||||
return &Sublist{root: newLevel(), cacheNum: slNoCache}
|
||||
}
|
||||
|
||||
// NewSublistNoCache will create a default sublist without caching enabled.
|
||||
// NewSublistWithCache will create a default sublist with caching enabled.
|
||||
func NewSublistWithCache() *Sublist {
|
||||
return NewSublist(true)
|
||||
}
|
||||
|
||||
// NewSublistNoCache will create a default sublist with caching disabled.
|
||||
func NewSublistNoCache() *Sublist {
|
||||
return &Sublist{root: newLevel(), cacheNum: slNoCache}
|
||||
return NewSublist(false)
|
||||
}
|
||||
|
||||
// CacheEnabled returns whether or not caching is enabled for this sublist.
|
||||
|
||||
@@ -115,12 +115,12 @@ func newRemoteQSub(subject, queue string, num int32) *subscription {
|
||||
}
|
||||
|
||||
func TestSublistInit(t *testing.T) {
|
||||
s := NewSublist()
|
||||
s := NewSublistWithCache()
|
||||
verifyCount(s, 0, t)
|
||||
}
|
||||
|
||||
func TestSublistInsertCount(t *testing.T) {
|
||||
testSublistInsertCount(t, NewSublist())
|
||||
testSublistInsertCount(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistInsertCountNoCache(t *testing.T) {
|
||||
@@ -135,7 +135,7 @@ func testSublistInsertCount(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistSimple(t *testing.T) {
|
||||
testSublistSimple(t, NewSublist())
|
||||
testSublistSimple(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistSimpleNoCache(t *testing.T) {
|
||||
@@ -152,7 +152,7 @@ func testSublistSimple(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistSimpleMultiTokens(t *testing.T) {
|
||||
testSublistSimpleMultiTokens(t, NewSublist())
|
||||
testSublistSimpleMultiTokens(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistSimpleMultiTokensNoCache(t *testing.T) {
|
||||
@@ -169,7 +169,7 @@ func testSublistSimpleMultiTokens(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistPartialWildcard(t *testing.T) {
|
||||
testSublistPartialWildcard(t, NewSublist())
|
||||
testSublistPartialWildcard(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistPartialWildcardNoCache(t *testing.T) {
|
||||
@@ -188,7 +188,7 @@ func testSublistPartialWildcard(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistPartialWildcardAtEnd(t *testing.T) {
|
||||
testSublistPartialWildcardAtEnd(t, NewSublist())
|
||||
testSublistPartialWildcardAtEnd(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistPartialWildcardAtEndNoCache(t *testing.T) {
|
||||
@@ -207,7 +207,7 @@ func testSublistPartialWildcardAtEnd(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistFullWildcard(t *testing.T) {
|
||||
testSublistFullWildcard(t, NewSublist())
|
||||
testSublistFullWildcard(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistFullWildcardNoCache(t *testing.T) {
|
||||
@@ -226,7 +226,7 @@ func testSublistFullWildcard(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRemove(t *testing.T) {
|
||||
testSublistRemove(t, NewSublist())
|
||||
testSublistRemove(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoveNoCache(t *testing.T) {
|
||||
@@ -249,7 +249,7 @@ func testSublistRemove(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRemoveWildcard(t *testing.T) {
|
||||
testSublistRemoveWildcard(t, NewSublist())
|
||||
testSublistRemoveWildcard(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoveWildcardNoCache(t *testing.T) {
|
||||
@@ -278,7 +278,7 @@ func testSublistRemoveWildcard(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRemoveCleanup(t *testing.T) {
|
||||
testSublistRemoveCleanup(t, NewSublist())
|
||||
testSublistRemoveCleanup(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoveCleanupNoCache(t *testing.T) {
|
||||
@@ -297,7 +297,7 @@ func testSublistRemoveCleanup(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRemoveCleanupWildcards(t *testing.T) {
|
||||
testSublistRemoveCleanupWildcards(t, NewSublist())
|
||||
testSublistRemoveCleanupWildcards(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoveCleanupWildcardsNoCache(t *testing.T) {
|
||||
@@ -316,7 +316,7 @@ func testSublistRemoveCleanupWildcards(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRemoveWithLargeSubs(t *testing.T) {
|
||||
testSublistRemoveWithLargeSubs(t, NewSublist())
|
||||
testSublistRemoveWithLargeSubs(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoveWithLargeSubsNoCache(t *testing.T) {
|
||||
@@ -343,7 +343,7 @@ func testSublistRemoveWithLargeSubs(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRemoveByClient(t *testing.T) {
|
||||
testSublistRemoveByClient(t, NewSublist())
|
||||
testSublistRemoveByClient(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoveByClientNoCache(t *testing.T) {
|
||||
@@ -380,7 +380,7 @@ func testSublistRemoveByClient(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistInvalidSubjectsInsert(t *testing.T) {
|
||||
testSublistInvalidSubjectsInsert(t, NewSublist())
|
||||
testSublistInvalidSubjectsInsert(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistInvalidSubjectsInsertNoCache(t *testing.T) {
|
||||
@@ -415,7 +415,7 @@ func testSublistInvalidSubjectsInsert(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistCache(t *testing.T) {
|
||||
s := NewSublist()
|
||||
s := NewSublistWithCache()
|
||||
|
||||
// Test add a remove logistics
|
||||
subject := "a.b.c.d"
|
||||
@@ -457,7 +457,7 @@ func TestSublistCache(t *testing.T) {
|
||||
})
|
||||
|
||||
// Test that adding to a wildcard properly adds to the cache.
|
||||
s = NewSublist()
|
||||
s = NewSublistWithCache()
|
||||
s.Insert(newSub("foo.*"))
|
||||
s.Insert(newSub("foo.bar"))
|
||||
r = s.Match("foo.baz")
|
||||
@@ -470,7 +470,7 @@ func TestSublistCache(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSublistBasicQueueResults(t *testing.T) {
|
||||
testSublistBasicQueueResults(t, NewSublist())
|
||||
testSublistBasicQueueResults(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistBasicQueueResultsNoCache(t *testing.T) {
|
||||
@@ -666,7 +666,7 @@ func TestSubjectIsLiteral(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSublistBadSubjectOnRemove(t *testing.T) {
|
||||
testSublistBadSubjectOnRemove(t, NewSublist())
|
||||
testSublistBadSubjectOnRemove(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistBadSubjectOnRemoveNoCache(t *testing.T) {
|
||||
@@ -693,7 +693,7 @@ func testSublistBadSubjectOnRemove(t *testing.T, s *Sublist) {
|
||||
|
||||
// This is from bug report #18
|
||||
func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) {
|
||||
testSublistTwoTokenPubMatchSingleTokenSub(t, NewSublist())
|
||||
testSublistTwoTokenPubMatchSingleTokenSub(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistTwoTokenPubMatchSingleTokenSubNoCache(t *testing.T) {
|
||||
@@ -711,7 +711,7 @@ func testSublistTwoTokenPubMatchSingleTokenSub(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistInsertWithWildcardsAsLiterals(t *testing.T) {
|
||||
testSublistInsertWithWildcardsAsLiterals(t, NewSublist())
|
||||
testSublistInsertWithWildcardsAsLiterals(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistInsertWithWildcardsAsLiteralsNoCache(t *testing.T) {
|
||||
@@ -733,7 +733,7 @@ func testSublistInsertWithWildcardsAsLiterals(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) {
|
||||
testSublistRemoveWithWildcardsAsLiterals(t, NewSublist())
|
||||
testSublistRemoveWithWildcardsAsLiterals(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoveWithWildcardsAsLiteralsNoCache(t *testing.T) {
|
||||
@@ -759,7 +759,7 @@ func testSublistRemoveWithWildcardsAsLiterals(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRaceOnRemove(t *testing.T) {
|
||||
testSublistRaceOnRemove(t, NewSublist())
|
||||
testSublistRaceOnRemove(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRaceOnRemoveNoCache(t *testing.T) {
|
||||
@@ -840,7 +840,7 @@ func testSublistRaceOnRemove(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistRaceOnInsert(t *testing.T) {
|
||||
testSublistRaceOnInsert(t, NewSublist())
|
||||
testSublistRaceOnInsert(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRaceOnInsertNoCache(t *testing.T) {
|
||||
@@ -942,7 +942,7 @@ func TestSublistRaceOnMatch(t *testing.T) {
|
||||
// Remote subscriptions for queue subscribers will be weighted such that a single subscription
|
||||
// is received, but represents all of the queue subscribers on the remote side.
|
||||
func TestSublistRemoteQueueSubscriptions(t *testing.T) {
|
||||
testSublistRemoteQueueSubscriptions(t, NewSublist())
|
||||
testSublistRemoteQueueSubscriptions(t, NewSublistWithCache())
|
||||
}
|
||||
|
||||
func TestSublistRemoteQueueSubscriptionsNoCache(t *testing.T) {
|
||||
@@ -993,7 +993,7 @@ func testSublistRemoteQueueSubscriptions(t *testing.T, s *Sublist) {
|
||||
}
|
||||
|
||||
func TestSublistSharedEmptyResult(t *testing.T) {
|
||||
s := NewSublist()
|
||||
s := NewSublistWithCache()
|
||||
r1 := s.Match("foo")
|
||||
verifyLen(r1.psubs, 0, t)
|
||||
verifyQLen(r1.qsubs, 0, t)
|
||||
@@ -1024,7 +1024,7 @@ func TestSublistNoCacheStats(t *testing.T) {
|
||||
// -- Benchmarks Setup --
|
||||
|
||||
var benchSublistSubs []*subscription
|
||||
var benchSublistSl = NewSublist()
|
||||
var benchSublistSl = NewSublistWithCache()
|
||||
|
||||
func init() {
|
||||
initSublist := false
|
||||
@@ -1069,7 +1069,7 @@ func addWildcards() {
|
||||
// -- Benchmarks Setup End --
|
||||
|
||||
func Benchmark______________________SublistInsert(b *testing.B) {
|
||||
s := NewSublist()
|
||||
s := NewSublistWithCache()
|
||||
for i, l := 0, len(benchSublistSubs); i < b.N; i++ {
|
||||
index := i % l
|
||||
s.Insert(benchSublistSubs[index])
|
||||
@@ -1203,7 +1203,7 @@ func Benchmark_____SublistMatch10kSubsWithNoCache(b *testing.B) {
|
||||
}
|
||||
|
||||
func removeTest(b *testing.B, singleSubject, doBatch bool, qgroup string) {
|
||||
s := NewSublist()
|
||||
s := NewSublistWithCache()
|
||||
subject := "foo"
|
||||
|
||||
subs := make([]*subscription, 0, b.N)
|
||||
@@ -1254,7 +1254,7 @@ func Benchmark_________SublistRemove1TokenQGBatch(b *testing.B) {
|
||||
}
|
||||
|
||||
func removeMultiTest(b *testing.B, singleSubject, doBatch bool) {
|
||||
s := NewSublist()
|
||||
s := NewSublistWithCache()
|
||||
subject := "foo"
|
||||
var swg, fwg sync.WaitGroup
|
||||
swg.Add(b.N)
|
||||
@@ -1329,7 +1329,7 @@ func cacheContentionTest(b *testing.B, numMatchers, numAdders, numRemovers int)
|
||||
quitCh := make(chan struct{})
|
||||
|
||||
// Set up a new sublist. subjects will be foo.bar.baz.N
|
||||
s := NewSublist()
|
||||
s := NewSublistWithCache()
|
||||
mu.Lock()
|
||||
for i := 0; i < 10000; i++ {
|
||||
sub := newSub(fmt.Sprintf("foo.bar.baz.%d", i))
|
||||
|
||||
Reference in New Issue
Block a user