// Copyright 2018 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "encoding/json" "fmt" "math/rand" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/nats-io/gnatsd/server/pse" ) const ( connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT" disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT" accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS" accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE" connsRespSubj = "$SYS._INBOX_.%s" accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS" shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR" serverStatsSubj = "$SYS.SERVER.%s.STATSZ" serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ" serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" shutdownEventTokens = 4 serverSubjectIndex = 2 accUpdateTokens = 5 accUpdateAccIndex = 2 ) // Used to send and receive messages from inside the server. type internal struct { account *Account client *client seq uint64 sid uint64 servers map[string]*serverUpdate sweeper *time.Timer stmr *time.Timer subs map[string]msgHandler sendq chan *pubMsg wg sync.WaitGroup orphMax time.Duration chkOrph time.Duration statsz time.Duration } // ServerStatsMsg is sent periodically with stats updates. type ServerStatsMsg struct { Server ServerInfo `json:"server"` Stats ServerStats `json:"statsz"` } // ConnectEventMsg is sent when a new connection is made that is part of an account. type ConnectEventMsg struct { Server ServerInfo `json:"server"` Client ClientInfo `json:"client"` } // DisconnectEventMsg is sent when a new connection previously defined from a // ConnectEventMsg is closed. type DisconnectEventMsg struct { Server ServerInfo `json:"server"` Client ClientInfo `json:"client"` Sent DataStats `json:"sent"` Received DataStats `json:"received"` Reason string `json:"reason"` } // AccountNumConns is an event that will be sent from a server that is tracking // a given account when the number of connections changes. It will also HB // updates in the absence of any changes. type AccountNumConns struct { Server ServerInfo `json:"server"` Account string `json:"acc"` Conns int `json:"conns"` TotalConns int `json:"total_conns"` } // accNumConnsReq is sent when we are starting to track an account for the first // time. We will request others send info to us about their local state. type accNumConnsReq struct { Server ServerInfo `json:"server"` Account string `json:"acc"` } type ServerInfo struct { Host string `json:"host"` ID string `json:"id"` Cluster string `json:"cluster,omitempty"` Version string `json:"ver"` Seq uint64 `json:"seq"` Time time.Time `json:"time"` } // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start time.Time `json:"start,omitempty"` Host string `json:"host,omitempty"` ID uint64 `json:"id"` Account string `json:"acc"` User string `json:"user,omitempty"` Name string `json:"name,omitempty"` Lang string `json:"lang,omitempty"` Version string `json:"ver,omitempty"` RTT string `json:"rtt,omitempty"` Stop *time.Time `json:"stop,omitempty"` } // Various statistics we will periodically send out. type ServerStats struct { Start time.Time `json:"start"` Mem int64 `json:"mem"` Cores int `json:"cores"` CPU float64 `json:"cpu"` Connections int `json:"connections"` TotalConnections uint64 `json:"total_connections"` ActiveAccounts int `json:"active_accounts"` NumSubs uint32 `json:"subscriptions"` Sent DataStats `json:"sent"` Received DataStats `json:"received"` SlowConsumers int64 `json:"slow_consumers"` Routes []*RouteStat `json:"routes,omitempty"` Gateways []*GatewayStat `json:"gateways,omitempty"` } type RouteStat struct { ID uint64 `json:"rid"` Sent DataStats `json:"sent"` Received DataStats `json:"received"` Pending int `json:"pending"` } type GatewayStat struct { ID uint64 `json:"gwid"` Name string `json:"name"` Sent DataStats `json:"sent"` Received DataStats `json:"received"` NumInbound int `json:"inbound_connections"` } // DataStats reports how may msg and bytes. Applicable for both sent and received. type DataStats struct { Msgs int64 `json:"msgs"` Bytes int64 `json:"bytes"` } // Used for internally queueing up messages that the server wants to send. type pubMsg struct { sub string rply string si *ServerInfo msg interface{} last bool } // Used to track server updates. type serverUpdate struct { seq uint64 ltime time.Time } // internalSendLoop will be responsible for serializing all messages that // a server wants to send. func (s *Server) internalSendLoop(wg *sync.WaitGroup) { defer wg.Done() s.mu.Lock() if s.sys == nil || s.sys.sendq == nil { s.mu.Unlock() return } c := s.sys.client sendq := s.sys.sendq id := s.info.ID host := s.info.Host seqp := &s.sys.seq var cluster string if s.gateway.enabled { cluster = s.getGatewayName() } s.mu.Unlock() for s.eventsRunning() { // Setup information for next message seq := atomic.AddUint64(seqp, 1) select { case pm := <-sendq: if pm.si != nil { pm.si.Host = host pm.si.Cluster = cluster pm.si.ID = id pm.si.Seq = seq pm.si.Version = VERSION pm.si.Time = time.Now() } var b []byte if pm.msg != nil { b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ") } // Prep internal structures needed to send message. c.pa.subject = []byte(pm.sub) c.pa.size = len(b) c.pa.szb = []byte(strconv.FormatInt(int64(len(b)), 10)) c.pa.reply = []byte(pm.rply) // Add in NL b = append(b, _CRLF_...) c.processInboundClientMsg(b) c.flushClients() // See if we are doing graceful shutdown. if pm.last { return } case <-s.quitCh: return } } } // Will send a shutdown message. func (s *Server) sendShutdownEvent() { s.mu.Lock() if s.sys == nil || s.sys.sendq == nil { s.mu.Unlock() return } subj := fmt.Sprintf(shutdownEventSubj, s.info.ID) sendq := s.sys.sendq // Stop any more messages from queueing up. s.sys.sendq = nil // Unhook all msgHandlers. Normal client cleanup will deal with subs, etc. s.sys.subs = nil s.mu.Unlock() // Send to the internal queue and mark as last. sendq <- &pubMsg{subj, _EMPTY_, nil, nil, true} } // This will queue up a message to be sent. // Assumes lock is held on entry. func (s *Server) sendInternalMsg(sub, rply string, si *ServerInfo, msg interface{}) { if s.sys == nil || s.sys.sendq == nil { return } sendq := s.sys.sendq // Don't hold lock while placing on the channel. s.mu.Unlock() sendq <- &pubMsg{sub, rply, si, msg, false} s.mu.Lock() } // Locked version of checking if events system running. Also checks server. func (s *Server) eventsRunning() bool { s.mu.Lock() defer s.mu.Unlock() return s.running && s.eventsEnabled() } // EventsEnabled will report if the server has internal events enabled via // a defined system account. func (s *Server) EventsEnabled() bool { s.mu.Lock() defer s.mu.Unlock() return s.eventsEnabled() } // eventsEnabled will report if events are enabled. // Lock should be held. func (s *Server) eventsEnabled() bool { return s.sys != nil && s.sys.client != nil && s.sys.account != nil } // Check for orphan servers who may have gone away without notification. // This should be wrapChk() to setup common locking. func (s *Server) checkRemoteServers() { now := time.Now() for sid, su := range s.sys.servers { if now.Sub(su.ltime) > s.sys.orphMax { s.Debugf("Detected orphan remote server: %q", sid) // Simulate it going away. s.processRemoteServerShutdown(sid) delete(s.sys.servers, sid) } } if s.sys.sweeper != nil { s.sys.sweeper.Reset(s.sys.chkOrph) } } // Grab RSS and PCPU func updateServerUsage(v *ServerStats) { var rss, vss int64 var pcpu float64 pse.ProcUsage(&pcpu, &rss, &vss) v.Mem = rss v.CPU = pcpu v.Cores = numCores } // Generate a route stat for our statz update. func routeStat(r *client) *RouteStat { if r == nil { return nil } r.mu.Lock() rs := &RouteStat{ ID: r.cid, Sent: DataStats{ Msgs: atomic.LoadInt64(&r.outMsgs), Bytes: atomic.LoadInt64(&r.outBytes), }, Received: DataStats{ Msgs: atomic.LoadInt64(&r.inMsgs), Bytes: atomic.LoadInt64(&r.inBytes), }, Pending: int(r.out.pb), } r.mu.Unlock() return rs } // Actual send method for statz updates. // Lock should be held. func (s *Server) sendStatsz(subj string) { m := ServerStatsMsg{} updateServerUsage(&m.Stats) m.Stats.Start = s.start m.Stats.Connections = len(s.clients) m.Stats.TotalConnections = s.totalClients m.Stats.ActiveAccounts = s.activeAccounts m.Stats.Received.Msgs = atomic.LoadInt64(&s.inMsgs) m.Stats.Received.Bytes = atomic.LoadInt64(&s.inBytes) m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs) m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes) m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) m.Stats.NumSubs = s.numSubscriptions() for _, r := range s.routes { m.Stats.Routes = append(m.Stats.Routes, routeStat(r)) } if s.gateway.enabled { gw := s.gateway gw.RLock() for name, c := range gw.out { gs := &GatewayStat{Name: name} c.mu.Lock() gs.ID = c.cid gs.Sent = DataStats{ Msgs: atomic.LoadInt64(&c.outMsgs), Bytes: atomic.LoadInt64(&c.outBytes), } c.mu.Unlock() // Gather matching inbound connections gs.Received = DataStats{} for _, c := range gw.in { c.mu.Lock() if c.gw.name == name { gs.Received.Msgs += atomic.LoadInt64(&c.inMsgs) gs.Received.Bytes += atomic.LoadInt64(&c.inBytes) gs.NumInbound++ } c.mu.Unlock() } m.Stats.Gateways = append(m.Stats.Gateways, gs) } gw.RUnlock() } s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) } // Send out our statz update. // This should be wrapChk() to setup common locking. func (s *Server) heartbeatStatsz() { if s.sys.stmr != nil { s.sys.stmr.Reset(s.sys.statsz) } s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) } // This should be wrapChk() to setup common locking. func (s *Server) startStatszTimer() { s.sys.stmr = time.AfterFunc(s.sys.statsz, s.wrapChk(s.heartbeatStatsz)) } // Start a ticker that will fire periodically and check for orphaned servers. // This should be wrapChk() to setup common locking. func (s *Server) startRemoteServerSweepTimer() { s.sys.sweeper = time.AfterFunc(s.sys.chkOrph, s.wrapChk(s.checkRemoteServers)) } // This will setup our system wide tracking subs. // For now we will setup one wildcard subscription to // monitor all accounts for changes in number of connections. // We can make this on a per account tracking basis if needed. // Tradeoff is subscription and interest graph events vs connect and // disconnect events, etc. func (s *Server) initEventTracking() { if !s.eventsEnabled() { return } subject := fmt.Sprintf(accConnsEventSubj, "*") if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // This will be for responses for account info that we send out. subject = fmt.Sprintf(connsRespSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for broad requests to respond with account info. subject = fmt.Sprintf(accConnsReqSubj, "*") if _, err := s.sysSubscribe(subject, s.connsRequest); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for all server shutdowns. subject = fmt.Sprintf(shutdownEventSubj, "*") if _, err := s.sysSubscribe(subject, s.remoteServerShutdown); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for account claims updates. subject = fmt.Sprintf(accUpdateEventSubj, "*") if _, err := s.sysSubscribe(subject, s.accountClaimUpdate); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for requests for our statsz. subject = fmt.Sprintf(serverStatsReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.statszReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for ping messages that will be sent to all servers for statsz. if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.statszPing); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } } // accountClaimUpdate will receive claim updates for accounts. func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { return } toks := strings.Split(subject, tsep) if len(toks) < accUpdateTokens { s.Debugf("Received account claims update on bad subject %q", subject) return } accName := toks[accUpdateAccIndex] s.updateAccountWithClaimJWT(s.accounts[accName], string(msg)) } // processRemoteServerShutdown will update any affected accounts. // Will upidate the remote count for clients. // Lock assume held. func (s *Server) processRemoteServerShutdown(sid string) { for _, a := range s.accounts { a.mu.Lock() prev := a.strack[sid] delete(a.strack, sid) a.nrclients -= prev a.mu.Unlock() } } // remoteServerShutdownEvent is called when we get an event from another server shutting down. func (s *Server) remoteServerShutdown(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { return } toks := strings.Split(subject, tsep) if len(toks) < shutdownEventTokens { s.Debugf("Received remote server shutdown on bad subject %q", subject) return } sid := toks[serverSubjectIndex] su := s.sys.servers[sid] if su != nil { s.processRemoteServerShutdown(sid) } } // updateRemoteServer is called when we have an update from a remote server. // This allows us to track remote servers, respond to shutdown messages properly, // make sure that messages are ordered, and allow us to prune dead servers. // Lock should be held upon entry. func (s *Server) updateRemoteServer(ms *ServerInfo) { su := s.sys.servers[ms.ID] if su == nil { s.sys.servers[ms.ID] = &serverUpdate{ms.Seq, time.Now()} } else { // Should alwqys be going up. if ms.Seq <= su.seq { s.Errorf("Received out of order remote server update from: %q", ms.ID) return } su.seq = ms.Seq su.ltime = time.Now() } } // shutdownEventing will clean up all eventing state. func (s *Server) shutdownEventing() { if !s.eventsRunning() { return } s.mu.Lock() clearTimer(&s.sys.sweeper) clearTimer(&s.sys.stmr) s.mu.Unlock() // We will queue up a shutdown event and wait for the // internal send loop to exit. s.sendShutdownEvent() s.sys.wg.Wait() s.mu.Lock() defer s.mu.Unlock() // Whip through all accounts. for _, a := range s.accounts { a.mu.Lock() a.nrclients = 0 // Now clear state clearTimer(&a.etmr) clearTimer(&a.ctmr) a.clients = nil a.strack = nil a.mu.Unlock() } // Turn everything off here. s.sys = nil } // Request for our local connection count. func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { return } m := accNumConnsReq{} if err := json.Unmarshal(msg, &m); err != nil { s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) return } acc, _ := s.lookupAccount(m.Account) if acc == nil { return } if nlc := acc.NumLocalConnections(); nlc > 0 { s.sendAccConnsUpdate(acc, reply) } } // random back off interval for broadcast statsz ping requests. const randomBackoff = 250 * time.Millisecond // statszPing will handle global requests for our server statsz. This will be a // broadcast msg so we want to be mindful of that. We will do a random backoff and // process in a separate go routine. func (s *Server) statszPing(sub *subscription, subject, reply string, msg []byte) { if !s.EventsEnabled() || reply == _EMPTY_ { return } s.startGoRoutine(func() { defer s.grWG.Done() delay := time.Duration(rand.Intn(int(randomBackoff))) select { case <-time.After(delay): s.mu.Lock() defer s.mu.Unlock() s.sendStatsz(reply) case <-s.quitCh: return } }) } // statszReq is a request for us to respond with current statz. func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() || reply == _EMPTY_ { return } s.sendStatsz(reply) } // remoteConnsUpdate gets called when we receive a remote update from another server. func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { return } m := AccountNumConns{} if err := json.Unmarshal(msg, &m); err != nil { s.sys.client.Errorf("Error unmarshalling account connection event message: %v", err) return } // Double check that this is not us, should never happen, so error if it does. if m.Server.ID == s.info.ID { s.sys.client.Errorf("Processing our own account connection event message: ignored") return } // See if we have the account registered, if not drop it. acc, _ := s.lookupAccount(m.Account) if acc == nil { s.sys.client.Debugf("Received account connection event for unknown account: %s", m.Account) return } // If we are here we have interest in tracking this account. Update our accounting. acc.mu.Lock() if acc.strack == nil { acc.strack = make(map[string]int) } // This does not depend on receiving all updates since each one is idempotent. prev := acc.strack[m.Server.ID] acc.strack[m.Server.ID] = m.Conns acc.nrclients += (m.Conns - prev) acc.mu.Unlock() s.updateRemoteServer(&m.Server) } // Setup tracking for this account. This allows us to track globally // account activity. // Lock should be held on entry. func (s *Server) enableAccountTracking(a *Account) { if a == nil || !s.eventsEnabled() { return } // TODO(ik): Generate payload although message may not be sent. // May need to ensure we do so only if there is a known interest. // This can get complicated with gateways. subj := fmt.Sprintf(accConnsReqSubj, a.Name) reply := fmt.Sprintf(connsRespSubj, s.info.ID) m := accNumConnsReq{Account: a.Name} s.sendInternalMsg(subj, reply, &m.Server, &m) } // FIXME(dlc) - make configurable. const eventsHBInterval = 30 * time.Second // sendAccConnsUpdate is called to send out our information on the // account's local connections. // Lock should be held on entry. func (s *Server) sendAccConnsUpdate(a *Account, subj string) { if !s.eventsEnabled() || a == nil || a == s.gacc { return } a.mu.Lock() // If no limits set, don't update, no need to. if a.mconns == 0 { a.mu.Unlock() return } // Build event with account name and number of local clients. m := AccountNumConns{ Account: a.Name, Conns: a.numLocalConnections(), TotalConns: len(s.clients), } a.mu.Unlock() s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) // Set timer to fire again unless we are at zero. a.mu.Lock() if a.numLocalConnections() == 0 { clearTimer(&a.etmr) } else { // Check to see if we have an HB running and update. if a.ctmr == nil { a.etmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) }) } else { a.etmr.Reset(eventsHBInterval) } } a.mu.Unlock() } // accConnsUpdate is called whenever there is a change to the account's // number of active connections, or during a heartbeat. func (s *Server) accConnsUpdate(a *Account) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() || a == nil { return } subj := fmt.Sprintf(accConnsEventSubj, a.Name) s.sendAccConnsUpdate(a, subj) } // accountConnectEvent will send an account client connect event if there is interest. // This is a billing event. func (s *Server) accountConnectEvent(c *client) { s.mu.Lock() if !s.eventsEnabled() { s.mu.Unlock() return } s.mu.Unlock() subj := fmt.Sprintf(connectEventSubj, c.acc.Name) c.mu.Lock() m := ConnectEventMsg{ Client: ClientInfo{ Start: c.start, Host: c.host, ID: c.cid, Account: c.acc.Name, User: nameForClient(c), Name: c.opts.Name, Lang: c.opts.Lang, Version: c.opts.Version, }, } c.mu.Unlock() s.mu.Lock() s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) s.mu.Unlock() } // accountDisconnectEvent will send an account client disconnect event if there is interest. // This is a billing event. func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) { s.mu.Lock() gacc := s.gacc if !s.eventsEnabled() { s.mu.Unlock() return } s.mu.Unlock() c.mu.Lock() // Ignore global account activity if c.acc == nil || c.acc == gacc { c.mu.Unlock() return } m := DisconnectEventMsg{ Client: ClientInfo{ Start: c.start, Stop: &now, Host: c.host, ID: c.cid, Account: c.acc.Name, User: nameForClient(c), Name: c.opts.Name, Lang: c.opts.Lang, Version: c.opts.Version, RTT: c.getRTT(), }, Sent: DataStats{ Msgs: c.inMsgs, Bytes: c.inBytes, }, Received: DataStats{ Msgs: c.outMsgs, Bytes: c.outBytes, }, Reason: reason, } c.mu.Unlock() subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name) s.mu.Lock() s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) s.mu.Unlock() } func (s *Server) sendAuthErrorEvent(c *client) { s.mu.Lock() if !s.eventsEnabled() { s.mu.Unlock() return } s.mu.Unlock() now := time.Now() c.mu.Lock() m := DisconnectEventMsg{ Client: ClientInfo{ Start: c.start, Stop: &now, Host: c.host, ID: c.cid, Account: c.acc.Name, User: nameForClient(c), Name: c.opts.Name, Lang: c.opts.Lang, Version: c.opts.Version, RTT: c.getRTT(), }, Sent: DataStats{ Msgs: c.inMsgs, Bytes: c.inBytes, }, Received: DataStats{ Msgs: c.outMsgs, Bytes: c.outBytes, }, Reason: AuthenticationViolation.String(), } c.mu.Unlock() s.mu.Lock() subj := fmt.Sprintf(authErrorEventSubj, s.info.ID) s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) s.mu.Unlock() } // Internal message callback. If the msg is needed past the callback it is // required to be copied. type msgHandler func(sub *subscription, subject, reply string, msg []byte) func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byte) { s.mu.Lock() if !s.eventsEnabled() || s.sys.subs == nil { s.mu.Unlock() return } cb := s.sys.subs[string(sub.sid)] s.mu.Unlock() if cb != nil { cb(sub, string(subject), string(reply), msg) } } // Create an internal subscription. No support for queue groups atm. func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) { if !s.eventsEnabled() { return nil, ErrNoSysAccount } if cb == nil { return nil, fmt.Errorf("Undefined message handler") } s.mu.Lock() sid := strconv.FormatInt(int64(s.sys.sid), 10) s.sys.subs[sid] = cb s.sys.sid++ c := s.sys.client s.mu.Unlock() // Now create the subscription if err := c.processSub([]byte(subject + " " + sid)); err != nil { return nil, err } c.mu.Lock() sub := c.subs[sid] c.mu.Unlock() return sub, nil } func (s *Server) sysUnsubscribe(sub *subscription) { if sub == nil || !s.eventsEnabled() { return } s.mu.Lock() acc := s.sys.account c := s.sys.client delete(s.sys.subs, string(sub.sid)) s.mu.Unlock() c.unsubscribe(acc, sub, true) } // flushClients will make sure toi flush any clients we may have // sent to during sendInternalMsg. func (c *client) flushClients() { last := time.Now() for cp := range c.pcd { // Queue up a flush for those in the set cp.mu.Lock() // Update last activity for message delivery cp.last = last cp.out.fsp-- cp.flushSignal() cp.mu.Unlock() delete(c.pcd, cp) } } // Helper to grab name for a client. func nameForClient(c *client) string { if c.user != nil { return c.user.Nkey } return "N/A" } // Helper to clear timers. func clearTimer(tp **time.Timer) { if t := *tp; t != nil { t.Stop() *tp = nil } } // Helper function to wrap functions with common test // to lock server and return if events not enabled. func (s *Server) wrapChk(f func()) func() { return func() { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { return } f() } }