diff --git a/server/accounts.go b/server/accounts.go index e177597e..c161befa 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -189,6 +189,60 @@ func (a *Account) shallowCopy() *Account { return na } +// Called to track a remote server and connections and leafnodes it +// has for this account. +func (a *Account) updateRemoteServer(m *AccountNumConns) { + a.mu.Lock() + if a.strack == nil { + a.strack = make(map[string]sconns) + } + // This does not depend on receiving all updates since each one is idempotent. + // FIXME(dlc) - We should cleanup when these both go to zero. + prev := a.strack[m.Server.ID] + a.strack[m.Server.ID] = sconns{conns: int32(m.Conns), leafs: int32(m.LeafNodes)} + a.nrclients += int32(m.Conns) - prev.conns + a.nrleafs += int32(m.LeafNodes) - prev.leafs + a.mu.Unlock() +} + +// Removes tracking for a remote server that has shutdown. +func (a *Account) removeRemoteServer(sid string) { + a.mu.Lock() + if a.strack != nil { + prev := a.strack[sid] + delete(a.strack, sid) + a.nrclients -= prev.conns + a.nrleafs -= prev.leafs + } + a.mu.Unlock() +} + +// When querying for subject interest this is the number of +// expected responses. We need to actually check that the entry +// has active connections. +func (a *Account) expectedRemoteResponses() (expected int32) { + a.mu.RLock() + for _, sc := range a.strack { + if sc.conns > 0 || sc.leafs > 0 { + expected++ + } + } + a.mu.RUnlock() + return +} + +// Clears eventing and tracking for this account. +func (a *Account) clearEventing() { + a.mu.Lock() + a.nrclients = 0 + // Now clear state + clearTimer(&a.etmr) + clearTimer(&a.ctmr) + a.clients = nil + a.strack = nil + a.mu.Unlock() +} + // NumConnections returns active number of clients for this account for // all known servers. func (a *Account) NumConnections() int { @@ -198,6 +252,15 @@ func (a *Account) NumConnections() int { return nc } +// NumRemoteConnections returns the number of client or leaf connections that +// are not on this server. +func (a *Account) NumRemoteConnections() int { + a.mu.RLock() + nc := int(a.nrclients + a.nrleafs) + a.mu.RUnlock() + return nc +} + // NumLocalConnections returns active number of clients for this account // on this server. func (a *Account) NumLocalConnections() int { @@ -212,6 +275,15 @@ func (a *Account) numLocalConnections() int { return len(a.clients) - int(a.sysclients) - int(a.nleafs) } +// This is for extended local interest. +// Lock should not be held. +func (a *Account) numLocalAndLeafConnections() int { + a.mu.RLock() + nlc := len(a.clients) - int(a.sysclients) + a.mu.RUnlock() + return nlc +} + func (a *Account) numLocalLeafNodes() int { return int(a.nleafs) } @@ -613,8 +685,9 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c // numServiceRoutes returns the number of service routes on this account. func (a *Account) numServiceRoutes() int { a.mu.RLock() - defer a.mu.RUnlock() - return len(a.imports.services) + num := len(a.imports.services) + a.mu.RUnlock() + return num } // AddServiceImportWithClaim will add in the service import via the jwt claim. diff --git a/server/auth.go b/server/auth.go index 2d8f8ea1..f570681f 100644 --- a/server/auth.go +++ b/server/auth.go @@ -460,6 +460,8 @@ func (s *Server) isClientAuthorized(c *client) bool { // for pub/sub authorizations. if ok { c.RegisterUser(user) + // Generate an event if we have a system account and this is not the $G account. + s.accountConnectEvent(c) } return ok } @@ -581,13 +583,14 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool { // Grab under lock but process after. var ( - juc *jwt.UserClaims - acc *Account - err error + juc *jwt.UserClaims + acc *Account + user *User + ok bool + err error ) s.mu.Lock() - // Check if we have trustedKeys defined in the server. If so we require a user jwt. if s.trustedKeys != nil { if c.opts.JWT == "" { @@ -609,6 +612,14 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool { c.Debugf("User JWT no longer valid: %+v", vr) return false } + } else if s.users != nil { + if c.opts.Username != "" { + user, ok = s.users[c.opts.Username] + if !ok { + s.mu.Unlock() + return false + } + } } s.mu.Unlock() @@ -672,6 +683,18 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool { return true } + if user != nil { + ok = comparePasswords(user.Password, c.opts.Password) + // If we are authorized, register the user which will properly setup any permissions + // for pub/sub authorizations. + if ok { + c.RegisterUser(user) + // Generate an event if we have a system account and this is not the $G account. + s.accountConnectEvent(c) + } + return ok + } + // FIXME(dlc) - Add ability to support remote account bindings via // other auth like user or nkey and tlsMapping. diff --git a/server/client.go b/server/client.go index fb317957..eddfb479 100644 --- a/server/client.go +++ b/server/client.go @@ -1704,7 +1704,7 @@ func splitArg(arg []byte) [][]byte { return args } -func (c *client) processSub(argo []byte) (err error) { +func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) { c.traceInOp("SUB", argo) // Indicate activity. @@ -1726,7 +1726,7 @@ func (c *client) processSub(argo []byte) (err error) { sub.queue = args[1] sub.sid = args[2] default: - return fmt.Errorf("processSub Parse Error: '%s'", arg) + return nil, fmt.Errorf("processSub Parse Error: '%s'", arg) } c.mu.Lock() @@ -1740,7 +1740,7 @@ func (c *client) processSub(argo []byte) (err error) { if c.nc == nil && kind != SYSTEM { c.mu.Unlock() - return nil + return sub, nil } // Check permissions if applicable. @@ -1749,17 +1749,18 @@ func (c *client) processSub(argo []byte) (err error) { c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) c.Errorf("Subscription Violation - %s, Subject %q, SID %s", c.getAuthUser(), sub.subject, sub.sid) - return nil + return nil, nil } // Check if we have a maximum on the number of subscriptions. if c.subsAtLimit() { c.mu.Unlock() c.maxSubsExceeded() - return nil + return nil, nil } - updateGWs := false + var updateGWs bool + var err error // Subscribe here. if c.subs[sid] == nil { @@ -1778,19 +1779,24 @@ func (c *client) processSub(argo []byte) (err error) { if err != nil { c.sendErr("Invalid Subject") - return nil + return nil, nil } else if c.opts.Verbose && kind != SYSTEM { c.sendOK() } // No account just return. if acc == nil { - return nil + return sub, nil } if err := c.addShadowSubscriptions(acc, sub); err != nil { c.Errorf(err.Error()) } + + if noForward { + return sub, nil + } + // If we are routing and this is a local sub, add to the route map for the associated account. if kind == CLIENT || kind == SYSTEM { srv.updateRouteSubscriptionMap(acc, sub, 1) @@ -1800,7 +1806,7 @@ func (c *client) processSub(argo []byte) (err error) { } // Now check on leafnode updates. srv.updateLeafNodes(acc, sub, 1) - return nil + return sub, nil } // If the client's account has stream imports and there are matches for @@ -2204,7 +2210,7 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool { if client.kind == SYSTEM { s := client.srv client.mu.Unlock() - s.deliverInternalMsg(sub, subject, c.pa.reply, msg[:msgSize]) + s.deliverInternalMsg(sub, c, subject, c.pa.reply, msg[:msgSize]) return true } @@ -2389,7 +2395,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool { } // If we are currently not allowed but we are tracking reply subjects - // dynamically, check to see if we are allowed here Avoid pcache. + // dynamically, check to see if we are allowed here but avoid pcache. // We need to acquire the lock though. if !allowed && fullCheck && c.perms.resp != nil { c.mu.Lock() @@ -2516,7 +2522,7 @@ func (c *client) processInboundClientMsg(msg []byte) { // If we have an exported service and we are doing remote tracking, check this subject // to see if we need to report the latency. - if c.acc.exports.services != nil && c.rrTracking != nil { + if c.rrTracking != nil { c.mu.Lock() rl := c.rrTracking[string(c.pa.subject)] if rl != nil { diff --git a/server/errors.go b/server/errors.go index eb3be9e4..8d714b2e 100644 --- a/server/errors.go +++ b/server/errors.go @@ -124,6 +124,9 @@ var ( // ErrRevocation is returned when a credential has been revoked. ErrRevocation = errors.New("credentials have been revoked") + + // Used to signal an error that a server is not running. + ErrServerNotRunning = errors.New("server is not running") ) // configErr is a configuration error. diff --git a/server/events.go b/server/events.go index 968e342a..49705270 100644 --- a/server/events.go +++ b/server/events.go @@ -18,13 +18,13 @@ import ( "crypto/sha256" "encoding/json" "fmt" + "math/rand" "strconv" "strings" "sync" "sync/atomic" "time" - "github.com/nats-io/jwt" "github.com/nats-io/nats-server/v2/server/pse" ) @@ -42,33 +42,42 @@ const ( serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s" + inboxRespSubj = "$SYS._INBOX.%s.%s" + + // FIXME(dlc) - Should account scope, even with wc for now, but later on + // we can then shard as needed. + accNumSubsReqSubj = "$SYS.REQ.ACCOUNT.NSUBS" + + // These are for exported debug services. These are local to this server only. + accSubsSubj = "$SYS.DEBUG.SUBSCRIBERS" shutdownEventTokens = 4 serverSubjectIndex = 2 accUpdateTokens = 5 accUpdateAccIndex = 2 - defaultEventsHBItvl = 30 * time.Second ) // FIXME(dlc) - make configurable. -var eventsHBInterval = defaultEventsHBItvl +var eventsHBInterval = 30 * time.Second // Used to send and receive messages from inside the server. type internal struct { - account *Account - client *client - seq uint64 - sid uint64 - servers map[string]*serverUpdate - sweeper *time.Timer - stmr *time.Timer - subs map[string]msgHandler - sendq chan *pubMsg - wg sync.WaitGroup - orphMax time.Duration - chkOrph time.Duration - statsz time.Duration - shash string + account *Account + client *client + seq uint64 + sid uint64 + servers map[string]*serverUpdate + sweeper *time.Timer + stmr *time.Timer + subs map[string]msgHandler + replies map[string]msgHandler + sendq chan *pubMsg + wg sync.WaitGroup + orphMax time.Duration + chkOrph time.Duration + statsz time.Duration + shash string + inboxPre string } // ServerStatsMsg is sent periodically with stats updates. @@ -280,6 +289,7 @@ func (s *Server) sendShutdownEvent() { s.sys.sendq = nil // Unhook all msgHandlers. Normal client cleanup will deal with subs, etc. s.sys.subs = nil + s.sys.replies = nil s.mu.Unlock() // Send to the internal queue and mark as last. sendq <- &pubMsg{nil, subj, _EMPTY_, nil, nil, true} @@ -332,8 +342,9 @@ func (s *Server) eventsRunning() bool { // a defined system account. func (s *Server) EventsEnabled() bool { s.mu.Lock() - defer s.mu.Unlock() - return s.eventsEnabled() + ee := s.eventsEnabled() + s.mu.Unlock() + return ee } // eventsEnabled will report if events are enabled. @@ -342,6 +353,18 @@ func (s *Server) eventsEnabled() bool { return s.sys != nil && s.sys.client != nil && s.sys.account != nil } +// TrackedRemoteServers returns how many remote servers we are tracking +// from a system events perspective. +func (s *Server) TrackedRemoteServers() int { + s.mu.Lock() + if !s.running || !s.eventsEnabled() { + return -1 + } + ns := len(s.sys.servers) + s.mu.Unlock() + return ns +} + // Check for orphan servers who may have gone away without notification. // This should be wrapChk() to setup common locking. func (s *Server) checkRemoteServers() { @@ -479,7 +502,14 @@ func (s *Server) initEventTracking() { sha.Write([]byte(s.info.ID)) s.sys.shash = fmt.Sprintf("%x", sha.Sum(nil))[:sysHashLen] - subject := fmt.Sprintf(accConnsEventSubj, "*") + // This will be for all inbox responses. + subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*") + if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } + s.sys.inboxPre = subject + // This is for remote updates for connection accounting. + subject = fmt.Sprintf(accConnsEventSubj, "*") if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } @@ -493,6 +523,10 @@ func (s *Server) initEventTracking() { if _, err := s.sysSubscribe(subject, s.connsRequest); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } + // Listen for broad requests to respond with number of subscriptions for a given subject. + if _, err := s.sysSubscribe(accNumSubsReqSubj, s.nsubsRequest); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } // Listen for all server shutdowns. subject = fmt.Sprintf(shutdownEventSubj, "*") if _, err := s.sysSubscribe(subject, s.remoteServerShutdown); err != nil { @@ -518,15 +552,26 @@ func (s *Server) initEventTracking() { if _, err := s.sysSubscribe(subject, s.leafNodeConnected); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } - // For tracking remote lateny measurements. + // For tracking remote latency measurements. subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash) if _, err := s.sysSubscribe(subject, s.remoteLatencyUpdate); err != nil { s.Errorf("Error setting up internal latency tracking: %v", err) } + + // These are for system account exports for debugging from client applications. + sacc := s.sys.account + + // This is for simple debugging of number of subscribers that exist in the system. + if _, err := s.sysSubscribeInternal(accSubsSubj, s.debugSubscribers); err != nil { + s.Errorf("Error setting up internal debug service for subscribers: %v", err) + } + if err := sacc.AddServiceExport(accSubsSubj, nil); err != nil { + s.Errorf("Error adding system service export for %q: %v", accSubsSubj, err) + } } // accountClaimUpdate will receive claim updates for accounts. -func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, msg []byte) { +func (s *Server) accountClaimUpdate(sub *subscription, _ *client, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { @@ -547,19 +592,13 @@ func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, ms // Lock assume held. func (s *Server) processRemoteServerShutdown(sid string) { s.accounts.Range(func(k, v interface{}) bool { - a := v.(*Account) - a.mu.Lock() - prev := a.strack[sid] - delete(a.strack, sid) - a.nrclients -= prev.conns - a.nrleafs -= prev.leafs - a.mu.Unlock() + v.(*Account).removeRemoteServer(sid) return true }) } // remoteServerShutdownEvent is called when we get an event from another server shutting down. -func (s *Server) remoteServerShutdown(sub *subscription, subject, reply string, msg []byte) { +func (s *Server) remoteServerShutdown(sub *subscription, _ *client, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { @@ -617,15 +656,7 @@ func (s *Server) shutdownEventing() { // Whip through all accounts. s.accounts.Range(func(k, v interface{}) bool { - a := v.(*Account) - a.mu.Lock() - a.nrclients = 0 - // Now clear state - clearTimer(&a.etmr) - clearTimer(&a.ctmr) - a.clients = nil - a.strack = nil - a.mu.Unlock() + v.(*Account).clearEventing() return true }) // Turn everything off here. @@ -633,7 +664,7 @@ func (s *Server) shutdownEventing() { } // Request for our local connection count. -func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) { +func (s *Server) connsRequest(sub *subscription, _ *client, subject, reply string, msg []byte) { if !s.eventsRunning() { return } @@ -655,7 +686,7 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by // leafNodeConnected is an event we will receive when a leaf node for a given account // connects. -func (s *Server) leafNodeConnected(sub *subscription, subject, reply string, msg []byte) { +func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply string, msg []byte) { m := accNumConnsReq{} if err := json.Unmarshal(msg, &m); err != nil { s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) @@ -676,7 +707,7 @@ func (s *Server) leafNodeConnected(sub *subscription, subject, reply string, msg } // statszReq is a request for us to respond with current statz. -func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) { +func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() || reply == _EMPTY_ { @@ -686,7 +717,7 @@ func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) } // remoteConnsUpdate gets called when we receive a remote update from another server. -func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg []byte) { +func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply string, msg []byte) { if !s.eventsRunning() { return } @@ -717,17 +748,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg return } // If we are here we have interest in tracking this account. Update our accounting. - acc.mu.Lock() - if acc.strack == nil { - acc.strack = make(map[string]sconns) - } - // This does not depend on receiving all updates since each one is idempotent. - prev := acc.strack[m.Server.ID] - acc.strack[m.Server.ID] = sconns{conns: int32(m.Conns), leafs: int32(m.LeafNodes)} - acc.nrclients += int32(m.Conns) - prev.conns - acc.nrleafs += int32(m.LeafNodes) - prev.leafs - acc.mu.Unlock() - + acc.updateRemoteServer(&m) s.updateRemoteServer(&m.Server) } @@ -775,12 +796,6 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) { } a.mu.RLock() - // If no limits set, don't update, no need to. - if a.mconns == jwt.NoLimit && a.mleafs == jwt.NoLimit { - a.mu.RUnlock() - return - } - // Build event with account name and number of local clients and leafnodes. m := AccountNumConns{ Account: a.Name, @@ -823,15 +838,20 @@ func (s *Server) accConnsUpdate(a *Account) { // This is a billing event. func (s *Server) accountConnectEvent(c *client) { s.mu.Lock() + gacc := s.gacc if !s.eventsEnabled() { s.mu.Unlock() return } s.mu.Unlock() - subj := fmt.Sprintf(connectEventSubj, c.acc.Name) - c.mu.Lock() + // Ignore global account activity + if c.acc == nil || c.acc == gacc { + c.mu.Unlock() + return + } + m := ConnectEventMsg{ Client: ClientInfo{ Start: c.start, @@ -846,6 +866,7 @@ func (s *Server) accountConnectEvent(c *client) { } c.mu.Unlock() + subj := fmt.Sprintf(connectEventSubj, c.acc.Name) s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m) } @@ -894,7 +915,6 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) c.mu.Unlock() subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name) - s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m) } @@ -940,9 +960,9 @@ func (s *Server) sendAuthErrorEvent(c *client) { // Internal message callback. If the msg is needed past the callback it is // required to be copied. -type msgHandler func(sub *subscription, subject, reply string, msg []byte) +type msgHandler func(sub *subscription, client *client, subject, reply string, msg []byte) -func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byte) { +func (s *Server) deliverInternalMsg(sub *subscription, c *client, subject, reply, msg []byte) { s.mu.Lock() if !s.eventsEnabled() || s.sys.subs == nil { s.mu.Unlock() @@ -951,12 +971,21 @@ func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byt cb := s.sys.subs[string(sub.sid)] s.mu.Unlock() if cb != nil { - cb(sub, string(subject), string(reply), msg) + cb(sub, c, string(subject), string(reply), msg) } } // Create an internal subscription. No support for queue groups atm. func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) { + return s.systemSubscribe(subject, false, cb) +} + +// Create an internal subscription but do not forward interest. +func (s *Server) sysSubscribeInternal(subject string, cb msgHandler) (*subscription, error) { + return s.systemSubscribe(subject, true, cb) +} + +func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandler) (*subscription, error) { if !s.eventsEnabled() { return nil, ErrNoSysAccount } @@ -971,13 +1000,7 @@ func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, err s.mu.Unlock() // Now create the subscription - if err := c.processSub([]byte(subject + " " + sid)); err != nil { - return nil, err - } - c.mu.Lock() - sub := c.subs[sid] - c.mu.Unlock() - return sub, nil + return c.processSub([]byte(subject+" "+sid), internalOnly) } func (s *Server) sysUnsubscribe(sub *subscription) { @@ -1003,7 +1026,7 @@ func remoteLatencySubjectForResponse(subject []byte) string { } // remoteLatencyUpdate is used to track remote latency measurements for tracking on exported services. -func (s *Server) remoteLatencyUpdate(sub *subscription, subject, _ string, msg []byte) { +func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ string, msg []byte) { if !s.eventsRunning() { return } @@ -1060,6 +1083,217 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, subject, _ string, msg [ s.sendInternalAccountMsg(acc, lsub, &m1) } +// This is used for all inbox replies so that we do not send supercluster wide interest +// updates for every request. Same trick used in modern NATS clients. +func (s *Server) inboxReply(sub *subscription, c *client, subject, reply string, msg []byte) { + s.mu.Lock() + if !s.eventsEnabled() || s.sys.replies == nil { + s.mu.Unlock() + return + } + cb, ok := s.sys.replies[subject] + s.mu.Unlock() + + if ok && cb != nil { + cb(sub, c, subject, reply, msg) + } +} + +// Copied from go client. +// We could use serviceReply here instead to save some code. +// I prefer these semantics for the moment, when tracing you know +// what this is. +const ( + InboxPrefix = "$SYS._INBOX." + inboxPrefixLen = len(InboxPrefix) + respInboxPrefixLen = inboxPrefixLen + sysHashLen + 1 + replySuffixLen = 8 // Gives us 62^8 +) + +// Creates an internal inbox used for replies that will be processed by the global wc handler. +func (s *Server) newRespInbox() string { + var b [respInboxPrefixLen + replySuffixLen]byte + pres := b[:respInboxPrefixLen] + copy(pres, s.sys.inboxPre) + rn := rand.Int63() + for i, l := respInboxPrefixLen, rn; i < len(b); i++ { + b[i] = digits[l%base] + l /= base + } + return string(b[:]) +} + +// accNumSubsReq is sent when we need to gather remote info on subs. +type accNumSubsReq struct { + Account string `json:"acc"` + Subject string `json:"subject"` + Queue []byte `json:"queue,omitempty"` +} + +// helper function to total information from results to count subs. +func totalSubs(rr *SublistResult, qg []byte) (nsubs int32) { + if rr == nil { + return + } + checkSub := func(sub *subscription) { + // TODO(dlc) - This could be smarter. + if qg != nil && !bytes.Equal(qg, sub.queue) { + return + } + if sub.client.kind == CLIENT || sub.client.isUnsolicitedLeafNode() { + nsubs++ + } + } + if qg == nil { + for _, sub := range rr.psubs { + checkSub(sub) + } + } + for _, qsub := range rr.qsubs { + for _, sub := range qsub { + checkSub(sub) + } + } + return +} + +// Allows users of large systems to debug active subscribers for a given subject. +// Payload should be the subject of interest. +func (s *Server) debugSubscribers(sub *subscription, c *client, subject, reply string, msg []byte) { + // Even though this is an internal only subscription, meaning interest was not forwarded, we could + // get one here from a GW in optimistic mode. Ignore for now. + // FIXME(dlc) - Should we send no interest here back to the GW? + if c.kind != CLIENT { + return + } + + var nsubs int32 + + // We could have a single subject or we could have a subject and a wildcard separated by whitespace. + args := strings.Split(strings.TrimSpace(string(msg)), " ") + if len(args) == 0 { + s.sendInternalAccountMsg(c.acc, reply, 0) + return + } + + tsubj := args[0] + var qgroup []byte + if len(args) > 1 { + qgroup = []byte(args[1]) + } + + if subjectIsLiteral(tsubj) { + // We will look up subscribers locally first then determine if we need to solicit other servers. + rr := c.acc.sl.Match(tsubj) + nsubs = totalSubs(rr, qgroup) + } else { + // We have a wildcard, so this is a bit slower path. + var _subs [32]*subscription + subs := _subs[:0] + c.acc.sl.All(&subs) + for _, sub := range subs { + if subjectIsSubsetMatch(string(sub.subject), tsubj) { + if qgroup != nil && !bytes.Equal(qgroup, sub.queue) { + continue + } + if sub.client.kind == CLIENT || sub.client.isUnsolicitedLeafNode() { + nsubs++ + } + } + } + } + + // We should have an idea of how many responses to expect from remote servers. + var expected = c.acc.expectedRemoteResponses() + + // If we are only local, go ahead and return. + if expected == 0 { + s.sendInternalAccountMsg(c.acc, reply, nsubs) + return + } + + // We need to solicit from others. + // To track status. + responses := int32(0) + done := make(chan (bool)) + + s.mu.Lock() + // Create direct reply inbox that we multiplex under the WC replies. + replySubj := s.newRespInbox() + // Store our handler. + s.sys.replies[replySubj] = func(sub *subscription, _ *client, subject, _ string, msg []byte) { + if n, err := strconv.Atoi(string(msg)); err == nil { + atomic.AddInt32(&nsubs, int32(n)) + } + if atomic.AddInt32(&responses, 1) >= expected { + select { + case done <- true: + default: + } + } + } + // Send the request to the other servers. + request := &accNumSubsReq{ + Account: c.acc.Name, + Subject: tsubj, + Queue: qgroup, + } + s.sendInternalMsg(accNumSubsReqSubj, replySubj, nil, request) + s.mu.Unlock() + + // FIXME(dlc) - We should rate limit here instead of blind Go routine. + go func() { + select { + case <-done: + case <-time.After(500 * time.Millisecond): + } + // Cleanup the WC entry. + s.mu.Lock() + delete(s.sys.replies, replySubj) + s.mu.Unlock() + // Send the response. + s.sendInternalAccountMsg(c.acc, reply, nsubs) + }() +} + +// Request for our local subscription count. This will come from a remote origin server +// that received the initial request. +func (s *Server) nsubsRequest(sub *subscription, _ *client, subject, reply string, msg []byte) { + if !s.eventsRunning() { + return + } + m := accNumSubsReq{} + if err := json.Unmarshal(msg, &m); err != nil { + s.sys.client.Errorf("Error unmarshalling account nsubs request message: %v", err) + return + } + // Grab account. + acc, _ := s.lookupAccount(m.Account) + if acc == nil || acc.numLocalAndLeafConnections() == 0 { + return + } + // We will look up subscribers locally first then determine if we need to solicit other servers. + var nsubs int32 + if subjectIsLiteral(m.Subject) { + rr := acc.sl.Match(m.Subject) + nsubs = totalSubs(rr, m.Queue) + } else { + // We have a wildcard, so this is a bit slower path. + var _subs [32]*subscription + subs := _subs[:0] + acc.sl.All(&subs) + for _, sub := range subs { + if (sub.client.kind == CLIENT || sub.client.isUnsolicitedLeafNode()) && subjectIsSubsetMatch(string(sub.subject), m.Subject) { + if m.Queue != nil && !bytes.Equal(m.Queue, sub.queue) { + continue + } + nsubs++ + } + } + } + s.sendInternalMsgLocked(reply, _EMPTY_, nil, nsubs) +} + // Helper to grab name for a client. func nameForClient(c *client) string { if c.user != nil { diff --git a/server/events_test.go b/server/events_test.go index 87374c80..2a21083c 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -538,7 +538,7 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) { received := make(chan *nats.Msg) // Create message callback handler. - cb := func(sub *subscription, subject, reply string, msg []byte) { + cb := func(sub *subscription, _ *client, subject, reply string, msg []byte) { copy := append([]byte(nil), msg...) received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy} } @@ -612,7 +612,7 @@ func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) { // Listen for updates to the new account connection activity. received := make(chan *nats.Msg, 10) - cb := func(sub *subscription, subject, reply string, msg []byte) { + cb := func(sub *subscription, _ *client, subject, reply string, msg []byte) { copy := append([]byte(nil), msg...) received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy} } @@ -1172,9 +1172,10 @@ func TestSystemAccountWithGateways(t *testing.T) { sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>") defer sub.Unsubscribe() nca.Flush() + // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 10, sa) + checkExpectedSubs(t, 13, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) @@ -1467,8 +1468,9 @@ func TestFetchAccountRace(t *testing.T) { } func TestConnectionUpdatesTimerProperlySet(t *testing.T) { + origEventsHBInterval := eventsHBInterval eventsHBInterval = 50 * time.Millisecond - defer func() { eventsHBInterval = defaultEventsHBItvl }() + defer func() { eventsHBInterval = origEventsHBInterval }() sa, _, sb, optsB, _ := runTrustedCluster(t) defer sa.Shutdown() @@ -1486,7 +1488,7 @@ func TestConnectionUpdatesTimerProperlySet(t *testing.T) { // Listen for HB updates... count := int32(0) - cb := func(sub *subscription, subject, reply string, msg []byte) { + cb := func(sub *subscription, _ *client, subject, reply string, msg []byte) { atomic.AddInt32(&count, 1) } subj := fmt.Sprintf(accConnsEventSubj, pub) diff --git a/server/leafnode.go b/server/leafnode.go index dd9e2d42..f5c763e2 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -64,6 +64,10 @@ func (c *client) isSolicitedLeafNode() bool { return c.kind == LEAF && c.leaf.remote != nil } +func (c *client) isUnsolicitedLeafNode() bool { + return c.kind == LEAF && c.leaf.remote == nil +} + // This will spin up go routines to solicit the remote leaf node connections. func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) { for _, r := range remotes { @@ -403,7 +407,6 @@ func (c *client) sendLeafConnect(tlsRequired bool) { cinfo.User = c.leaf.remote.username cinfo.Pass = c.leaf.remote.password } - b, err := json.Marshal(cinfo) if err != nil { c.Errorf("Error marshaling CONNECT to route: %v\n", err) @@ -1001,6 +1004,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) { n := c.leaf.smap[key] // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0. update := sub.queue != nil || n == 0 || n+delta <= 0 + n += delta if n > 0 { c.leaf.smap[key] = n diff --git a/server/parser.go b/server/parser.go index f03826b6..9f7b0430 100644 --- a/server/parser.go +++ b/server/parser.go @@ -407,7 +407,7 @@ func (c *client) parse(buf []byte) error { switch c.kind { case CLIENT: - err = c.processSub(arg) + _, err = c.processSub(arg, false) case ROUTER: err = c.processRemoteSub(arg) case GATEWAY: diff --git a/server/server.go b/server/server.go index 18d63c93..fd9cf56a 100644 --- a/server/server.go +++ b/server/server.go @@ -327,6 +327,18 @@ func NewServer(opts *Options) (*Server, error) { return s, nil } +// ClientURL returns the URL used to connect clients. Helpful in testing +// when we designate a random client port (-1). +func (s *Server) ClientURL() string { + // FIXME(dlc) - should we add in user and pass if defined single? + opts := s.getOpts() + scheme := "nats://" + if opts.TLSConfig != nil { + scheme = "tls://" + } + return fmt.Sprintf("%s%s:%d", scheme, opts.Host, opts.Port) +} + func validateOptions(o *Options) error { // Check that the trust configuration is correct. if err := validateTrustedOperators(o); err != nil { @@ -725,6 +737,14 @@ func (s *Server) setSystemAccount(acc *Account) error { return ErrAccountExists } + // This is here in an attempt to quiet the race detector and not have to place + // locks on fast path for inbound messages and checking service imports. + acc.mu.Lock() + if acc.imports.services == nil { + acc.imports.services = make(map[string]*serviceImport) + } + acc.mu.Unlock() + s.sys = &internal{ account: acc, client: &client{srv: s, kind: SYSTEM, opts: internalOpts, msubs: -1, mpay: -1, start: time.Now(), last: time.Now()}, @@ -732,6 +752,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sid: 1, servers: make(map[string]*serverUpdate), subs: make(map[string]msgHandler), + replies: make(map[string]msgHandler), sendq: make(chan *pubMsg, internalSendQLen), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, diff --git a/server/sublist.go b/server/sublist.go index 379424a7..6b75c3fc 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -979,7 +979,7 @@ func matchLiteral(literal, subject string) bool { } func addLocalSub(sub *subscription, subs *[]*subscription) { - if sub != nil && sub.client != nil && sub.client.kind == CLIENT && sub.im == nil { + if sub != nil && sub.client != nil && (sub.client.kind == CLIENT || sub.client.kind == SYSTEM) && sub.im == nil { *subs = append(*subs, sub) } } diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 118c83f7..1b9928e5 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -596,6 +596,7 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe &server.User{Username: "ngs", Password: "pass", Permissions: nil, Account: ngs}, &server.User{Username: "foo", Password: "pass", Permissions: nil, Account: foo}, &server.User{Username: "bar", Password: "pass", Permissions: nil, Account: bar}, + &server.User{Username: "sys", Password: "pass", Permissions: nil, Account: sys}, } return accounts, users } diff --git a/test/system_services_test.go b/test/system_services_test.go new file mode 100644 index 00000000..7b7c072f --- /dev/null +++ b/test/system_services_test.go @@ -0,0 +1,368 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "fmt" + "math/rand" + "net/url" + "strconv" + "strings" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +const dbgSubs = "$SYS.DEBUG.SUBSCRIBERS" + +func (sc *supercluster) selectRandomServer() *server.Options { + ci := rand.Int31n(int32(len(sc.clusters))) + si := rand.Int31n(int32(len(sc.clusters[ci].servers))) + return sc.clusters[ci].opts[si] +} + +func (sc *supercluster) setupSystemServicesImports(t *testing.T, account string) { + t.Helper() + for _, c := range sc.clusters { + for _, s := range c.servers { + sysAcc := s.SystemAccount() + if sysAcc == nil { + t.Fatalf("System account not set") + } + acc, err := s.LookupAccount(account) + if err != nil { + t.Fatalf("Error looking up account '%s': %v", account, err) + } + if err := acc.AddServiceImport(sysAcc, dbgSubs, dbgSubs); err != nil { + t.Fatalf("Error adding subscribers debug service to '%s': %v", account, err) + } + } + } +} + +func numSubs(t *testing.T, msg *nats.Msg) int { + t.Helper() + if msg == nil || msg.Data == nil { + t.Fatalf("No response") + } + n, err := strconv.Atoi(string(msg.Data)) + if err != nil { + t.Fatalf("Got non-number response: %v", err) + } + return n +} + +func checkNumSubs(t *testing.T, msg *nats.Msg, expected int) { + t.Helper() + if n := numSubs(t, msg); n != expected { + t.Fatalf("Expected %d subscribers, got %d", expected, n) + } +} + +func TestSystemServiceSubscribers(t *testing.T) { + numServers, numClusters := 3, 3 + sc := createSuperCluster(t, numServers, numClusters) + defer sc.shutdown() + + sc.setupSystemServicesImports(t, "FOO") + + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + checkInterest := func(expected int) { + t.Helper() + response, _ := nc.Request(dbgSubs, []byte("foo.bar"), time.Second) + checkNumSubs(t, response, expected) + } + + checkInterest(0) + + // Now add in local subscribers. + for i := 0; i < 5; i++ { + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + nc.SubscribeSync("foo.bar") + nc.SubscribeSync("foo.*") + nc.Flush() + } + + checkInterest(10) + + // Now create remote subscribers at random. + for i := 0; i < 90; i++ { + nc := clientConnect(t, sc.selectRandomServer(), "foo") + defer nc.Close() + nc.SubscribeSync("foo.bar") + nc.Flush() + } + + checkInterest(100) +} + +// Test that we can match wildcards. So sub may be foo.bar and we ask about foo.*, that should work. +func TestSystemServiceSubscribersWildcards(t *testing.T) { + numServers, numClusters := 3, 3 + sc := createSuperCluster(t, numServers, numClusters) + defer sc.shutdown() + + sc.setupSystemServicesImports(t, "FOO") + + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + for i := 0; i < 50; i++ { + nc := clientConnect(t, sc.selectRandomServer(), "foo") + defer nc.Close() + nc.SubscribeSync(fmt.Sprintf("foo.bar.%d", i+1)) + nc.SubscribeSync(fmt.Sprintf("%d", i+1)) + nc.Flush() + } + + response, _ := nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second) + checkNumSubs(t, response, 50) + + response, _ = nc.Request(dbgSubs, []byte("foo.>"), time.Second) + checkNumSubs(t, response, 50) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.22"), time.Second) + checkNumSubs(t, response, 1) + + response, _ = nc.Request(dbgSubs, []byte("_INBOX.*.*"), time.Second) + hasInbox := numSubs(t, response) + + response, _ = nc.Request(dbgSubs, []byte(">"), time.Second) + checkNumSubs(t, response, 100+hasInbox) +} + +// Test that we can match on queue groups as well. Separate request payload with any whitespace. +func TestSystemServiceSubscribersQueueGroups(t *testing.T) { + numServers, numClusters := 3, 3 + sc := createSuperCluster(t, numServers, numClusters) + defer sc.shutdown() + + sc.setupSystemServicesImports(t, "FOO") + + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + for i := 0; i < 10; i++ { + nc := clientConnect(t, sc.selectRandomServer(), "foo") + defer nc.Close() + subj := fmt.Sprintf("foo.bar.%d", i+1) + nc.QueueSubscribeSync(subj, "QG.11") + nc.QueueSubscribeSync("foo.baz", "QG.33") + nc.Flush() + } + + for i := 0; i < 23; i++ { + nc := clientConnect(t, sc.selectRandomServer(), "foo") + defer nc.Close() + subj := fmt.Sprintf("foo.bar.%d", i+1) + nc.QueueSubscribeSync(subj, "QG.22") + nc.QueueSubscribeSync("foo.baz", "QG.22") + nc.Flush() + } + + response, _ := nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second) + checkNumSubs(t, response, 33) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.22 QG.22"), time.Second) + checkNumSubs(t, response, 1) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.2"), time.Second) + checkNumSubs(t, response, 2) + + response, _ = nc.Request(dbgSubs, []byte("foo.baz"), time.Second) + checkNumSubs(t, response, 33) + + response, _ = nc.Request(dbgSubs, []byte("foo.baz QG.22"), time.Second) + checkNumSubs(t, response, 23) + + // Now check qfilters work on wildcards too. + response, _ = nc.Request(dbgSubs, []byte("foo.bar.> QG.11"), time.Second) + checkNumSubs(t, response, 10) + + response, _ = nc.Request(dbgSubs, []byte("*.baz QG.22"), time.Second) + checkNumSubs(t, response, 23) + + response, _ = nc.Request(dbgSubs, []byte("foo.*.2 QG.22"), time.Second) + checkNumSubs(t, response, 1) +} + +func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) { + numServers, numClusters := 3, 3 + sc := createSuperCluster(t, numServers, numClusters) + defer sc.shutdown() + + sc.setupSystemServicesImports(t, "FOO") + + ci := rand.Int31n(int32(len(sc.clusters))) + si := rand.Int31n(int32(len(sc.clusters[ci].servers))) + s, opts := sc.clusters[ci].servers[si], sc.clusters[ci].opts[si] + url := fmt.Sprintf("nats://%s:pass@%s:%d", "foo", opts.Host, opts.LeafNode.Port) + ls, lopts := runSolicitLeafServerToURL(url) + defer ls.Shutdown() + + checkLeafNodeConnected(t, s) + + // This is so we can test when the subs on a leafnode are flushed to the connected supercluster. + fsubj := "__leaf.flush__" + fc := clientConnect(t, opts, "foo") + fc.Subscribe(fsubj, func(m *nats.Msg) { + m.Respond(nil) + }) + + lnc := clientConnect(t, lopts, "$G") + defer lnc.Close() + + flushLeaf := func() { + if _, err := lnc.Request(fsubj, nil, time.Second); err != nil { + t.Fatalf("Did not flush through to the supercluster: %v", err) + } + } + + for i := 0; i < 10; i++ { + nc := clientConnect(t, sc.selectRandomServer(), "foo") + defer nc.Close() + nc.SubscribeSync(fmt.Sprintf("foo.bar.%d", i+1)) + nc.QueueSubscribeSync("foo.bar.baz", "QG.22") + nc.Flush() + } + + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + response, _ := nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second) + checkNumSubs(t, response, 20) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second) + checkNumSubs(t, response, 1) + + lnc.SubscribeSync("foo.bar.3") + lnc.QueueSubscribeSync("foo.bar.baz", "QG.22") + + // We could flush here but that does not guarantee we have flushed through to the supercluster. + flushLeaf() + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second) + checkNumSubs(t, response, 2) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.baz QG.22"), time.Second) + checkNumSubs(t, response, 11) + + lnc.SubscribeSync("foo.bar.3") + lnc.QueueSubscribeSync("foo.bar.baz", "QG.22") + flushLeaf() + + // For now we do not see all the details behind a leafnode if the leafnode is not enabled. + response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second) + checkNumSubs(t, response, 2) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.baz QG.22"), time.Second) + checkNumSubs(t, response, 11) +} + +func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) { + o := DefaultTestOptions + o.Port = -1 + fooAcc := server.NewAccount("FOO") + o.Accounts = []*server.Account{server.NewAccount("$SYS"), fooAcc} + o.SystemAccount = "$SYS" + o.Users = []*server.User{ + &server.User{Username: "foo", Password: "pass", Permissions: nil, Account: fooAcc}, + } + rurl, _ := url.Parse(surl) + sysUrl, _ := url.Parse(strings.Replace(surl, rurl.User.Username(), "sys", -1)) + o.LeafNode.Remotes = []*server.RemoteLeafOpts{ + { + URLs: []*url.URL{rurl}, + LocalAccount: "FOO", + }, + { + URLs: []*url.URL{sysUrl}, + LocalAccount: "$SYS", + }, + } + o.LeafNode.ReconnectInterval = 100 * time.Millisecond + return RunServer(&o), &o +} + +func TestSystemServiceSubscribersLeafNodesWithSystem(t *testing.T) { + numServers, numClusters := 3, 3 + sc := createSuperCluster(t, numServers, numClusters) + defer sc.shutdown() + + sc.setupSystemServicesImports(t, "FOO") + + ci := rand.Int31n(int32(len(sc.clusters))) + si := rand.Int31n(int32(len(sc.clusters[ci].servers))) + s, opts := sc.clusters[ci].servers[si], sc.clusters[ci].opts[si] + url := fmt.Sprintf("nats://%s:pass@%s:%d", "foo", opts.Host, opts.LeafNode.Port) + ls, lopts := runSolicitLeafServerWithSystemToURL(url) + defer ls.Shutdown() + + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + if nln := s.NumLeafNodes(); nln != 2 { + return fmt.Errorf("Expected a connected leafnode for server %q, got none", s.ID()) + } + return nil + }) + + // This is so we can test when the subs on a leafnode are flushed to the connected supercluster. + fsubj := "__leaf.flush__" + fc := clientConnect(t, opts, "foo") + fc.Subscribe(fsubj, func(m *nats.Msg) { + m.Respond(nil) + }) + + lnc := clientConnect(t, lopts, "foo") + defer lnc.Close() + + flushLeaf := func() { + if _, err := lnc.Request(fsubj, nil, time.Second); err != nil { + t.Fatalf("Did not flush through to the supercluster: %v", err) + } + } + + for i := 0; i < 10; i++ { + nc := clientConnect(t, sc.selectRandomServer(), "foo") + defer nc.Close() + nc.SubscribeSync(fmt.Sprintf("foo.bar.%d", i+1)) + nc.QueueSubscribeSync("foo.bar.baz", "QG.22") + nc.Flush() + } + + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + response, _ := nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second) + checkNumSubs(t, response, 1) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second) + checkNumSubs(t, response, 20) + + lnc.SubscribeSync("foo.bar.3") + lnc.QueueSubscribeSync("foo.bar.baz", "QG.22") + flushLeaf() + + // Since we are doing real tracking now on the other side, this will be off by 1 since we are counting + // the leaf and the real sub. + response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second) + checkNumSubs(t, response, 3) + + response, _ = nc.Request(dbgSubs, []byte("foo.bar.baz QG.22"), time.Second) + checkNumSubs(t, response, 12) +}