Merge pull request #930 from nats-io/route_send_subs_go_routine_threshold

Conditional send of routed subs from a go routine
This commit is contained in:
Ivan Kozlovic
2019-04-08 14:03:41 -06:00
committed by GitHub
3 changed files with 69 additions and 47 deletions

View File

@@ -22,6 +22,7 @@ import (
"math/rand"
"net"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"
@@ -824,6 +825,13 @@ func (c *client) handlePartialWrite(pnb net.Buffers) {
// Lock must be held
func (c *client) flushOutbound() bool {
if c.flags.isSet(flushOutbound) {
// Another go-routine has set this and is either
// doing the write or waiting to re-acquire the
// lock post write. Release lock to give it a
// chance to complete.
c.mu.Unlock()
runtime.Gosched()
c.mu.Lock()
return false
}
c.flags.set(flushOutbound)

View File

@@ -92,6 +92,10 @@ const (
InfoProto = "INFO %s" + _CRLF_
)
// Used to decide if the sending of the route SUBs list should be
// done in place or in separate go routine.
const sendRouteSubsInGoRoutineThreshold = 1024 * 1024 // 1MB
// This will add a timer to watch over remote reply subjects in case
// they fail to receive a response. The duration will be taken from the
// accounts map timeout to match.
@@ -443,10 +447,7 @@ func (c *client) processRouteInfo(info *Info) {
c.Debugf("Registering remote route %q", info.ID)
// Send our subs to the other side.
s.startGoRoutine(func() {
s.sendSubsToRoute(c)
s.grWG.Done()
})
s.sendSubsToRoute(c)
// Send info about the known gateways to this route.
s.sendGatewayConfigsToRoute(c)
@@ -886,54 +887,74 @@ func (s *Server) sendSubsToRoute(route *client) {
accs := _accs[:0]
// copy accounts into array first
s.mu.Lock()
// Estimated size of all protocols. It does not have to be accurate at all.
eSize := 0
for _, a := range s.accounts {
accs = append(accs, a)
a.mu.RLock()
// Proto looks like: "RS+ <account name> <subject>[ <queue weight>]\r\n"
// If we wanted to have better estimates (or even accurate), we would
// collect the subs here instead of capturing the accounts and then
// later going over each account.
eSize += len(a.rm) * (4 + len(a.Name) + 256)
a.mu.RUnlock()
}
s.mu.Unlock()
var raw [4096]*subscription
var closed bool
sendSubs := func(accs []*Account) {
var raw [4096]*subscription
var closed bool
route.mu.Lock()
for _, a := range accs {
subs := raw[:0]
route.mu.Lock()
for _, a := range accs {
subs := raw[:0]
a.mu.RLock()
c := a.randomClient()
if c == nil {
a.mu.RUnlock()
continue
}
for key, n := range a.rm {
// FIXME(dlc) - Just pass rme around.
// Construct a sub on the fly. We need to place
// a client (or im) to properly set the account.
var subj, qn []byte
s := strings.Split(key, " ")
subj = []byte(s[0])
if len(s) > 1 {
qn = []byte(s[1])
a.mu.RLock()
c := a.randomClient()
if c == nil {
a.mu.RUnlock()
continue
}
// TODO(dlc) - This code needs to change, but even if left alone could be more
// efficient with these tmp subs.
sub := &subscription{client: c, subject: subj, queue: qn, qw: n}
subs = append(subs, sub)
for key, n := range a.rm {
// FIXME(dlc) - Just pass rme around.
// Construct a sub on the fly. We need to place
// a client (or im) to properly set the account.
var subj, qn []byte
s := strings.Split(key, " ")
subj = []byte(s[0])
if len(s) > 1 {
qn = []byte(s[1])
}
// TODO(dlc) - This code needs to change, but even if left alone could be more
// efficient with these tmp subs.
sub := &subscription{client: c, subject: subj, queue: qn, qw: n}
subs = append(subs, sub)
}
a.mu.RUnlock()
closed = route.sendRouteSubProtos(subs, false, func(sub *subscription) bool {
return route.canImport(string(sub.subject))
})
if closed {
route.mu.Unlock()
return
}
}
a.mu.RUnlock()
closed = route.sendRouteSubProtos(subs, false, func(sub *subscription) bool {
return route.canImport(string(sub.subject))
})
if closed {
route.mu.Unlock()
return
route.mu.Unlock()
if !closed {
route.Debugf("Sent local subscriptions to route")
}
}
route.mu.Unlock()
if !closed {
route.Debugf("Sent local subscriptions to route")
// Decide if we call above function in go routine or in place.
if eSize > sendRouteSubsInGoRoutineThreshold {
s.startGoRoutine(func() {
sendSubs(accs)
s.grWG.Done()
})
} else {
sendSubs(accs)
}
}

View File

@@ -144,13 +144,6 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
routeSend("PING\r\n")
routeExpect(pongRe)
// Routes now send their subs list from a go routine,
// so it is possible that if we don't wait we get
// the client SUB being forwarded, then for the UNSUB,
// we get the go routine kicking-in and send the SUB again
// (which is ok since it is idempotent on the receiving side)
time.Sleep(100 * time.Millisecond)
// Send SUB via client connection
send("SUB foo 22\r\n")