Merge pull request #2276 from nats-io/active_servers

Added active servers to statsz.
This commit is contained in:
Derek Collison
2021-06-10 11:01:29 -07:00
committed by GitHub
3 changed files with 79 additions and 36 deletions

View File

@@ -199,6 +199,7 @@ type ServerStats struct {
SlowConsumers int64 `json:"slow_consumers"`
Routes []*RouteStat `json:"routes,omitempty"`
Gateways []*GatewayStat `json:"gateways,omitempty"`
ActiveServers int `json:"active_servers,omitempty"`
JetStream *JetStreamVarz `json:"jetstream,omitempty"`
}
@@ -490,7 +491,6 @@ func (s *Server) checkRemoteServers() {
s.Debugf("Detected orphan remote server: %q", sid)
// Simulate it going away.
s.processRemoteServerShutdown(sid)
delete(s.sys.servers, sid)
}
}
if s.sys.sweeper != nil {
@@ -536,7 +536,7 @@ func routeStat(r *client) *RouteStat {
// Actual send method for statz updates.
// Lock should be held.
func (s *Server) sendStatsz(subj string) {
m := ServerStatsMsg{}
var m ServerStatsMsg
s.updateServerUsage(&m.Stats)
m.Stats.Start = s.start
m.Stats.Connections = len(s.clients)
@@ -548,33 +548,11 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes)
m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
m.Stats.NumSubs = s.numSubscriptions()
if js := s.js; js != nil {
jStat := &JetStreamVarz{}
s.mu.Unlock()
js.mu.RLock()
c := js.config
c.StoreDir = _EMPTY_
jStat.Config = &c
js.mu.RUnlock()
jStat.Stats = js.usageStats()
if mg := js.getMetaGroup(); mg != nil {
if mg.Leader() {
jStat.Meta = s.raftNodeToClusterInfo(mg)
} else {
// non leader only include a shortened version without peers
jStat.Meta = &ClusterInfo{
Name: s.ClusterName(),
Leader: s.serverNameForNode(mg.GroupLeader()),
}
}
}
m.Stats.JetStream = jStat
s.mu.Lock()
}
// Routes
for _, r := range s.routes {
m.Stats.Routes = append(m.Stats.Routes, routeStat(r))
}
// Gateways
if s.gateway.enabled {
gw := s.gateway
gw.RLock()
@@ -602,6 +580,36 @@ func (s *Server) sendStatsz(subj string) {
}
gw.RUnlock()
}
// Active Servers
m.Stats.ActiveServers = 1
if s.sys != nil {
m.Stats.ActiveServers += len(s.sys.servers)
}
// JetStream
if js := s.js; js != nil {
jStat := &JetStreamVarz{}
s.mu.Unlock()
js.mu.RLock()
c := js.config
c.StoreDir = _EMPTY_
jStat.Config = &c
js.mu.RUnlock()
jStat.Stats = js.usageStats()
if mg := js.getMetaGroup(); mg != nil {
if mg.Leader() {
jStat.Meta = s.raftNodeToClusterInfo(mg)
} else {
// non leader only include a shortened version without peers
jStat.Meta = &ClusterInfo{
Name: s.ClusterName(),
Leader: s.serverNameForNode(mg.GroupLeader()),
}
}
}
m.Stats.JetStream = jStat
s.mu.Lock()
}
// Send message.
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
}
@@ -622,8 +630,8 @@ func (s *Server) heartbeatStatsz() {
// This should be wrapChk() to setup common locking.
func (s *Server) startStatszTimer() {
// We will start by sending out more of these and trail off to the statsz being the max.
s.sys.cstatsz = time.Second
// Send out the first one only after a second.
s.sys.cstatsz = 250 * time.Millisecond
// Send out the first one after 250ms.
s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz))
}
@@ -904,6 +912,7 @@ func (s *Server) processRemoteServerShutdown(sid string) {
}
return true
})
delete(s.sys.servers, sid)
}
// remoteServerShutdownEvent is called when we get an event from another server shutting down.
@@ -948,12 +957,6 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, subject, reply
}
si := ssm.Server
node := string(getHash(si.Name))
if _, ok := s.nodeToInfo.Load(node); !ok {
// Since we have not seen this one they probably have not seen us so send out our update.
s.mu.Lock()
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
s.mu.Unlock()
}
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false, si.JetStream})
}
@@ -986,6 +989,8 @@ func (s *Server) processNewServer(ms *ServerInfo) {
// Add to our nodeToName
node := string(getHash(ms.Name))
s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false, ms.JetStream})
// Announce ourselves..
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
}
// If GW is enabled on this server and there are any leaf node connections,

View File

@@ -7125,6 +7125,44 @@ func TestJetStreamClusterVarzReporting(t *testing.T) {
}
}
func TestJetStreamClusterStatszActiveServers(t *testing.T) {
sc := createJetStreamSuperCluster(t, 2, 2)
defer sc.shutdown()
checkActive := func(expected int) {
t.Helper()
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
s := sc.randomServer()
nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer nc.Close()
resp, err := nc.Request(serverStatsPingReqSubj, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var ssm ServerStatsMsg
if err := json.Unmarshal(resp.Data, &ssm); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ssm.Stats.ActiveServers != expected {
return fmt.Errorf("Wanted %d, got %d", expected, ssm.Stats.ActiveServers)
}
return nil
})
}
checkActive(4)
c := sc.randomCluster()
ss := c.randomServer()
ss.Shutdown()
checkActive(3)
c.restartServer(ss)
checkActive(4)
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -330,11 +330,11 @@ func TestNoRaceLargeClusterMem(t *testing.T) {
checkClusterFormed(t, servers...)
// Calculate in MB what we are using now.
const max = 64 * 1024 * 1024 // 64MB
const max = 80 * 1024 * 1024 // 80MB
runtime.ReadMemStats(&m)
used := m.TotalAlloc - pta
if used > max {
t.Fatalf("Cluster using too much memory, expect < 60MB, got %dMB", used/(1024*1024))
t.Fatalf("Cluster using too much memory, expect < 80MB, got %dMB", used/(1024*1024))
}
for _, s := range servers {