Queue subscriber performance

Reworked sublist to sort out normal subscribers from queue subscribers into
a result set that can be cached and easily iterated over.
This commit is contained in:
Derek Collison
2016-04-03 13:04:06 -07:00
parent fcd87b77c2
commit 96d9ce5048
5 changed files with 278 additions and 113 deletions

View File

@@ -65,7 +65,8 @@ type readCache struct {
genid uint64
inMsgs int64
inBytes int64
subs map[string][]*subscription
results map[string]*SublistResult
prand *rand.Rand
}
func (c *client) String() (id string) {
@@ -744,8 +745,8 @@ func (c *client) processMsg(msg []byte) {
srv := c.srv
// Create cache subs map if needed.
if c.cache.subs == nil && srv != nil {
c.cache.subs = make(map[string][]*subscription)
if c.cache.results == nil && srv != nil {
c.cache.results = make(map[string]*SublistResult)
c.cache.genid = atomic.LoadUint64(&srv.sl.genid)
}
@@ -765,7 +766,7 @@ func (c *client) processMsg(msg []byte) {
}
var genid uint64
var r []*subscription
var r *SublistResult
var ok bool
subject := string(c.pa.subject)
@@ -774,19 +775,20 @@ func (c *client) processMsg(msg []byte) {
genid = atomic.LoadUint64(&srv.sl.genid)
}
if genid == c.cache.genid && c.cache.subs != nil {
r, ok = c.cache.subs[subject]
if genid == c.cache.genid && c.cache.results != nil {
r, ok = c.cache.results[subject]
} else {
// reset
c.cache.subs = make(map[string][]*subscription)
c.cache.results = make(map[string]*SublistResult)
c.cache.genid = genid
}
if !ok {
r = srv.sl.Match(subject)
c.cache.subs[subject] = r
c.cache.results[subject] = r
}
if len(r) <= 0 {
// Check for no interest, short circuit if so.
if len(r.psubs) <= 0 && len(r.qsubs) <= 0 {
return
}
@@ -799,7 +801,6 @@ func (c *client) processMsg(msg []byte) {
si := len(msgh)
isRoute := c.typ == ROUTER
var rmap map[string]struct{}
// If we are a route and we have a queue subscription, deliver direct
// since they are sent direct via L2 semantics. If the match is a queue
@@ -814,37 +815,15 @@ func (c *client) processMsg(msg []byte) {
}
}
var qmap map[string][]*subscription
// Used to only send normal subscriptions once across a given route.
var rmap map[string]struct{}
// Loop over all subscriptions that match.
for _, sub := range r {
// Process queue group subscriptions by gathering them all up
// here. We will pick the winners when we are done processing
// all of the subscriptions.
if sub.queue != nil {
// Queue subscriptions handled from routes directly above.
if isRoute {
continue
}
// FIXME(dlc), this can be more efficient
if qmap == nil {
qmap = make(map[string][]*subscription)
}
qname := string(sub.queue)
qsubs := qmap[qname]
if qsubs == nil {
qsubs = make([]*subscription, 0, 4)
}
qsubs = append(qsubs, sub)
qmap[qname] = qsubs
continue
}
// Loop over all normal subscriptions that match.
// Process normal, non-queue group subscriptions.
// If this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing.
// Also enforce 1-Hop.
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// Skip if sourced from a ROUTER and going to another ROUTER.
// This is 1-Hop semantics for ROUTERs.
@@ -870,14 +849,22 @@ func (c *client) processMsg(msg []byte) {
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
if qmap != nil {
for _, qsubs := range qmap {
index := rand.Intn(len(qsubs))
// Now process any queue subs we have if not a route
if !isRoute {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.cache.prand == nil {
c.cache.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
index := c.cache.prand.Intn(len(qsubs))
sub := qsubs[index]
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)

View File

@@ -33,10 +33,10 @@ func TestSplitBufferSubOp(t *testing.T) {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match("foo")
if r == nil || len(r) != 1 {
if r == nil || len(r.psubs) != 1 {
t.Fatalf("Did not match subscription properly: %+v\n", r)
}
sub := r[0]
sub := r.psubs[0]
if !bytes.Equal(sub.subject, []byte("foo")) {
t.Fatalf("Subject did not match expected 'foo' : '%s'\n", sub.subject)
}
@@ -77,7 +77,7 @@ func TestSplitBufferUnsubOp(t *testing.T) {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match("foo")
if r != nil && len(r) != 0 {
if r != nil && len(r.psubs) != 0 {
t.Fatalf("Should be no subscriptions in results: %+v\n", r)
}
}

View File

@@ -7,6 +7,7 @@
package server
import (
"bytes"
"errors"
"strings"
"sync"
@@ -30,6 +31,12 @@ var (
// cacheMax is used to bound limit the frontend cache
const slCacheMax = 1024
// A result structure better optimized for queue subs.
type SublistResult struct {
psubs []*subscription
qsubs [][]*subscription // don't make this a map, too expensive to iterate
}
// A Sublist stores and efficiently retrieves subscriptions.
type Sublist struct {
sync.RWMutex
@@ -38,15 +45,16 @@ type Sublist struct {
cacheHits uint64
inserts uint64
removes uint64
cache map[string][]*subscription
cache map[string]*SublistResult
root *level
count uint32
}
// A node contains subscriptions and a pointer to the next level.
type node struct {
next *level
subs []*subscription
next *level
psubs []*subscription
qsubs [][]*subscription
}
// A level represents a group of nodes and special pointers to
@@ -58,7 +66,7 @@ type level struct {
// Create a new default node.
func newNode() *node {
return &node{subs: make([]*subscription, 0, 4)}
return &node{psubs: make([]*subscription, 0, 4)}
}
// Create a new default level. We use FNV1A as the hash
@@ -69,7 +77,7 @@ func newLevel() *level {
// New will create a default sublist
func NewSublist() *Sublist {
return &Sublist{root: newLevel(), cache: make(map[string][]*subscription)}
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
}
// Insert adds a subscription into the sublist
@@ -124,7 +132,17 @@ func (s *Sublist) Insert(sub *subscription) error {
}
l = n.next
}
n.subs = append(n.subs, sub)
if sub.queue == nil {
n.psubs = append(n.psubs, sub)
} else {
// This is a queue subscription
if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
n.qsubs[i] = append(n.qsubs[i], sub)
} else {
n.qsubs = append(n.qsubs, []*subscription{sub})
}
}
s.count++
s.inserts++
@@ -135,15 +153,34 @@ func (s *Sublist) Insert(sub *subscription) error {
return nil
}
// Deep copy
func copyResult(r *SublistResult) *SublistResult {
nr := &SublistResult{}
nr.psubs = append([]*subscription(nil), r.psubs...)
for _, qr := range r.qsubs {
nqr := append([]*subscription(nil), qr...)
nr.qsubs = append(nr.qsubs, nqr)
}
return nr
}
// addToCache will add the new entry to existing cache
// entries if needed. Assumes write lock is held.
func (s *Sublist) addToCache(subject string, sub *subscription) {
for k, results := range s.cache {
for k, r := range s.cache {
if matchLiteral(k, subject) {
// Copy since others may have a reference.
nr := make([]*subscription, len(results), len(results)+1)
copy(nr, results)
s.cache[k] = append(nr, sub)
nr := copyResult(r)
if sub.queue == nil {
nr.psubs = append(nr.psubs, sub)
} else {
if i := findQSliceForSub(sub, nr.qsubs); i >= 0 {
nr.qsubs[i] = append(nr.qsubs[i], sub)
} else {
nr.qsubs = append(nr.qsubs, []*subscription{sub})
}
}
s.cache[k] = nr
}
}
}
@@ -162,10 +199,8 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) {
}
// Match will match all entries to the literal subject.
// It will return a slice of results.
// Note that queue subscribers will only have one member selected
// and returned for each queue group.
func (s *Sublist) Match(subject string) []*subscription {
// It will return a set of results for both normal and queue subscribers.
func (s *Sublist) Match(subject string) *SublistResult {
s.RLock()
atomic.AddUint64(&s.matches, 1)
rc, ok := s.cache[subject]
@@ -186,16 +221,14 @@ func (s *Sublist) Match(subject string) []*subscription {
}
tokens = append(tokens, subject[start:])
// FIXME(dlc) - Make pool?
results := []*subscription{}
s.RLock()
results = matchLevel(s.root, tokens, results)
s.RUnlock()
// FIXME(dlc) - Make shared pool between sublist and client readLoop?
result := &SublistResult{}
s.Lock()
matchLevel(s.root, tokens, result)
// Add to our cache
s.cache[subject] = results
s.cache[subject] = result
// Bound the number of entries to sublistMaxCache
if len(s.cache) > slCacheMax {
for k, _ := range s.cache {
@@ -205,21 +238,53 @@ func (s *Sublist) Match(subject string) []*subscription {
}
s.Unlock()
return results
return result
}
// This will add in a node's results to the total results.
func addNodeToResults(n *node, results *SublistResult) {
results.psubs = append(results.psubs, n.psubs...)
for _, qr := range n.qsubs {
if len(qr) == 0 {
continue
}
// Need to find matching list in results
if i := findQSliceForSub(qr[0], results.qsubs); i >= 0 {
results.qsubs[i] = append(results.qsubs[i], qr...)
} else {
results.qsubs = append(results.qsubs, qr)
}
}
}
// We do not use a map here since we want iteration to be past when
// processing publishes in L1 on client. So we need to walk sequentially
// for now. Keep an eye on this in case we start getting large number of
// different queue subscribers for the same subject.
func findQSliceForSub(sub *subscription, qsl [][]*subscription) int {
if sub.queue == nil {
return -1
}
for i, qr := range qsl {
if len(qr) > 0 && bytes.Equal(sub.queue, qr[0].queue) {
return i
}
}
return -1
}
// matchLevel is used to recursively descend into the trie.
func matchLevel(l *level, toks []string, results []*subscription) []*subscription {
func matchLevel(l *level, toks []string, results *SublistResult) {
var pwc, n *node
for i, t := range toks {
if l == nil {
return results
return
}
if l.fwc != nil {
results = append(results, l.fwc.subs...)
addNodeToResults(l.fwc, results)
}
if pwc = l.pwc; pwc != nil {
results = matchLevel(pwc.next, toks[i+1:], results)
matchLevel(pwc.next, toks[i+1:], results)
}
n = l.nodes[t]
if n != nil {
@@ -229,12 +294,11 @@ func matchLevel(l *level, toks []string, results []*subscription) []*subscriptio
}
}
if n != nil {
results = append(results, n.subs...)
addNodeToResults(n, results)
}
if pwc != nil {
results = append(results, pwc.subs...)
addNodeToResults(pwc, results)
}
return results
}
// lnt is used to track descent into levels for a removal for pruning.
@@ -328,7 +392,7 @@ func (l *level) pruneNode(n *node, t string) {
// isEmpty will test if the node has any entries. Used
// in pruning.
func (n *node) isEmpty() bool {
if len(n.subs) == 0 {
if len(n.psubs) == 0 && len(n.qsubs) == 0 {
if n.next == nil || n.next.numNodes() == 0 {
return true
}
@@ -348,20 +412,43 @@ func (l *level) numNodes() int {
return num
}
// Removes a sub from a list.
func removeSubFromList(sub *subscription, sl []*subscription) ([]*subscription, bool) {
for i := 0; i < len(sl); i++ {
if sl[i] == sub {
last := len(sl) - 1
sl[i] = sl[last]
sl[last] = nil
sl = sl[:last]
return shrinkAsNeeded(sl), true
}
}
return sl, false
}
// Remove the sub for the given node.
func (s *Sublist) removeFromNode(n *node, sub *subscription) bool {
func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) {
if n == nil {
return false
}
sl := n.subs
for i := 0; i < len(sl); i++ {
if sl[i] == sub {
sl[i] = sl[len(sl)-1]
sl[len(sl)-1] = nil
sl = sl[:len(sl)-1]
n.subs = shrinkAsNeeded(sl)
return true
if sub.queue == nil {
n.psubs, found = removeSubFromList(sub, n.psubs)
return found
}
// We have a queue group subscription here
if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
n.qsubs[i], found = removeSubFromList(sub, n.qsubs[i])
if len(n.qsubs[i]) == 0 {
last := len(n.qsubs) - 1
n.qsubs[i] = n.qsubs[last]
n.qsubs[last] = nil
n.qsubs = n.qsubs[:last]
if len(n.qsubs) == 0 {
n.qsubs = nil
}
}
return found
}
return false
}
@@ -424,8 +511,8 @@ func (s *Sublist) Stats() *SublistStats {
}
// whip through cache for fanout stats
tot, max := 0, 0
for _, results := range s.cache {
l := len(results)
for _, r := range s.cache {
l := len(r.psubs) + len(r.qsubs)
tot += l
if l > max {
max = l

View File

@@ -35,11 +35,17 @@ func verifyCount(s *Sublist, count uint32, t *testing.T) {
}
func verifyLen(r []*subscription, l int, t *testing.T) {
if r == nil || len(r) != l {
if len(r) != l {
stackFatalf(t, "Results len is %d, should be %d", len(r), l)
}
}
func verifyQLen(r [][]*subscription, l int, t *testing.T) {
if len(r) != l {
stackFatalf(t, "Queue Results len is %d, should be %d", len(r), l)
}
}
func verifyNumLevels(s *Sublist, expected int, t *testing.T) {
dl := s.numLevels()
if dl != expected {
@@ -59,11 +65,15 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) {
stackFatalf(t, "Value '%+v' not found in results", val)
}
// Helper to generate test subscriptions.
// Helpera to generate test subscriptions.
func newSub(subject string) *subscription {
return &subscription{subject: []byte(subject)}
}
func newQSub(subject, queue string) *subscription {
return &subscription{subject: []byte(subject), queue: []byte(queue)}
}
func TestSublistInit(t *testing.T) {
s := NewSublist()
verifyCount(s, 0, t)
@@ -83,8 +93,8 @@ func TestSublistSimple(t *testing.T) {
sub := newSub(subject)
s.Insert(sub)
r := s.Match(subject)
verifyLen(r, 1, t)
verifyMember(r, sub, t)
verifyLen(r.psubs, 1, t)
verifyMember(r.psubs, sub, t)
}
func TestSublistSimpleMultiTokens(t *testing.T) {
@@ -93,8 +103,8 @@ func TestSublistSimpleMultiTokens(t *testing.T) {
sub := newSub(subject)
s.Insert(sub)
r := s.Match(subject)
verifyLen(r, 1, t)
verifyMember(r, sub, t)
verifyLen(r.psubs, 1, t)
verifyMember(r.psubs, sub, t)
}
func TestSublistPartialWildcard(t *testing.T) {
@@ -104,9 +114,9 @@ func TestSublistPartialWildcard(t *testing.T) {
s.Insert(lsub)
s.Insert(psub)
r := s.Match("a.b.c")
verifyLen(r, 2, t)
verifyMember(r, lsub, t)
verifyMember(r, psub, t)
verifyLen(r.psubs, 2, t)
verifyMember(r.psubs, lsub, t)
verifyMember(r.psubs, psub, t)
}
func TestSublistPartialWildcardAtEnd(t *testing.T) {
@@ -116,9 +126,9 @@ func TestSublistPartialWildcardAtEnd(t *testing.T) {
s.Insert(lsub)
s.Insert(psub)
r := s.Match("a.b.c")
verifyLen(r, 2, t)
verifyMember(r, lsub, t)
verifyMember(r, psub, t)
verifyLen(r.psubs, 2, t)
verifyMember(r.psubs, lsub, t)
verifyMember(r.psubs, psub, t)
}
func TestSublistFullWildcard(t *testing.T) {
@@ -128,9 +138,9 @@ func TestSublistFullWildcard(t *testing.T) {
s.Insert(lsub)
s.Insert(fsub)
r := s.Match("a.b.c")
verifyLen(r, 2, t)
verifyMember(r, lsub, t)
verifyMember(r, fsub, t)
verifyLen(r.psubs, 2, t)
verifyMember(r.psubs, lsub, t)
verifyMember(r.psubs, fsub, t)
}
func TestSublistRemove(t *testing.T) {
@@ -140,13 +150,13 @@ func TestSublistRemove(t *testing.T) {
s.Insert(sub)
verifyCount(s, 1, t)
r := s.Match(subject)
verifyLen(r, 1, t)
verifyLen(r.psubs, 1, t)
s.Remove(newSub("a.b.c"))
verifyCount(s, 1, t)
s.Remove(sub)
verifyCount(s, 0, t)
r = s.Match(subject)
verifyLen(r, 0, t)
verifyLen(r.psubs, 0, t)
}
func TestSublistRemoveWildcard(t *testing.T) {
@@ -160,7 +170,7 @@ func TestSublistRemoveWildcard(t *testing.T) {
s.Insert(fsub)
verifyCount(s, 3, t)
r := s.Match(subject)
verifyLen(r, 3, t)
verifyLen(r.psubs, 3, t)
s.Remove(sub)
verifyCount(s, 2, t)
s.Remove(fsub)
@@ -168,7 +178,7 @@ func TestSublistRemoveWildcard(t *testing.T) {
s.Remove(psub)
verifyCount(s, 0, t)
r = s.Match(subject)
verifyLen(r, 0, t)
verifyLen(r.psubs, 0, t)
}
func TestSublistRemoveCleanup(t *testing.T) {
@@ -234,12 +244,12 @@ func TestSublistCache(t *testing.T) {
fsub := newSub("a.b.>")
s.Insert(sub)
r := s.Match(subject)
verifyLen(r, 1, t)
verifyLen(r.psubs, 1, t)
s.Insert(psub)
s.Insert(fsub)
verifyCount(s, 3, t)
r = s.Match(subject)
verifyLen(r, 3, t)
verifyLen(r.psubs, 3, t)
s.Remove(sub)
verifyCount(s, 2, t)
s.Remove(fsub)
@@ -253,7 +263,7 @@ func TestSublistCache(t *testing.T) {
}
r = s.Match(subject)
verifyLen(r, 0, t)
verifyLen(r.psubs, 0, t)
for i := 0; i < 2*slCacheMax; i++ {
s.Match(fmt.Sprintf("foo-%d\n", i))
@@ -264,6 +274,87 @@ func TestSublistCache(t *testing.T) {
}
}
func TestSublistBasicQueueResults(t *testing.T) {
s := NewSublist()
// Test some basics
subject := "foo"
sub := newSub(subject)
sub1 := newQSub(subject, "bar")
sub2 := newQSub(subject, "baz")
s.Insert(sub1)
r := s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 1, t)
verifyLen(r.qsubs[0], 1, t)
verifyMember(r.qsubs[0], sub1, t)
s.Insert(sub2)
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 1, t)
verifyLen(r.qsubs[1], 1, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
s.Insert(sub)
r = s.Match(subject)
verifyLen(r.psubs, 1, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 1, t)
verifyLen(r.qsubs[1], 1, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
verifyMember(r.psubs, sub, t)
s.Insert(sub1)
s.Insert(sub2)
r = s.Match(subject)
verifyLen(r.psubs, 1, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 2, t)
verifyLen(r.qsubs[1], 2, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
verifyMember(r.psubs, sub, t)
// Now removal
s.Remove(sub)
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 2, t)
verifyLen(r.qsubs[1], 2, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
s.Remove(sub1)
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 1, t)
verifyLen(r.qsubs[1], 2, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
s.Remove(sub1) // Last one
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 1, t)
verifyLen(r.qsubs[0], 2, t) // this is sub2/baz now
verifyMember(r.qsubs[0], sub2, t)
s.Remove(sub2)
s.Remove(sub2)
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 0, t)
}
func checkBool(b, expected bool, t *testing.T) {
if b != expected {
dbg.PrintStack()
@@ -322,10 +413,10 @@ func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) {
sub := newSub("foo")
s.Insert(sub)
r := s.Match("foo")
verifyLen(r, 1, t)
verifyMember(r, sub, t)
verifyLen(r.psubs, 1, t)
verifyMember(r.psubs, sub, t)
r = s.Match("foo.bar")
verifyLen(r, 0, t)
verifyLen(r.psubs, 0, t)
}
// -- Benchmarks Setup --

View File

@@ -183,7 +183,7 @@ func TestServerRestartAndQueueSubs(t *testing.T) {
for i := 0; i < numSent; i++ {
if results[i] != ExpectedMsgCount {
t.Fatalf("Received incorrect number of messages, [%d] for seq: %d\n", results[i], i)
t.Fatalf("Received incorrect number of messages, [%d] vs [%d] for seq: %d\n", results[i], ExpectedMsgCount, i)
}
}