Allow disabling of shared cache with new constuctor. Also share empty results.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-04-22 16:34:39 -07:00
parent bc11c1c284
commit da2dab92d1
2 changed files with 248 additions and 42 deletions

View File

@@ -36,11 +36,13 @@ const (
// Sublist related errors
var (
ErrInvalidSubject = errors.New("sublist: Invalid Subject")
ErrNotFound = errors.New("sublist: No Matches Found")
ErrInvalidSubject = errors.New("sublist: invalid subject")
ErrNotFound = errors.New("sublist: no matches found")
)
const (
// slNoCache for cacheNum means cache is disabled.
slNoCache = -22
// cacheMax is used to bound limit the frontend cache
slCacheMax = 1024
// If we run a sweeper we will drain to this count.
@@ -64,7 +66,7 @@ type Sublist struct {
inserts uint64
removes uint64
root *level
cache sync.Map
cache *sync.Map
cacheNum int32
ccSweep int32
count uint32
@@ -97,7 +99,17 @@ func newLevel() *level {
// NewSublist will create a default sublist
func NewSublist() *Sublist {
return &Sublist{root: newLevel()}
return &Sublist{root: newLevel(), cache: &sync.Map{}}
}
// NewSublistNoCache will create a default sublist without caching enabled.
func NewSublistNoCache() *Sublist {
return &Sublist{root: newLevel(), cacheNum: slNoCache}
}
// CacheEnabled returns whether or not caching is enabled for this sublist.
func (s *Sublist) CacheEnabled() bool {
return atomic.LoadInt32(&s.cacheNum) != slNoCache
}
// Insert adds a subscription into the sublist
@@ -226,6 +238,9 @@ func (r *SublistResult) addSubToResult(sub *subscription) *SublistResult {
// addToCache will add the new entry to the existing cache
// entries if needed. Assumes write lock is held.
func (s *Sublist) addToCache(subject string, sub *subscription) {
if s.cache == nil {
return
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
if v, ok := s.cache.Load(subject); ok {
@@ -247,6 +262,9 @@ func (s *Sublist) addToCache(subject string, sub *subscription) {
// removeFromCache will remove the sub from any active cache entries.
// Assumes write lock is held.
func (s *Sublist) removeFromCache(subject string, sub *subscription) {
if s.cache == nil {
return
}
// If literal we can direct match.
if subjectIsLiteral(subject) {
// Load for accounting
@@ -268,15 +286,21 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) {
})
}
// a place holder for an empty result.
var emptyResult = &SublistResult{}
// Match will match all entries to the literal subject.
// It will return a set of results for both normal and queue subscribers.
func (s *Sublist) Match(subject string) *SublistResult {
atomic.AddUint64(&s.matches, 1)
// Check cache first.
if r, ok := s.cache.Load(subject); ok {
atomic.AddUint64(&s.cacheHits, 1)
return r.(*SublistResult)
ce := atomic.LoadInt32(&s.cacheNum)
if ce > 0 {
if r, ok := s.cache.Load(subject); ok {
atomic.AddUint64(&s.cacheHits, 1)
return r.(*SublistResult)
}
}
tsa := [32]string{}
@@ -295,10 +319,18 @@ func (s *Sublist) Match(subject string) *SublistResult {
// Get result from the main structure and place into the shared cache.
// Hold the read lock to avoid race between match and store.
var n int32
s.RLock()
matchLevel(s.root, tokens, result)
s.cache.Store(subject, result)
n := atomic.AddInt32(&s.cacheNum, 1)
// Check for empty result.
if len(result.psubs) == 0 && len(result.qsubs) == 0 {
result = emptyResult
}
if ce != slNoCache {
s.cache.Store(subject, result)
n = atomic.AddInt32(&s.cacheNum, 1)
}
s.RUnlock()
// Reduce the cache count if we have exceeded our set maximum.
@@ -314,11 +346,13 @@ func (s *Sublist) Match(subject string) *SublistResult {
func (s *Sublist) reduceCacheCount() {
defer atomic.StoreInt32(&s.ccSweep, 0)
// If we are over the cache limit randomly drop until under the limit.
s.Lock()
s.cache.Range(func(k, v interface{}) bool {
s.cache.Delete(k.(string))
n := atomic.AddInt32(&s.cacheNum, -1)
return n >= slCacheSweep
})
s.Unlock()
}
// Helper function for auto-expanding remote qsubs.

View File

@@ -28,6 +28,7 @@ import (
"github.com/nats-io/nuid"
)
// FIXME(dlc) - this is also used by monitor_test. Not needed with t.Helper.
func stackFatalf(t *testing.T, f string, args ...interface{}) {
lines := make([]string, 0, 32)
msg := fmt.Sprintf(f, args...)
@@ -46,35 +47,41 @@ func stackFatalf(t *testing.T, f string, args ...interface{}) {
}
func verifyCount(s *Sublist, count uint32, t *testing.T) {
t.Helper()
if s.Count() != count {
stackFatalf(t, "Count is %d, should be %d", s.Count(), count)
t.Fatalf("Count is %d, should be %d", s.Count(), count)
}
}
func verifyLen(r []*subscription, l int, t *testing.T) {
t.Helper()
if len(r) != l {
stackFatalf(t, "Results len is %d, should be %d", len(r), l)
t.Fatalf("Results len is %d, should be %d", len(r), l)
}
}
func verifyQLen(r [][]*subscription, l int, t *testing.T) {
t.Helper()
if len(r) != l {
stackFatalf(t, "Queue Results len is %d, should be %d", len(r), l)
t.Fatalf("Queue Results len is %d, should be %d", len(r), l)
}
}
func verifyNumLevels(s *Sublist, expected int, t *testing.T) {
t.Helper()
dl := s.numLevels()
if dl != expected {
stackFatalf(t, "NumLevels is %d, should be %d", dl, expected)
t.Fatalf("NumLevels is %d, should be %d", dl, expected)
}
}
func verifyQMember(qsubs [][]*subscription, val *subscription, t *testing.T) {
t.Helper()
verifyMember(qsubs[findQSlot(val.queue, qsubs)], val, t)
}
func verifyMember(r []*subscription, val *subscription, t *testing.T) {
t.Helper()
for _, v := range r {
if v == nil {
continue
@@ -83,7 +90,7 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) {
return
}
}
stackFatalf(t, "Subscription (%p) for [%s : %s] not found in results", val, val.subject, val.queue)
t.Fatalf("Subscription (%p) for [%s : %s] not found in results", val, val.subject, val.queue)
}
// Helpers to generate test subscriptions.
@@ -113,7 +120,14 @@ func TestSublistInit(t *testing.T) {
}
func TestSublistInsertCount(t *testing.T) {
s := NewSublist()
testSublistInsertCount(t, NewSublist())
}
func TestSublistInsertCountNoCache(t *testing.T) {
testSublistInsertCount(t, NewSublistNoCache())
}
func testSublistInsertCount(t *testing.T, s *Sublist) {
s.Insert(newSub("foo"))
s.Insert(newSub("bar"))
s.Insert(newSub("foo.bar"))
@@ -121,7 +135,14 @@ func TestSublistInsertCount(t *testing.T) {
}
func TestSublistSimple(t *testing.T) {
s := NewSublist()
testSublistSimple(t, NewSublist())
}
func TestSublistSimpleNoCache(t *testing.T) {
testSublistSimple(t, NewSublistNoCache())
}
func testSublistSimple(t *testing.T, s *Sublist) {
subject := "foo"
sub := newSub(subject)
s.Insert(sub)
@@ -131,7 +152,14 @@ func TestSublistSimple(t *testing.T) {
}
func TestSublistSimpleMultiTokens(t *testing.T) {
s := NewSublist()
testSublistSimpleMultiTokens(t, NewSublist())
}
func TestSublistSimpleMultiTokensNoCache(t *testing.T) {
testSublistSimpleMultiTokens(t, NewSublistNoCache())
}
func testSublistSimpleMultiTokens(t *testing.T, s *Sublist) {
subject := "foo.bar.baz"
sub := newSub(subject)
s.Insert(sub)
@@ -141,7 +169,14 @@ func TestSublistSimpleMultiTokens(t *testing.T) {
}
func TestSublistPartialWildcard(t *testing.T) {
s := NewSublist()
testSublistPartialWildcard(t, NewSublist())
}
func TestSublistPartialWildcardNoCache(t *testing.T) {
testSublistPartialWildcard(t, NewSublistNoCache())
}
func testSublistPartialWildcard(t *testing.T, s *Sublist) {
lsub := newSub("a.b.c")
psub := newSub("a.*.c")
s.Insert(lsub)
@@ -153,7 +188,14 @@ func TestSublistPartialWildcard(t *testing.T) {
}
func TestSublistPartialWildcardAtEnd(t *testing.T) {
s := NewSublist()
testSublistPartialWildcardAtEnd(t, NewSublist())
}
func TestSublistPartialWildcardAtEndNoCache(t *testing.T) {
testSublistPartialWildcardAtEnd(t, NewSublistNoCache())
}
func testSublistPartialWildcardAtEnd(t *testing.T, s *Sublist) {
lsub := newSub("a.b.c")
psub := newSub("a.b.*")
s.Insert(lsub)
@@ -165,7 +207,14 @@ func TestSublistPartialWildcardAtEnd(t *testing.T) {
}
func TestSublistFullWildcard(t *testing.T) {
s := NewSublist()
testSublistFullWildcard(t, NewSublist())
}
func TestSublistFullWildcardNoCache(t *testing.T) {
testSublistFullWildcard(t, NewSublistNoCache())
}
func testSublistFullWildcard(t *testing.T, s *Sublist) {
lsub := newSub("a.b.c")
fsub := newSub("a.>")
s.Insert(lsub)
@@ -177,7 +226,14 @@ func TestSublistFullWildcard(t *testing.T) {
}
func TestSublistRemove(t *testing.T) {
s := NewSublist()
testSublistRemove(t, NewSublist())
}
func TestSublistRemoveNoCache(t *testing.T) {
testSublistRemove(t, NewSublistNoCache())
}
func testSublistRemove(t *testing.T, s *Sublist) {
subject := "a.b.c.d"
sub := newSub(subject)
s.Insert(sub)
@@ -193,7 +249,14 @@ func TestSublistRemove(t *testing.T) {
}
func TestSublistRemoveWildcard(t *testing.T) {
s := NewSublist()
testSublistRemoveWildcard(t, NewSublist())
}
func TestSublistRemoveWildcardNoCache(t *testing.T) {
testSublistRemoveWildcard(t, NewSublistNoCache())
}
func testSublistRemoveWildcard(t *testing.T, s *Sublist) {
subject := "a.b.c.d"
sub := newSub(subject)
psub := newSub("a.b.*.d")
@@ -215,7 +278,14 @@ func TestSublistRemoveWildcard(t *testing.T) {
}
func TestSublistRemoveCleanup(t *testing.T) {
s := NewSublist()
testSublistRemoveCleanup(t, NewSublist())
}
func TestSublistRemoveCleanupNoCache(t *testing.T) {
testSublistRemoveCleanup(t, NewSublistNoCache())
}
func testSublistRemoveCleanup(t *testing.T, s *Sublist) {
literal := "a.b.c.d.e.f"
depth := len(strings.Split(literal, tsep))
sub := newSub(literal)
@@ -227,7 +297,14 @@ func TestSublistRemoveCleanup(t *testing.T) {
}
func TestSublistRemoveCleanupWildcards(t *testing.T) {
s := NewSublist()
testSublistRemoveCleanupWildcards(t, NewSublist())
}
func TestSublistRemoveCleanupWildcardsNoCache(t *testing.T) {
testSublistRemoveCleanupWildcards(t, NewSublistNoCache())
}
func testSublistRemoveCleanupWildcards(t *testing.T, s *Sublist) {
subject := "a.b.*.d.e.>"
depth := len(strings.Split(subject, tsep))
sub := newSub(subject)
@@ -239,8 +316,15 @@ func TestSublistRemoveCleanupWildcards(t *testing.T) {
}
func TestSublistRemoveWithLargeSubs(t *testing.T) {
testSublistRemoveWithLargeSubs(t, NewSublist())
}
func TestSublistRemoveWithLargeSubsNoCache(t *testing.T) {
testSublistRemoveWithLargeSubs(t, NewSublistNoCache())
}
func testSublistRemoveWithLargeSubs(t *testing.T, s *Sublist) {
subject := "foo"
s := NewSublist()
for i := 0; i < plistMin*2; i++ {
sub := newSub(subject)
s.Insert(sub)
@@ -259,7 +343,14 @@ func TestSublistRemoveWithLargeSubs(t *testing.T) {
}
func TestSublistRemoveByClient(t *testing.T) {
s := NewSublist()
testSublistRemoveByClient(t, NewSublist())
}
func TestSublistRemoveByClientNoCache(t *testing.T) {
testSublistRemoveByClient(t, NewSublistNoCache())
}
func testSublistRemoveByClient(t *testing.T, s *Sublist) {
c := &client{}
for i := 0; i < 10; i++ {
subject := fmt.Sprintf("a.b.c.d.e.f.%d", i)
@@ -281,14 +372,22 @@ func TestSublistRemoveByClient(t *testing.T) {
if genid == atomic.LoadUint64(&s.genid) {
t.Fatalf("GenId should have been changed after removal of subs")
}
if cc := s.CacheCount(); cc != 0 {
t.Fatalf("Cache should be zero, got %d", cc)
if s.CacheEnabled() {
if cc := s.CacheCount(); cc != 0 {
t.Fatalf("Cache should be zero, got %d", cc)
}
}
}
func TestSublistInvalidSubjectsInsert(t *testing.T) {
s := NewSublist()
testSublistInvalidSubjectsInsert(t, NewSublist())
}
func TestSublistInvalidSubjectsInsertNoCache(t *testing.T) {
testSublistInvalidSubjectsInsert(t, NewSublistNoCache())
}
func testSublistInvalidSubjectsInsert(t *testing.T, s *Sublist) {
// Insert, or subscriptions, can have wildcards, but not empty tokens,
// and can not have a FWC that is not the terminal token.
@@ -371,8 +470,14 @@ func TestSublistCache(t *testing.T) {
}
func TestSublistBasicQueueResults(t *testing.T) {
s := NewSublist()
testSublistBasicQueueResults(t, NewSublist())
}
func TestSublistBasicQueueResultsNoCache(t *testing.T) {
testSublistBasicQueueResults(t, NewSublistNoCache())
}
func testSublistBasicQueueResults(t *testing.T, s *Sublist) {
// Test some basics
subject := "foo"
sub := newSub(subject)
@@ -561,10 +666,17 @@ func TestSubjectIsLiteral(t *testing.T) {
}
func TestSublistBadSubjectOnRemove(t *testing.T) {
testSublistBadSubjectOnRemove(t, NewSublist())
}
func TestSublistBadSubjectOnRemoveNoCache(t *testing.T) {
testSublistBadSubjectOnRemove(t, NewSublistNoCache())
}
func testSublistBadSubjectOnRemove(t *testing.T, s *Sublist) {
bad := "a.b..d"
sub := newSub(bad)
s := NewSublist()
if err := s.Insert(sub); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject, got %v\n", err)
}
@@ -581,7 +693,14 @@ func TestSublistBadSubjectOnRemove(t *testing.T) {
// This is from bug report #18
func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) {
s := NewSublist()
testSublistTwoTokenPubMatchSingleTokenSub(t, NewSublist())
}
func TestSublistTwoTokenPubMatchSingleTokenSubNoCache(t *testing.T) {
testSublistTwoTokenPubMatchSingleTokenSub(t, NewSublistNoCache())
}
func testSublistTwoTokenPubMatchSingleTokenSub(t *testing.T, s *Sublist) {
sub := newSub("foo")
s.Insert(sub)
r := s.Match("foo")
@@ -592,7 +711,14 @@ func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) {
}
func TestSublistInsertWithWildcardsAsLiterals(t *testing.T) {
s := NewSublist()
testSublistInsertWithWildcardsAsLiterals(t, NewSublist())
}
func TestSublistInsertWithWildcardsAsLiteralsNoCache(t *testing.T) {
testSublistInsertWithWildcardsAsLiterals(t, NewSublistNoCache())
}
func testSublistInsertWithWildcardsAsLiterals(t *testing.T, s *Sublist) {
subjects := []string{"foo.*-", "foo.>-"}
for _, subject := range subjects {
sub := newSub(subject)
@@ -607,7 +733,14 @@ func TestSublistInsertWithWildcardsAsLiterals(t *testing.T) {
}
func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) {
s := NewSublist()
testSublistRemoveWithWildcardsAsLiterals(t, NewSublist())
}
func TestSublistRemoveWithWildcardsAsLiteralsNoCache(t *testing.T) {
testSublistRemoveWithWildcardsAsLiterals(t, NewSublistNoCache())
}
func testSublistRemoveWithWildcardsAsLiterals(t *testing.T, s *Sublist) {
subjects := []string{"foo.*-", "foo.>-"}
for _, subject := range subjects {
sub := newSub(subject)
@@ -626,8 +759,14 @@ func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) {
}
func TestSublistRaceOnRemove(t *testing.T) {
s := NewSublist()
testSublistRaceOnRemove(t, NewSublist())
}
func TestSublistRaceOnRemoveNoCache(t *testing.T) {
testSublistRaceOnRemove(t, NewSublistNoCache())
}
func testSublistRaceOnRemove(t *testing.T, s *Sublist) {
var (
total = 100
subs = make(map[int]*subscription, total) // use map for randomness
@@ -701,8 +840,14 @@ func TestSublistRaceOnRemove(t *testing.T) {
}
func TestSublistRaceOnInsert(t *testing.T) {
s := NewSublist()
testSublistRaceOnInsert(t, NewSublist())
}
func TestSublistRaceOnInsertNoCache(t *testing.T) {
testSublistRaceOnInsert(t, NewSublistNoCache())
}
func testSublistRaceOnInsert(t *testing.T, s *Sublist) {
var (
total = 100
subs = make(map[int]*subscription, total) // use map for randomness
@@ -755,7 +900,7 @@ func TestSublistRaceOnInsert(t *testing.T) {
}
func TestSublistRaceOnMatch(t *testing.T) {
s := NewSublist()
s := NewSublistNoCache()
s.Insert(newQSub("foo.*", "workers"))
s.Insert(newQSub("foo.bar", "workers"))
s.Insert(newSub("foo.*"))
@@ -782,8 +927,6 @@ func TestSublistRaceOnMatch(t *testing.T) {
}
}
}
// Empty cache to maximize chance for race
s.cache.Delete("foo.bar")
}
}
go f()
@@ -799,7 +942,14 @@ func TestSublistRaceOnMatch(t *testing.T) {
// Remote subscriptions for queue subscribers will be weighted such that a single subscription
// is received, but represents all of the queue subscribers on the remote side.
func TestSublistRemoteQueueSubscriptions(t *testing.T) {
s := NewSublist()
testSublistRemoteQueueSubscriptions(t, NewSublist())
}
func TestSublistRemoteQueueSubscriptionsNoCache(t *testing.T) {
testSublistRemoteQueueSubscriptions(t, NewSublistNoCache())
}
func testSublistRemoteQueueSubscriptions(t *testing.T, s *Sublist) {
// Normals
s1 := newQSub("foo", "bar")
s2 := newQSub("foo", "bar")
@@ -842,6 +992,21 @@ func TestSublistRemoteQueueSubscriptions(t *testing.T) {
verifyLen(r.qsubs[0], 2, t)
}
func TestSublistSharedEmptyResult(t *testing.T) {
s := NewSublist()
r1 := s.Match("foo")
verifyLen(r1.psubs, 0, t)
verifyQLen(r1.qsubs, 0, t)
r2 := s.Match("bar")
verifyLen(r2.psubs, 0, t)
verifyQLen(r2.qsubs, 0, t)
if r1 != r2 {
t.Fatalf("Expected empty result to be a shared result set")
}
}
// -- Benchmarks Setup --
var benchSublistSubs []*subscription
@@ -897,6 +1062,14 @@ func Benchmark______________________SublistInsert(b *testing.B) {
}
}
func Benchmark_______________SublistInsertNoCache(b *testing.B) {
s := NewSublistNoCache()
for i, l := 0, len(benchSublistSubs); i < b.N; i++ {
index := i % l
s.Insert(benchSublistSubs[index])
}
}
func benchSublistTokens(b *testing.B, tokens string) {
for i := 0; i < b.N; i++ {
benchSublistSl.Match(tokens)
@@ -1001,7 +1174,7 @@ func Benchmark________________SublistMatchLiteral(b *testing.B) {
func Benchmark_____SublistMatch10kSubsWithNoCache(b *testing.B) {
var nsubs = 512
s := NewSublist()
s := NewSublistNoCache()
subject := "foo"
for i := 0; i < nsubs; i++ {
s.Insert(newSub(subject))
@@ -1012,7 +1185,6 @@ func Benchmark_____SublistMatch10kSubsWithNoCache(b *testing.B) {
if len(r.psubs) != nsubs {
b.Fatalf("Results len is %d, should be %d", len(r.psubs), nsubs)
}
s.cache.Delete(subject)
}
}