Simplify sendSubsToRoute()

Since we were creating subs on the fly, sub.im would always be nil.
We were passing a client because it was needed in sendRouteSubOrUnSubProtos().

This PR simply fills the buffer with each account's subscriptions.
There is also no need to have subs sent from different go routine
based on some threshold. Routes are no longer subject to max pending.

Some code has been made into a function so that they can be shared
by sendSubsToRoute() and sendRouteSubOrUnSubProtos(). The function
is simply adding to given buffer the RS+/- protocol.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-01-19 14:01:43 -07:00
parent 51a321e739
commit 42dcdd2eb2
4 changed files with 79 additions and 114 deletions

View File

@@ -876,19 +876,6 @@ func (a *Account) removeClient(c *client) int {
return n
}
func (a *Account) randomClient() *client {
if a.ic != nil {
return a.ic
}
var c *client
for c = range a.clients {
if c.acc == a {
break
}
}
return c
}
// AddServiceExport will configure the account with the defined export.
func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
return a.AddServiceExportWithResponse(subject, Singleton, accounts)

View File

@@ -107,10 +107,6 @@ const (
)
const (
// Used to decide if the sending of the route SUBs list should be
// done in place or in separate go routine.
sendRouteSubsInGoRoutineThreshold = 1024 * 1024 // 1MB
// Warning when user configures cluster TLS insecure
clusterTLSInsecureWarning = "TLS certificate chain and hostname of solicited routes will not be verified. DO NOT USE IN PRODUCTION!"
)
@@ -1113,6 +1109,46 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
return nil
}
func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *subscription, isSubProto bool) []byte {
// If we have an origin cluster and the other side supports leafnode origin clusters
// send an LS+/LS- version instead.
if len(sub.origin) > 0 && c.route.lnoc {
if isSubProto {
buf = append(buf, lSubBytes...)
buf = append(buf, sub.origin...)
} else {
buf = append(buf, lUnsubBytes...)
}
buf = append(buf, ' ')
} else {
if isSubProto {
buf = append(buf, rSubBytes...)
} else {
buf = append(buf, rUnsubBytes...)
}
}
buf = append(buf, accName...)
buf = append(buf, ' ')
buf = append(buf, sub.subject...)
if len(sub.queue) > 0 {
buf = append(buf, ' ')
buf = append(buf, sub.queue...)
// Send our weight if we are a sub proto
if isSubProto {
buf = append(buf, ' ')
var b [12]byte
var i = len(b)
for l := sub.qw; l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
buf = append(buf, b[i:]...)
}
}
buf = append(buf, CR_LF...)
return buf
}
// sendSubsToRoute will send over our subject interest to
// the remote side. For each account we will send the
// complete interest for all subjects, both normal as a binary
@@ -1128,65 +1164,48 @@ func (s *Server) sendSubsToRoute(route *client) {
a := v.(*Account)
accs = append(accs, a)
a.mu.RLock()
// Proto looks like: "RS+ <account name> <subject>[ <queue weight>]\r\n"
// If we wanted to have better estimates (or even accurate), we would
// collect the subs here instead of capturing the accounts and then
// later going over each account.
eSize += len(a.rm) * (4 + len(a.Name) + 256)
if ns := len(a.rm); ns > 0 {
// Proto looks like: "RS+ <account name> <subject>[ <queue> <weight>]\r\n"
eSize += ns * (len(rSubBytes) + len(a.Name) + 1 + 2)
for key := range a.rm {
// Key contains "<subject>[ <queue>]"
eSize += len(key)
// In case this is a queue, just add some bytes for the queue weight.
// If we want to be accurate, would have to check if "key" has a space,
// if so, then figure out how many bytes we need to represent the weight.
eSize += 5
}
}
a.mu.RUnlock()
return true
})
s.mu.Unlock()
sendSubs := func(accs []*Account) {
var raw [32]*subscription
buf := make([]byte, 0, eSize)
route.mu.Lock()
for _, a := range accs {
subs := raw[:0]
a.mu.RLock()
c := a.randomClient()
if c == nil {
nsubs := len(a.rm)
accName := a.Name
a.mu.RUnlock()
if nsubs > 0 {
route.Warnf("Ignoring account %q with %d subs, no clients", accName, nsubs)
}
route.mu.Lock()
for _, a := range accs {
a.mu.RLock()
for key, n := range a.rm {
var subj, qn []byte
s := strings.Split(key, " ")
subj = []byte(s[0])
if len(s) > 1 {
qn = []byte(s[1])
}
// s[0] is the subject and already as a string, so use that
// instead of converting back `subj` to a string.
if !route.canImport(s[0]) {
continue
}
for key, n := range a.rm {
// FIXME(dlc) - Just pass rme around.
// Construct a sub on the fly. We need to place
// a client (or im) to properly set the account.
var subj, qn []byte
s := strings.Split(key, " ")
subj = []byte(s[0])
if len(s) > 1 {
qn = []byte(s[1])
}
// TODO(dlc) - This code needs to change, but even if left alone could be more
// efficient with these tmp subs.
sub := &subscription{client: c, subject: subj, queue: qn, qw: n}
subs = append(subs, sub)
}
a.mu.RUnlock()
route.sendRouteSubProtos(subs, false, route.importFilter)
sub := subscription{subject: subj, queue: qn, qw: n}
buf = route.addRouteSubOrUnsubProtoToBuf(buf, a.Name, &sub, true)
}
route.mu.Unlock()
route.Debugf("Sent local subscriptions to route")
}
// Decide if we call above function in go routine or in place.
if eSize > sendRouteSubsInGoRoutineThreshold {
s.startGoRoutine(func() {
sendSubs(accs)
s.grWG.Done()
})
} else {
sendSubs(accs)
a.mu.RUnlock()
}
route.enqueueProto(buf)
route.mu.Unlock()
route.Debugf("Sent local subscriptions to route")
}
// Sends SUBs protocols for the given subscriptions. If a filter is specified, it is
@@ -1240,46 +1259,10 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra
}
as := len(buf)
// If we have an origin cluster and the other side supports leafnode origin clusters
// send an LS+/LS- version instead.
if len(sub.origin) > 0 && c.route.lnoc {
if isSubProto {
buf = append(buf, lSubBytes...)
buf = append(buf, sub.origin...)
} else {
buf = append(buf, lUnsubBytes...)
}
buf = append(buf, ' ')
} else {
if isSubProto {
buf = append(buf, rSubBytes...)
} else {
buf = append(buf, rUnsubBytes...)
}
}
buf = append(buf, accName...)
buf = append(buf, ' ')
buf = append(buf, sub.subject...)
if len(sub.queue) > 0 {
buf = append(buf, ' ')
buf = append(buf, sub.queue...)
// Send our weight if we are a sub proto
if isSubProto {
buf = append(buf, ' ')
var b [12]byte
var i = len(b)
for l := sub.qw; l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
buf = append(buf, b[i:]...)
}
}
buf = c.addRouteSubOrUnsubProtoToBuf(buf, accName, sub, isSubProto)
if trace {
c.traceOutOp("", buf[as:])
c.traceOutOp("", buf[as:len(buf)-LEN_CR_LF])
}
buf = append(buf, CR_LF...)
}
c.enqueueProto(buf)

View File

@@ -1048,14 +1048,6 @@ func (s *Server) setSystemAccount(acc *Account) error {
if acc.imports.services == nil {
acc.imports.services = make(map[string]*serviceImport)
}
// Create a dummy internal client for fast lookup for
// randomClient used in route xfer of subs. Also will
// be stable with account.
if acc.ic == nil {
acc.ic = s.createInternalAccountClient()
acc.ic.acc = acc
}
acc.mu.Unlock()
s.sys = &internal{

View File

@@ -82,8 +82,11 @@ func TestNoRaceRouteSendSubs(t *testing.T) {
// total number of subscriptions per server
totalPerServer := 100000
for i := 0; i < totalPerServer; i++ {
proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2)
for i := 0; i < totalPerServer/2; i++ {
proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2+1)
clientASend(proto)
clientBSend(proto)
proto = fmt.Sprintf("SUB bar.%d queue.%d %d\r\n", i, i, i*2+2)
clientASend(proto)
clientBSend(proto)
}