diff --git a/server/accounts.go b/server/accounts.go index 89429b8b..880bb1fb 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -876,19 +876,6 @@ func (a *Account) removeClient(c *client) int { return n } -func (a *Account) randomClient() *client { - if a.ic != nil { - return a.ic - } - var c *client - for c = range a.clients { - if c.acc == a { - break - } - } - return c -} - // AddServiceExport will configure the account with the defined export. func (a *Account) AddServiceExport(subject string, accounts []*Account) error { return a.AddServiceExportWithResponse(subject, Singleton, accounts) diff --git a/server/route.go b/server/route.go index 42873021..589fe92d 100644 --- a/server/route.go +++ b/server/route.go @@ -107,10 +107,6 @@ const ( ) const ( - // Used to decide if the sending of the route SUBs list should be - // done in place or in separate go routine. - sendRouteSubsInGoRoutineThreshold = 1024 * 1024 // 1MB - // Warning when user configures cluster TLS insecure clusterTLSInsecureWarning = "TLS certificate chain and hostname of solicited routes will not be verified. DO NOT USE IN PRODUCTION!" ) @@ -1113,6 +1109,46 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { return nil } +func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *subscription, isSubProto bool) []byte { + // If we have an origin cluster and the other side supports leafnode origin clusters + // send an LS+/LS- version instead. + if len(sub.origin) > 0 && c.route.lnoc { + if isSubProto { + buf = append(buf, lSubBytes...) + buf = append(buf, sub.origin...) + } else { + buf = append(buf, lUnsubBytes...) + } + buf = append(buf, ' ') + } else { + if isSubProto { + buf = append(buf, rSubBytes...) + } else { + buf = append(buf, rUnsubBytes...) + } + } + buf = append(buf, accName...) + buf = append(buf, ' ') + buf = append(buf, sub.subject...) + if len(sub.queue) > 0 { + buf = append(buf, ' ') + buf = append(buf, sub.queue...) + // Send our weight if we are a sub proto + if isSubProto { + buf = append(buf, ' ') + var b [12]byte + var i = len(b) + for l := sub.qw; l > 0; l /= 10 { + i-- + b[i] = digits[l%10] + } + buf = append(buf, b[i:]...) + } + } + buf = append(buf, CR_LF...) + return buf +} + // sendSubsToRoute will send over our subject interest to // the remote side. For each account we will send the // complete interest for all subjects, both normal as a binary @@ -1128,65 +1164,48 @@ func (s *Server) sendSubsToRoute(route *client) { a := v.(*Account) accs = append(accs, a) a.mu.RLock() - // Proto looks like: "RS+ [ ]\r\n" - // If we wanted to have better estimates (or even accurate), we would - // collect the subs here instead of capturing the accounts and then - // later going over each account. - eSize += len(a.rm) * (4 + len(a.Name) + 256) + if ns := len(a.rm); ns > 0 { + // Proto looks like: "RS+ [ ]\r\n" + eSize += ns * (len(rSubBytes) + len(a.Name) + 1 + 2) + for key := range a.rm { + // Key contains "[ ]" + eSize += len(key) + // In case this is a queue, just add some bytes for the queue weight. + // If we want to be accurate, would have to check if "key" has a space, + // if so, then figure out how many bytes we need to represent the weight. + eSize += 5 + } + } a.mu.RUnlock() return true }) s.mu.Unlock() - sendSubs := func(accs []*Account) { - var raw [32]*subscription + buf := make([]byte, 0, eSize) - route.mu.Lock() - for _, a := range accs { - subs := raw[:0] - - a.mu.RLock() - c := a.randomClient() - if c == nil { - nsubs := len(a.rm) - accName := a.Name - a.mu.RUnlock() - if nsubs > 0 { - route.Warnf("Ignoring account %q with %d subs, no clients", accName, nsubs) - } + route.mu.Lock() + for _, a := range accs { + a.mu.RLock() + for key, n := range a.rm { + var subj, qn []byte + s := strings.Split(key, " ") + subj = []byte(s[0]) + if len(s) > 1 { + qn = []byte(s[1]) + } + // s[0] is the subject and already as a string, so use that + // instead of converting back `subj` to a string. + if !route.canImport(s[0]) { continue } - for key, n := range a.rm { - // FIXME(dlc) - Just pass rme around. - // Construct a sub on the fly. We need to place - // a client (or im) to properly set the account. - var subj, qn []byte - s := strings.Split(key, " ") - subj = []byte(s[0]) - if len(s) > 1 { - qn = []byte(s[1]) - } - // TODO(dlc) - This code needs to change, but even if left alone could be more - // efficient with these tmp subs. - sub := &subscription{client: c, subject: subj, queue: qn, qw: n} - subs = append(subs, sub) - } - a.mu.RUnlock() - - route.sendRouteSubProtos(subs, false, route.importFilter) + sub := subscription{subject: subj, queue: qn, qw: n} + buf = route.addRouteSubOrUnsubProtoToBuf(buf, a.Name, &sub, true) } - route.mu.Unlock() - route.Debugf("Sent local subscriptions to route") - } - // Decide if we call above function in go routine or in place. - if eSize > sendRouteSubsInGoRoutineThreshold { - s.startGoRoutine(func() { - sendSubs(accs) - s.grWG.Done() - }) - } else { - sendSubs(accs) + a.mu.RUnlock() } + route.enqueueProto(buf) + route.mu.Unlock() + route.Debugf("Sent local subscriptions to route") } // Sends SUBs protocols for the given subscriptions. If a filter is specified, it is @@ -1240,46 +1259,10 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra } as := len(buf) - - // If we have an origin cluster and the other side supports leafnode origin clusters - // send an LS+/LS- version instead. - if len(sub.origin) > 0 && c.route.lnoc { - if isSubProto { - buf = append(buf, lSubBytes...) - buf = append(buf, sub.origin...) - } else { - buf = append(buf, lUnsubBytes...) - } - buf = append(buf, ' ') - } else { - if isSubProto { - buf = append(buf, rSubBytes...) - } else { - buf = append(buf, rUnsubBytes...) - } - } - buf = append(buf, accName...) - buf = append(buf, ' ') - buf = append(buf, sub.subject...) - if len(sub.queue) > 0 { - buf = append(buf, ' ') - buf = append(buf, sub.queue...) - // Send our weight if we are a sub proto - if isSubProto { - buf = append(buf, ' ') - var b [12]byte - var i = len(b) - for l := sub.qw; l > 0; l /= 10 { - i-- - b[i] = digits[l%10] - } - buf = append(buf, b[i:]...) - } - } + buf = c.addRouteSubOrUnsubProtoToBuf(buf, accName, sub, isSubProto) if trace { - c.traceOutOp("", buf[as:]) + c.traceOutOp("", buf[as:len(buf)-LEN_CR_LF]) } - buf = append(buf, CR_LF...) } c.enqueueProto(buf) diff --git a/server/server.go b/server/server.go index 1b8ac8dc..e592360c 100644 --- a/server/server.go +++ b/server/server.go @@ -1048,14 +1048,6 @@ func (s *Server) setSystemAccount(acc *Account) error { if acc.imports.services == nil { acc.imports.services = make(map[string]*serviceImport) } - - // Create a dummy internal client for fast lookup for - // randomClient used in route xfer of subs. Also will - // be stable with account. - if acc.ic == nil { - acc.ic = s.createInternalAccountClient() - acc.ic.acc = acc - } acc.mu.Unlock() s.sys = &internal{ diff --git a/test/norace_test.go b/test/norace_test.go index 32b734b4..5988bd58 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -82,8 +82,11 @@ func TestNoRaceRouteSendSubs(t *testing.T) { // total number of subscriptions per server totalPerServer := 100000 - for i := 0; i < totalPerServer; i++ { - proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2) + for i := 0; i < totalPerServer/2; i++ { + proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2+1) + clientASend(proto) + clientBSend(proto) + proto = fmt.Sprintf("SUB bar.%d queue.%d %d\r\n", i, i, i*2+2) clientASend(proto) clientBSend(proto) }