Deduplicate *subscription in Sublist

This commit is contained in:
Neil Twigg
2023-01-06 13:19:20 +00:00
parent f5d939ec24
commit d853b0eb89

View File

@@ -84,8 +84,8 @@ type notifyMaps struct {
// A node contains subscriptions and a pointer to the next level.
type node struct {
next *level
psubs map[*subscription]*subscription
qsubs map[string](map[*subscription]*subscription)
psubs map[*subscription]struct{}
qsubs map[string]map[*subscription]struct{}
plist []*subscription
}
@@ -98,7 +98,7 @@ type level struct {
// Create a new default node.
func newNode() *node {
return &node{psubs: make(map[*subscription]*subscription)}
return &node{psubs: make(map[*subscription]struct{})}
}
// Create a new default level.
@@ -413,30 +413,30 @@ func (s *Sublist) Insert(sub *subscription) error {
l = n.next
}
if sub.queue == nil {
n.psubs[sub] = sub
n.psubs[sub] = struct{}{}
isnew = len(n.psubs) == 1
if n.plist != nil {
n.plist = append(n.plist, sub)
} else if len(n.psubs) > plistMin {
n.plist = make([]*subscription, 0, len(n.psubs))
// Populate
for _, psub := range n.psubs {
for psub := range n.psubs {
n.plist = append(n.plist, psub)
}
}
} else {
if n.qsubs == nil {
n.qsubs = make(map[string]map[*subscription]*subscription)
n.qsubs = make(map[string]map[*subscription]struct{})
}
qname := string(sub.queue)
// This is a queue subscription
subs, ok := n.qsubs[qname]
if !ok {
subs = make(map[*subscription]*subscription)
subs = make(map[*subscription]struct{})
n.qsubs[qname] = subs
isnew = true
}
subs[sub] = sub
subs[sub] = struct{}{}
}
s.count++
@@ -636,7 +636,7 @@ func addNodeToResults(n *node, results *SublistResult) {
if n.plist != nil {
results.psubs = append(results.psubs, n.plist...)
} else {
for _, psub := range n.psubs {
for psub := range n.psubs {
results.psubs = append(results.psubs, psub)
}
}
@@ -652,7 +652,7 @@ func addNodeToResults(n *node, results *SublistResult) {
nqsub := make([]*subscription, 0, len(qr))
results.qsubs = append(results.qsubs, nqsub)
}
for _, sub := range qr {
for sub := range qr {
if isRemoteQSub(sub) {
ns := atomic.LoadInt32(&sub.qw)
// Shadow these subscriptions
@@ -1435,13 +1435,13 @@ func (s *Sublist) addNodeToSubs(n *node, subs *[]*subscription, includeLeafHubs
addLocalSub(sub, subs, includeLeafHubs)
}
} else {
for _, sub := range n.psubs {
for sub := range n.psubs {
addLocalSub(sub, subs, includeLeafHubs)
}
}
// Queue subscriptions
for _, qr := range n.qsubs {
for _, sub := range qr {
for sub := range qr {
addLocalSub(sub, subs, includeLeafHubs)
}
}
@@ -1481,13 +1481,13 @@ func (s *Sublist) addAllNodeToSubs(n *node, subs *[]*subscription) {
if n.plist != nil {
*subs = append(*subs, n.plist...)
} else {
for _, sub := range n.psubs {
for sub := range n.psubs {
*subs = append(*subs, sub)
}
}
// Queue subscriptions
for _, qr := range n.qsubs {
for _, sub := range qr {
for sub := range qr {
*subs = append(*subs, sub)
}
}