From 03aacecb8136358d81939e2a8113f85969950812 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 3 May 2020 14:31:34 -0700 Subject: [PATCH] Changed cache back to simple map. We were using a sync.Map. This did provide a benefit with massive contention from lots of Go routines. However this is only about 2x in the crazy extremes now and with a normal map and read locks we can assist the RemoveBatch which was a cause for performance issues. Signed-off-by: Derek Collison --- server/sublist.go | 152 +++++++++++++---------------------------- server/sublist_test.go | 38 ----------- 2 files changed, 47 insertions(+), 143 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index fa0a4d0a..e862483e 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -43,8 +43,6 @@ var ( ) const ( - // slNoCache for cacheNum means cache is disabled. - slNoCache = -22 // cacheMax is used to bound limit the frontend cache slCacheMax = 1024 // If we run a sweeper we will drain to this count. @@ -68,8 +66,7 @@ type Sublist struct { inserts uint64 removes uint64 root *level - cache *sync.Map - cacheNum int32 + cache map[string]*SublistResult ccSweep int32 notify *notifyMaps count uint32 @@ -115,9 +112,9 @@ func newLevel() *level { // NewSublist will create a default sublist with caching enabled per the flag. func NewSublist(enableCache bool) *Sublist { if enableCache { - return &Sublist{root: newLevel(), cache: &sync.Map{}} + return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)} } - return &Sublist{root: newLevel(), cacheNum: slNoCache} + return &Sublist{root: newLevel()} } // NewSublistWithCache will create a default sublist with caching enabled. @@ -132,7 +129,10 @@ func NewSublistNoCache() *Sublist { // CacheEnabled returns whether or not caching is enabled for this sublist. func (s *Sublist) CacheEnabled() bool { - return atomic.LoadInt32(&s.cacheNum) != slNoCache + s.RLock() + enabled := s.cache != nil + s.RUnlock() + return enabled } // RegisterNotification will register for notifications when interest for the given @@ -424,26 +424,23 @@ func (r *SublistResult) addSubToResult(sub *subscription) *SublistResult { // addToCache will add the new entry to the existing cache // entries if needed. Assumes write lock is held. +// Assumes write lock is held. func (s *Sublist) addToCache(subject string, sub *subscription) { if s.cache == nil { return } // If literal we can direct match. if subjectIsLiteral(subject) { - if v, ok := s.cache.Load(subject); ok { - r := v.(*SublistResult) - s.cache.Store(subject, r.addSubToResult(sub)) + if r := s.cache[subject]; r != nil { + s.cache[subject] = r.addSubToResult(sub) } return } - s.cache.Range(func(k, v interface{}) bool { - key := k.(string) - r := v.(*SublistResult) + for key, r := range s.cache { if matchLiteral(key, subject) { - s.cache.Store(key, r.addSubToResult(sub)) + s.cache[key] = r.addSubToResult(sub) } - return true - }) + } } // removeFromCache will remove the sub from any active cache entries. @@ -454,24 +451,15 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) { } // If literal we can direct match. if subjectIsLiteral(subject) { - // Load for accounting - if _, ok := s.cache.Load(subject); ok { - s.cache.Delete(subject) - atomic.AddInt32(&s.cacheNum, -1) - } + delete(s.cache, subject) return } // Wildcard here. - s.cache.Range(func(k, v interface{}) bool { - key := k.(string) + for key := range s.cache { if matchLiteral(key, subject) { - // Since someone else may be referecing, can't modify the list - // safely, just let it re-populate. - s.cache.Delete(key) - atomic.AddInt32(&s.cacheNum, -1) + delete(s.cache, key) } - return true - }) + } } // a place holder for an empty result. @@ -491,11 +479,16 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { atomic.AddUint64(&s.matches, 1) // Check cache first. - if atomic.LoadInt32(&s.cacheNum) > 0 { - if r, ok := s.cache.Load(subject); ok { - atomic.AddUint64(&s.cacheHits, 1) - return r.(*SublistResult) - } + if doLock { + s.RLock() + } + r, ok := s.cache[subject] + if doLock { + s.RUnlock() + } + if ok { + atomic.AddUint64(&s.cacheHits, 1) + return r } tsa := [32]string{} @@ -514,10 +507,10 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { // Get result from the main structure and place into the shared cache. // Hold the read lock to avoid race between match and store. - var n int32 + var n int if doLock { - s.RLock() + s.Lock() } matchLevel(s.root, tokens, result) @@ -526,11 +519,11 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { result = emptyResult } if s.cache != nil { - s.cache.Store(subject, result) - n = atomic.AddInt32(&s.cacheNum, 1) + s.cache[subject] = result + n = len(s.cache) } if doLock { - s.RUnlock() + s.Unlock() } // Reduce the cache count if we have exceeded our set maximum. @@ -547,11 +540,12 @@ func (s *Sublist) reduceCacheCount() { defer atomic.StoreInt32(&s.ccSweep, 0) // If we are over the cache limit randomly drop until under the limit. s.Lock() - s.cache.Range(func(k, v interface{}) bool { - s.cache.Delete(k.(string)) - n := atomic.AddInt32(&s.cacheNum, -1) - return n >= slCacheSweep - }) + for key := range s.cache { + delete(s.cache, key) + if len(s.cache) <= slCacheSweep { + break + } + } s.Unlock() } @@ -761,7 +755,6 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error { // Turn off our cache. s.cache = nil - atomic.StoreInt32(&s.cacheNum, 0) for _, sub := range subs { if err := s.remove(sub, false, false); err != nil { return err @@ -769,61 +762,10 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error { } // Turn caching back on here. atomic.AddUint64(&s.genid, 1) - s.cache = &sync.Map{} + s.cache = make(map[string]*SublistResult) return nil } -func (s *Sublist) checkNodeForClientSubs(n *node, c *client) { - var removed uint32 - for _, sub := range n.psubs { - if sub.client == c { - if s.removeFromNode(n, sub) { - s.removeFromCache(string(sub.subject), sub) - removed++ - } - } - } - // Queue subscriptions - for _, qr := range n.qsubs { - for _, sub := range qr { - if sub.client == c { - if s.removeFromNode(n, sub) { - s.removeFromCache(string(sub.subject), sub) - removed++ - } - } - } - } - s.count -= removed - s.removes += uint64(removed) -} - -func (s *Sublist) removeClientSubs(l *level, c *client) { - for _, n := range l.nodes { - s.checkNodeForClientSubs(n, c) - s.removeClientSubs(n.next, c) - } - if l.pwc != nil { - s.checkNodeForClientSubs(l.pwc, c) - s.removeClientSubs(l.pwc.next, c) - } - if l.fwc != nil { - s.checkNodeForClientSubs(l.fwc, c) - s.removeClientSubs(l.fwc.next, c) - } -} - -// RemoveAllForClient will remove all subscriptions for a given client. -func (s *Sublist) RemoveAllForClient(c *client) { - s.Lock() - removes := s.removes - s.removeClientSubs(s.root, c) - if s.removes != removes { - atomic.AddUint64(&s.genid, 1) - } - s.Unlock() -} - // pruneNode is used to prune an empty node from the tree. func (l *level) pruneNode(n *node, t string) { if n == nil { @@ -897,7 +839,10 @@ func (s *Sublist) Count() uint32 { // CacheCount returns the number of result sets in the cache. func (s *Sublist) CacheCount() int { - return int(atomic.LoadInt32(&s.cacheNum)) + s.RLock() + cc := len(s.cache) + s.RUnlock() + return cc } // SublistStats are public stats for the sublist @@ -940,14 +885,13 @@ func (s *Sublist) Stats() *SublistStats { s.RLock() cache := s.cache + cc := len(s.cache) st.NumSubs = s.count st.NumInserts = s.inserts st.NumRemoves = s.removes s.RUnlock() - if cn := atomic.LoadInt32(&s.cacheNum); cn > 0 { - st.NumCache = uint32(cn) - } + st.NumCache = uint32(cc) st.NumMatches = atomic.LoadUint64(&s.matches) if st.NumMatches > 0 { st.CacheHitRate = float64(atomic.LoadUint64(&s.cacheHits)) / float64(st.NumMatches) @@ -957,16 +901,14 @@ func (s *Sublist) Stats() *SublistStats { // If this is called frequently, which it should not be, this could hurt performance. if cache != nil { tot, max, clen := 0, 0, 0 - s.cache.Range(func(k, v interface{}) bool { + for _, r := range s.cache { clen++ - r := v.(*SublistResult) l := len(r.psubs) + len(r.qsubs) tot += l if l > max { max = l } - return true - }) + } st.totFanout = tot st.cacheCnt = clen st.MaxFanout = uint32(max) diff --git a/server/sublist_test.go b/server/sublist_test.go index 939ee648..272196a6 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -22,7 +22,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "testing" "time" @@ -343,43 +342,6 @@ func testSublistRemoveWithLargeSubs(t *testing.T, s *Sublist) { verifyLen(r.psubs, plistMin*2-3, t) } -func TestSublistRemoveByClient(t *testing.T) { - testSublistRemoveByClient(t, NewSublistWithCache()) -} - -func TestSublistRemoveByClientNoCache(t *testing.T) { - testSublistRemoveByClient(t, NewSublistNoCache()) -} - -func testSublistRemoveByClient(t *testing.T, s *Sublist) { - c := &client{} - for i := 0; i < 10; i++ { - subject := fmt.Sprintf("a.b.c.d.e.f.%d", i) - sub := &subscription{client: c, subject: []byte(subject)} - s.Insert(sub) - } - verifyCount(s, 10, t) - s.Insert(&subscription{client: c, subject: []byte(">")}) - s.Insert(&subscription{client: c, subject: []byte("foo.*")}) - s.Insert(&subscription{client: c, subject: []byte("foo"), queue: []byte("bar")}) - s.Insert(&subscription{client: c, subject: []byte("foo"), queue: []byte("bar")}) - s.Insert(&subscription{client: c, subject: []byte("foo.bar"), queue: []byte("baz")}) - s.Insert(&subscription{client: c, subject: []byte("foo.bar"), queue: []byte("baz")}) - verifyCount(s, 16, t) - genid := atomic.LoadUint64(&s.genid) - s.RemoveAllForClient(c) - verifyCount(s, 0, t) - // genid should be different - if genid == atomic.LoadUint64(&s.genid) { - t.Fatalf("GenId should have been changed after removal of subs") - } - if s.CacheEnabled() { - if cc := s.CacheCount(); cc != 0 { - t.Fatalf("Cache should be zero, got %d", cc) - } - } -} - func TestSublistInvalidSubjectsInsert(t *testing.T) { testSublistInvalidSubjectsInsert(t, NewSublistWithCache()) }