From c3b46f57dbdfcd759a026dfb4339644b3cc1dddb Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 6 Nov 2012 16:07:39 -0800 Subject: [PATCH] Added code to handle cache changes more efficiently --- sublist.go | 140 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 117 insertions(+), 23 deletions(-) diff --git a/sublist.go b/sublist.go index a06b3876..c7df354c 100644 --- a/sublist.go +++ b/sublist.go @@ -5,9 +5,19 @@ package gnatsd import ( "sync" + "github.com/apcera/gnatsd/hash" "github.com/apcera/gnatsd/hashmap" ) +// A Sublist stores and efficiently retrieves subscriptions. It uses a +// trie structure and an efficient LRU cache to achieve quick lookups. +type Sublist struct { + mu sync.RWMutex + root *level + count uint32 + cache *hashmap.HashMap +} + type node struct { next *level subs []interface{} @@ -18,19 +28,14 @@ type level struct { pwc, fwc *node } -type Sublist struct { - lck sync.RWMutex - root *level - count uint32 - cache *hashmap.HashMap -} - func newNode() *node { return &node{subs: make([]interface{}, 0, 4)} } func newLevel() *level { - return &level{nodes: hashmap.New()} + h := hashmap.New() + h.Hash = hash.FNV1A + return &level{nodes: h} } func New() *Sublist { @@ -46,6 +51,7 @@ var ( _SEP = byte('.') ) +// split will split a subject into tokens func split(subject []byte, tokens [][]byte) [][]byte { start := 0 for i, b := range subject { @@ -61,7 +67,7 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) { tsa := [16][]byte{} toks := split(subject, tsa[:0]) - s.lck.Lock() + s.mu.Lock() l := s.root var n *node @@ -96,21 +102,64 @@ func (s *Sublist) Insert(subject []byte, sub interface{}) { } n.subs = append(n.subs, sub) s.count++ - // FIXME: Do something more intelligent here - s.cache = hashmap.New() - s.lck.Unlock() + s.addToCache(subject, sub) + s.mu.Unlock() } +// addToCache will add the new entry to existing cache +// entries if needed. +func (s *Sublist) addToCache(subject []byte, sub interface{}) { + if s.cache.Count() == 0 { + return + } + all := s.cache.AllKeys() + for _, k := range all { + if !matchLiteral(k, subject) { + continue + } + r := s.cache.Get(k) + if r == nil { + continue + } + res := r.([]interface{}) + res = append(res, sub) + s.cache.Set(k, res) + } +} + +// removeFromCache will remove the sub from any active cache entries +func (s *Sublist) removeFromCache(subject []byte, sub interface{}) { + if s.cache.Count() == 0 { + return + } + all := s.cache.AllKeys() + for _, k := range all { + if !matchLiteral(k, subject) { + continue + } + r := s.cache.Get(k) + if r == nil { + continue + } + s.cache.Remove(k) + } +} + +// Match will match all entries to the literal subject. It will return a +// slice of results. func (s *Sublist) Match(subject []byte) []interface{} { - s.lck.RLock() + s.mu.RLock() r := s.cache.Get(subject) - s.lck.RUnlock() + s.mu.RUnlock() if r != nil { - return r.([]interface{}) + return r.([] interface{}) } + // Cache miss + // Process subject into tokens, this is performed + // unlocked, so can be parallel. tsa := [32][]byte{} toks := tsa[:0] @@ -124,13 +173,22 @@ func (s *Sublist) Match(subject []byte) []interface{} { toks = append(toks, subject[start:]) results := make([]interface{}, 0, 4) - s.lck.Lock() + // Lookup and add entry to hash. + s.mu.Lock() matchLevel(s.root, toks, &results) + + // FIXME: This can overwhelm memory, but can't find a fast enough solution. + // LRU is too slow, LFU and time.Now() is also too slow. Can try PLRU or + // possible 2-way, although 2-way also uses time. We could have a go routine + // for getting time, then on a fetch we just &timeNow in an atomic way. s.cache.Set(subject, results) - s.lck.Unlock() + s.mu.Unlock() + return results } +// matchLevel is used to recursively descend into the trie when there +// is a cache miss. func matchLevel(l *level, toks [][]byte, results *[]interface{}) { var pwc, n *node for i, t := range toks { @@ -163,17 +221,20 @@ func matchLevel(l *level, toks [][]byte, results *[]interface{}) { return } +// lnt is used to track descent into a removal for pruning. type lnt struct { l *level n *node t []byte } +// Remove will remove any item associated with key. It will track descent +// into the trie and prune upon successful removal. func (s *Sublist) Remove(subject []byte, sub interface{}) { tsa := [16][]byte{} toks := split(subject, tsa[:0]) - s.lck.Lock() + s.mu.Lock() l := s.root var n *node @@ -182,7 +243,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) { for _, t := range toks { if l == nil { - s.lck.Unlock() + s.mu.Unlock() return } switch t[0] { @@ -205,7 +266,7 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) { } } if !s.removeFromNode(n, sub) { - s.lck.Unlock() + s.mu.Unlock() return } for i := len(levels) - 1; i >= 0; i-- { @@ -214,9 +275,8 @@ func (s *Sublist) Remove(subject []byte, sub interface{}) { l.pruneNode(n, t) } } - // FIXME: Do something more intelligent here - s.cache = hashmap.New() - s.lck.Unlock() + s.removeFromCache(subject, sub) + s.mu.Unlock() } func (l *level) pruneNode(n *node, t []byte) { @@ -241,6 +301,7 @@ func (n *node) isEmpty() bool { return false } +// Return the number of nodes for the given level. func (l *level) numNodes() uint32 { num := l.nodes.Count() if l.pwc != nil { @@ -252,6 +313,7 @@ func (l *level) numNodes() uint32 { return num } +// Remove the sub for the given node. func (s *Sublist) removeFromNode(n *node, sub interface{}) bool { if n == nil { return false @@ -269,8 +331,40 @@ func (s *Sublist) removeFromNode(n *node, sub interface{}) bool { return false } +// matchLiteral is used to test literal subjects, those that do not have any +// wildcards, with a target subject. This is used in the cache layer. +func matchLiteral(literal, subject []byte) bool { + li := 0 + for _, b := range(subject) { + if li >= len(literal) { + return false + } + switch b { + case _PWC: + // Skip token + for { + if li >= len(literal) || literal[li] == _SEP { + break; + } + li += 1 + } + case _FWC: + return true + default: + if b != literal[li] { + return false + } + } + li += 1 + } + return true +} + +// Count return the number of stored items in the HashMap. func (s *Sublist) Count() uint32 { return s.count } +// DebugNumLevels will return the number of levels contained in +// the HashMap. func (s *Sublist) DebugNumLevels() int { return visitLevel(s.root, 0) }