From cc63915f5451c18f268b2479134ec402bb4b5177 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 9 Jun 2021 19:14:37 -0700 Subject: [PATCH 1/3] Added active servers to statsz. This is generally useful but will also help with cli actions since we know from first response how many total responses are expected. Signed-off-by: Derek Collison --- server/events.go | 25 ++++++++++++--------- server/jetstream_cluster_test.go | 38 ++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/server/events.go b/server/events.go index 810951f4..44fc0af2 100644 --- a/server/events.go +++ b/server/events.go @@ -199,6 +199,7 @@ type ServerStats struct { SlowConsumers int64 `json:"slow_consumers"` Routes []*RouteStat `json:"routes,omitempty"` Gateways []*GatewayStat `json:"gateways,omitempty"` + ActiveServers int `json:"active_servers,omitempty"` } // RouteStat holds route statistics. @@ -489,7 +490,6 @@ func (s *Server) checkRemoteServers() { s.Debugf("Detected orphan remote server: %q", sid) // Simulate it going away. s.processRemoteServerShutdown(sid) - delete(s.sys.servers, sid) } } if s.sys.sweeper != nil { @@ -535,7 +535,7 @@ func routeStat(r *client) *RouteStat { // Actual send method for statz updates. // Lock should be held. func (s *Server) sendStatsz(subj string) { - m := ServerStatsMsg{} + var m ServerStatsMsg s.updateServerUsage(&m.Stats) m.Stats.Start = s.start m.Stats.Connections = len(s.clients) @@ -547,10 +547,11 @@ func (s *Server) sendStatsz(subj string) { m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes) m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) m.Stats.NumSubs = s.numSubscriptions() - + // Routes for _, r := range s.routes { m.Stats.Routes = append(m.Stats.Routes, routeStat(r)) } + // Gateways if s.gateway.enabled { gw := s.gateway gw.RLock() @@ -578,6 +579,11 @@ func (s *Server) sendStatsz(subj string) { } gw.RUnlock() } + // Active Servers + m.Stats.ActiveServers = 1 + if s.sys != nil { + m.Stats.ActiveServers += len(s.sys.servers) + } s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) } @@ -598,8 +604,8 @@ func (s *Server) heartbeatStatsz() { // This should be wrapChk() to setup common locking. func (s *Server) startStatszTimer() { // We will start by sending out more of these and trail off to the statsz being the max. - s.sys.cstatsz = time.Second - // Send out the first one only after a second. + s.sys.cstatsz = 250 * time.Millisecond + // Send out the first one after 250ms. s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz)) } @@ -880,6 +886,7 @@ func (s *Server) processRemoteServerShutdown(sid string) { } return true }) + delete(s.sys.servers, sid) } // remoteServerShutdownEvent is called when we get an event from another server shutting down. @@ -924,12 +931,6 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, subject, reply } si := ssm.Server node := string(getHash(si.Name)) - if _, ok := s.nodeToInfo.Load(node); !ok { - // Since we have not seen this one they probably have not seen us so send out our update. - s.mu.Lock() - s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) - s.mu.Unlock() - } s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false, si.JetStream}) } @@ -962,6 +963,8 @@ func (s *Server) processNewServer(ms *ServerInfo) { // Add to our nodeToName node := string(getHash(ms.Name)) s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false, ms.JetStream}) + // Announce ourselves.. + s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) } // If GW is enabled on this server and there are any leaf node connections, diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 9af2ddc0..d9800565 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7125,6 +7125,44 @@ func TestJetStreamClusterVarzReporting(t *testing.T) { } } +func TestJetStreamClusterStatszActiveServers(t *testing.T) { + sc := createJetStreamSuperCluster(t, 2, 2) + defer sc.shutdown() + + checkActive := func(expected int) { + t.Helper() + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { + s := sc.randomServer() + nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Failed to create system client: %v", err) + } + defer nc.Close() + + resp, err := nc.Request(serverStatsPingReqSubj, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var ssm ServerStatsMsg + if err := json.Unmarshal(resp.Data, &ssm); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ssm.Stats.ActiveServers != expected { + return fmt.Errorf("Wanted %d, got %d", expected, ssm.Stats.ActiveServers) + } + return nil + }) + } + + checkActive(4) + c := sc.randomCluster() + ss := c.randomServer() + ss.Shutdown() + checkActive(3) + c.restartServer(ss) + checkActive(4) +} + // Support functions // Used to setup superclusters for tests. From 5f93ca09cdb802e57ba80c5e3273bfc74fe6e0bd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 10 Jun 2021 07:28:20 -0700 Subject: [PATCH 2/3] Bumped memory ceiling Signed-off-by: Derek Collison --- test/norace_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/norace_test.go b/test/norace_test.go index 46880f23..24330ed2 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -330,11 +330,11 @@ func TestNoRaceLargeClusterMem(t *testing.T) { checkClusterFormed(t, servers...) // Calculate in MB what we are using now. - const max = 64 * 1024 * 1024 // 64MB + const max = 80 * 1024 * 1024 // 80MB runtime.ReadMemStats(&m) used := m.TotalAlloc - pta if used > max { - t.Fatalf("Cluster using too much memory, expect < 60MB, got %dMB", used/(1024*1024)) + t.Fatalf("Cluster using too much memory, expect < 80MB, got %dMB", used/(1024*1024)) } for _, s := range servers { From 637973a1c76beadee618389b493f8d849a3a5b08 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 10 Jun 2021 10:04:41 -0700 Subject: [PATCH 3/3] fomatting Signed-off-by: Derek Collison --- server/events.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/events.go b/server/events.go index d2525c18..70d60472 100644 --- a/server/events.go +++ b/server/events.go @@ -585,7 +585,7 @@ func (s *Server) sendStatsz(subj string) { if s.sys != nil { m.Stats.ActiveServers += len(s.sys.servers) } - // JetStream + // JetStream if js := s.js; js != nil { jStat := &JetStreamVarz{} s.mu.Unlock() @@ -609,6 +609,7 @@ func (s *Server) sendStatsz(subj string) { m.Stats.JetStream = jStat s.mu.Lock() } + // Send message. s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) }