Fixed queue sub logic for multiple queue groups

This commit is contained in:
Derek Collison
2012-11-20 14:55:48 -08:00
parent 6aee091958
commit bd43e276b7

View File

@@ -10,6 +10,7 @@ import (
"net"
"sync"
"time"
// "unsafe"
"github.com/apcera/gnatsd/hashmap"
)
@@ -322,8 +323,6 @@ func (c *client) processMsg(msg []byte) {
return
}
qsubsA := [32]*subscription{}
qsubs := qsubsA[:0]
scratch := [512]byte{}
msgh := scratch[:0]
@@ -339,20 +338,35 @@ func (c *client) processMsg(msg []byte) {
msgh = append(msgh, ' ')
si := len(msgh)
var qmap map[string][]*subscription
var qsubs []*subscription
for _, v := range r {
sub := v.(*subscription)
if sub.queue != nil {
if qmap == nil {
qmap = make(map[string][]*subscription)
}
//qname := *(*string)(unsafe.Pointer(&sub.queue))
qname := string(sub.queue)
qsubs = qmap[qname]
if qsubs == nil {
qsubs = make([]*subscription, 0, 4)
}
qsubs = append(qsubs, sub)
qmap[qname] = qsubs
continue
}
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
if len(qsubs) > 0 {
index := rand.Int() % len(qsubs)
sub := qsubs[index]
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
if qmap != nil {
for _, qsubs := range qmap {
index := rand.Int() % len(qsubs)
sub := qsubs[index]
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
}
}