diff --git a/sublist.go b/sublist.go index c7df354c..4be91eb1 100644 --- a/sublist.go +++ b/sublist.go @@ -16,6 +16,7 @@ type Sublist struct { root *level count uint32 cache *hashmap.HashMap + cmax int } type node struct { @@ -38,13 +39,19 @@ func newLevel() *level { return &level{nodes: h} } +// defaultCacheMax is used to bound limit the frontend cache +const defaultCacheMax = 1024 + +// New will create a default sublist func New() *Sublist { return &Sublist{ root: newLevel(), cache: hashmap.New(), + cmax: defaultCacheMax, } } +// Common byte variables for wildcards and token separator. var ( _PWC = byte('*') _FWC = byte('>') @@ -154,7 +161,7 @@ func (s *Sublist) Match(subject []byte) []interface{} { s.mu.RUnlock() if r != nil { - return r.([] interface{}) + return r.([]interface{}) } // Cache miss @@ -177,10 +184,11 @@ func (s *Sublist) Match(subject []byte) []interface{} { s.mu.Lock() matchLevel(s.root, toks, &results) - // FIXME: This can overwhelm memory, but can't find a fast enough solution. - // LRU is too slow, LFU and time.Now() is also too slow. Can try PLRU or - // possible 2-way, although 2-way also uses time. We could have a go routine - // for getting time, then on a fetch we just &timeNow in an atomic way. + // We use random eviction to bound the size of the cache. + // RR is used for speed purposes here. + if int(s.cache.Count()) >= s.cmax { + s.cache.RemoveRandom() + } s.cache.Set(subject, results) s.mu.Unlock() @@ -344,7 +352,7 @@ func matchLiteral(literal, subject []byte) bool { // Skip token for { if li >= len(literal) || literal[li] == _SEP { - break; + break } li += 1 } diff --git a/sublist_test.go b/sublist_test.go index b9a69f9f..3186d8e1 100644 --- a/sublist_test.go +++ b/sublist_test.go @@ -2,6 +2,7 @@ package gnatsd import ( "bytes" + "fmt" "runtime" "runtime/debug" "strings" @@ -208,6 +209,25 @@ func TestMatchLiterals(t *testing.T) { checkBool(matchLiteral([]byte("foo.bar"), []byte("bar.>")), false, t) } +func TestCacheBounds(t *testing.T) { + s := New() + s.Insert([]byte("cache.>"), "foo") + + tmpl := "cache.test.%d" + loop := s.cmax + 100 + + for i := 0; i < loop ; i++ { + sub := []byte(fmt.Sprintf(tmpl, i)) + s.Match(sub) + } + cs := int(s.cache.Count()) + if cs > s.cmax { + t.Fatalf("Cache is growing past limit: %d vs %d\n", cs, s.cmax) + } +} + +// -- Benchmarks Setup -- + var subs [][]byte var toks = []string{"apcera", "continuum", "component", "router", "api", "imgr", "jmgr", "auth"} var sl = New() @@ -244,6 +264,9 @@ func addWildcards() { sl.Insert([]byte("cloud.*.*.router.*"), "traffic") } +// -- Benchmarks Setup End -- + + func Benchmark______________________Insert(b *testing.B) { b.SetBytes(1) s := New()