Added code to handle cache changes more efficiently

This commit is contained in:
Derek Collison
2012-11-06 16:07:39 -08:00
parent cf04fa9393
commit c3b46f57db

View File

@@ -5,9 +5,19 @@ package gnatsd
import (
"sync"
"github.com/apcera/gnatsd/hash"
"github.com/apcera/gnatsd/hashmap"
)
// A Sublist stores and efficiently retrieves subscriptions. It uses a
// trie structure and an efficient LRU cache to achieve quick lookups.
type Sublist struct {
mu sync.RWMutex
root *level
count uint32
cache *hashmap.HashMap
}
type node struct {
next *level
subs []interface{}
@@ -18,19 +28,14 @@ type level struct {
pwc, fwc *node
}
type Sublist struct {
lck sync.RWMutex
root *level
count uint32
cache *hashmap.HashMap
}
func newNode() *node {
return &node{subs: make([]interface{}, 0, 4)}
}
func newLevel() *level {
return &level{nodes: hashmap.New()}
h := hashmap.New()
h.Hash = hash.FNV1A
return &level{nodes: h}
}
func New() *Sublist {
@@ -46,6 +51,7 @@ var (
_SEP = byte('.')
)
// split will split a subject into tokens
func split(subject []byte, tokens [][]byte) [][]byte {
start := 0
for i, b := range subject {
@@ -61,7 +67,7 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
s.lck.Lock()
s.mu.Lock()
l := s.root
var n *node
@@ -96,21 +102,64 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) {
}
n.subs = append(n.subs, sub)
s.count++
// FIXME: Do something more intelligent here
s.cache = hashmap.New()
s.lck.Unlock()
s.addToCache(subject, sub)
s.mu.Unlock()
}
// addToCache will add the new entry to existing cache
// entries if needed.
func (s *Sublist) addToCache(subject []byte, sub interface{}) {
if s.cache.Count() == 0 {
return
}
all := s.cache.AllKeys()
for _, k := range all {
if !matchLiteral(k, subject) {
continue
}
r := s.cache.Get(k)
if r == nil {
continue
}
res := r.([]interface{})
res = append(res, sub)
s.cache.Set(k, res)
}
}
// removeFromCache will remove the sub from any active cache entries
func (s *Sublist) removeFromCache(subject []byte, sub interface{}) {
if s.cache.Count() == 0 {
return
}
all := s.cache.AllKeys()
for _, k := range all {
if !matchLiteral(k, subject) {
continue
}
r := s.cache.Get(k)
if r == nil {
continue
}
s.cache.Remove(k)
}
}
// Match will match all entries to the literal subject. It will return a
// slice of results.
func (s *Sublist) Match(subject []byte) []interface{} {
s.lck.RLock()
s.mu.RLock()
r := s.cache.Get(subject)
s.lck.RUnlock()
s.mu.RUnlock()
if r != nil {
return r.([]interface{})
return r.([] interface{})
}
// Cache miss
// Process subject into tokens, this is performed
// unlocked, so can be parallel.
tsa := [32][]byte{}
toks := tsa[:0]
@@ -124,13 +173,22 @@ func (s *Sublist) Match(subject []byte) []interface{} {
toks = append(toks, subject[start:])
results := make([]interface{}, 0, 4)
s.lck.Lock()
// Lookup and add entry to hash.
s.mu.Lock()
matchLevel(s.root, toks, &results)
// FIXME: This can overwhelm memory, but can't find a fast enough solution.
// LRU is too slow, LFU and time.Now() is also too slow. Can try PLRU or
// possible 2-way, although 2-way also uses time. We could have a go routine
// for getting time, then on a fetch we just &timeNow in an atomic way.
s.cache.Set(subject, results)
s.lck.Unlock()
s.mu.Unlock()
return results
}
// matchLevel is used to recursively descend into the trie when there
// is a cache miss.
func matchLevel(l *level, toks [][]byte, results *[]interface{}) {
var pwc, n *node
for i, t := range toks {
@@ -163,17 +221,20 @@ func matchLevel(l *level, toks [][]byte, results *[]interface{}) {
return
}
// lnt is used to track descent into a removal for pruning.
type lnt struct {
l *level
n *node
t []byte
}
// Remove will remove any item associated with key. It will track descent
// into the trie and prune upon successful removal.
func (s *Sublist) Remove(subject []byte, sub interface{}) {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
s.lck.Lock()
s.mu.Lock()
l := s.root
var n *node
@@ -182,7 +243,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) {
for _, t := range toks {
if l == nil {
s.lck.Unlock()
s.mu.Unlock()
return
}
switch t[0] {
@@ -205,7 +266,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) {
}
}
if !s.removeFromNode(n, sub) {
s.lck.Unlock()
s.mu.Unlock()
return
}
for i := len(levels) - 1; i >= 0; i-- {
@@ -214,9 +275,8 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) {
l.pruneNode(n, t)
}
}
// FIXME: Do something more intelligent here
s.cache = hashmap.New()
s.lck.Unlock()
s.removeFromCache(subject, sub)
s.mu.Unlock()
}
func (l *level) pruneNode(n *node, t []byte) {
@@ -241,6 +301,7 @@ func (n *node) isEmpty() bool {
return false
}
// Return the number of nodes for the given level.
func (l *level) numNodes() uint32 {
num := l.nodes.Count()
if l.pwc != nil {
@@ -252,6 +313,7 @@ func (l *level) numNodes() uint32 {
return num
}
// Remove the sub for the given node.
func (s *Sublist) removeFromNode(n *node, sub interface{}) bool {
if n == nil {
return false
@@ -269,8 +331,40 @@ func (s *Sublist) removeFromNode(n *node, sub interface{}) bool {
return false
}
// matchLiteral is used to test literal subjects, those that do not have any
// wildcards, with a target subject. This is used in the cache layer.
func matchLiteral(literal, subject []byte) bool {
li := 0
for _, b := range(subject) {
if li >= len(literal) {
return false
}
switch b {
case _PWC:
// Skip token
for {
if li >= len(literal) || literal[li] == _SEP {
break;
}
li += 1
}
case _FWC:
return true
default:
if b != literal[li] {
return false
}
}
li += 1
}
return true
}
// Count return the number of stored items in the HashMap.
func (s *Sublist) Count() uint32 { return s.count }
// DebugNumLevels will return the number of levels contained in
// the HashMap.
func (s *Sublist) DebugNumLevels() int {
return visitLevel(s.root, 0)
}