[IMPROVED] Optimize statsz locking and sending in standalone mode. (#4235)

If we know we are in stand alone mode only send out statsz updates if we
know we have external interest.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves: #4234
This commit is contained in:
Derek Collison
2023-06-10 20:55:43 -07:00
committed by GitHub
3 changed files with 94 additions and 38 deletions

View File

@@ -99,25 +99,26 @@ type inSysMsg struct {
// Used to send and receive messages from inside the server.
type internal struct {
account *Account
client *client
seq uint64
sid int
servers map[string]*serverUpdate
sweeper *time.Timer
stmr *time.Timer
replies map[string]msgHandler
sendq *ipQueue[*pubMsg]
recvq *ipQueue[*inSysMsg]
resetCh chan struct{}
wg sync.WaitGroup
sq *sendq
orphMax time.Duration
chkOrph time.Duration
statsz time.Duration
cstatsz time.Duration
shash string
inboxPre string
account *Account
client *client
seq uint64
sid int
servers map[string]*serverUpdate
sweeper *time.Timer
stmr *time.Timer
replies map[string]msgHandler
sendq *ipQueue[*pubMsg]
recvq *ipQueue[*inSysMsg]
resetCh chan struct{}
wg sync.WaitGroup
sq *sendq
orphMax time.Duration
chkOrph time.Duration
statsz time.Duration
cstatsz time.Duration
shash string
inboxPre string
remoteStatsSub *subscription
}
// ServerStatsMsg is sent periodically with stats updates.
@@ -642,8 +643,6 @@ func (s *Server) checkRemoteServers() {
// Grab RSS and PCPU
// Server lock will be held but released.
func (s *Server) updateServerUsage(v *ServerStats) {
s.mu.Unlock()
defer s.mu.Lock()
var vss int64
pse.ProcUsage(&v.CPU, &v.Mem, &vss)
v.Cores = runtime.NumCPU()
@@ -679,6 +678,32 @@ func routeStat(r *client) *RouteStat {
func (s *Server) sendStatsz(subj string) {
var m ServerStatsMsg
s.updateServerUsage(&m.Stats)
s.mu.RLock()
defer s.mu.RUnlock()
// Check that we have a system account, etc.
if s.sys == nil || s.sys.account == nil {
return
}
// if we are running standalone, check for interest.
if s.standAloneMode() {
// Check if we even have interest in this subject.
sacc := s.sys.account
rr := sacc.sl.Match(subj)
totalSubs := len(rr.psubs) + len(rr.qsubs)
if totalSubs == 0 {
return
} else if totalSubs == 1 && len(rr.psubs) == 1 {
// For the broadcast subject we listen to that ourselves with no echo for remote updates.
// If we are the only ones listening do not send either.
if rr.psubs[0] == s.sys.remoteStatsSub {
return
}
}
}
m.Stats.Start = s.start
m.Stats.Connections = len(s.clients)
m.Stats.TotalConnections = s.totalClients
@@ -722,14 +747,12 @@ func (s *Server) sendStatsz(subj string) {
gw.RUnlock()
}
// Active Servers
m.Stats.ActiveServers = 1
if s.sys != nil {
m.Stats.ActiveServers += len(s.sys.servers)
}
m.Stats.ActiveServers = len(s.sys.servers) + 1
// JetStream
if js := s.js; js != nil {
jStat := &JetStreamVarz{}
s.mu.Unlock()
s.mu.RUnlock()
js.mu.RLock()
c := js.config
c.StoreDir = _EMPTY_
@@ -771,7 +794,7 @@ func (s *Server) sendStatsz(subj string) {
}
}
m.Stats.JetStream = jStat
s.mu.Lock()
s.mu.RLock()
}
// Send message.
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
@@ -790,13 +813,12 @@ func (s *Server) heartbeatStatsz() {
}
s.sys.stmr.Reset(s.sys.cstatsz)
}
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
// Do in separate Go routine.
go s.sendStatszUpdate()
}
func (s *Server) sendStatszUpdate() {
s.mu.Lock()
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
s.mu.Unlock()
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.ID()))
}
// This should be wrapChk() to setup common locking.
@@ -897,8 +919,11 @@ func (s *Server) initEventTracking() {
}
// Listen for statsz from others.
subject = fmt.Sprintf(serverStatsSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
} else {
// Keep track of this one.
s.sys.remoteStatsSub = sub
}
// Listen for all server shutdowns.
subject = fmt.Sprintf(shutdownEventSubj, "*")
@@ -1338,7 +1363,8 @@ func (s *Server) processNewServer(si *ServerInfo) {
}
}
// Announce ourselves..
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
// Do this in a separate Go routine.
go s.sendStatszUpdate()
}
// If GW is enabled on this server and there are any leaf node connections,
@@ -1612,9 +1638,7 @@ func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, re
return
}
}
s.mu.Lock()
s.sendStatsz(reply)
s.mu.Unlock()
}
var errSkipZreq = errors.New("filtered response")

View File

@@ -2501,6 +2501,40 @@ func TestServerEventsAndDQSubscribers(t *testing.T) {
checkSubsPending(t, sub, 10)
}
func TestServerEventsStatszSingleServer(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: "127.0.0.1:-1"
accounts { $SYS { users [{user: "admin", password: "p1d"}]} }
`))
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
// Grab internal system client.
s.mu.RLock()
sysc := s.sys.client
wait := s.sys.cstatsz + 25*time.Millisecond
s.mu.RUnlock()
// Wait for when first statsz would have gone out..
time.Sleep(wait)
sysc.mu.Lock()
outMsgs := sysc.stats.outMsgs
sysc.mu.Unlock()
require_True(t, outMsgs == 0)
// Connect as a system user and make sure if there is
// subscription interest that we will receive updates.
nc, _ := jsClientConnect(t, s, nats.UserInfo("admin", "p1d"))
defer nc.Close()
sub, err := nc.SubscribeSync(fmt.Sprintf(serverStatsSubj, "*"))
require_NoError(t, err)
checkSubsPending(t, sub, 1)
}
func Benchmark_GetHash(b *testing.B) {
b.StopTimer()
// Get 100 random names

View File

@@ -870,9 +870,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
// Announce ourselves again to new connections.
if solicit && s.EventsEnabled() {
s.mu.Lock()
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
s.mu.Unlock()
s.sendStatszUpdate()
}
}