diff --git a/server/events.go b/server/events.go index b395a698..70d60472 100644 --- a/server/events.go +++ b/server/events.go @@ -199,6 +199,7 @@ type ServerStats struct { SlowConsumers int64 `json:"slow_consumers"` Routes []*RouteStat `json:"routes,omitempty"` Gateways []*GatewayStat `json:"gateways,omitempty"` + ActiveServers int `json:"active_servers,omitempty"` JetStream *JetStreamVarz `json:"jetstream,omitempty"` } @@ -490,7 +491,6 @@ func (s *Server) checkRemoteServers() { s.Debugf("Detected orphan remote server: %q", sid) // Simulate it going away. s.processRemoteServerShutdown(sid) - delete(s.sys.servers, sid) } } if s.sys.sweeper != nil { @@ -536,7 +536,7 @@ func routeStat(r *client) *RouteStat { // Actual send method for statz updates. // Lock should be held. func (s *Server) sendStatsz(subj string) { - m := ServerStatsMsg{} + var m ServerStatsMsg s.updateServerUsage(&m.Stats) m.Stats.Start = s.start m.Stats.Connections = len(s.clients) @@ -548,33 +548,11 @@ func (s *Server) sendStatsz(subj string) { m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes) m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) m.Stats.NumSubs = s.numSubscriptions() - - if js := s.js; js != nil { - jStat := &JetStreamVarz{} - s.mu.Unlock() - js.mu.RLock() - c := js.config - c.StoreDir = _EMPTY_ - jStat.Config = &c - js.mu.RUnlock() - jStat.Stats = js.usageStats() - if mg := js.getMetaGroup(); mg != nil { - if mg.Leader() { - jStat.Meta = s.raftNodeToClusterInfo(mg) - } else { - // non leader only include a shortened version without peers - jStat.Meta = &ClusterInfo{ - Name: s.ClusterName(), - Leader: s.serverNameForNode(mg.GroupLeader()), - } - } - } - m.Stats.JetStream = jStat - s.mu.Lock() - } + // Routes for _, r := range s.routes { m.Stats.Routes = append(m.Stats.Routes, routeStat(r)) } + // Gateways if s.gateway.enabled { gw := s.gateway gw.RLock() @@ -602,6 +580,36 @@ func (s *Server) sendStatsz(subj string) { } gw.RUnlock() } + // Active Servers + m.Stats.ActiveServers = 1 + if s.sys != nil { + m.Stats.ActiveServers += len(s.sys.servers) + } + // JetStream + if js := s.js; js != nil { + jStat := &JetStreamVarz{} + s.mu.Unlock() + js.mu.RLock() + c := js.config + c.StoreDir = _EMPTY_ + jStat.Config = &c + js.mu.RUnlock() + jStat.Stats = js.usageStats() + if mg := js.getMetaGroup(); mg != nil { + if mg.Leader() { + jStat.Meta = s.raftNodeToClusterInfo(mg) + } else { + // non leader only include a shortened version without peers + jStat.Meta = &ClusterInfo{ + Name: s.ClusterName(), + Leader: s.serverNameForNode(mg.GroupLeader()), + } + } + } + m.Stats.JetStream = jStat + s.mu.Lock() + } + // Send message. s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) } @@ -622,8 +630,8 @@ func (s *Server) heartbeatStatsz() { // This should be wrapChk() to setup common locking. func (s *Server) startStatszTimer() { // We will start by sending out more of these and trail off to the statsz being the max. - s.sys.cstatsz = time.Second - // Send out the first one only after a second. + s.sys.cstatsz = 250 * time.Millisecond + // Send out the first one after 250ms. s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz)) } @@ -904,6 +912,7 @@ func (s *Server) processRemoteServerShutdown(sid string) { } return true }) + delete(s.sys.servers, sid) } // remoteServerShutdownEvent is called when we get an event from another server shutting down. @@ -948,12 +957,6 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, subject, reply } si := ssm.Server node := string(getHash(si.Name)) - if _, ok := s.nodeToInfo.Load(node); !ok { - // Since we have not seen this one they probably have not seen us so send out our update. - s.mu.Lock() - s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) - s.mu.Unlock() - } s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false, si.JetStream}) } @@ -986,6 +989,8 @@ func (s *Server) processNewServer(ms *ServerInfo) { // Add to our nodeToName node := string(getHash(ms.Name)) s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false, ms.JetStream}) + // Announce ourselves.. + s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) } // If GW is enabled on this server and there are any leaf node connections, diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 9af2ddc0..d9800565 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7125,6 +7125,44 @@ func TestJetStreamClusterVarzReporting(t *testing.T) { } } +func TestJetStreamClusterStatszActiveServers(t *testing.T) { + sc := createJetStreamSuperCluster(t, 2, 2) + defer sc.shutdown() + + checkActive := func(expected int) { + t.Helper() + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { + s := sc.randomServer() + nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Failed to create system client: %v", err) + } + defer nc.Close() + + resp, err := nc.Request(serverStatsPingReqSubj, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var ssm ServerStatsMsg + if err := json.Unmarshal(resp.Data, &ssm); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ssm.Stats.ActiveServers != expected { + return fmt.Errorf("Wanted %d, got %d", expected, ssm.Stats.ActiveServers) + } + return nil + }) + } + + checkActive(4) + c := sc.randomCluster() + ss := c.randomServer() + ss.Shutdown() + checkActive(3) + c.restartServer(ss) + checkActive(4) +} + // Support functions // Used to setup superclusters for tests. diff --git a/test/norace_test.go b/test/norace_test.go index 46880f23..24330ed2 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -330,11 +330,11 @@ func TestNoRaceLargeClusterMem(t *testing.T) { checkClusterFormed(t, servers...) // Calculate in MB what we are using now. - const max = 64 * 1024 * 1024 // 64MB + const max = 80 * 1024 * 1024 // 80MB runtime.ReadMemStats(&m) used := m.TotalAlloc - pta if used > max { - t.Fatalf("Cluster using too much memory, expect < 60MB, got %dMB", used/(1024*1024)) + t.Fatalf("Cluster using too much memory, expect < 80MB, got %dMB", used/(1024*1024)) } for _, s := range servers {