diff --git a/server/sublist.go b/server/sublist.go index 3be44d16..c93bde27 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -36,11 +36,13 @@ const ( // Sublist related errors var ( - ErrInvalidSubject = errors.New("sublist: Invalid Subject") - ErrNotFound = errors.New("sublist: No Matches Found") + ErrInvalidSubject = errors.New("sublist: invalid subject") + ErrNotFound = errors.New("sublist: no matches found") ) const ( + // slNoCache for cacheNum means cache is disabled. + slNoCache = -22 // cacheMax is used to bound limit the frontend cache slCacheMax = 1024 // If we run a sweeper we will drain to this count. @@ -64,7 +66,7 @@ type Sublist struct { inserts uint64 removes uint64 root *level - cache sync.Map + cache *sync.Map cacheNum int32 ccSweep int32 count uint32 @@ -97,7 +99,17 @@ func newLevel() *level { // NewSublist will create a default sublist func NewSublist() *Sublist { - return &Sublist{root: newLevel()} + return &Sublist{root: newLevel(), cache: &sync.Map{}} +} + +// NewSublistNoCache will create a default sublist without caching enabled. +func NewSublistNoCache() *Sublist { + return &Sublist{root: newLevel(), cacheNum: slNoCache} +} + +// CacheEnabled returns whether or not caching is enabled for this sublist. +func (s *Sublist) CacheEnabled() bool { + return atomic.LoadInt32(&s.cacheNum) != slNoCache } // Insert adds a subscription into the sublist @@ -226,6 +238,9 @@ func (r *SublistResult) addSubToResult(sub *subscription) *SublistResult { // addToCache will add the new entry to the existing cache // entries if needed. Assumes write lock is held. func (s *Sublist) addToCache(subject string, sub *subscription) { + if s.cache == nil { + return + } // If literal we can direct match. if subjectIsLiteral(subject) { if v, ok := s.cache.Load(subject); ok { @@ -247,6 +262,9 @@ func (s *Sublist) addToCache(subject string, sub *subscription) { // removeFromCache will remove the sub from any active cache entries. // Assumes write lock is held. func (s *Sublist) removeFromCache(subject string, sub *subscription) { + if s.cache == nil { + return + } // If literal we can direct match. if subjectIsLiteral(subject) { // Load for accounting @@ -268,15 +286,21 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) { }) } +// a place holder for an empty result. +var emptyResult = &SublistResult{} + // Match will match all entries to the literal subject. // It will return a set of results for both normal and queue subscribers. func (s *Sublist) Match(subject string) *SublistResult { atomic.AddUint64(&s.matches, 1) // Check cache first. - if r, ok := s.cache.Load(subject); ok { - atomic.AddUint64(&s.cacheHits, 1) - return r.(*SublistResult) + ce := atomic.LoadInt32(&s.cacheNum) + if ce > 0 { + if r, ok := s.cache.Load(subject); ok { + atomic.AddUint64(&s.cacheHits, 1) + return r.(*SublistResult) + } } tsa := [32]string{} @@ -295,10 +319,18 @@ func (s *Sublist) Match(subject string) *SublistResult { // Get result from the main structure and place into the shared cache. // Hold the read lock to avoid race between match and store. + var n int32 + s.RLock() matchLevel(s.root, tokens, result) - s.cache.Store(subject, result) - n := atomic.AddInt32(&s.cacheNum, 1) + // Check for empty result. + if len(result.psubs) == 0 && len(result.qsubs) == 0 { + result = emptyResult + } + if ce != slNoCache { + s.cache.Store(subject, result) + n = atomic.AddInt32(&s.cacheNum, 1) + } s.RUnlock() // Reduce the cache count if we have exceeded our set maximum. @@ -314,11 +346,13 @@ func (s *Sublist) Match(subject string) *SublistResult { func (s *Sublist) reduceCacheCount() { defer atomic.StoreInt32(&s.ccSweep, 0) // If we are over the cache limit randomly drop until under the limit. + s.Lock() s.cache.Range(func(k, v interface{}) bool { s.cache.Delete(k.(string)) n := atomic.AddInt32(&s.cacheNum, -1) return n >= slCacheSweep }) + s.Unlock() } // Helper function for auto-expanding remote qsubs. diff --git a/server/sublist_test.go b/server/sublist_test.go index 0d8e9689..7f0c4e9c 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -28,6 +28,7 @@ import ( "github.com/nats-io/nuid" ) +// FIXME(dlc) - this is also used by monitor_test. Not needed with t.Helper. func stackFatalf(t *testing.T, f string, args ...interface{}) { lines := make([]string, 0, 32) msg := fmt.Sprintf(f, args...) @@ -46,35 +47,41 @@ func stackFatalf(t *testing.T, f string, args ...interface{}) { } func verifyCount(s *Sublist, count uint32, t *testing.T) { + t.Helper() if s.Count() != count { - stackFatalf(t, "Count is %d, should be %d", s.Count(), count) + t.Fatalf("Count is %d, should be %d", s.Count(), count) } } func verifyLen(r []*subscription, l int, t *testing.T) { + t.Helper() if len(r) != l { - stackFatalf(t, "Results len is %d, should be %d", len(r), l) + t.Fatalf("Results len is %d, should be %d", len(r), l) } } func verifyQLen(r [][]*subscription, l int, t *testing.T) { + t.Helper() if len(r) != l { - stackFatalf(t, "Queue Results len is %d, should be %d", len(r), l) + t.Fatalf("Queue Results len is %d, should be %d", len(r), l) } } func verifyNumLevels(s *Sublist, expected int, t *testing.T) { + t.Helper() dl := s.numLevels() if dl != expected { - stackFatalf(t, "NumLevels is %d, should be %d", dl, expected) + t.Fatalf("NumLevels is %d, should be %d", dl, expected) } } func verifyQMember(qsubs [][]*subscription, val *subscription, t *testing.T) { + t.Helper() verifyMember(qsubs[findQSlot(val.queue, qsubs)], val, t) } func verifyMember(r []*subscription, val *subscription, t *testing.T) { + t.Helper() for _, v := range r { if v == nil { continue @@ -83,7 +90,7 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) { return } } - stackFatalf(t, "Subscription (%p) for [%s : %s] not found in results", val, val.subject, val.queue) + t.Fatalf("Subscription (%p) for [%s : %s] not found in results", val, val.subject, val.queue) } // Helpers to generate test subscriptions. @@ -113,7 +120,14 @@ func TestSublistInit(t *testing.T) { } func TestSublistInsertCount(t *testing.T) { - s := NewSublist() + testSublistInsertCount(t, NewSublist()) +} + +func TestSublistInsertCountNoCache(t *testing.T) { + testSublistInsertCount(t, NewSublistNoCache()) +} + +func testSublistInsertCount(t *testing.T, s *Sublist) { s.Insert(newSub("foo")) s.Insert(newSub("bar")) s.Insert(newSub("foo.bar")) @@ -121,7 +135,14 @@ func TestSublistInsertCount(t *testing.T) { } func TestSublistSimple(t *testing.T) { - s := NewSublist() + testSublistSimple(t, NewSublist()) +} + +func TestSublistSimpleNoCache(t *testing.T) { + testSublistSimple(t, NewSublistNoCache()) +} + +func testSublistSimple(t *testing.T, s *Sublist) { subject := "foo" sub := newSub(subject) s.Insert(sub) @@ -131,7 +152,14 @@ func TestSublistSimple(t *testing.T) { } func TestSublistSimpleMultiTokens(t *testing.T) { - s := NewSublist() + testSublistSimpleMultiTokens(t, NewSublist()) +} + +func TestSublistSimpleMultiTokensNoCache(t *testing.T) { + testSublistSimpleMultiTokens(t, NewSublistNoCache()) +} + +func testSublistSimpleMultiTokens(t *testing.T, s *Sublist) { subject := "foo.bar.baz" sub := newSub(subject) s.Insert(sub) @@ -141,7 +169,14 @@ func TestSublistSimpleMultiTokens(t *testing.T) { } func TestSublistPartialWildcard(t *testing.T) { - s := NewSublist() + testSublistPartialWildcard(t, NewSublist()) +} + +func TestSublistPartialWildcardNoCache(t *testing.T) { + testSublistPartialWildcard(t, NewSublistNoCache()) +} + +func testSublistPartialWildcard(t *testing.T, s *Sublist) { lsub := newSub("a.b.c") psub := newSub("a.*.c") s.Insert(lsub) @@ -153,7 +188,14 @@ func TestSublistPartialWildcard(t *testing.T) { } func TestSublistPartialWildcardAtEnd(t *testing.T) { - s := NewSublist() + testSublistPartialWildcardAtEnd(t, NewSublist()) +} + +func TestSublistPartialWildcardAtEndNoCache(t *testing.T) { + testSublistPartialWildcardAtEnd(t, NewSublistNoCache()) +} + +func testSublistPartialWildcardAtEnd(t *testing.T, s *Sublist) { lsub := newSub("a.b.c") psub := newSub("a.b.*") s.Insert(lsub) @@ -165,7 +207,14 @@ func TestSublistPartialWildcardAtEnd(t *testing.T) { } func TestSublistFullWildcard(t *testing.T) { - s := NewSublist() + testSublistFullWildcard(t, NewSublist()) +} + +func TestSublistFullWildcardNoCache(t *testing.T) { + testSublistFullWildcard(t, NewSublistNoCache()) +} + +func testSublistFullWildcard(t *testing.T, s *Sublist) { lsub := newSub("a.b.c") fsub := newSub("a.>") s.Insert(lsub) @@ -177,7 +226,14 @@ func TestSublistFullWildcard(t *testing.T) { } func TestSublistRemove(t *testing.T) { - s := NewSublist() + testSublistRemove(t, NewSublist()) +} + +func TestSublistRemoveNoCache(t *testing.T) { + testSublistRemove(t, NewSublistNoCache()) +} + +func testSublistRemove(t *testing.T, s *Sublist) { subject := "a.b.c.d" sub := newSub(subject) s.Insert(sub) @@ -193,7 +249,14 @@ func TestSublistRemove(t *testing.T) { } func TestSublistRemoveWildcard(t *testing.T) { - s := NewSublist() + testSublistRemoveWildcard(t, NewSublist()) +} + +func TestSublistRemoveWildcardNoCache(t *testing.T) { + testSublistRemoveWildcard(t, NewSublistNoCache()) +} + +func testSublistRemoveWildcard(t *testing.T, s *Sublist) { subject := "a.b.c.d" sub := newSub(subject) psub := newSub("a.b.*.d") @@ -215,7 +278,14 @@ func TestSublistRemoveWildcard(t *testing.T) { } func TestSublistRemoveCleanup(t *testing.T) { - s := NewSublist() + testSublistRemoveCleanup(t, NewSublist()) +} + +func TestSublistRemoveCleanupNoCache(t *testing.T) { + testSublistRemoveCleanup(t, NewSublistNoCache()) +} + +func testSublistRemoveCleanup(t *testing.T, s *Sublist) { literal := "a.b.c.d.e.f" depth := len(strings.Split(literal, tsep)) sub := newSub(literal) @@ -227,7 +297,14 @@ func TestSublistRemoveCleanup(t *testing.T) { } func TestSublistRemoveCleanupWildcards(t *testing.T) { - s := NewSublist() + testSublistRemoveCleanupWildcards(t, NewSublist()) +} + +func TestSublistRemoveCleanupWildcardsNoCache(t *testing.T) { + testSublistRemoveCleanupWildcards(t, NewSublistNoCache()) +} + +func testSublistRemoveCleanupWildcards(t *testing.T, s *Sublist) { subject := "a.b.*.d.e.>" depth := len(strings.Split(subject, tsep)) sub := newSub(subject) @@ -239,8 +316,15 @@ func TestSublistRemoveCleanupWildcards(t *testing.T) { } func TestSublistRemoveWithLargeSubs(t *testing.T) { + testSublistRemoveWithLargeSubs(t, NewSublist()) +} + +func TestSublistRemoveWithLargeSubsNoCache(t *testing.T) { + testSublistRemoveWithLargeSubs(t, NewSublistNoCache()) +} + +func testSublistRemoveWithLargeSubs(t *testing.T, s *Sublist) { subject := "foo" - s := NewSublist() for i := 0; i < plistMin*2; i++ { sub := newSub(subject) s.Insert(sub) @@ -259,7 +343,14 @@ func TestSublistRemoveWithLargeSubs(t *testing.T) { } func TestSublistRemoveByClient(t *testing.T) { - s := NewSublist() + testSublistRemoveByClient(t, NewSublist()) +} + +func TestSublistRemoveByClientNoCache(t *testing.T) { + testSublistRemoveByClient(t, NewSublistNoCache()) +} + +func testSublistRemoveByClient(t *testing.T, s *Sublist) { c := &client{} for i := 0; i < 10; i++ { subject := fmt.Sprintf("a.b.c.d.e.f.%d", i) @@ -281,14 +372,22 @@ func TestSublistRemoveByClient(t *testing.T) { if genid == atomic.LoadUint64(&s.genid) { t.Fatalf("GenId should have been changed after removal of subs") } - if cc := s.CacheCount(); cc != 0 { - t.Fatalf("Cache should be zero, got %d", cc) + if s.CacheEnabled() { + if cc := s.CacheCount(); cc != 0 { + t.Fatalf("Cache should be zero, got %d", cc) + } } } func TestSublistInvalidSubjectsInsert(t *testing.T) { - s := NewSublist() + testSublistInvalidSubjectsInsert(t, NewSublist()) +} +func TestSublistInvalidSubjectsInsertNoCache(t *testing.T) { + testSublistInvalidSubjectsInsert(t, NewSublistNoCache()) +} + +func testSublistInvalidSubjectsInsert(t *testing.T, s *Sublist) { // Insert, or subscriptions, can have wildcards, but not empty tokens, // and can not have a FWC that is not the terminal token. @@ -371,8 +470,14 @@ func TestSublistCache(t *testing.T) { } func TestSublistBasicQueueResults(t *testing.T) { - s := NewSublist() + testSublistBasicQueueResults(t, NewSublist()) +} +func TestSublistBasicQueueResultsNoCache(t *testing.T) { + testSublistBasicQueueResults(t, NewSublistNoCache()) +} + +func testSublistBasicQueueResults(t *testing.T, s *Sublist) { // Test some basics subject := "foo" sub := newSub(subject) @@ -561,10 +666,17 @@ func TestSubjectIsLiteral(t *testing.T) { } func TestSublistBadSubjectOnRemove(t *testing.T) { + testSublistBadSubjectOnRemove(t, NewSublist()) +} + +func TestSublistBadSubjectOnRemoveNoCache(t *testing.T) { + testSublistBadSubjectOnRemove(t, NewSublistNoCache()) +} + +func testSublistBadSubjectOnRemove(t *testing.T, s *Sublist) { bad := "a.b..d" sub := newSub(bad) - s := NewSublist() if err := s.Insert(sub); err != ErrInvalidSubject { t.Fatalf("Expected ErrInvalidSubject, got %v\n", err) } @@ -581,7 +693,14 @@ func TestSublistBadSubjectOnRemove(t *testing.T) { // This is from bug report #18 func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) { - s := NewSublist() + testSublistTwoTokenPubMatchSingleTokenSub(t, NewSublist()) +} + +func TestSublistTwoTokenPubMatchSingleTokenSubNoCache(t *testing.T) { + testSublistTwoTokenPubMatchSingleTokenSub(t, NewSublistNoCache()) +} + +func testSublistTwoTokenPubMatchSingleTokenSub(t *testing.T, s *Sublist) { sub := newSub("foo") s.Insert(sub) r := s.Match("foo") @@ -592,7 +711,14 @@ func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) { } func TestSublistInsertWithWildcardsAsLiterals(t *testing.T) { - s := NewSublist() + testSublistInsertWithWildcardsAsLiterals(t, NewSublist()) +} + +func TestSublistInsertWithWildcardsAsLiteralsNoCache(t *testing.T) { + testSublistInsertWithWildcardsAsLiterals(t, NewSublistNoCache()) +} + +func testSublistInsertWithWildcardsAsLiterals(t *testing.T, s *Sublist) { subjects := []string{"foo.*-", "foo.>-"} for _, subject := range subjects { sub := newSub(subject) @@ -607,7 +733,14 @@ func TestSublistInsertWithWildcardsAsLiterals(t *testing.T) { } func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) { - s := NewSublist() + testSublistRemoveWithWildcardsAsLiterals(t, NewSublist()) +} + +func TestSublistRemoveWithWildcardsAsLiteralsNoCache(t *testing.T) { + testSublistRemoveWithWildcardsAsLiterals(t, NewSublistNoCache()) +} + +func testSublistRemoveWithWildcardsAsLiterals(t *testing.T, s *Sublist) { subjects := []string{"foo.*-", "foo.>-"} for _, subject := range subjects { sub := newSub(subject) @@ -626,8 +759,14 @@ func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) { } func TestSublistRaceOnRemove(t *testing.T) { - s := NewSublist() + testSublistRaceOnRemove(t, NewSublist()) +} +func TestSublistRaceOnRemoveNoCache(t *testing.T) { + testSublistRaceOnRemove(t, NewSublistNoCache()) +} + +func testSublistRaceOnRemove(t *testing.T, s *Sublist) { var ( total = 100 subs = make(map[int]*subscription, total) // use map for randomness @@ -701,8 +840,14 @@ func TestSublistRaceOnRemove(t *testing.T) { } func TestSublistRaceOnInsert(t *testing.T) { - s := NewSublist() + testSublistRaceOnInsert(t, NewSublist()) +} +func TestSublistRaceOnInsertNoCache(t *testing.T) { + testSublistRaceOnInsert(t, NewSublistNoCache()) +} + +func testSublistRaceOnInsert(t *testing.T, s *Sublist) { var ( total = 100 subs = make(map[int]*subscription, total) // use map for randomness @@ -755,7 +900,7 @@ func TestSublistRaceOnInsert(t *testing.T) { } func TestSublistRaceOnMatch(t *testing.T) { - s := NewSublist() + s := NewSublistNoCache() s.Insert(newQSub("foo.*", "workers")) s.Insert(newQSub("foo.bar", "workers")) s.Insert(newSub("foo.*")) @@ -782,8 +927,6 @@ func TestSublistRaceOnMatch(t *testing.T) { } } } - // Empty cache to maximize chance for race - s.cache.Delete("foo.bar") } } go f() @@ -799,7 +942,14 @@ func TestSublistRaceOnMatch(t *testing.T) { // Remote subscriptions for queue subscribers will be weighted such that a single subscription // is received, but represents all of the queue subscribers on the remote side. func TestSublistRemoteQueueSubscriptions(t *testing.T) { - s := NewSublist() + testSublistRemoteQueueSubscriptions(t, NewSublist()) +} + +func TestSublistRemoteQueueSubscriptionsNoCache(t *testing.T) { + testSublistRemoteQueueSubscriptions(t, NewSublistNoCache()) +} + +func testSublistRemoteQueueSubscriptions(t *testing.T, s *Sublist) { // Normals s1 := newQSub("foo", "bar") s2 := newQSub("foo", "bar") @@ -842,6 +992,21 @@ func TestSublistRemoteQueueSubscriptions(t *testing.T) { verifyLen(r.qsubs[0], 2, t) } +func TestSublistSharedEmptyResult(t *testing.T) { + s := NewSublist() + r1 := s.Match("foo") + verifyLen(r1.psubs, 0, t) + verifyQLen(r1.qsubs, 0, t) + + r2 := s.Match("bar") + verifyLen(r2.psubs, 0, t) + verifyQLen(r2.qsubs, 0, t) + + if r1 != r2 { + t.Fatalf("Expected empty result to be a shared result set") + } +} + // -- Benchmarks Setup -- var benchSublistSubs []*subscription @@ -897,6 +1062,14 @@ func Benchmark______________________SublistInsert(b *testing.B) { } } +func Benchmark_______________SublistInsertNoCache(b *testing.B) { + s := NewSublistNoCache() + for i, l := 0, len(benchSublistSubs); i < b.N; i++ { + index := i % l + s.Insert(benchSublistSubs[index]) + } +} + func benchSublistTokens(b *testing.B, tokens string) { for i := 0; i < b.N; i++ { benchSublistSl.Match(tokens) @@ -1001,7 +1174,7 @@ func Benchmark________________SublistMatchLiteral(b *testing.B) { func Benchmark_____SublistMatch10kSubsWithNoCache(b *testing.B) { var nsubs = 512 - s := NewSublist() + s := NewSublistNoCache() subject := "foo" for i := 0; i < nsubs; i++ { s.Insert(newSub(subject)) @@ -1012,7 +1185,6 @@ func Benchmark_____SublistMatch10kSubsWithNoCache(b *testing.B) { if len(r.psubs) != nsubs { b.Fatalf("Results len is %d, should be %d", len(r.psubs), nsubs) } - s.cache.Delete(subject) } }