diff --git a/sublist.go b/sublist.go index c106376c..a06b3876 100644 --- a/sublist.go +++ b/sublist.go @@ -3,10 +3,8 @@ package gnatsd import ( - // "bytes" - // "fmt" + "sync" - // "github.com/apcera/gnatsd/hash" "github.com/apcera/gnatsd/hashmap" ) @@ -21,8 +19,10 @@ type level struct { } type Sublist struct { + lck sync.RWMutex root *level count uint32 + cache *hashmap.HashMap } func newNode() *node { @@ -34,7 +34,10 @@ func newLevel() *level { } func New() *Sublist { - return &Sublist{root: newLevel()} + return &Sublist{ + root: newLevel(), + cache: hashmap.New(), + } } var ( @@ -58,6 +61,7 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) { tsa := [16][]byte{} toks := split(subject, tsa[:0]) + s.lck.Lock() l := s.root var n *node @@ -74,7 +78,6 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) { n = v.(*node) } } - if n == nil { n = newNode() switch t[0] { @@ -93,14 +96,38 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) { } n.subs = append(n.subs, sub) s.count++ + // FIXME: Do something more intelligent here + s.cache = hashmap.New() + s.lck.Unlock() } func (s *Sublist) Match(subject []byte) []interface{} { - tsa := [16][]byte{} - toks := split(subject, tsa[:0]) - // FIXME: Let them pass in? + + s.lck.RLock() + r := s.cache.Get(subject) + s.lck.RUnlock() + + if r != nil { + return r.([]interface{}) + } + + tsa := [32][]byte{} + toks := tsa[:0] + + start := 0 + for i, b := range subject { + if b == _SEP { + toks = append(toks, subject[start:i]) + start = i + 1 + } + } + toks = append(toks, subject[start:]) results := make([]interface{}, 0, 4) + + s.lck.Lock() matchLevel(s.root, toks, &results) + s.cache.Set(subject, results) + s.lck.Unlock() return results } @@ -145,6 +172,8 @@ type lnt struct { func (s *Sublist) Remove(subject []byte, sub interface{}) { tsa := [16][]byte{} toks := split(subject, tsa[:0]) + + s.lck.Lock() l := s.root var n *node @@ -153,6 +182,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) { for _, t := range toks { if l == nil { + s.lck.Unlock() return } switch t[0] { @@ -175,6 +205,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) { } } if !s.removeFromNode(n, sub) { + s.lck.Unlock() return } for i := len(levels) - 1; i >= 0; i-- { @@ -183,6 +214,9 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) { l.pruneNode(n, t) } } + // FIXME: Do something more intelligent here + s.cache = hashmap.New() + s.lck.Unlock() } func (l *level) pruneNode(n *node, t []byte) { diff --git a/sublist_test.go b/sublist_test.go index aed6e6c4..f75ccbd0 100644 --- a/sublist_test.go +++ b/sublist_test.go @@ -2,7 +2,10 @@ package gnatsd import ( "bytes" + "runtime" + "strings" "testing" + "time" ) func verifyCount(s *Sublist, count uint32, t *testing.T) { @@ -168,4 +171,116 @@ func TestRemoveCleanupWildcards(t *testing.T) { verifyNumLevels(s, 0, t) } +func TestCacheBehavior(t *testing.T) { + s := New() + literal := []byte("a.b.c") + fwc := []byte("a.>") + a, b := "a", "b" + s.Insert(literal, a) + r := s.Match(literal) + verifyLen(r, 1, t) + s.Insert(fwc, b) + r = s.Match(literal) + verifyLen(r, 2, t) + verifyMember(r, a, t) + verifyMember(r, b, t) + s.Remove(fwc, b) + r = s.Match(literal) + verifyLen(r, 1, t) + verifyMember(r, a, t) +} +var subs [][]byte +var toks = []string{"apcera", "continuum", "component", "router", "api", "imgr", "jmgr", "auth"} +var sl = New() +var results = make([]interface{}, 0, 64) + +func init() { + subs = make([][]byte, 0, 256*1024) + subsInit("") + for i := 0; i < len(subs); i++ { + sl.Insert(subs[i], subs[i]) + } + addWildcards() + println("Sublist holding ", sl.Count(), " subscriptions") +} + +func subsInit(pre string) { + var sub string + for _, t := range toks { + if len(pre) > 0 { + sub = pre + "." + t + } else { + sub = t + } + subs = append(subs, []byte(sub)) + if (len(strings.Split(sub, ".")) < 5) { + subsInit(sub) + } + } +} + +func addWildcards() { + sl.Insert([]byte("cloud.>"), "paas") + sl.Insert([]byte("cloud.continuum.component.>"), "health") + sl.Insert([]byte("cloud.*.*.router.*"), "traffic") +} + +func Benchmark______________________Insert(b *testing.B) { + b.SetBytes(1) + s := New() + for i, l := 0, len(subs); i < b.N; i++ { + index := i % l + s.Insert(subs[index], subs[index]) + } +} + +func Benchmark____________MatchSingleToken(b *testing.B) { + b.SetBytes(1) + s := []byte("apcera") + for i := 0; i < b.N; i++ { + sl.Match(s) + } +} + +func Benchmark______________MatchTwoTokens(b *testing.B) { + b.SetBytes(1) + s := []byte("apcera.continuum") + for i := 0; i < b.N; i++ { + sl.Match(s) + } +} + +func Benchmark_MatchFourTokensSingleResult(b *testing.B) { + b.SetBytes(1) + s := []byte("apcera.continuum.component.router") + for i := 0; i < b.N; i++ { + sl.Match(s) + } +} + +func Benchmark_MatchFourTokensMultiResults(b *testing.B) { + b.SetBytes(1) + s := []byte("cloud.continuum.component.router") + for i := 0; i < b.N; i++ { + sl.Match(s) + } +} + +func Benchmark_______MissOnLastTokenOfFive(b *testing.B) { + b.SetBytes(1) + s := []byte("apcera.continuum.component.router.ZZZZ") + for i := 0; i < b.N; i++ { + sl.Match(s) + } +} + +func _BenchmarkRSS(b *testing.B) { + runtime.GC() + var m runtime.MemStats + runtime.ReadMemStats(&m) + println("HEAP:", m.HeapObjects) + println("ALLOC:", m.Alloc) + println("TOTAL ALLOC:", m.TotalAlloc) + time.Sleep(30 * 1e9) +}