Optimize subs over routes

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-11-11 19:08:42 -08:00
parent 6e049efbde
commit 24b99c74a3
3 changed files with 127 additions and 56 deletions

View File

@@ -211,11 +211,20 @@ type msgDeny struct {
dcache map[string]bool
}
// routeTarget collects information regarding routes and queue groups for
// sending information to a remote.
type routeTarget struct {
sub *subscription
qs []byte
_qs [32]byte
}
const (
maxResultCacheSize = 512
maxDenyPermCacheSize = 256
maxPermCacheSize = 128
pruneSize = 32
routeTargetInit = 8
)
// Used in readloop to cache hot subject lookups and group statistics.
@@ -227,6 +236,10 @@ type readCache struct {
// This is for routes to have their own L1 as well that is account aware.
rcache map[string]*routeCache
// This is for when we deliver messages across a route. We use this structure
// to make sure to only send one message and properly scope to queues as needed.
rts []routeTarget
prand *rand.Rand
msgs int
bytes int
@@ -1954,9 +1967,39 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
}
}
type routeTarget struct {
sub *subscription
qnames [][]byte
func (c *client) addSubToRouteTargets(sub *subscription) {
if c.in.rts == nil {
c.in.rts = make([]routeTarget, 0, routeTargetInit)
}
for i, _ := range c.in.rts {
rt := &c.in.rts[i]
if rt.sub.client == sub.client {
if sub.queue != nil {
rt.qs = append(rt.qs, sub.queue...)
rt.qs = append(rt.qs, ' ')
}
return
}
}
// If we are here we do not have the sub yet in our list
// If we have to grow do so here.
if len(c.in.rts) == cap(c.in.rts) {
c.in.rts = append(c.in.rts, routeTarget{})
}
var rt *routeTarget
lrts := len(c.in.rts)
c.in.rts = c.in.rts[:lrts+1]
rt = &c.in.rts[lrts]
rt.sub = sub
rt.qs = rt._qs[:0]
if sub.queue != nil {
rt.qs = append(rt.qs, sub.queue...)
rt.qs = append(rt.qs, ' ')
}
}
// This processes the sublist results for a given message.
@@ -1967,23 +2010,24 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
msgh = append(msgh, ' ')
si := len(msgh)
// For sending messages across routes
var rmap map[*client]*routeTarget
// For sending messages across routes. Reset it if we have one.
// We reuse this data structure.
if c.in.rts != nil {
c.in.rts = c.in.rts[:0]
}
// Loop over all normal subscriptions that match.
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER. We now process
// these after everything else.
if sub.client != nil && sub.client.typ == ROUTER {
if rmap == nil {
rmap = map[*client]*routeTarget{}
}
if c.typ != ROUTER && rmap[sub.client] == nil {
rmap[sub.client] = &routeTarget{sub: sub}
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
continue
}
c.addSubToRouteTargets(sub)
continue
}
// Check for stream import mapped subs
// Check for stream import mapped subs. These apply to local subs only.
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
msgh = c.msgb[1:msgHeadProtoLen]
@@ -2014,12 +2058,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
}
// Process queue subs
var bounce bool
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
// If we have a filter check that here. We could make this a map or someting more
// complex but linear search since we expect queues to be small should be faster
// complex but linear search since we expect queues to be small. Should be faster
// and more cache friendly.
if qf != nil && len(qsubs) > 0 {
tqn := qsubs[0].queue
@@ -2033,6 +2076,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
selectQSub:
// We will hold onto remote qsubs when we are coming from a route
// just in case we can no longer do local delivery.
var rsub *subscription
// Find a subscription that is able to deliver this message
@@ -2044,7 +2089,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
if sub == nil {
continue
}
// Sending to a remote route.
// Potentially sending to a remote sub across a route.
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
// We just came from a route, so skip and prefer local subs.
@@ -2054,18 +2099,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
}
continue
} else {
if rmap == nil {
rmap = map[*client]*routeTarget{}
rmap[sub.client] = &routeTarget{sub: sub, qnames: [][]byte{sub.queue}}
} else if rt := rmap[sub.client]; rt != nil {
rt.qnames = append(rt.qnames, sub.queue)
} else {
rmap[sub.client] = &routeTarget{sub: sub, qnames: [][]byte{sub.queue}}
}
c.addSubToRouteTargets(sub)
}
break
}
// Check for mapped subs
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
@@ -2083,51 +2120,39 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
break
}
}
if rsub != nil {
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote.
bounce = true
if rmap == nil {
rmap = map[*client]*routeTarget{}
}
if rt := rmap[rsub.client]; rt != nil {
rt.qnames = append(rt.qnames, rsub.queue)
} else {
rmap[rsub.client] = &routeTarget{sub: rsub, qnames: [][]byte{rsub.queue}}
}
c.addSubToRouteTargets(rsub)
}
}
// Don't send messages to routes if we ourselves are a route.
if (c.typ != CLIENT && !bounce) || len(rmap) == 0 {
// If no messages for routes return here.
if len(c.in.rts) == 0 {
return
}
// Now process route connections.
for _, rt := range rmap {
// We address by index to avoimd struct copy. We have inline structs for memory
// layout and cache coherency.
for i, _ := range c.in.rts {
rt := &c.in.rts[i]
mh := c.msgb[:msgHeadProtoLen]
mh = append(mh, acc.Name...)
mh = append(mh, ' ')
mh = append(mh, subject...)
mh = append(mh, ' ')
// If we have queues the third token turns into marker
// that signals number of queues. The leading byte signifies
// whether a reply is present as well.
if len(rt.qnames) > 0 {
if reply != nil {
mh = append(mh, '+') // Signal that there is a reply.
} else {
mh = append(mh, '|') // Only queues
}
mh = append(mh, ' ')
if len(rt.qs) > 0 {
if reply != nil {
mh = append(mh, "+ "...) // Signal that there is a reply.
mh = append(mh, reply...)
mh = append(mh, ' ')
} else {
mh = append(mh, "| "...) // Only queues
}
for _, qn := range rt.qnames {
mh = append(mh, qn...)
mh = append(mh, ' ')
}
mh = append(mh, rt.qs...)
} else if reply != nil {
mh = append(mh, reply...)
mh = append(mh, ' ')

View File

@@ -532,14 +532,13 @@ func Benchmark__RoutedPubSub_100K(b *testing.B) {
}
func routeQueue(b *testing.B, numQueueSubs, size int) {
b.StopTimer()
s1, o1 := RunServerWithConfig("./configs/srv_a.conf")
defer s1.Shutdown()
s2, o2 := RunServerWithConfig("./configs/srv_b.conf")
defer s2.Shutdown()
sub := createClientConn(b, o1.Host, o1.Port)
defer sub.Close()
doDefaultConnect(b, sub)
for i := 0; i < numQueueSubs; i++ {
sendProto(b, sub, fmt.Sprintf("SUB foo bar %d\r\n", 100+i))
@@ -549,6 +548,7 @@ func routeQueue(b *testing.B, numQueueSubs, size int) {
payload := sizedString(size)
pub := createClientConn(b, o2.Host, o2.Port)
defer pub.Close()
doDefaultConnect(b, pub)
bw := bufio.NewWriterSize(pub, defaultSendBufSize)
@@ -556,7 +556,8 @@ func routeQueue(b *testing.B, numQueueSubs, size int) {
sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", len(payload), payload))
expected := len(fmt.Sprintf("MSG foo 100 %d\r\n%s\r\n", len(payload), payload)) * b.N
go drainConnection(b, sub, ch, expected)
b.StartTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := bw.Write(sendOp)
@@ -574,8 +575,6 @@ func routeQueue(b *testing.B, numQueueSubs, size int) {
<-ch
b.StopTimer()
pub.Close()
sub.Close()
}
func Benchmark____Routed2QueueSub(b *testing.B) {

View File

@@ -1284,3 +1284,50 @@ func TestNewRouteNoQueueSubscribersBounce(t *testing.T) {
t.Fatalf("Expect to get all 500 responses, got %d", numAnswers)
}
}
func TestNewRouteLargeDistinctQueueSubscribers(t *testing.T) {
srvA, srvB, optsA, optsB := runServers(t)
defer srvA.Shutdown()
defer srvB.Shutdown()
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port)
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
ncA, err := nats.Connect(urlA)
if err != nil {
t.Fatalf("Failed to create connection for ncA: %v\n", err)
}
defer ncA.Close()
ncB, err := nats.Connect(urlB)
if err != nil {
t.Fatalf("Failed to create connection for ncB: %v\n", err)
}
defer ncB.Close()
const nqsubs = 100
qsubs := make([]*nats.Subscription, 100)
// Create 100 queue subscribers on B all with different queue groups.
for i := 0; i < nqsubs; i++ {
qg := fmt.Sprintf("worker-%d", i)
qsubs[i], _ = ncB.QueueSubscribeSync("foo", qg)
}
ncB.Flush()
// Send 10 messages. We should receive 1000 responses.
for i := 0; i < 10; i++ {
ncA.Publish("foo", nil)
}
ncA.Flush()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
for i := 0; i < nqsubs; i++ {
if n, _, _ := qsubs[i].Pending(); n != 10 {
return fmt.Errorf("Number of messgaes is %d", n)
}
}
return nil
})
}