mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added stats, fixed matchLiteral bug
This commit is contained in:
64
sublist.go
64
sublist.go
@@ -4,6 +4,7 @@ package gnatsd
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/apcera/gnatsd/hash"
|
||||
"github.com/apcera/gnatsd/hashmap"
|
||||
@@ -17,6 +18,15 @@ type Sublist struct {
|
||||
count uint32
|
||||
cache *hashmap.HashMap
|
||||
cmax int
|
||||
stats stats
|
||||
}
|
||||
|
||||
type stats struct {
|
||||
inserts uint64
|
||||
removes uint64
|
||||
matches uint64
|
||||
cacheHits uint64
|
||||
since time.Time
|
||||
}
|
||||
|
||||
// A node contains subscriptions and a pointer to the next level.
|
||||
@@ -54,6 +64,7 @@ func New() *Sublist {
|
||||
root: newLevel(),
|
||||
cache: hashmap.New(),
|
||||
cmax: defaultCacheMax,
|
||||
stats: stats{since: time.Now()},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +126,7 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) {
|
||||
}
|
||||
n.subs = append(n.subs, sub)
|
||||
s.count++
|
||||
s.stats.inserts++
|
||||
s.addToCache(subject, sub)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
@@ -162,10 +174,12 @@ func (s *Sublist) removeFromCache(subject []byte, sub interface{}) {
|
||||
// slice of results.
|
||||
func (s *Sublist) Match(subject []byte) []interface{} {
|
||||
s.mu.RLock()
|
||||
s.stats.matches++
|
||||
r := s.cache.Get(subject)
|
||||
s.mu.RUnlock()
|
||||
|
||||
if r != nil {
|
||||
s.stats.cacheHits++
|
||||
return r.([]interface{})
|
||||
}
|
||||
|
||||
@@ -282,6 +296,10 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
s.count--
|
||||
s.stats.removes++
|
||||
|
||||
for i := len(levels) - 1; i >= 0; i-- {
|
||||
l, n, t := levels[i].l, levels[i].n, levels[i].t
|
||||
if n.isEmpty() {
|
||||
@@ -336,7 +354,6 @@ func (s *Sublist) removeFromNode(n *node, sub interface{}) bool {
|
||||
}
|
||||
for i, v := range n.subs {
|
||||
if v == sub {
|
||||
s.count--
|
||||
num := len(n.subs)
|
||||
a := n.subs
|
||||
copy(a[i:num-1], a[i+1:num])
|
||||
@@ -357,9 +374,11 @@ func matchLiteral(literal, subject []byte) bool {
|
||||
}
|
||||
switch b {
|
||||
case _PWC:
|
||||
// Skip token
|
||||
// Skip token in literal
|
||||
ll := len(literal)
|
||||
for {
|
||||
if li >= len(literal) || literal[li] == _SEP {
|
||||
if li >= ll || literal[li] == _SEP {
|
||||
li -= 1
|
||||
break
|
||||
}
|
||||
li += 1
|
||||
@@ -379,6 +398,45 @@ func matchLiteral(literal, subject []byte) bool {
|
||||
// Count return the number of stored items in the HashMap.
|
||||
func (s *Sublist) Count() uint32 { return s.count }
|
||||
|
||||
// Stats for the sublist
|
||||
type Stats struct {
|
||||
NumSubs uint32
|
||||
NumCache uint32
|
||||
NumInserts uint64
|
||||
NumRemoves uint64
|
||||
NumMatches uint64
|
||||
CacheHitRate float64
|
||||
MaxFanout uint32
|
||||
AvgFanout float64
|
||||
StatsTime time.Time
|
||||
}
|
||||
|
||||
// Stats will return a stats structure for the current state.
|
||||
func (s *Sublist) Stats() *Stats {
|
||||
st := &Stats{}
|
||||
st.NumSubs = s.count
|
||||
st.NumCache = s.cache.Count()
|
||||
st.NumInserts = s.stats.inserts
|
||||
st.NumRemoves = s.stats.removes
|
||||
st.NumMatches = s.stats.matches
|
||||
st.CacheHitRate = float64(s.stats.cacheHits) / float64(s.stats.matches)
|
||||
// whip through cache for fanout stats
|
||||
// FIXME, creating all each time could be expensive, should do a cb version.
|
||||
tot, max := 0, 0
|
||||
all := s.cache.All()
|
||||
for _, r := range all {
|
||||
l := len(r.([]interface{}))
|
||||
tot += l
|
||||
if l > max {
|
||||
max = l
|
||||
}
|
||||
}
|
||||
st.MaxFanout = uint32(max)
|
||||
st.AvgFanout = float64(tot) / float64(len(all))
|
||||
st.StatsTime = s.stats.since
|
||||
return st
|
||||
}
|
||||
|
||||
// numLevels will return the maximum number of levels
|
||||
// contained in the Sublist tree.
|
||||
func (s *Sublist) numLevels() int {
|
||||
|
||||
@@ -207,6 +207,8 @@ func TestMatchLiterals(t *testing.T) {
|
||||
checkBool(matchLiteral([]byte("foo.bar"), []byte(">")), true, t)
|
||||
checkBool(matchLiteral([]byte("foo.bar"), []byte("foo.>")), true, t)
|
||||
checkBool(matchLiteral([]byte("foo.bar"), []byte("bar.>")), false, t)
|
||||
checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.>")), true, t)
|
||||
checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.*.*")), true, t)
|
||||
}
|
||||
|
||||
func TestCacheBounds(t *testing.T) {
|
||||
@@ -216,16 +218,67 @@ func TestCacheBounds(t *testing.T) {
|
||||
tmpl := "cache.test.%d"
|
||||
loop := s.cmax + 100
|
||||
|
||||
for i := 0; i < loop ; i++ {
|
||||
for i := 0; i < loop; i++ {
|
||||
sub := []byte(fmt.Sprintf(tmpl, i))
|
||||
s.Match(sub)
|
||||
}
|
||||
cs := int(s.cache.Count())
|
||||
if cs > s.cmax {
|
||||
if cs > s.cmax {
|
||||
t.Fatalf("Cache is growing past limit: %d vs %d\n", cs, s.cmax)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStats(t *testing.T) {
|
||||
s := New()
|
||||
s.Insert([]byte("stats.>"), "fwc")
|
||||
tmpl := "stats.test.%d"
|
||||
loop := 255
|
||||
total := uint32(loop+1)
|
||||
|
||||
for i := 0; i < loop ; i++ {
|
||||
sub := []byte(fmt.Sprintf(tmpl, i))
|
||||
s.Insert(sub, "l")
|
||||
}
|
||||
|
||||
stats := s.Stats()
|
||||
if time.Since(stats.StatsTime) > 50*time.Millisecond {
|
||||
t.Fatalf("StatsTime seems incorrect: %+v\n", stats.StatsTime)
|
||||
}
|
||||
if stats.NumSubs != total {
|
||||
t.Fatalf("Wrong stats for NumSubs: %d vs %d\n", stats.NumSubs, total)
|
||||
}
|
||||
if stats.NumInserts != uint64(total) {
|
||||
t.Fatalf("Wrong stats for NumInserts: %d vs %d\n", stats.NumInserts, total)
|
||||
}
|
||||
if stats.NumRemoves != 0 {
|
||||
t.Fatalf("Wrong stats for NumRemoves: %d vs %d\n", stats.NumRemoves, 0)
|
||||
}
|
||||
if stats.NumMatches != 0 {
|
||||
t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, 0)
|
||||
}
|
||||
|
||||
for i := 0; i < loop ; i++ {
|
||||
s.Match([]byte("stats.test.22"))
|
||||
}
|
||||
s.Insert([]byte("stats.*.*"), "pwc")
|
||||
s.Match([]byte("stats.test.22"))
|
||||
|
||||
stats = s.Stats()
|
||||
if stats.NumMatches != uint64(loop+1) {
|
||||
t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, loop+1)
|
||||
}
|
||||
expectedCacheHitRate := 255.0/256.0
|
||||
if stats.CacheHitRate != expectedCacheHitRate {
|
||||
t.Fatalf("Wrong stats for CacheHitRate: %.3g vs %0.3g\n", stats.CacheHitRate, expectedCacheHitRate)
|
||||
}
|
||||
if stats.MaxFanout != 3 {
|
||||
t.Fatalf("Wrong stats for MaxFanout: %d vs %d\n", stats.MaxFanout, 3)
|
||||
}
|
||||
if stats.AvgFanout != 2.5 {
|
||||
t.Fatalf("Wrong stats for MaxFanout: %d vs %d\n", stats.AvgFanout, 2.5)
|
||||
}
|
||||
}
|
||||
|
||||
// -- Benchmarks Setup --
|
||||
|
||||
var subs [][]byte
|
||||
@@ -266,7 +319,6 @@ func addWildcards() {
|
||||
|
||||
// -- Benchmarks Setup End --
|
||||
|
||||
|
||||
func Benchmark______________________Insert(b *testing.B) {
|
||||
b.SetBytes(1)
|
||||
s := New()
|
||||
|
||||
Reference in New Issue
Block a user