diff --git a/server/events.go b/server/events.go index 00c1d3ed..c954467a 100644 --- a/server/events.go +++ b/server/events.go @@ -99,25 +99,26 @@ type inSysMsg struct { // Used to send and receive messages from inside the server. type internal struct { - account *Account - client *client - seq uint64 - sid int - servers map[string]*serverUpdate - sweeper *time.Timer - stmr *time.Timer - replies map[string]msgHandler - sendq *ipQueue[*pubMsg] - recvq *ipQueue[*inSysMsg] - resetCh chan struct{} - wg sync.WaitGroup - sq *sendq - orphMax time.Duration - chkOrph time.Duration - statsz time.Duration - cstatsz time.Duration - shash string - inboxPre string + account *Account + client *client + seq uint64 + sid int + servers map[string]*serverUpdate + sweeper *time.Timer + stmr *time.Timer + replies map[string]msgHandler + sendq *ipQueue[*pubMsg] + recvq *ipQueue[*inSysMsg] + resetCh chan struct{} + wg sync.WaitGroup + sq *sendq + orphMax time.Duration + chkOrph time.Duration + statsz time.Duration + cstatsz time.Duration + shash string + inboxPre string + remoteStatsSub *subscription } // ServerStatsMsg is sent periodically with stats updates. @@ -642,8 +643,6 @@ func (s *Server) checkRemoteServers() { // Grab RSS and PCPU // Server lock will be held but released. func (s *Server) updateServerUsage(v *ServerStats) { - s.mu.Unlock() - defer s.mu.Lock() var vss int64 pse.ProcUsage(&v.CPU, &v.Mem, &vss) v.Cores = runtime.NumCPU() @@ -679,6 +678,32 @@ func routeStat(r *client) *RouteStat { func (s *Server) sendStatsz(subj string) { var m ServerStatsMsg s.updateServerUsage(&m.Stats) + + s.mu.RLock() + defer s.mu.RUnlock() + + // Check that we have a system account, etc. + if s.sys == nil || s.sys.account == nil { + return + } + + // if we are running standalone, check for interest. + if s.standAloneMode() { + // Check if we even have interest in this subject. + sacc := s.sys.account + rr := sacc.sl.Match(subj) + totalSubs := len(rr.psubs) + len(rr.qsubs) + if totalSubs == 0 { + return + } else if totalSubs == 1 && len(rr.psubs) == 1 { + // For the broadcast subject we listen to that ourselves with no echo for remote updates. + // If we are the only ones listening do not send either. + if rr.psubs[0] == s.sys.remoteStatsSub { + return + } + } + } + m.Stats.Start = s.start m.Stats.Connections = len(s.clients) m.Stats.TotalConnections = s.totalClients @@ -722,14 +747,12 @@ func (s *Server) sendStatsz(subj string) { gw.RUnlock() } // Active Servers - m.Stats.ActiveServers = 1 - if s.sys != nil { - m.Stats.ActiveServers += len(s.sys.servers) - } + m.Stats.ActiveServers = len(s.sys.servers) + 1 + // JetStream if js := s.js; js != nil { jStat := &JetStreamVarz{} - s.mu.Unlock() + s.mu.RUnlock() js.mu.RLock() c := js.config c.StoreDir = _EMPTY_ @@ -771,7 +794,7 @@ func (s *Server) sendStatsz(subj string) { } } m.Stats.JetStream = jStat - s.mu.Lock() + s.mu.RLock() } // Send message. s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) @@ -790,13 +813,12 @@ func (s *Server) heartbeatStatsz() { } s.sys.stmr.Reset(s.sys.cstatsz) } - s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) + // Do in separate Go routine. + go s.sendStatszUpdate() } func (s *Server) sendStatszUpdate() { - s.mu.Lock() - s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) - s.mu.Unlock() + s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.ID())) } // This should be wrapChk() to setup common locking. @@ -897,8 +919,11 @@ func (s *Server) initEventTracking() { } // Listen for statsz from others. subject = fmt.Sprintf(serverStatsSubj, "*") - if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil { + if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + } else { + // Keep track of this one. + s.sys.remoteStatsSub = sub } // Listen for all server shutdowns. subject = fmt.Sprintf(shutdownEventSubj, "*") @@ -1338,7 +1363,8 @@ func (s *Server) processNewServer(si *ServerInfo) { } } // Announce ourselves.. - s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) + // Do this in a separate Go routine. + go s.sendStatszUpdate() } // If GW is enabled on this server and there are any leaf node connections, @@ -1612,9 +1638,7 @@ func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, re return } } - s.mu.Lock() s.sendStatsz(reply) - s.mu.Unlock() } var errSkipZreq = errors.New("filtered response") diff --git a/server/events_test.go b/server/events_test.go index 7e572a39..5398beda 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -2501,6 +2501,40 @@ func TestServerEventsAndDQSubscribers(t *testing.T) { checkSubsPending(t, sub, 10) } +func TestServerEventsStatszSingleServer(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + accounts { $SYS { users [{user: "admin", password: "p1d"}]} } + `)) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + // Grab internal system client. + s.mu.RLock() + sysc := s.sys.client + wait := s.sys.cstatsz + 25*time.Millisecond + s.mu.RUnlock() + + // Wait for when first statsz would have gone out.. + time.Sleep(wait) + + sysc.mu.Lock() + outMsgs := sysc.stats.outMsgs + sysc.mu.Unlock() + + require_True(t, outMsgs == 0) + + // Connect as a system user and make sure if there is + // subscription interest that we will receive updates. + nc, _ := jsClientConnect(t, s, nats.UserInfo("admin", "p1d")) + defer nc.Close() + + sub, err := nc.SubscribeSync(fmt.Sprintf(serverStatsSubj, "*")) + require_NoError(t, err) + + checkSubsPending(t, sub, 1) +} + func Benchmark_GetHash(b *testing.B) { b.StopTimer() // Get 100 random names diff --git a/server/gateway.go b/server/gateway.go index afc75683..8d194978 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -870,9 +870,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { // Announce ourselves again to new connections. if solicit && s.EventsEnabled() { - s.mu.Lock() - s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) - s.mu.Unlock() + s.sendStatszUpdate() } }