From 055703f4fabc46eb9722193f9ecbed8803795228 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 18 Mar 2022 10:27:09 +0100 Subject: [PATCH] ensures the cluster info in jsz is sent from the leader only The data from other nodes are usually wrong, this can be quite confusing for users so we now only send it when we are the leader Signed-off-by: R.I.Pienaar --- server/monitor.go | 48 +++++++++++++++++++++--------------------- server/monitor_test.go | 22 +++++++++++++++++-- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 0cf1d6ad..b9324486 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1341,7 +1341,10 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) { v.Stats = js.usageStats() if mg := js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { - v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()} + v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()} + if ci.Leader == s.info.Name { + v.Meta.Replicas = ci.Replicas + } } } } @@ -2307,7 +2310,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { a.mu.RLock() defer a.mu.RUnlock() var vrIssues []ExtVrIssues - claim, _ := jwt.DecodeAccountClaims(a.claimJWT) //ignore error + claim, _ := jwt.DecodeAccountClaims(a.claimJWT) // ignore error if claim != nil { vr := jwt.ValidationResults{} claim.Validate(&vr) @@ -2606,33 +2609,26 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { opts.Accounts = true } - // Check if we want a response from the leader only. - if opts.LeaderOnly { - js, cc := s.getJetStreamCluster() - if js == nil { - // Ignore - return nil, fmt.Errorf("%w: no cluster", errSkipZreq) - } - // So if we have JS but no clustering, we are the leader so allow. - if cc != nil { - js.mu.RLock() - isLeader := cc.isLeader() - js.mu.RUnlock() - if !isLeader { - return nil, fmt.Errorf("%w: not leader", errSkipZreq) - } - } - } - jsi := &JSInfo{ ID: s.ID(), Now: time.Now().UTC(), } - if !s.JetStreamEnabled() { + + js := s.getJetStream() + if js == nil || !js.isEnabled() { jsi.Disabled = true return jsi, nil } - accounts := []*jsAccount{} + + js.mu.RLock() + isLeader := js.cluster == nil || js.cluster.isLeader() + js.mu.RUnlock() + + if opts.LeaderOnly && !isLeader { + return nil, fmt.Errorf("%w: not leader", errSkipZreq) + } + + var accounts []*jsAccount s.js.mu.RLock() jsi.Config = s.js.config @@ -2643,9 +2639,13 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { if mg := s.js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { - jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()} + jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()} + if isLeader { + jsi.Meta.Replicas = ci.Replicas + } } } + jsi.JetStreamStats = *s.js.usageStats() filterIdx := -1 @@ -2697,7 +2697,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { return jsi, nil } -// HandleJSz process HTTP requests for jetstream information. +// HandleJsz process HTTP requests for jetstream information. func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { s.mu.Lock() s.httpReqStats[JszPath]++ diff --git a/server/monitor_test.go b/server/monitor_test.go index 041638c1..72a82c2b 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1084,7 +1084,7 @@ func TestConnzSortedByStopTimeClosedConn(t *testing.T) { } checkClosedConns(t, s, 4, time.Second) - //Now adjust the Stop times for these with some random values. + // Now adjust the Stop times for these with some random values. s.mu.Lock() now := time.Now().UTC() ccs := s.closed.closedClients() @@ -1129,7 +1129,7 @@ func TestConnzSortedByReason(t *testing.T) { } checkClosedConns(t, s, 20, time.Second) - //Now adjust the Reasons for these with some random values. + // Now adjust the Reasons for these with some random values. s.mu.Lock() ccs := s.closed.closedClients() max := int(ServerShutdown) @@ -4231,6 +4231,24 @@ func TestMonitorJsz(t *testing.T) { t.Fatal("ReplicationLag expected to be present for my-stream-mirror stream") } }) + t.Run("cluster-info", func(t *testing.T) { + found := 0 + for i, url := range []string{monUrl1, monUrl2} { + info := readJsInfo(url + "") + if info.Meta.Replicas != nil { + found++ + if info.Meta.Leader != srvs[i].Name() { + t.Fatalf("received cluster info from non leader: leader %s, server: %s", info.Meta.Leader, srvs[i].Name()) + } + } + } + if found == 0 { + t.Fatalf("did not receive cluster info from any node") + } + if found > 1 { + t.Fatalf("received cluster info from multiple nodes") + } + }) t.Run("account-non-existing", func(t *testing.T) { for _, url := range []string{monUrl1, monUrl2} { info := readJsInfo(url + "?acc=DOES_NOT_EXIT")