Merge pull request #1825 from nats-io/route_send_subs

Simplify sendSubsToRoute()
This commit is contained in:
Ivan Kozlovic
2021-01-20 10:17:36 -07:00
committed by GitHub
4 changed files with 79 additions and 114 deletions

View File

@@ -876,19 +876,6 @@ func (a *Account) removeClient(c *client) int {
return n
}
func (a *Account) randomClient() *client {
if a.ic != nil {
return a.ic
}
var c *client
for c = range a.clients {
if c.acc == a {
break
}
}
return c
}
// AddServiceExport will configure the account with the defined export.
func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
return a.AddServiceExportWithResponse(subject, Singleton, accounts)

View File

@@ -107,10 +107,6 @@ const (
)
const (
// Used to decide if the sending of the route SUBs list should be
// done in place or in separate go routine.
sendRouteSubsInGoRoutineThreshold = 1024 * 1024 // 1MB
// Warning when user configures cluster TLS insecure
clusterTLSInsecureWarning = "TLS certificate chain and hostname of solicited routes will not be verified. DO NOT USE IN PRODUCTION!"
)
@@ -1113,6 +1109,46 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
return nil
}
func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *subscription, isSubProto bool) []byte {
// If we have an origin cluster and the other side supports leafnode origin clusters
// send an LS+/LS- version instead.
if len(sub.origin) > 0 && c.route.lnoc {
if isSubProto {
buf = append(buf, lSubBytes...)
buf = append(buf, sub.origin...)
} else {
buf = append(buf, lUnsubBytes...)
}
buf = append(buf, ' ')
} else {
if isSubProto {
buf = append(buf, rSubBytes...)
} else {
buf = append(buf, rUnsubBytes...)
}
}
buf = append(buf, accName...)
buf = append(buf, ' ')
buf = append(buf, sub.subject...)
if len(sub.queue) > 0 {
buf = append(buf, ' ')
buf = append(buf, sub.queue...)
// Send our weight if we are a sub proto
if isSubProto {
buf = append(buf, ' ')
var b [12]byte
var i = len(b)
for l := sub.qw; l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
buf = append(buf, b[i:]...)
}
}
buf = append(buf, CR_LF...)
return buf
}
// sendSubsToRoute will send over our subject interest to
// the remote side. For each account we will send the
// complete interest for all subjects, both normal as a binary
@@ -1128,65 +1164,48 @@ func (s *Server) sendSubsToRoute(route *client) {
a := v.(*Account)
accs = append(accs, a)
a.mu.RLock()
// Proto looks like: "RS+ <account name> <subject>[ <queue weight>]\r\n"
// If we wanted to have better estimates (or even accurate), we would
// collect the subs here instead of capturing the accounts and then
// later going over each account.
eSize += len(a.rm) * (4 + len(a.Name) + 256)
if ns := len(a.rm); ns > 0 {
// Proto looks like: "RS+ <account name> <subject>[ <queue> <weight>]\r\n"
eSize += ns * (len(rSubBytes) + len(a.Name) + 1 + 2)
for key := range a.rm {
// Key contains "<subject>[ <queue>]"
eSize += len(key)
// In case this is a queue, just add some bytes for the queue weight.
// If we want to be accurate, would have to check if "key" has a space,
// if so, then figure out how many bytes we need to represent the weight.
eSize += 5
}
}
a.mu.RUnlock()
return true
})
s.mu.Unlock()
sendSubs := func(accs []*Account) {
var raw [32]*subscription
buf := make([]byte, 0, eSize)
route.mu.Lock()
for _, a := range accs {
subs := raw[:0]
a.mu.RLock()
c := a.randomClient()
if c == nil {
nsubs := len(a.rm)
accName := a.Name
a.mu.RUnlock()
if nsubs > 0 {
route.Warnf("Ignoring account %q with %d subs, no clients", accName, nsubs)
}
route.mu.Lock()
for _, a := range accs {
a.mu.RLock()
for key, n := range a.rm {
var subj, qn []byte
s := strings.Split(key, " ")
subj = []byte(s[0])
if len(s) > 1 {
qn = []byte(s[1])
}
// s[0] is the subject and already as a string, so use that
// instead of converting back `subj` to a string.
if !route.canImport(s[0]) {
continue
}
for key, n := range a.rm {
// FIXME(dlc) - Just pass rme around.
// Construct a sub on the fly. We need to place
// a client (or im) to properly set the account.
var subj, qn []byte
s := strings.Split(key, " ")
subj = []byte(s[0])
if len(s) > 1 {
qn = []byte(s[1])
}
// TODO(dlc) - This code needs to change, but even if left alone could be more
// efficient with these tmp subs.
sub := &subscription{client: c, subject: subj, queue: qn, qw: n}
subs = append(subs, sub)
}
a.mu.RUnlock()
route.sendRouteSubProtos(subs, false, route.importFilter)
sub := subscription{subject: subj, queue: qn, qw: n}
buf = route.addRouteSubOrUnsubProtoToBuf(buf, a.Name, &sub, true)
}
route.mu.Unlock()
route.Debugf("Sent local subscriptions to route")
}
// Decide if we call above function in go routine or in place.
if eSize > sendRouteSubsInGoRoutineThreshold {
s.startGoRoutine(func() {
sendSubs(accs)
s.grWG.Done()
})
} else {
sendSubs(accs)
a.mu.RUnlock()
}
route.enqueueProto(buf)
route.mu.Unlock()
route.Debugf("Sent local subscriptions to route")
}
// Sends SUBs protocols for the given subscriptions. If a filter is specified, it is
@@ -1240,46 +1259,10 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra
}
as := len(buf)
// If we have an origin cluster and the other side supports leafnode origin clusters
// send an LS+/LS- version instead.
if len(sub.origin) > 0 && c.route.lnoc {
if isSubProto {
buf = append(buf, lSubBytes...)
buf = append(buf, sub.origin...)
} else {
buf = append(buf, lUnsubBytes...)
}
buf = append(buf, ' ')
} else {
if isSubProto {
buf = append(buf, rSubBytes...)
} else {
buf = append(buf, rUnsubBytes...)
}
}
buf = append(buf, accName...)
buf = append(buf, ' ')
buf = append(buf, sub.subject...)
if len(sub.queue) > 0 {
buf = append(buf, ' ')
buf = append(buf, sub.queue...)
// Send our weight if we are a sub proto
if isSubProto {
buf = append(buf, ' ')
var b [12]byte
var i = len(b)
for l := sub.qw; l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
buf = append(buf, b[i:]...)
}
}
buf = c.addRouteSubOrUnsubProtoToBuf(buf, accName, sub, isSubProto)
if trace {
c.traceOutOp("", buf[as:])
c.traceOutOp("", buf[as:len(buf)-LEN_CR_LF])
}
buf = append(buf, CR_LF...)
}
c.enqueueProto(buf)

View File

@@ -1048,14 +1048,6 @@ func (s *Server) setSystemAccount(acc *Account) error {
if acc.imports.services == nil {
acc.imports.services = make(map[string]*serviceImport)
}
// Create a dummy internal client for fast lookup for
// randomClient used in route xfer of subs. Also will
// be stable with account.
if acc.ic == nil {
acc.ic = s.createInternalAccountClient()
acc.ic.acc = acc
}
acc.mu.Unlock()
s.sys = &internal{

View File

@@ -82,8 +82,11 @@ func TestNoRaceRouteSendSubs(t *testing.T) {
// total number of subscriptions per server
totalPerServer := 100000
for i := 0; i < totalPerServer; i++ {
proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2)
for i := 0; i < totalPerServer/2; i++ {
proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2+1)
clientASend(proto)
clientBSend(proto)
proto = fmt.Sprintf("SUB bar.%d queue.%d %d\r\n", i, i, i*2+2)
clientASend(proto)
clientBSend(proto)
}