diff --git a/sublist.go b/sublist.go index 052ee176..9da76028 100644 --- a/sublist.go +++ b/sublist.go @@ -4,6 +4,7 @@ package gnatsd import ( "sync" + "time" "github.com/apcera/gnatsd/hash" "github.com/apcera/gnatsd/hashmap" @@ -17,6 +18,15 @@ type Sublist struct { count uint32 cache *hashmap.HashMap cmax int + stats stats +} + +type stats struct { + inserts uint64 + removes uint64 + matches uint64 + cacheHits uint64 + since time.Time } // A node contains subscriptions and a pointer to the next level. @@ -54,6 +64,7 @@ func New() *Sublist { root: newLevel(), cache: hashmap.New(), cmax: defaultCacheMax, + stats: stats{since: time.Now()}, } } @@ -115,6 +126,7 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) { } n.subs = append(n.subs, sub) s.count++ + s.stats.inserts++ s.addToCache(subject, sub) s.mu.Unlock() } @@ -162,10 +174,12 @@ func (s *Sublist) removeFromCache(subject []byte, sub interface{}) { // slice of results. func (s *Sublist) Match(subject []byte) []interface{} { s.mu.RLock() + s.stats.matches++ r := s.cache.Get(subject) s.mu.RUnlock() if r != nil { + s.stats.cacheHits++ return r.([]interface{}) } @@ -282,6 +296,10 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) { s.mu.Unlock() return } + + s.count-- + s.stats.removes++ + for i := len(levels) - 1; i >= 0; i-- { l, n, t := levels[i].l, levels[i].n, levels[i].t if n.isEmpty() { @@ -336,7 +354,6 @@ func (s *Sublist) removeFromNode(n *node, sub interface{}) bool { } for i, v := range n.subs { if v == sub { - s.count-- num := len(n.subs) a := n.subs copy(a[i:num-1], a[i+1:num]) @@ -357,9 +374,11 @@ func matchLiteral(literal, subject []byte) bool { } switch b { case _PWC: - // Skip token + // Skip token in literal + ll := len(literal) for { - if li >= len(literal) || literal[li] == _SEP { + if li >= ll || literal[li] == _SEP { + li -= 1 break } li += 1 @@ -379,6 +398,45 @@ func matchLiteral(literal, subject []byte) bool { // Count return the number of stored items in the HashMap. func (s *Sublist) Count() uint32 { return s.count } +// Stats for the sublist +type Stats struct { + NumSubs uint32 + NumCache uint32 + NumInserts uint64 + NumRemoves uint64 + NumMatches uint64 + CacheHitRate float64 + MaxFanout uint32 + AvgFanout float64 + StatsTime time.Time +} + +// Stats will return a stats structure for the current state. +func (s *Sublist) Stats() *Stats { + st := &Stats{} + st.NumSubs = s.count + st.NumCache = s.cache.Count() + st.NumInserts = s.stats.inserts + st.NumRemoves = s.stats.removes + st.NumMatches = s.stats.matches + st.CacheHitRate = float64(s.stats.cacheHits) / float64(s.stats.matches) + // whip through cache for fanout stats + // FIXME, creating all each time could be expensive, should do a cb version. + tot, max := 0, 0 + all := s.cache.All() + for _, r := range all { + l := len(r.([]interface{})) + tot += l + if l > max { + max = l + } + } + st.MaxFanout = uint32(max) + st.AvgFanout = float64(tot) / float64(len(all)) + st.StatsTime = s.stats.since + return st +} + // numLevels will return the maximum number of levels // contained in the Sublist tree. func (s *Sublist) numLevels() int { diff --git a/sublist_test.go b/sublist_test.go index 6ecd0bd4..1d2adfeb 100644 --- a/sublist_test.go +++ b/sublist_test.go @@ -207,6 +207,8 @@ func TestMatchLiterals(t *testing.T) { checkBool(matchLiteral([]byte("foo.bar"), []byte(">")), true, t) checkBool(matchLiteral([]byte("foo.bar"), []byte("foo.>")), true, t) checkBool(matchLiteral([]byte("foo.bar"), []byte("bar.>")), false, t) + checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.>")), true, t) + checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.*.*")), true, t) } func TestCacheBounds(t *testing.T) { @@ -216,16 +218,67 @@ func TestCacheBounds(t *testing.T) { tmpl := "cache.test.%d" loop := s.cmax + 100 - for i := 0; i < loop ; i++ { + for i := 0; i < loop; i++ { sub := []byte(fmt.Sprintf(tmpl, i)) s.Match(sub) } cs := int(s.cache.Count()) - if cs > s.cmax { + if cs > s.cmax { t.Fatalf("Cache is growing past limit: %d vs %d\n", cs, s.cmax) } } +func TestStats(t *testing.T) { + s := New() + s.Insert([]byte("stats.>"), "fwc") + tmpl := "stats.test.%d" + loop := 255 + total := uint32(loop+1) + + for i := 0; i < loop ; i++ { + sub := []byte(fmt.Sprintf(tmpl, i)) + s.Insert(sub, "l") + } + + stats := s.Stats() + if time.Since(stats.StatsTime) > 50*time.Millisecond { + t.Fatalf("StatsTime seems incorrect: %+v\n", stats.StatsTime) + } + if stats.NumSubs != total { + t.Fatalf("Wrong stats for NumSubs: %d vs %d\n", stats.NumSubs, total) + } + if stats.NumInserts != uint64(total) { + t.Fatalf("Wrong stats for NumInserts: %d vs %d\n", stats.NumInserts, total) + } + if stats.NumRemoves != 0 { + t.Fatalf("Wrong stats for NumRemoves: %d vs %d\n", stats.NumRemoves, 0) + } + if stats.NumMatches != 0 { + t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, 0) + } + + for i := 0; i < loop ; i++ { + s.Match([]byte("stats.test.22")) + } + s.Insert([]byte("stats.*.*"), "pwc") + s.Match([]byte("stats.test.22")) + + stats = s.Stats() + if stats.NumMatches != uint64(loop+1) { + t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, loop+1) + } + expectedCacheHitRate := 255.0/256.0 + if stats.CacheHitRate != expectedCacheHitRate { + t.Fatalf("Wrong stats for CacheHitRate: %.3g vs %0.3g\n", stats.CacheHitRate, expectedCacheHitRate) + } + if stats.MaxFanout != 3 { + t.Fatalf("Wrong stats for MaxFanout: %d vs %d\n", stats.MaxFanout, 3) + } + if stats.AvgFanout != 2.5 { + t.Fatalf("Wrong stats for MaxFanout: %d vs %d\n", stats.AvgFanout, 2.5) + } +} + // -- Benchmarks Setup -- var subs [][]byte @@ -266,7 +319,6 @@ func addWildcards() { // -- Benchmarks Setup End -- - func Benchmark______________________Insert(b *testing.B) { b.SetBytes(1) s := New()