Added locking and simple cache mechanism

This commit is contained in:
Derek Collison
2012-10-31 07:25:39 -07:00
parent 01885a9106
commit c7dbb8bd0e
2 changed files with 157 additions and 8 deletions

View File

@@ -3,10 +3,8 @@
package gnatsd
import (
// "bytes"
// "fmt"
"sync"
// "github.com/apcera/gnatsd/hash"
"github.com/apcera/gnatsd/hashmap"
)
@@ -21,8 +19,10 @@ type level struct {
}
type Sublist struct {
lck sync.RWMutex
root *level
count uint32
cache *hashmap.HashMap
}
func newNode() *node {
@@ -34,7 +34,10 @@ func newLevel() *level {
}
func New() *Sublist {
return &Sublist{root: newLevel()}
return &Sublist{
root: newLevel(),
cache: hashmap.New(),
}
}
var (
@@ -58,6 +61,7 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
s.lck.Lock()
l := s.root
var n *node
@@ -74,7 +78,6 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) {
n = v.(*node)
}
}
if n == nil {
n = newNode()
switch t[0] {
@@ -93,14 +96,38 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) {
}
n.subs = append(n.subs, sub)
s.count++
// FIXME: Do something more intelligent here
s.cache = hashmap.New()
s.lck.Unlock()
}
func (s *Sublist) Match(subject []byte) []interface{} {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
// FIXME: Let them pass in?
s.lck.RLock()
r := s.cache.Get(subject)
s.lck.RUnlock()
if r != nil {
return r.([]interface{})
}
tsa := [32][]byte{}
toks := tsa[:0]
start := 0
for i, b := range subject {
if b == _SEP {
toks = append(toks, subject[start:i])
start = i + 1
}
}
toks = append(toks, subject[start:])
results := make([]interface{}, 0, 4)
s.lck.Lock()
matchLevel(s.root, toks, &results)
s.cache.Set(subject, results)
s.lck.Unlock()
return results
}
@@ -145,6 +172,8 @@ type lnt struct {
func (s *Sublist) Remove(subject []byte, sub interface{}) {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
s.lck.Lock()
l := s.root
var n *node
@@ -153,6 +182,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) {
for _, t := range toks {
if l == nil {
s.lck.Unlock()
return
}
switch t[0] {
@@ -175,6 +205,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) {
}
}
if !s.removeFromNode(n, sub) {
s.lck.Unlock()
return
}
for i := len(levels) - 1; i >= 0; i-- {
@@ -183,6 +214,9 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) {
l.pruneNode(n, t)
}
}
// FIXME: Do something more intelligent here
s.cache = hashmap.New()
s.lck.Unlock()
}
func (l *level) pruneNode(n *node, t []byte) {

View File

@@ -2,7 +2,10 @@ package gnatsd
import (
"bytes"
"runtime"
"strings"
"testing"
"time"
)
func verifyCount(s *Sublist, count uint32, t *testing.T) {
@@ -168,4 +171,116 @@ func TestRemoveCleanupWildcards(t *testing.T) {
verifyNumLevels(s, 0, t)
}
func TestCacheBehavior(t *testing.T) {
s := New()
literal := []byte("a.b.c")
fwc := []byte("a.>")
a, b := "a", "b"
s.Insert(literal, a)
r := s.Match(literal)
verifyLen(r, 1, t)
s.Insert(fwc, b)
r = s.Match(literal)
verifyLen(r, 2, t)
verifyMember(r, a, t)
verifyMember(r, b, t)
s.Remove(fwc, b)
r = s.Match(literal)
verifyLen(r, 1, t)
verifyMember(r, a, t)
}
var subs [][]byte
var toks = []string{"apcera", "continuum", "component", "router", "api", "imgr", "jmgr", "auth"}
var sl = New()
var results = make([]interface{}, 0, 64)
func init() {
subs = make([][]byte, 0, 256*1024)
subsInit("")
for i := 0; i < len(subs); i++ {
sl.Insert(subs[i], subs[i])
}
addWildcards()
println("Sublist holding ", sl.Count(), " subscriptions")
}
func subsInit(pre string) {
var sub string
for _, t := range toks {
if len(pre) > 0 {
sub = pre + "." + t
} else {
sub = t
}
subs = append(subs, []byte(sub))
if (len(strings.Split(sub, ".")) < 5) {
subsInit(sub)
}
}
}
func addWildcards() {
sl.Insert([]byte("cloud.>"), "paas")
sl.Insert([]byte("cloud.continuum.component.>"), "health")
sl.Insert([]byte("cloud.*.*.router.*"), "traffic")
}
func Benchmark______________________Insert(b *testing.B) {
b.SetBytes(1)
s := New()
for i, l := 0, len(subs); i < b.N; i++ {
index := i % l
s.Insert(subs[index], subs[index])
}
}
func Benchmark____________MatchSingleToken(b *testing.B) {
b.SetBytes(1)
s := []byte("apcera")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark______________MatchTwoTokens(b *testing.B) {
b.SetBytes(1)
s := []byte("apcera.continuum")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark_MatchFourTokensSingleResult(b *testing.B) {
b.SetBytes(1)
s := []byte("apcera.continuum.component.router")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark_MatchFourTokensMultiResults(b *testing.B) {
b.SetBytes(1)
s := []byte("cloud.continuum.component.router")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark_______MissOnLastTokenOfFive(b *testing.B) {
b.SetBytes(1)
s := []byte("apcera.continuum.component.router.ZZZZ")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func _BenchmarkRSS(b *testing.B) {
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
println("HEAP:", m.HeapObjects)
println("ALLOC:", m.Alloc)
println("TOTAL ALLOC:", m.TotalAlloc)
time.Sleep(30 * 1e9)
}