From 96d9ce5048b98a9e92ab37d419fec6e62836ff8d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 3 Apr 2016 13:04:06 -0700 Subject: [PATCH] Queue subscriber performance Reworked sublist to sort out normal subscribers from queue subscribers into a result set that can be cached and easily iterated over. --- server/client.go | 73 +++++++--------- server/split_test.go | 6 +- server/sublist.go | 169 +++++++++++++++++++++++++++--------- server/sublist_test.go | 141 ++++++++++++++++++++++++------ test/client_cluster_test.go | 2 +- 5 files changed, 278 insertions(+), 113 deletions(-) diff --git a/server/client.go b/server/client.go index 84528d23..976b3146 100644 --- a/server/client.go +++ b/server/client.go @@ -65,7 +65,8 @@ type readCache struct { genid uint64 inMsgs int64 inBytes int64 - subs map[string][]*subscription + results map[string]*SublistResult + prand *rand.Rand } func (c *client) String() (id string) { @@ -744,8 +745,8 @@ func (c *client) processMsg(msg []byte) { srv := c.srv // Create cache subs map if needed. - if c.cache.subs == nil && srv != nil { - c.cache.subs = make(map[string][]*subscription) + if c.cache.results == nil && srv != nil { + c.cache.results = make(map[string]*SublistResult) c.cache.genid = atomic.LoadUint64(&srv.sl.genid) } @@ -765,7 +766,7 @@ func (c *client) processMsg(msg []byte) { } var genid uint64 - var r []*subscription + var r *SublistResult var ok bool subject := string(c.pa.subject) @@ -774,19 +775,20 @@ func (c *client) processMsg(msg []byte) { genid = atomic.LoadUint64(&srv.sl.genid) } - if genid == c.cache.genid && c.cache.subs != nil { - r, ok = c.cache.subs[subject] + if genid == c.cache.genid && c.cache.results != nil { + r, ok = c.cache.results[subject] } else { // reset - c.cache.subs = make(map[string][]*subscription) + c.cache.results = make(map[string]*SublistResult) c.cache.genid = genid } if !ok { r = srv.sl.Match(subject) - c.cache.subs[subject] = r + c.cache.results[subject] = r } - if len(r) <= 0 { + // Check for no interest, short circuit if so. + if len(r.psubs) <= 0 && len(r.qsubs) <= 0 { return } @@ -799,7 +801,6 @@ func (c *client) processMsg(msg []byte) { si := len(msgh) isRoute := c.typ == ROUTER - var rmap map[string]struct{} // If we are a route and we have a queue subscription, deliver direct // since they are sent direct via L2 semantics. If the match is a queue @@ -814,37 +815,15 @@ func (c *client) processMsg(msg []byte) { } } - var qmap map[string][]*subscription + // Used to only send normal subscriptions once across a given route. + var rmap map[string]struct{} - // Loop over all subscriptions that match. - for _, sub := range r { - // Process queue group subscriptions by gathering them all up - // here. We will pick the winners when we are done processing - // all of the subscriptions. - if sub.queue != nil { - // Queue subscriptions handled from routes directly above. - if isRoute { - continue - } - // FIXME(dlc), this can be more efficient - if qmap == nil { - qmap = make(map[string][]*subscription) - } - qname := string(sub.queue) - qsubs := qmap[qname] - if qsubs == nil { - qsubs = make([]*subscription, 0, 4) - } - qsubs = append(qsubs, sub) - qmap[qname] = qsubs - continue - } + // Loop over all normal subscriptions that match. - // Process normal, non-queue group subscriptions. - - // If this is a send to a ROUTER, make sure we only send it - // once. The other side will handle the appropriate re-processing. - // Also enforce 1-Hop. + for _, sub := range r.psubs { + // Check if this is a send to a ROUTER, make sure we only send it + // once. The other side will handle the appropriate re-processing + // and fan-out. Also enforce 1-Hop semantics, so no routing to another. if sub.client.typ == ROUTER { // Skip if sourced from a ROUTER and going to another ROUTER. // This is 1-Hop semantics for ROUTERs. @@ -870,14 +849,22 @@ func (c *client) processMsg(msg []byte) { rmap[sub.client.route.remoteID] = routeSeen sub.client.mu.Unlock() } - + // Normal delivery mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) } - if qmap != nil { - for _, qsubs := range qmap { - index := rand.Intn(len(qsubs)) + // Now process any queue subs we have if not a route + if !isRoute { + // Check to see if we have our own rand yet. Global rand + // has contention with lots of clients, etc. + if c.cache.prand == nil { + c.cache.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } + // Process queue subs + for i := 0; i < len(r.qsubs); i++ { + qsubs := r.qsubs[i] + index := c.cache.prand.Intn(len(qsubs)) sub := qsubs[index] mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) diff --git a/server/split_test.go b/server/split_test.go index 610e7a7f..cdaa1c50 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -33,10 +33,10 @@ func TestSplitBufferSubOp(t *testing.T) { t.Fatalf("Expected OP_START state vs %d\n", c.state) } r := s.sl.Match("foo") - if r == nil || len(r) != 1 { + if r == nil || len(r.psubs) != 1 { t.Fatalf("Did not match subscription properly: %+v\n", r) } - sub := r[0] + sub := r.psubs[0] if !bytes.Equal(sub.subject, []byte("foo")) { t.Fatalf("Subject did not match expected 'foo' : '%s'\n", sub.subject) } @@ -77,7 +77,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { t.Fatalf("Expected OP_START state vs %d\n", c.state) } r := s.sl.Match("foo") - if r != nil && len(r) != 0 { + if r != nil && len(r.psubs) != 0 { t.Fatalf("Should be no subscriptions in results: %+v\n", r) } } diff --git a/server/sublist.go b/server/sublist.go index 2569a2a9..75d3710c 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -7,6 +7,7 @@ package server import ( + "bytes" "errors" "strings" "sync" @@ -30,6 +31,12 @@ var ( // cacheMax is used to bound limit the frontend cache const slCacheMax = 1024 +// A result structure better optimized for queue subs. +type SublistResult struct { + psubs []*subscription + qsubs [][]*subscription // don't make this a map, too expensive to iterate +} + // A Sublist stores and efficiently retrieves subscriptions. type Sublist struct { sync.RWMutex @@ -38,15 +45,16 @@ type Sublist struct { cacheHits uint64 inserts uint64 removes uint64 - cache map[string][]*subscription + cache map[string]*SublistResult root *level count uint32 } // A node contains subscriptions and a pointer to the next level. type node struct { - next *level - subs []*subscription + next *level + psubs []*subscription + qsubs [][]*subscription } // A level represents a group of nodes and special pointers to @@ -58,7 +66,7 @@ type level struct { // Create a new default node. func newNode() *node { - return &node{subs: make([]*subscription, 0, 4)} + return &node{psubs: make([]*subscription, 0, 4)} } // Create a new default level. We use FNV1A as the hash @@ -69,7 +77,7 @@ func newLevel() *level { // New will create a default sublist func NewSublist() *Sublist { - return &Sublist{root: newLevel(), cache: make(map[string][]*subscription)} + return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)} } // Insert adds a subscription into the sublist @@ -124,7 +132,17 @@ func (s *Sublist) Insert(sub *subscription) error { } l = n.next } - n.subs = append(n.subs, sub) + if sub.queue == nil { + n.psubs = append(n.psubs, sub) + } else { + // This is a queue subscription + if i := findQSliceForSub(sub, n.qsubs); i >= 0 { + n.qsubs[i] = append(n.qsubs[i], sub) + } else { + n.qsubs = append(n.qsubs, []*subscription{sub}) + } + } + s.count++ s.inserts++ @@ -135,15 +153,34 @@ func (s *Sublist) Insert(sub *subscription) error { return nil } +// Deep copy +func copyResult(r *SublistResult) *SublistResult { + nr := &SublistResult{} + nr.psubs = append([]*subscription(nil), r.psubs...) + for _, qr := range r.qsubs { + nqr := append([]*subscription(nil), qr...) + nr.qsubs = append(nr.qsubs, nqr) + } + return nr +} + // addToCache will add the new entry to existing cache // entries if needed. Assumes write lock is held. func (s *Sublist) addToCache(subject string, sub *subscription) { - for k, results := range s.cache { + for k, r := range s.cache { if matchLiteral(k, subject) { // Copy since others may have a reference. - nr := make([]*subscription, len(results), len(results)+1) - copy(nr, results) - s.cache[k] = append(nr, sub) + nr := copyResult(r) + if sub.queue == nil { + nr.psubs = append(nr.psubs, sub) + } else { + if i := findQSliceForSub(sub, nr.qsubs); i >= 0 { + nr.qsubs[i] = append(nr.qsubs[i], sub) + } else { + nr.qsubs = append(nr.qsubs, []*subscription{sub}) + } + } + s.cache[k] = nr } } } @@ -162,10 +199,8 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) { } // Match will match all entries to the literal subject. -// It will return a slice of results. -// Note that queue subscribers will only have one member selected -// and returned for each queue group. -func (s *Sublist) Match(subject string) []*subscription { +// It will return a set of results for both normal and queue subscribers. +func (s *Sublist) Match(subject string) *SublistResult { s.RLock() atomic.AddUint64(&s.matches, 1) rc, ok := s.cache[subject] @@ -186,16 +221,14 @@ func (s *Sublist) Match(subject string) []*subscription { } tokens = append(tokens, subject[start:]) - // FIXME(dlc) - Make pool? - results := []*subscription{} - - s.RLock() - results = matchLevel(s.root, tokens, results) - s.RUnlock() + // FIXME(dlc) - Make shared pool between sublist and client readLoop? + result := &SublistResult{} s.Lock() + matchLevel(s.root, tokens, result) + // Add to our cache - s.cache[subject] = results + s.cache[subject] = result // Bound the number of entries to sublistMaxCache if len(s.cache) > slCacheMax { for k, _ := range s.cache { @@ -205,21 +238,53 @@ func (s *Sublist) Match(subject string) []*subscription { } s.Unlock() - return results + return result +} + +// This will add in a node's results to the total results. +func addNodeToResults(n *node, results *SublistResult) { + results.psubs = append(results.psubs, n.psubs...) + for _, qr := range n.qsubs { + if len(qr) == 0 { + continue + } + // Need to find matching list in results + if i := findQSliceForSub(qr[0], results.qsubs); i >= 0 { + results.qsubs[i] = append(results.qsubs[i], qr...) + } else { + results.qsubs = append(results.qsubs, qr) + } + } +} + +// We do not use a map here since we want iteration to be past when +// processing publishes in L1 on client. So we need to walk sequentially +// for now. Keep an eye on this in case we start getting large number of +// different queue subscribers for the same subject. +func findQSliceForSub(sub *subscription, qsl [][]*subscription) int { + if sub.queue == nil { + return -1 + } + for i, qr := range qsl { + if len(qr) > 0 && bytes.Equal(sub.queue, qr[0].queue) { + return i + } + } + return -1 } // matchLevel is used to recursively descend into the trie. -func matchLevel(l *level, toks []string, results []*subscription) []*subscription { +func matchLevel(l *level, toks []string, results *SublistResult) { var pwc, n *node for i, t := range toks { if l == nil { - return results + return } if l.fwc != nil { - results = append(results, l.fwc.subs...) + addNodeToResults(l.fwc, results) } if pwc = l.pwc; pwc != nil { - results = matchLevel(pwc.next, toks[i+1:], results) + matchLevel(pwc.next, toks[i+1:], results) } n = l.nodes[t] if n != nil { @@ -229,12 +294,11 @@ func matchLevel(l *level, toks []string, results []*subscription) []*subscriptio } } if n != nil { - results = append(results, n.subs...) + addNodeToResults(n, results) } if pwc != nil { - results = append(results, pwc.subs...) + addNodeToResults(pwc, results) } - return results } // lnt is used to track descent into levels for a removal for pruning. @@ -328,7 +392,7 @@ func (l *level) pruneNode(n *node, t string) { // isEmpty will test if the node has any entries. Used // in pruning. func (n *node) isEmpty() bool { - if len(n.subs) == 0 { + if len(n.psubs) == 0 && len(n.qsubs) == 0 { if n.next == nil || n.next.numNodes() == 0 { return true } @@ -348,20 +412,43 @@ func (l *level) numNodes() int { return num } +// Removes a sub from a list. +func removeSubFromList(sub *subscription, sl []*subscription) ([]*subscription, bool) { + for i := 0; i < len(sl); i++ { + if sl[i] == sub { + last := len(sl) - 1 + sl[i] = sl[last] + sl[last] = nil + sl = sl[:last] + return shrinkAsNeeded(sl), true + } + } + return sl, false +} + // Remove the sub for the given node. -func (s *Sublist) removeFromNode(n *node, sub *subscription) bool { +func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) { if n == nil { return false } - sl := n.subs - for i := 0; i < len(sl); i++ { - if sl[i] == sub { - sl[i] = sl[len(sl)-1] - sl[len(sl)-1] = nil - sl = sl[:len(sl)-1] - n.subs = shrinkAsNeeded(sl) - return true + if sub.queue == nil { + n.psubs, found = removeSubFromList(sub, n.psubs) + return found + } + + // We have a queue group subscription here + if i := findQSliceForSub(sub, n.qsubs); i >= 0 { + n.qsubs[i], found = removeSubFromList(sub, n.qsubs[i]) + if len(n.qsubs[i]) == 0 { + last := len(n.qsubs) - 1 + n.qsubs[i] = n.qsubs[last] + n.qsubs[last] = nil + n.qsubs = n.qsubs[:last] + if len(n.qsubs) == 0 { + n.qsubs = nil + } } + return found } return false } @@ -424,8 +511,8 @@ func (s *Sublist) Stats() *SublistStats { } // whip through cache for fanout stats tot, max := 0, 0 - for _, results := range s.cache { - l := len(results) + for _, r := range s.cache { + l := len(r.psubs) + len(r.qsubs) tot += l if l > max { max = l diff --git a/server/sublist_test.go b/server/sublist_test.go index 3d34b0c8..ce7c0d7b 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -35,11 +35,17 @@ func verifyCount(s *Sublist, count uint32, t *testing.T) { } func verifyLen(r []*subscription, l int, t *testing.T) { - if r == nil || len(r) != l { + if len(r) != l { stackFatalf(t, "Results len is %d, should be %d", len(r), l) } } +func verifyQLen(r [][]*subscription, l int, t *testing.T) { + if len(r) != l { + stackFatalf(t, "Queue Results len is %d, should be %d", len(r), l) + } +} + func verifyNumLevels(s *Sublist, expected int, t *testing.T) { dl := s.numLevels() if dl != expected { @@ -59,11 +65,15 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) { stackFatalf(t, "Value '%+v' not found in results", val) } -// Helper to generate test subscriptions. +// Helpera to generate test subscriptions. func newSub(subject string) *subscription { return &subscription{subject: []byte(subject)} } +func newQSub(subject, queue string) *subscription { + return &subscription{subject: []byte(subject), queue: []byte(queue)} +} + func TestSublistInit(t *testing.T) { s := NewSublist() verifyCount(s, 0, t) @@ -83,8 +93,8 @@ func TestSublistSimple(t *testing.T) { sub := newSub(subject) s.Insert(sub) r := s.Match(subject) - verifyLen(r, 1, t) - verifyMember(r, sub, t) + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub, t) } func TestSublistSimpleMultiTokens(t *testing.T) { @@ -93,8 +103,8 @@ func TestSublistSimpleMultiTokens(t *testing.T) { sub := newSub(subject) s.Insert(sub) r := s.Match(subject) - verifyLen(r, 1, t) - verifyMember(r, sub, t) + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub, t) } func TestSublistPartialWildcard(t *testing.T) { @@ -104,9 +114,9 @@ func TestSublistPartialWildcard(t *testing.T) { s.Insert(lsub) s.Insert(psub) r := s.Match("a.b.c") - verifyLen(r, 2, t) - verifyMember(r, lsub, t) - verifyMember(r, psub, t) + verifyLen(r.psubs, 2, t) + verifyMember(r.psubs, lsub, t) + verifyMember(r.psubs, psub, t) } func TestSublistPartialWildcardAtEnd(t *testing.T) { @@ -116,9 +126,9 @@ func TestSublistPartialWildcardAtEnd(t *testing.T) { s.Insert(lsub) s.Insert(psub) r := s.Match("a.b.c") - verifyLen(r, 2, t) - verifyMember(r, lsub, t) - verifyMember(r, psub, t) + verifyLen(r.psubs, 2, t) + verifyMember(r.psubs, lsub, t) + verifyMember(r.psubs, psub, t) } func TestSublistFullWildcard(t *testing.T) { @@ -128,9 +138,9 @@ func TestSublistFullWildcard(t *testing.T) { s.Insert(lsub) s.Insert(fsub) r := s.Match("a.b.c") - verifyLen(r, 2, t) - verifyMember(r, lsub, t) - verifyMember(r, fsub, t) + verifyLen(r.psubs, 2, t) + verifyMember(r.psubs, lsub, t) + verifyMember(r.psubs, fsub, t) } func TestSublistRemove(t *testing.T) { @@ -140,13 +150,13 @@ func TestSublistRemove(t *testing.T) { s.Insert(sub) verifyCount(s, 1, t) r := s.Match(subject) - verifyLen(r, 1, t) + verifyLen(r.psubs, 1, t) s.Remove(newSub("a.b.c")) verifyCount(s, 1, t) s.Remove(sub) verifyCount(s, 0, t) r = s.Match(subject) - verifyLen(r, 0, t) + verifyLen(r.psubs, 0, t) } func TestSublistRemoveWildcard(t *testing.T) { @@ -160,7 +170,7 @@ func TestSublistRemoveWildcard(t *testing.T) { s.Insert(fsub) verifyCount(s, 3, t) r := s.Match(subject) - verifyLen(r, 3, t) + verifyLen(r.psubs, 3, t) s.Remove(sub) verifyCount(s, 2, t) s.Remove(fsub) @@ -168,7 +178,7 @@ func TestSublistRemoveWildcard(t *testing.T) { s.Remove(psub) verifyCount(s, 0, t) r = s.Match(subject) - verifyLen(r, 0, t) + verifyLen(r.psubs, 0, t) } func TestSublistRemoveCleanup(t *testing.T) { @@ -234,12 +244,12 @@ func TestSublistCache(t *testing.T) { fsub := newSub("a.b.>") s.Insert(sub) r := s.Match(subject) - verifyLen(r, 1, t) + verifyLen(r.psubs, 1, t) s.Insert(psub) s.Insert(fsub) verifyCount(s, 3, t) r = s.Match(subject) - verifyLen(r, 3, t) + verifyLen(r.psubs, 3, t) s.Remove(sub) verifyCount(s, 2, t) s.Remove(fsub) @@ -253,7 +263,7 @@ func TestSublistCache(t *testing.T) { } r = s.Match(subject) - verifyLen(r, 0, t) + verifyLen(r.psubs, 0, t) for i := 0; i < 2*slCacheMax; i++ { s.Match(fmt.Sprintf("foo-%d\n", i)) @@ -264,6 +274,87 @@ func TestSublistCache(t *testing.T) { } } +func TestSublistBasicQueueResults(t *testing.T) { + s := NewSublist() + + // Test some basics + subject := "foo" + sub := newSub(subject) + sub1 := newQSub(subject, "bar") + sub2 := newQSub(subject, "baz") + + s.Insert(sub1) + r := s.Match(subject) + verifyLen(r.psubs, 0, t) + verifyQLen(r.qsubs, 1, t) + verifyLen(r.qsubs[0], 1, t) + verifyMember(r.qsubs[0], sub1, t) + + s.Insert(sub2) + r = s.Match(subject) + verifyLen(r.psubs, 0, t) + verifyQLen(r.qsubs, 2, t) + verifyLen(r.qsubs[0], 1, t) + verifyLen(r.qsubs[1], 1, t) + verifyMember(r.qsubs[0], sub1, t) + verifyMember(r.qsubs[1], sub2, t) + + s.Insert(sub) + r = s.Match(subject) + verifyLen(r.psubs, 1, t) + verifyQLen(r.qsubs, 2, t) + verifyLen(r.qsubs[0], 1, t) + verifyLen(r.qsubs[1], 1, t) + verifyMember(r.qsubs[0], sub1, t) + verifyMember(r.qsubs[1], sub2, t) + verifyMember(r.psubs, sub, t) + + s.Insert(sub1) + s.Insert(sub2) + + r = s.Match(subject) + verifyLen(r.psubs, 1, t) + verifyQLen(r.qsubs, 2, t) + verifyLen(r.qsubs[0], 2, t) + verifyLen(r.qsubs[1], 2, t) + verifyMember(r.qsubs[0], sub1, t) + verifyMember(r.qsubs[1], sub2, t) + verifyMember(r.psubs, sub, t) + + // Now removal + s.Remove(sub) + + r = s.Match(subject) + verifyLen(r.psubs, 0, t) + verifyQLen(r.qsubs, 2, t) + verifyLen(r.qsubs[0], 2, t) + verifyLen(r.qsubs[1], 2, t) + verifyMember(r.qsubs[0], sub1, t) + verifyMember(r.qsubs[1], sub2, t) + + s.Remove(sub1) + r = s.Match(subject) + verifyLen(r.psubs, 0, t) + verifyQLen(r.qsubs, 2, t) + verifyLen(r.qsubs[0], 1, t) + verifyLen(r.qsubs[1], 2, t) + verifyMember(r.qsubs[0], sub1, t) + verifyMember(r.qsubs[1], sub2, t) + + s.Remove(sub1) // Last one + r = s.Match(subject) + verifyLen(r.psubs, 0, t) + verifyQLen(r.qsubs, 1, t) + verifyLen(r.qsubs[0], 2, t) // this is sub2/baz now + verifyMember(r.qsubs[0], sub2, t) + + s.Remove(sub2) + s.Remove(sub2) + r = s.Match(subject) + verifyLen(r.psubs, 0, t) + verifyQLen(r.qsubs, 0, t) +} + func checkBool(b, expected bool, t *testing.T) { if b != expected { dbg.PrintStack() @@ -322,10 +413,10 @@ func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) { sub := newSub("foo") s.Insert(sub) r := s.Match("foo") - verifyLen(r, 1, t) - verifyMember(r, sub, t) + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub, t) r = s.Match("foo.bar") - verifyLen(r, 0, t) + verifyLen(r.psubs, 0, t) } // -- Benchmarks Setup -- diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index ce145bcf..04f0c7d2 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -183,7 +183,7 @@ func TestServerRestartAndQueueSubs(t *testing.T) { for i := 0; i < numSent; i++ { if results[i] != ExpectedMsgCount { - t.Fatalf("Received incorrect number of messages, [%d] for seq: %d\n", results[i], i) + t.Fatalf("Received incorrect number of messages, [%d] vs [%d] for seq: %d\n", results[i], ExpectedMsgCount, i) } }