Changed sublist to avoid quadratic time in removal with large N

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-04-14 11:25:47 -07:00
committed by Derek Collison
parent e6f200b698
commit b9c73e96ee
4 changed files with 222 additions and 95 deletions

View File

@@ -1402,11 +1402,11 @@ func (c *client) closeConnection() {
srv.removeClient(c)
// Remove clients subscriptions.
for _, sub := range subs {
srv.sl.Remove(sub)
// Forward on unsubscribes if we are not
// a router ourselves.
if c.typ != ROUTER {
srv.sl.RemoveBatch(subs)
if c.typ != ROUTER {
for _, sub := range subs {
// Forward on unsubscribes if we are not
// a router ourselves.
srv.broadcastUnSubscribe(sub)
}
}

View File

@@ -64,8 +64,8 @@ type Sublist struct {
// A node contains subscriptions and a pointer to the next level.
type node struct {
next *level
psubs []*subscription
qsubs [][]*subscription
psubs map[*subscription]*subscription
qsubs map[string](map[*subscription]*subscription)
}
// A level represents a group of nodes and special pointers to
@@ -77,11 +77,10 @@ type level struct {
// Create a new default node.
func newNode() *node {
return &node{psubs: make([]*subscription, 0, 4)}
return &node{psubs: make(map[*subscription]*subscription)}
}
// Create a new default level. We use FNV1A as the hash
// algorithm for the tokens, which should be short.
// Create a new default level.
func newLevel() *level {
return &level{nodes: make(map[string]*node)}
}
@@ -153,14 +152,19 @@ func (s *Sublist) Insert(sub *subscription) error {
l = n.next
}
if sub.queue == nil {
n.psubs = append(n.psubs, sub)
n.psubs[sub] = sub
} else {
// This is a queue subscription
if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
n.qsubs[i] = append(n.qsubs[i], sub)
} else {
n.qsubs = append(n.qsubs, []*subscription{sub})
if n.qsubs == nil {
n.qsubs = make(map[string]map[*subscription]*subscription)
}
qname := string(sub.queue)
// This is a queue subscription
subs, ok := n.qsubs[qname]
if !ok {
subs = make(map[*subscription]*subscription)
n.qsubs[qname] = subs
}
subs[sub] = sub
}
s.count++
@@ -263,16 +267,26 @@ func (s *Sublist) Match(subject string) *SublistResult {
// This will add in a node's results to the total results.
func addNodeToResults(n *node, results *SublistResult) {
results.psubs = append(results.psubs, n.psubs...)
for _, qr := range n.qsubs {
for _, psub := range n.psubs {
results.psubs = append(results.psubs, psub)
}
//results.psubs = append(results.psubs, n.psubs...)
for qname, qr := range n.qsubs {
if len(qr) == 0 {
continue
}
tsub := &subscription{subject: nil, queue: []byte(qname)}
// Need to find matching list in results
if i := findQSliceForSub(qr[0], results.qsubs); i >= 0 {
results.qsubs[i] = append(results.qsubs[i], qr...)
if i := findQSliceForSub(tsub, results.qsubs); i >= 0 {
for _, sub := range qr {
results.qsubs[i] = append(results.qsubs[i], sub)
}
} else {
results.qsubs = append(results.qsubs, append([]*subscription(nil), qr...))
var nqsub []*subscription
for _, sub := range qr {
nqsub = append(nqsub, sub)
}
results.qsubs = append(results.qsubs, nqsub)
}
}
}
@@ -328,8 +342,8 @@ type lnt struct {
t string
}
// Remove will remove a subscription.
func (s *Sublist) Remove(sub *subscription) error {
// Raw low level remove, can do batches with lock held outside.
func (s *Sublist) remove(sub *subscription, shouldLock bool) error {
subject := string(sub.subject)
tsa := [32]string{}
tokens := tsa[:0]
@@ -342,8 +356,10 @@ func (s *Sublist) Remove(sub *subscription) error {
}
tokens = append(tokens, subject[start:])
s.Lock()
defer s.Unlock()
if shouldLock {
s.Lock()
defer s.Unlock()
}
sfwc := false
l := s.root
@@ -400,6 +416,24 @@ func (s *Sublist) Remove(sub *subscription) error {
return nil
}
// Remove will remove a subscription.
func (s *Sublist) Remove(sub *subscription) error {
return s.remove(sub, true)
}
// Remove will remove a subscription.
func (s *Sublist) RemoveBatch(subs []*subscription) error {
s.Lock()
defer s.Unlock()
for _, sub := range subs {
if err := s.remove(sub, false); err != nil {
return err
}
}
return nil
}
// pruneNode is used to prune an empty node from the tree.
func (l *level) pruneNode(n *node, t string) {
if n == nil {
@@ -437,61 +471,26 @@ func (l *level) numNodes() int {
return num
}
// Removes a sub from a list.
func removeSubFromList(sub *subscription, sl []*subscription) ([]*subscription, bool) {
for i := 0; i < len(sl); i++ {
if sl[i] == sub {
last := len(sl) - 1
sl[i] = sl[last]
sl[last] = nil
sl = sl[:last]
return shrinkAsNeeded(sl), true
}
}
return sl, false
}
// Remove the sub for the given node.
func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) {
if n == nil {
return false
}
if sub.queue == nil {
n.psubs, found = removeSubFromList(sub, n.psubs)
_, found = n.psubs[sub]
delete(n.psubs, sub)
return found
}
// We have a queue group subscription here
if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
n.qsubs[i], found = removeSubFromList(sub, n.qsubs[i])
if len(n.qsubs[i]) == 0 {
last := len(n.qsubs) - 1
n.qsubs[i] = n.qsubs[last]
n.qsubs[last] = nil
n.qsubs = n.qsubs[:last]
if len(n.qsubs) == 0 {
n.qsubs = nil
}
}
return found
qname := string(sub.queue)
qsub := n.qsubs[qname]
_, found = qsub[sub]
delete(qsub, sub)
if len(qsub) == 0 {
delete(n.qsubs, qname)
}
return false
}
// Checks if we need to do a resize. This is for very large growth then
// subsequent return to a more normal size from unsubscribe.
func shrinkAsNeeded(sl []*subscription) []*subscription {
lsl := len(sl)
csl := cap(sl)
// Don't bother if list not too big
if csl <= 8 {
return sl
}
pFree := float32(csl-lsl) / float32(csl)
if pFree > 0.50 {
return append([]*subscription(nil), sl...)
}
return sl
return found
}
// Count returns the number of subscriptions.

View File

@@ -65,6 +65,10 @@ func verifyNumLevels(s *Sublist, expected int, t *testing.T) {
}
}
func verifyQMember(qsubs [][]*subscription, val *subscription, t *testing.T) {
verifyMember(qsubs[findQSliceForSub(val, qsubs)], val, t)
}
func verifyMember(r []*subscription, val *subscription, t *testing.T) {
for _, v := range r {
if v == nil {
@@ -74,16 +78,19 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) {
return
}
}
stackFatalf(t, "Value '%+v' not found in results", val)
stackFatalf(t, "Subscription (%p) for [%s : %s] not found in results", val, val.subject, val.queue)
}
// Helpera to generate test subscriptions.
// Helpers to generate test subscriptions.
func newSub(subject string) *subscription {
return &subscription{subject: []byte(subject)}
}
func newQSub(subject, queue string) *subscription {
return &subscription{subject: []byte(subject), queue: []byte(queue)}
if queue != "" {
return &subscription{subject: []byte(subject), queue: []byte(queue)}
}
return newSub(subject)
}
func TestSublistInit(t *testing.T) {
@@ -300,7 +307,7 @@ func TestSublistBasicQueueResults(t *testing.T) {
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 1, t)
verifyLen(r.qsubs[0], 1, t)
verifyMember(r.qsubs[0], sub1, t)
verifyQMember(r.qsubs, sub1, t)
s.Insert(sub2)
r = s.Match(subject)
@@ -308,8 +315,8 @@ func TestSublistBasicQueueResults(t *testing.T) {
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 1, t)
verifyLen(r.qsubs[1], 1, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
verifyQMember(r.qsubs, sub1, t)
verifyQMember(r.qsubs, sub2, t)
s.Insert(sub)
r = s.Match(subject)
@@ -317,20 +324,25 @@ func TestSublistBasicQueueResults(t *testing.T) {
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 1, t)
verifyLen(r.qsubs[1], 1, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
verifyQMember(r.qsubs, sub1, t)
verifyQMember(r.qsubs, sub2, t)
verifyMember(r.psubs, sub, t)
s.Insert(sub1)
s.Insert(sub2)
sub3 := newQSub(subject, "bar")
sub4 := newQSub(subject, "baz")
s.Insert(sub3)
s.Insert(sub4)
r = s.Match(subject)
verifyLen(r.psubs, 1, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 2, t)
verifyLen(r.qsubs[1], 2, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
verifyQMember(r.qsubs, sub1, t)
verifyQMember(r.qsubs, sub2, t)
verifyQMember(r.qsubs, sub3, t)
verifyQMember(r.qsubs, sub4, t)
verifyMember(r.psubs, sub, t)
// Now removal
@@ -341,27 +353,28 @@ func TestSublistBasicQueueResults(t *testing.T) {
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 2, t)
verifyLen(r.qsubs[1], 2, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
verifyQMember(r.qsubs, sub1, t)
verifyQMember(r.qsubs, sub2, t)
s.Remove(sub1)
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[0], 1, t)
verifyLen(r.qsubs[1], 2, t)
verifyMember(r.qsubs[0], sub1, t)
verifyMember(r.qsubs[1], sub2, t)
verifyLen(r.qsubs[findQSliceForSub(sub1, r.qsubs)], 1, t)
verifyLen(r.qsubs[findQSliceForSub(sub2, r.qsubs)], 2, t)
verifyQMember(r.qsubs, sub2, t)
verifyQMember(r.qsubs, sub3, t)
verifyQMember(r.qsubs, sub4, t)
s.Remove(sub1) // Last one
s.Remove(sub3) // Last one
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 1, t)
verifyLen(r.qsubs[0], 2, t) // this is sub2/baz now
verifyMember(r.qsubs[0], sub2, t)
verifyQMember(r.qsubs, sub2, t)
s.Remove(sub2)
s.Remove(sub2)
s.Remove(sub4)
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 0, t)
@@ -804,15 +817,15 @@ func multiRead(b *testing.B, num int) {
fwg.Wait()
}
func Benchmark_____________Sublist10XMultipleReads(b *testing.B) {
func Benchmark____________Sublist10XMultipleReads(b *testing.B) {
multiRead(b, 10)
}
func Benchmark____________Sublist100XMultipleReads(b *testing.B) {
func Benchmark___________Sublist100XMultipleReads(b *testing.B) {
multiRead(b, 100)
}
func Benchmark_SublistMatchLiteral(b *testing.B) {
func Benchmark________________SublistMatchLiteral(b *testing.B) {
b.StopTimer()
cachedSubj := "foo.foo.foo.foo.foo.foo.foo.foo.foo.foo"
subjects := []string{
@@ -847,3 +860,118 @@ func Benchmark_SublistMatchLiteral(b *testing.B) {
}
}
}
func removeTest(b *testing.B, singleSubject, doBatch bool, qgroup string) {
b.StopTimer()
s := NewSublist()
subject := "foo"
subs := make([]*subscription, 0, b.N)
for i := 0; i < b.N; i++ {
var sub *subscription
if singleSubject {
sub = newQSub(subject, qgroup)
} else {
sub = newQSub(fmt.Sprintf("%s.%d\n", subject, i), qgroup)
}
s.Insert(sub)
subs = append(subs, sub)
}
// Actual test on Remove
b.StartTimer()
if doBatch {
s.RemoveBatch(subs)
} else {
for _, sub := range subs {
s.Remove(sub)
}
}
}
func Benchmark__________SublistRemove1TokenSingle(b *testing.B) {
removeTest(b, true, false, "")
}
func Benchmark___________SublistRemove1TokenBatch(b *testing.B) {
removeTest(b, true, true, "")
}
func Benchmark_________SublistRemove2TokensSingle(b *testing.B) {
removeTest(b, false, false, "")
}
func Benchmark__________SublistRemove2TokensBatch(b *testing.B) {
removeTest(b, false, true, "")
}
func Benchmark________SublistRemove1TokenQGSingle(b *testing.B) {
removeTest(b, true, false, "bar")
}
func Benchmark_________SublistRemove1TokenQGBatch(b *testing.B) {
removeTest(b, true, true, "bar")
}
func removeMultiTest(b *testing.B, singleSubject, doBatch bool) {
b.StopTimer()
s := NewSublist()
subject := "foo"
var swg, fwg sync.WaitGroup
swg.Add(b.N)
fwg.Add(b.N)
// We will have b.N go routines each with 1k subscriptions.
sc := 1000
for i := 0; i < b.N; i++ {
go func() {
subs := make([]*subscription, 0, sc)
for n := 0; n < sc; n++ {
var sub *subscription
if singleSubject {
sub = newSub(subject)
} else {
sub = newSub(fmt.Sprintf("%s.%d\n", subject, n))
}
s.Insert(sub)
subs = append(subs, sub)
}
// Wait to start test
swg.Done()
swg.Wait()
// Actual test on Remove
if doBatch {
s.RemoveBatch(subs)
} else {
for _, sub := range subs {
s.Remove(sub)
}
}
fwg.Done()
}()
}
swg.Wait()
b.StartTimer()
fwg.Wait()
}
// Check contention rates for remove from multiple Go routines.
// Reason for BatchRemove.
func Benchmark_________SublistRemove1kSingleMulti(b *testing.B) {
removeMultiTest(b, true, false)
}
// Batch version
func Benchmark__________SublistRemove1kBatchMulti(b *testing.B) {
removeMultiTest(b, true, true)
}
func Benchmark__SublistRemove1kSingle2TokensMulti(b *testing.B) {
removeMultiTest(b, false, false)
}
// Batch version
func Benchmark___SublistRemove1kBatch2TokensMulti(b *testing.B) {
removeMultiTest(b, false, true)
}

View File

@@ -557,11 +557,11 @@ func doFanout(b *testing.B, numServers, numConnections, subsPerConnection int, s
var sub = "x"
var payload = "12345678"
func Benchmark_____FanOut_512x1000x1000(b *testing.B) {
func Benchmark___FanOut_512x1kx1k(b *testing.B) {
doFanout(b, 1, 1000, 1000, sub, sizedString(512))
}
func Benchmark_____FanOut_8x1000x100(b *testing.B) {
func Benchmark__FanOut_8x1000x100(b *testing.B) {
doFanout(b, 1, 1000, 100, sub, payload)
}