diff --git a/server/client.go b/server/client.go index d6f8c0a6..57d0597d 100644 --- a/server/client.go +++ b/server/client.go @@ -1402,11 +1402,11 @@ func (c *client) closeConnection() { srv.removeClient(c) // Remove clients subscriptions. - for _, sub := range subs { - srv.sl.Remove(sub) - // Forward on unsubscribes if we are not - // a router ourselves. - if c.typ != ROUTER { + srv.sl.RemoveBatch(subs) + if c.typ != ROUTER { + for _, sub := range subs { + // Forward on unsubscribes if we are not + // a router ourselves. srv.broadcastUnSubscribe(sub) } } diff --git a/server/sublist.go b/server/sublist.go index 93f6f017..73a77a51 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -64,8 +64,8 @@ type Sublist struct { // A node contains subscriptions and a pointer to the next level. type node struct { next *level - psubs []*subscription - qsubs [][]*subscription + psubs map[*subscription]*subscription + qsubs map[string](map[*subscription]*subscription) } // A level represents a group of nodes and special pointers to @@ -77,11 +77,10 @@ type level struct { // Create a new default node. func newNode() *node { - return &node{psubs: make([]*subscription, 0, 4)} + return &node{psubs: make(map[*subscription]*subscription)} } -// Create a new default level. We use FNV1A as the hash -// algorithm for the tokens, which should be short. +// Create a new default level. func newLevel() *level { return &level{nodes: make(map[string]*node)} } @@ -153,14 +152,19 @@ func (s *Sublist) Insert(sub *subscription) error { l = n.next } if sub.queue == nil { - n.psubs = append(n.psubs, sub) + n.psubs[sub] = sub } else { - // This is a queue subscription - if i := findQSliceForSub(sub, n.qsubs); i >= 0 { - n.qsubs[i] = append(n.qsubs[i], sub) - } else { - n.qsubs = append(n.qsubs, []*subscription{sub}) + if n.qsubs == nil { + n.qsubs = make(map[string]map[*subscription]*subscription) } + qname := string(sub.queue) + // This is a queue subscription + subs, ok := n.qsubs[qname] + if !ok { + subs = make(map[*subscription]*subscription) + n.qsubs[qname] = subs + } + subs[sub] = sub } s.count++ @@ -263,16 +267,26 @@ func (s *Sublist) Match(subject string) *SublistResult { // This will add in a node's results to the total results. func addNodeToResults(n *node, results *SublistResult) { - results.psubs = append(results.psubs, n.psubs...) - for _, qr := range n.qsubs { + for _, psub := range n.psubs { + results.psubs = append(results.psubs, psub) + } + //results.psubs = append(results.psubs, n.psubs...) + for qname, qr := range n.qsubs { if len(qr) == 0 { continue } + tsub := &subscription{subject: nil, queue: []byte(qname)} // Need to find matching list in results - if i := findQSliceForSub(qr[0], results.qsubs); i >= 0 { - results.qsubs[i] = append(results.qsubs[i], qr...) + if i := findQSliceForSub(tsub, results.qsubs); i >= 0 { + for _, sub := range qr { + results.qsubs[i] = append(results.qsubs[i], sub) + } } else { - results.qsubs = append(results.qsubs, append([]*subscription(nil), qr...)) + var nqsub []*subscription + for _, sub := range qr { + nqsub = append(nqsub, sub) + } + results.qsubs = append(results.qsubs, nqsub) } } } @@ -328,8 +342,8 @@ type lnt struct { t string } -// Remove will remove a subscription. -func (s *Sublist) Remove(sub *subscription) error { +// Raw low level remove, can do batches with lock held outside. +func (s *Sublist) remove(sub *subscription, shouldLock bool) error { subject := string(sub.subject) tsa := [32]string{} tokens := tsa[:0] @@ -342,8 +356,10 @@ func (s *Sublist) Remove(sub *subscription) error { } tokens = append(tokens, subject[start:]) - s.Lock() - defer s.Unlock() + if shouldLock { + s.Lock() + defer s.Unlock() + } sfwc := false l := s.root @@ -400,6 +416,24 @@ func (s *Sublist) Remove(sub *subscription) error { return nil } +// Remove will remove a subscription. +func (s *Sublist) Remove(sub *subscription) error { + return s.remove(sub, true) +} + +// Remove will remove a subscription. +func (s *Sublist) RemoveBatch(subs []*subscription) error { + s.Lock() + defer s.Unlock() + + for _, sub := range subs { + if err := s.remove(sub, false); err != nil { + return err + } + } + return nil +} + // pruneNode is used to prune an empty node from the tree. func (l *level) pruneNode(n *node, t string) { if n == nil { @@ -437,61 +471,26 @@ func (l *level) numNodes() int { return num } -// Removes a sub from a list. -func removeSubFromList(sub *subscription, sl []*subscription) ([]*subscription, bool) { - for i := 0; i < len(sl); i++ { - if sl[i] == sub { - last := len(sl) - 1 - sl[i] = sl[last] - sl[last] = nil - sl = sl[:last] - return shrinkAsNeeded(sl), true - } - } - return sl, false -} - // Remove the sub for the given node. func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) { if n == nil { return false } if sub.queue == nil { - n.psubs, found = removeSubFromList(sub, n.psubs) + _, found = n.psubs[sub] + delete(n.psubs, sub) return found } // We have a queue group subscription here - if i := findQSliceForSub(sub, n.qsubs); i >= 0 { - n.qsubs[i], found = removeSubFromList(sub, n.qsubs[i]) - if len(n.qsubs[i]) == 0 { - last := len(n.qsubs) - 1 - n.qsubs[i] = n.qsubs[last] - n.qsubs[last] = nil - n.qsubs = n.qsubs[:last] - if len(n.qsubs) == 0 { - n.qsubs = nil - } - } - return found + qname := string(sub.queue) + qsub := n.qsubs[qname] + _, found = qsub[sub] + delete(qsub, sub) + if len(qsub) == 0 { + delete(n.qsubs, qname) } - return false -} - -// Checks if we need to do a resize. This is for very large growth then -// subsequent return to a more normal size from unsubscribe. -func shrinkAsNeeded(sl []*subscription) []*subscription { - lsl := len(sl) - csl := cap(sl) - // Don't bother if list not too big - if csl <= 8 { - return sl - } - pFree := float32(csl-lsl) / float32(csl) - if pFree > 0.50 { - return append([]*subscription(nil), sl...) - } - return sl + return found } // Count returns the number of subscriptions. diff --git a/server/sublist_test.go b/server/sublist_test.go index 3625a5b3..0715acea 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -65,6 +65,10 @@ func verifyNumLevels(s *Sublist, expected int, t *testing.T) { } } +func verifyQMember(qsubs [][]*subscription, val *subscription, t *testing.T) { + verifyMember(qsubs[findQSliceForSub(val, qsubs)], val, t) +} + func verifyMember(r []*subscription, val *subscription, t *testing.T) { for _, v := range r { if v == nil { @@ -74,16 +78,19 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) { return } } - stackFatalf(t, "Value '%+v' not found in results", val) + stackFatalf(t, "Subscription (%p) for [%s : %s] not found in results", val, val.subject, val.queue) } -// Helpera to generate test subscriptions. +// Helpers to generate test subscriptions. func newSub(subject string) *subscription { return &subscription{subject: []byte(subject)} } func newQSub(subject, queue string) *subscription { - return &subscription{subject: []byte(subject), queue: []byte(queue)} + if queue != "" { + return &subscription{subject: []byte(subject), queue: []byte(queue)} + } + return newSub(subject) } func TestSublistInit(t *testing.T) { @@ -300,7 +307,7 @@ func TestSublistBasicQueueResults(t *testing.T) { verifyLen(r.psubs, 0, t) verifyQLen(r.qsubs, 1, t) verifyLen(r.qsubs[0], 1, t) - verifyMember(r.qsubs[0], sub1, t) + verifyQMember(r.qsubs, sub1, t) s.Insert(sub2) r = s.Match(subject) @@ -308,8 +315,8 @@ func TestSublistBasicQueueResults(t *testing.T) { verifyQLen(r.qsubs, 2, t) verifyLen(r.qsubs[0], 1, t) verifyLen(r.qsubs[1], 1, t) - verifyMember(r.qsubs[0], sub1, t) - verifyMember(r.qsubs[1], sub2, t) + verifyQMember(r.qsubs, sub1, t) + verifyQMember(r.qsubs, sub2, t) s.Insert(sub) r = s.Match(subject) @@ -317,20 +324,25 @@ func TestSublistBasicQueueResults(t *testing.T) { verifyQLen(r.qsubs, 2, t) verifyLen(r.qsubs[0], 1, t) verifyLen(r.qsubs[1], 1, t) - verifyMember(r.qsubs[0], sub1, t) - verifyMember(r.qsubs[1], sub2, t) + verifyQMember(r.qsubs, sub1, t) + verifyQMember(r.qsubs, sub2, t) verifyMember(r.psubs, sub, t) - s.Insert(sub1) - s.Insert(sub2) + sub3 := newQSub(subject, "bar") + sub4 := newQSub(subject, "baz") + + s.Insert(sub3) + s.Insert(sub4) r = s.Match(subject) verifyLen(r.psubs, 1, t) verifyQLen(r.qsubs, 2, t) verifyLen(r.qsubs[0], 2, t) verifyLen(r.qsubs[1], 2, t) - verifyMember(r.qsubs[0], sub1, t) - verifyMember(r.qsubs[1], sub2, t) + verifyQMember(r.qsubs, sub1, t) + verifyQMember(r.qsubs, sub2, t) + verifyQMember(r.qsubs, sub3, t) + verifyQMember(r.qsubs, sub4, t) verifyMember(r.psubs, sub, t) // Now removal @@ -341,27 +353,28 @@ func TestSublistBasicQueueResults(t *testing.T) { verifyQLen(r.qsubs, 2, t) verifyLen(r.qsubs[0], 2, t) verifyLen(r.qsubs[1], 2, t) - verifyMember(r.qsubs[0], sub1, t) - verifyMember(r.qsubs[1], sub2, t) + verifyQMember(r.qsubs, sub1, t) + verifyQMember(r.qsubs, sub2, t) s.Remove(sub1) r = s.Match(subject) verifyLen(r.psubs, 0, t) verifyQLen(r.qsubs, 2, t) - verifyLen(r.qsubs[0], 1, t) - verifyLen(r.qsubs[1], 2, t) - verifyMember(r.qsubs[0], sub1, t) - verifyMember(r.qsubs[1], sub2, t) + verifyLen(r.qsubs[findQSliceForSub(sub1, r.qsubs)], 1, t) + verifyLen(r.qsubs[findQSliceForSub(sub2, r.qsubs)], 2, t) + verifyQMember(r.qsubs, sub2, t) + verifyQMember(r.qsubs, sub3, t) + verifyQMember(r.qsubs, sub4, t) - s.Remove(sub1) // Last one + s.Remove(sub3) // Last one r = s.Match(subject) verifyLen(r.psubs, 0, t) verifyQLen(r.qsubs, 1, t) verifyLen(r.qsubs[0], 2, t) // this is sub2/baz now - verifyMember(r.qsubs[0], sub2, t) + verifyQMember(r.qsubs, sub2, t) s.Remove(sub2) - s.Remove(sub2) + s.Remove(sub4) r = s.Match(subject) verifyLen(r.psubs, 0, t) verifyQLen(r.qsubs, 0, t) @@ -804,15 +817,15 @@ func multiRead(b *testing.B, num int) { fwg.Wait() } -func Benchmark_____________Sublist10XMultipleReads(b *testing.B) { +func Benchmark____________Sublist10XMultipleReads(b *testing.B) { multiRead(b, 10) } -func Benchmark____________Sublist100XMultipleReads(b *testing.B) { +func Benchmark___________Sublist100XMultipleReads(b *testing.B) { multiRead(b, 100) } -func Benchmark_SublistMatchLiteral(b *testing.B) { +func Benchmark________________SublistMatchLiteral(b *testing.B) { b.StopTimer() cachedSubj := "foo.foo.foo.foo.foo.foo.foo.foo.foo.foo" subjects := []string{ @@ -847,3 +860,118 @@ func Benchmark_SublistMatchLiteral(b *testing.B) { } } } + +func removeTest(b *testing.B, singleSubject, doBatch bool, qgroup string) { + b.StopTimer() + s := NewSublist() + subject := "foo" + + subs := make([]*subscription, 0, b.N) + for i := 0; i < b.N; i++ { + var sub *subscription + if singleSubject { + sub = newQSub(subject, qgroup) + } else { + sub = newQSub(fmt.Sprintf("%s.%d\n", subject, i), qgroup) + } + s.Insert(sub) + subs = append(subs, sub) + } + + // Actual test on Remove + b.StartTimer() + if doBatch { + s.RemoveBatch(subs) + } else { + for _, sub := range subs { + s.Remove(sub) + } + } +} + +func Benchmark__________SublistRemove1TokenSingle(b *testing.B) { + removeTest(b, true, false, "") +} + +func Benchmark___________SublistRemove1TokenBatch(b *testing.B) { + removeTest(b, true, true, "") +} + +func Benchmark_________SublistRemove2TokensSingle(b *testing.B) { + removeTest(b, false, false, "") +} + +func Benchmark__________SublistRemove2TokensBatch(b *testing.B) { + removeTest(b, false, true, "") +} + +func Benchmark________SublistRemove1TokenQGSingle(b *testing.B) { + removeTest(b, true, false, "bar") +} + +func Benchmark_________SublistRemove1TokenQGBatch(b *testing.B) { + removeTest(b, true, true, "bar") +} + +func removeMultiTest(b *testing.B, singleSubject, doBatch bool) { + b.StopTimer() + s := NewSublist() + subject := "foo" + var swg, fwg sync.WaitGroup + swg.Add(b.N) + fwg.Add(b.N) + + // We will have b.N go routines each with 1k subscriptions. + sc := 1000 + + for i := 0; i < b.N; i++ { + go func() { + subs := make([]*subscription, 0, sc) + for n := 0; n < sc; n++ { + var sub *subscription + if singleSubject { + sub = newSub(subject) + } else { + sub = newSub(fmt.Sprintf("%s.%d\n", subject, n)) + } + s.Insert(sub) + subs = append(subs, sub) + } + // Wait to start test + swg.Done() + swg.Wait() + // Actual test on Remove + if doBatch { + s.RemoveBatch(subs) + } else { + for _, sub := range subs { + s.Remove(sub) + } + } + fwg.Done() + }() + } + swg.Wait() + b.StartTimer() + fwg.Wait() +} + +// Check contention rates for remove from multiple Go routines. +// Reason for BatchRemove. +func Benchmark_________SublistRemove1kSingleMulti(b *testing.B) { + removeMultiTest(b, true, false) +} + +// Batch version +func Benchmark__________SublistRemove1kBatchMulti(b *testing.B) { + removeMultiTest(b, true, true) +} + +func Benchmark__SublistRemove1kSingle2TokensMulti(b *testing.B) { + removeMultiTest(b, false, false) +} + +// Batch version +func Benchmark___SublistRemove1kBatch2TokensMulti(b *testing.B) { + removeMultiTest(b, false, true) +} diff --git a/test/bench_test.go b/test/bench_test.go index 67aa9e64..d1b7a99b 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -557,11 +557,11 @@ func doFanout(b *testing.B, numServers, numConnections, subsPerConnection int, s var sub = "x" var payload = "12345678" -func Benchmark_____FanOut_512x1000x1000(b *testing.B) { +func Benchmark___FanOut_512x1kx1k(b *testing.B) { doFanout(b, 1, 1000, 1000, sub, sizedString(512)) } -func Benchmark_____FanOut_8x1000x100(b *testing.B) { +func Benchmark__FanOut_8x1000x100(b *testing.B) { doFanout(b, 1, 1000, 100, sub, payload) }