From 42dcdd2eb2e5bb0c0efdeef433bed6424ab54bf6 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 19 Jan 2021 14:01:43 -0700 Subject: [PATCH] Simplify sendSubsToRoute() Since we were creating subs on the fly, sub.im would always be nil. We were passing a client because it was needed in sendRouteSubOrUnSubProtos(). This PR simply fills the buffer with each account's subscriptions. There is also no need to have subs sent from different go routine based on some threshold. Routes are no longer subject to max pending. Some code has been made into a function so that they can be shared by sendSubsToRoute() and sendRouteSubOrUnSubProtos(). The function is simply adding to given buffer the RS+/- protocol. Signed-off-by: Ivan Kozlovic --- server/accounts.go | 13 ---- server/route.go | 165 ++++++++++++++++++++------------------------ server/server.go | 8 --- test/norace_test.go | 7 +- 4 files changed, 79 insertions(+), 114 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 89429b8b..880bb1fb 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -876,19 +876,6 @@ func (a *Account) removeClient(c *client) int { return n } -func (a *Account) randomClient() *client { - if a.ic != nil { - return a.ic - } - var c *client - for c = range a.clients { - if c.acc == a { - break - } - } - return c -} - // AddServiceExport will configure the account with the defined export. func (a *Account) AddServiceExport(subject string, accounts []*Account) error { return a.AddServiceExportWithResponse(subject, Singleton, accounts) diff --git a/server/route.go b/server/route.go index 42873021..589fe92d 100644 --- a/server/route.go +++ b/server/route.go @@ -107,10 +107,6 @@ const ( ) const ( - // Used to decide if the sending of the route SUBs list should be - // done in place or in separate go routine. - sendRouteSubsInGoRoutineThreshold = 1024 * 1024 // 1MB - // Warning when user configures cluster TLS insecure clusterTLSInsecureWarning = "TLS certificate chain and hostname of solicited routes will not be verified. DO NOT USE IN PRODUCTION!" ) @@ -1113,6 +1109,46 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { return nil } +func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *subscription, isSubProto bool) []byte { + // If we have an origin cluster and the other side supports leafnode origin clusters + // send an LS+/LS- version instead. + if len(sub.origin) > 0 && c.route.lnoc { + if isSubProto { + buf = append(buf, lSubBytes...) + buf = append(buf, sub.origin...) + } else { + buf = append(buf, lUnsubBytes...) + } + buf = append(buf, ' ') + } else { + if isSubProto { + buf = append(buf, rSubBytes...) + } else { + buf = append(buf, rUnsubBytes...) + } + } + buf = append(buf, accName...) + buf = append(buf, ' ') + buf = append(buf, sub.subject...) + if len(sub.queue) > 0 { + buf = append(buf, ' ') + buf = append(buf, sub.queue...) + // Send our weight if we are a sub proto + if isSubProto { + buf = append(buf, ' ') + var b [12]byte + var i = len(b) + for l := sub.qw; l > 0; l /= 10 { + i-- + b[i] = digits[l%10] + } + buf = append(buf, b[i:]...) + } + } + buf = append(buf, CR_LF...) + return buf +} + // sendSubsToRoute will send over our subject interest to // the remote side. For each account we will send the // complete interest for all subjects, both normal as a binary @@ -1128,65 +1164,48 @@ func (s *Server) sendSubsToRoute(route *client) { a := v.(*Account) accs = append(accs, a) a.mu.RLock() - // Proto looks like: "RS+ [ ]\r\n" - // If we wanted to have better estimates (or even accurate), we would - // collect the subs here instead of capturing the accounts and then - // later going over each account. - eSize += len(a.rm) * (4 + len(a.Name) + 256) + if ns := len(a.rm); ns > 0 { + // Proto looks like: "RS+ [ ]\r\n" + eSize += ns * (len(rSubBytes) + len(a.Name) + 1 + 2) + for key := range a.rm { + // Key contains "[ ]" + eSize += len(key) + // In case this is a queue, just add some bytes for the queue weight. + // If we want to be accurate, would have to check if "key" has a space, + // if so, then figure out how many bytes we need to represent the weight. + eSize += 5 + } + } a.mu.RUnlock() return true }) s.mu.Unlock() - sendSubs := func(accs []*Account) { - var raw [32]*subscription + buf := make([]byte, 0, eSize) - route.mu.Lock() - for _, a := range accs { - subs := raw[:0] - - a.mu.RLock() - c := a.randomClient() - if c == nil { - nsubs := len(a.rm) - accName := a.Name - a.mu.RUnlock() - if nsubs > 0 { - route.Warnf("Ignoring account %q with %d subs, no clients", accName, nsubs) - } + route.mu.Lock() + for _, a := range accs { + a.mu.RLock() + for key, n := range a.rm { + var subj, qn []byte + s := strings.Split(key, " ") + subj = []byte(s[0]) + if len(s) > 1 { + qn = []byte(s[1]) + } + // s[0] is the subject and already as a string, so use that + // instead of converting back `subj` to a string. + if !route.canImport(s[0]) { continue } - for key, n := range a.rm { - // FIXME(dlc) - Just pass rme around. - // Construct a sub on the fly. We need to place - // a client (or im) to properly set the account. - var subj, qn []byte - s := strings.Split(key, " ") - subj = []byte(s[0]) - if len(s) > 1 { - qn = []byte(s[1]) - } - // TODO(dlc) - This code needs to change, but even if left alone could be more - // efficient with these tmp subs. - sub := &subscription{client: c, subject: subj, queue: qn, qw: n} - subs = append(subs, sub) - } - a.mu.RUnlock() - - route.sendRouteSubProtos(subs, false, route.importFilter) + sub := subscription{subject: subj, queue: qn, qw: n} + buf = route.addRouteSubOrUnsubProtoToBuf(buf, a.Name, &sub, true) } - route.mu.Unlock() - route.Debugf("Sent local subscriptions to route") - } - // Decide if we call above function in go routine or in place. - if eSize > sendRouteSubsInGoRoutineThreshold { - s.startGoRoutine(func() { - sendSubs(accs) - s.grWG.Done() - }) - } else { - sendSubs(accs) + a.mu.RUnlock() } + route.enqueueProto(buf) + route.mu.Unlock() + route.Debugf("Sent local subscriptions to route") } // Sends SUBs protocols for the given subscriptions. If a filter is specified, it is @@ -1240,46 +1259,10 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra } as := len(buf) - - // If we have an origin cluster and the other side supports leafnode origin clusters - // send an LS+/LS- version instead. - if len(sub.origin) > 0 && c.route.lnoc { - if isSubProto { - buf = append(buf, lSubBytes...) - buf = append(buf, sub.origin...) - } else { - buf = append(buf, lUnsubBytes...) - } - buf = append(buf, ' ') - } else { - if isSubProto { - buf = append(buf, rSubBytes...) - } else { - buf = append(buf, rUnsubBytes...) - } - } - buf = append(buf, accName...) - buf = append(buf, ' ') - buf = append(buf, sub.subject...) - if len(sub.queue) > 0 { - buf = append(buf, ' ') - buf = append(buf, sub.queue...) - // Send our weight if we are a sub proto - if isSubProto { - buf = append(buf, ' ') - var b [12]byte - var i = len(b) - for l := sub.qw; l > 0; l /= 10 { - i-- - b[i] = digits[l%10] - } - buf = append(buf, b[i:]...) - } - } + buf = c.addRouteSubOrUnsubProtoToBuf(buf, accName, sub, isSubProto) if trace { - c.traceOutOp("", buf[as:]) + c.traceOutOp("", buf[as:len(buf)-LEN_CR_LF]) } - buf = append(buf, CR_LF...) } c.enqueueProto(buf) diff --git a/server/server.go b/server/server.go index 1b8ac8dc..e592360c 100644 --- a/server/server.go +++ b/server/server.go @@ -1048,14 +1048,6 @@ func (s *Server) setSystemAccount(acc *Account) error { if acc.imports.services == nil { acc.imports.services = make(map[string]*serviceImport) } - - // Create a dummy internal client for fast lookup for - // randomClient used in route xfer of subs. Also will - // be stable with account. - if acc.ic == nil { - acc.ic = s.createInternalAccountClient() - acc.ic.acc = acc - } acc.mu.Unlock() s.sys = &internal{ diff --git a/test/norace_test.go b/test/norace_test.go index 32b734b4..5988bd58 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -82,8 +82,11 @@ func TestNoRaceRouteSendSubs(t *testing.T) { // total number of subscriptions per server totalPerServer := 100000 - for i := 0; i < totalPerServer; i++ { - proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2) + for i := 0; i < totalPerServer/2; i++ { + proto := fmt.Sprintf("SUB foo.%d %d\r\n", i, i*2+1) + clientASend(proto) + clientBSend(proto) + proto = fmt.Sprintf("SUB bar.%d queue.%d %d\r\n", i, i, i*2+2) clientASend(proto) clientBSend(proto) }