Changed cache back to simple map.

We were using a sync.Map. This did provide a benefit with massive contention from lots of Go routines. However this is only about 2x in the crazy extremes now and with a normal map and read locks we can assist the RemoveBatch which was a cause for performance issues.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-05-03 14:31:34 -07:00
parent d3a447e6e9
commit 03aacecb81
2 changed files with 47 additions and 143 deletions

View File

@@ -43,8 +43,6 @@ var (
)
const (
// slNoCache for cacheNum means cache is disabled.
slNoCache = -22
// cacheMax is used to bound limit the frontend cache
slCacheMax = 1024
// If we run a sweeper we will drain to this count.
@@ -68,8 +66,7 @@ type Sublist struct {
inserts uint64
removes uint64
root *level
cache *sync.Map
cacheNum int32
cache map[string]*SublistResult
ccSweep int32
notify *notifyMaps
count uint32
@@ -115,9 +112,9 @@ func newLevel() *level {
// NewSublist will create a default sublist with caching enabled per the flag.
func NewSublist(enableCache bool) *Sublist {
if enableCache {
return &Sublist{root: newLevel(), cache: &sync.Map{}}
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
}
return &Sublist{root: newLevel(), cacheNum: slNoCache}
return &Sublist{root: newLevel()}
}
// NewSublistWithCache will create a default sublist with caching enabled.
@@ -132,7 +129,10 @@ func NewSublistNoCache() *Sublist {
// CacheEnabled returns whether or not caching is enabled for this sublist.
func (s *Sublist) CacheEnabled() bool {
return atomic.LoadInt32(&s.cacheNum) != slNoCache
s.RLock()
enabled := s.cache != nil
s.RUnlock()
return enabled
}
// RegisterNotification will register for notifications when interest for the given
@@ -424,26 +424,23 @@ func (r *SublistResult) addSubToResult(sub *subscription) *SublistResult {
// addToCache will add the new entry to the existing cache
// entries if needed. Assumes write lock is held.
// Assumes write lock is held.
func (s *Sublist) addToCache(subject string, sub *subscription) {
if s.cache == nil {
return
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
if v, ok := s.cache.Load(subject); ok {
r := v.(*SublistResult)
s.cache.Store(subject, r.addSubToResult(sub))
if r := s.cache[subject]; r != nil {
s.cache[subject] = r.addSubToResult(sub)
}
return
}
s.cache.Range(func(k, v interface{}) bool {
key := k.(string)
r := v.(*SublistResult)
for key, r := range s.cache {
if matchLiteral(key, subject) {
s.cache.Store(key, r.addSubToResult(sub))
s.cache[key] = r.addSubToResult(sub)
}
return true
})
}
}
// removeFromCache will remove the sub from any active cache entries.
@@ -454,24 +451,15 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) {
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
// Load for accounting
if _, ok := s.cache.Load(subject); ok {
s.cache.Delete(subject)
atomic.AddInt32(&s.cacheNum, -1)
}
delete(s.cache, subject)
return
}
// Wildcard here.
s.cache.Range(func(k, v interface{}) bool {
key := k.(string)
for key := range s.cache {
if matchLiteral(key, subject) {
// Since someone else may be referecing, can't modify the list
// safely, just let it re-populate.
s.cache.Delete(key)
atomic.AddInt32(&s.cacheNum, -1)
delete(s.cache, key)
}
return true
})
}
}
// a place holder for an empty result.
@@ -491,11 +479,16 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
atomic.AddUint64(&s.matches, 1)
// Check cache first.
if atomic.LoadInt32(&s.cacheNum) > 0 {
if r, ok := s.cache.Load(subject); ok {
atomic.AddUint64(&s.cacheHits, 1)
return r.(*SublistResult)
}
if doLock {
s.RLock()
}
r, ok := s.cache[subject]
if doLock {
s.RUnlock()
}
if ok {
atomic.AddUint64(&s.cacheHits, 1)
return r
}
tsa := [32]string{}
@@ -514,10 +507,10 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
// Get result from the main structure and place into the shared cache.
// Hold the read lock to avoid race between match and store.
var n int32
var n int
if doLock {
s.RLock()
s.Lock()
}
matchLevel(s.root, tokens, result)
@@ -526,11 +519,11 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
result = emptyResult
}
if s.cache != nil {
s.cache.Store(subject, result)
n = atomic.AddInt32(&s.cacheNum, 1)
s.cache[subject] = result
n = len(s.cache)
}
if doLock {
s.RUnlock()
s.Unlock()
}
// Reduce the cache count if we have exceeded our set maximum.
@@ -547,11 +540,12 @@ func (s *Sublist) reduceCacheCount() {
defer atomic.StoreInt32(&s.ccSweep, 0)
// If we are over the cache limit randomly drop until under the limit.
s.Lock()
s.cache.Range(func(k, v interface{}) bool {
s.cache.Delete(k.(string))
n := atomic.AddInt32(&s.cacheNum, -1)
return n >= slCacheSweep
})
for key := range s.cache {
delete(s.cache, key)
if len(s.cache) <= slCacheSweep {
break
}
}
s.Unlock()
}
@@ -761,7 +755,6 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error {
// Turn off our cache.
s.cache = nil
atomic.StoreInt32(&s.cacheNum, 0)
for _, sub := range subs {
if err := s.remove(sub, false, false); err != nil {
return err
@@ -769,61 +762,10 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error {
}
// Turn caching back on here.
atomic.AddUint64(&s.genid, 1)
s.cache = &sync.Map{}
s.cache = make(map[string]*SublistResult)
return nil
}
func (s *Sublist) checkNodeForClientSubs(n *node, c *client) {
var removed uint32
for _, sub := range n.psubs {
if sub.client == c {
if s.removeFromNode(n, sub) {
s.removeFromCache(string(sub.subject), sub)
removed++
}
}
}
// Queue subscriptions
for _, qr := range n.qsubs {
for _, sub := range qr {
if sub.client == c {
if s.removeFromNode(n, sub) {
s.removeFromCache(string(sub.subject), sub)
removed++
}
}
}
}
s.count -= removed
s.removes += uint64(removed)
}
func (s *Sublist) removeClientSubs(l *level, c *client) {
for _, n := range l.nodes {
s.checkNodeForClientSubs(n, c)
s.removeClientSubs(n.next, c)
}
if l.pwc != nil {
s.checkNodeForClientSubs(l.pwc, c)
s.removeClientSubs(l.pwc.next, c)
}
if l.fwc != nil {
s.checkNodeForClientSubs(l.fwc, c)
s.removeClientSubs(l.fwc.next, c)
}
}
// RemoveAllForClient will remove all subscriptions for a given client.
func (s *Sublist) RemoveAllForClient(c *client) {
s.Lock()
removes := s.removes
s.removeClientSubs(s.root, c)
if s.removes != removes {
atomic.AddUint64(&s.genid, 1)
}
s.Unlock()
}
// pruneNode is used to prune an empty node from the tree.
func (l *level) pruneNode(n *node, t string) {
if n == nil {
@@ -897,7 +839,10 @@ func (s *Sublist) Count() uint32 {
// CacheCount returns the number of result sets in the cache.
func (s *Sublist) CacheCount() int {
return int(atomic.LoadInt32(&s.cacheNum))
s.RLock()
cc := len(s.cache)
s.RUnlock()
return cc
}
// SublistStats are public stats for the sublist
@@ -940,14 +885,13 @@ func (s *Sublist) Stats() *SublistStats {
s.RLock()
cache := s.cache
cc := len(s.cache)
st.NumSubs = s.count
st.NumInserts = s.inserts
st.NumRemoves = s.removes
s.RUnlock()
if cn := atomic.LoadInt32(&s.cacheNum); cn > 0 {
st.NumCache = uint32(cn)
}
st.NumCache = uint32(cc)
st.NumMatches = atomic.LoadUint64(&s.matches)
if st.NumMatches > 0 {
st.CacheHitRate = float64(atomic.LoadUint64(&s.cacheHits)) / float64(st.NumMatches)
@@ -957,16 +901,14 @@ func (s *Sublist) Stats() *SublistStats {
// If this is called frequently, which it should not be, this could hurt performance.
if cache != nil {
tot, max, clen := 0, 0, 0
s.cache.Range(func(k, v interface{}) bool {
for _, r := range s.cache {
clen++
r := v.(*SublistResult)
l := len(r.psubs) + len(r.qsubs)
tot += l
if l > max {
max = l
}
return true
})
}
st.totFanout = tot
st.cacheCnt = clen
st.MaxFanout = uint32(max)

View File

@@ -22,7 +22,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -343,43 +342,6 @@ func testSublistRemoveWithLargeSubs(t *testing.T, s *Sublist) {
verifyLen(r.psubs, plistMin*2-3, t)
}
func TestSublistRemoveByClient(t *testing.T) {
testSublistRemoveByClient(t, NewSublistWithCache())
}
func TestSublistRemoveByClientNoCache(t *testing.T) {
testSublistRemoveByClient(t, NewSublistNoCache())
}
func testSublistRemoveByClient(t *testing.T, s *Sublist) {
c := &client{}
for i := 0; i < 10; i++ {
subject := fmt.Sprintf("a.b.c.d.e.f.%d", i)
sub := &subscription{client: c, subject: []byte(subject)}
s.Insert(sub)
}
verifyCount(s, 10, t)
s.Insert(&subscription{client: c, subject: []byte(">")})
s.Insert(&subscription{client: c, subject: []byte("foo.*")})
s.Insert(&subscription{client: c, subject: []byte("foo"), queue: []byte("bar")})
s.Insert(&subscription{client: c, subject: []byte("foo"), queue: []byte("bar")})
s.Insert(&subscription{client: c, subject: []byte("foo.bar"), queue: []byte("baz")})
s.Insert(&subscription{client: c, subject: []byte("foo.bar"), queue: []byte("baz")})
verifyCount(s, 16, t)
genid := atomic.LoadUint64(&s.genid)
s.RemoveAllForClient(c)
verifyCount(s, 0, t)
// genid should be different
if genid == atomic.LoadUint64(&s.genid) {
t.Fatalf("GenId should have been changed after removal of subs")
}
if s.CacheEnabled() {
if cc := s.CacheCount(); cc != 0 {
t.Fatalf("Cache should be zero, got %d", cc)
}
}
}
func TestSublistInvalidSubjectsInsert(t *testing.T) {
testSublistInvalidSubjectsInsert(t, NewSublistWithCache())
}