mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Optimize statsz locking and only send if we know we have external interest.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -99,25 +99,26 @@ type inSysMsg struct {
|
||||
|
||||
// Used to send and receive messages from inside the server.
|
||||
type internal struct {
|
||||
account *Account
|
||||
client *client
|
||||
seq uint64
|
||||
sid int
|
||||
servers map[string]*serverUpdate
|
||||
sweeper *time.Timer
|
||||
stmr *time.Timer
|
||||
replies map[string]msgHandler
|
||||
sendq *ipQueue[*pubMsg]
|
||||
recvq *ipQueue[*inSysMsg]
|
||||
resetCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
sq *sendq
|
||||
orphMax time.Duration
|
||||
chkOrph time.Duration
|
||||
statsz time.Duration
|
||||
cstatsz time.Duration
|
||||
shash string
|
||||
inboxPre string
|
||||
account *Account
|
||||
client *client
|
||||
seq uint64
|
||||
sid int
|
||||
servers map[string]*serverUpdate
|
||||
sweeper *time.Timer
|
||||
stmr *time.Timer
|
||||
replies map[string]msgHandler
|
||||
sendq *ipQueue[*pubMsg]
|
||||
recvq *ipQueue[*inSysMsg]
|
||||
resetCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
sq *sendq
|
||||
orphMax time.Duration
|
||||
chkOrph time.Duration
|
||||
statsz time.Duration
|
||||
cstatsz time.Duration
|
||||
shash string
|
||||
inboxPre string
|
||||
remoteStatsSub *subscription
|
||||
}
|
||||
|
||||
// ServerStatsMsg is sent periodically with stats updates.
|
||||
@@ -642,8 +643,6 @@ func (s *Server) checkRemoteServers() {
|
||||
// Grab RSS and PCPU
|
||||
// Server lock will be held but released.
|
||||
func (s *Server) updateServerUsage(v *ServerStats) {
|
||||
s.mu.Unlock()
|
||||
defer s.mu.Lock()
|
||||
var vss int64
|
||||
pse.ProcUsage(&v.CPU, &v.Mem, &vss)
|
||||
v.Cores = runtime.NumCPU()
|
||||
@@ -679,6 +678,32 @@ func routeStat(r *client) *RouteStat {
|
||||
func (s *Server) sendStatsz(subj string) {
|
||||
var m ServerStatsMsg
|
||||
s.updateServerUsage(&m.Stats)
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
// Check that we have a system account, etc.
|
||||
if s.sys == nil || s.sys.account == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// if we are running standalone, check for interest.
|
||||
if s.standAloneMode() {
|
||||
// Check if we even have interest in this subject.
|
||||
sacc := s.sys.account
|
||||
rr := sacc.sl.Match(subj)
|
||||
totalSubs := len(rr.psubs) + len(rr.qsubs)
|
||||
if totalSubs == 0 {
|
||||
return
|
||||
} else if totalSubs == 1 && len(rr.psubs) == 1 {
|
||||
// For the broadcast subject we listen to that ourselves with no echo for remote updates.
|
||||
// If we are the only ones listening do not send either.
|
||||
if rr.psubs[0] == s.sys.remoteStatsSub {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
m.Stats.Start = s.start
|
||||
m.Stats.Connections = len(s.clients)
|
||||
m.Stats.TotalConnections = s.totalClients
|
||||
@@ -722,14 +747,12 @@ func (s *Server) sendStatsz(subj string) {
|
||||
gw.RUnlock()
|
||||
}
|
||||
// Active Servers
|
||||
m.Stats.ActiveServers = 1
|
||||
if s.sys != nil {
|
||||
m.Stats.ActiveServers += len(s.sys.servers)
|
||||
}
|
||||
m.Stats.ActiveServers = len(s.sys.servers) + 1
|
||||
|
||||
// JetStream
|
||||
if js := s.js; js != nil {
|
||||
jStat := &JetStreamVarz{}
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
js.mu.RLock()
|
||||
c := js.config
|
||||
c.StoreDir = _EMPTY_
|
||||
@@ -771,7 +794,7 @@ func (s *Server) sendStatsz(subj string) {
|
||||
}
|
||||
}
|
||||
m.Stats.JetStream = jStat
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
}
|
||||
// Send message.
|
||||
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
|
||||
@@ -790,13 +813,12 @@ func (s *Server) heartbeatStatsz() {
|
||||
}
|
||||
s.sys.stmr.Reset(s.sys.cstatsz)
|
||||
}
|
||||
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
|
||||
// Do in separate Go routine.
|
||||
go s.sendStatszUpdate()
|
||||
}
|
||||
|
||||
func (s *Server) sendStatszUpdate() {
|
||||
s.mu.Lock()
|
||||
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
|
||||
s.mu.Unlock()
|
||||
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.ID()))
|
||||
}
|
||||
|
||||
// This should be wrapChk() to setup common locking.
|
||||
@@ -897,8 +919,11 @@ func (s *Server) initEventTracking() {
|
||||
}
|
||||
// Listen for statsz from others.
|
||||
subject = fmt.Sprintf(serverStatsSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
|
||||
if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
} else {
|
||||
// Keep track of this one.
|
||||
s.sys.remoteStatsSub = sub
|
||||
}
|
||||
// Listen for all server shutdowns.
|
||||
subject = fmt.Sprintf(shutdownEventSubj, "*")
|
||||
@@ -1338,7 +1363,8 @@ func (s *Server) processNewServer(si *ServerInfo) {
|
||||
}
|
||||
}
|
||||
// Announce ourselves..
|
||||
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
|
||||
// Do this in a separate Go routine.
|
||||
go s.sendStatszUpdate()
|
||||
}
|
||||
|
||||
// If GW is enabled on this server and there are any leaf node connections,
|
||||
@@ -1612,9 +1638,7 @@ func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, re
|
||||
return
|
||||
}
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.sendStatsz(reply)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
var errSkipZreq = errors.New("filtered response")
|
||||
|
||||
Reference in New Issue
Block a user