mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
ensures the cluster info in jsz is sent from the leader only
The data from other nodes are usually wrong, this can be quite confusing for users so we now only send it when we are the leader Signed-off-by: R.I.Pienaar <rip@devco.net>
This commit is contained in:
@@ -1341,7 +1341,10 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
|
||||
v.Stats = js.usageStats()
|
||||
if mg := js.getMetaGroup(); mg != nil {
|
||||
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
|
||||
v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()}
|
||||
v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()}
|
||||
if ci.Leader == s.info.Name {
|
||||
v.Meta.Replicas = ci.Replicas
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2307,7 +2310,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
var vrIssues []ExtVrIssues
|
||||
claim, _ := jwt.DecodeAccountClaims(a.claimJWT) //ignore error
|
||||
claim, _ := jwt.DecodeAccountClaims(a.claimJWT) // ignore error
|
||||
if claim != nil {
|
||||
vr := jwt.ValidationResults{}
|
||||
claim.Validate(&vr)
|
||||
@@ -2606,33 +2609,26 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
opts.Accounts = true
|
||||
}
|
||||
|
||||
// Check if we want a response from the leader only.
|
||||
if opts.LeaderOnly {
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil {
|
||||
// Ignore
|
||||
return nil, fmt.Errorf("%w: no cluster", errSkipZreq)
|
||||
}
|
||||
// So if we have JS but no clustering, we are the leader so allow.
|
||||
if cc != nil {
|
||||
js.mu.RLock()
|
||||
isLeader := cc.isLeader()
|
||||
js.mu.RUnlock()
|
||||
if !isLeader {
|
||||
return nil, fmt.Errorf("%w: not leader", errSkipZreq)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
jsi := &JSInfo{
|
||||
ID: s.ID(),
|
||||
Now: time.Now().UTC(),
|
||||
}
|
||||
if !s.JetStreamEnabled() {
|
||||
|
||||
js := s.getJetStream()
|
||||
if js == nil || !js.isEnabled() {
|
||||
jsi.Disabled = true
|
||||
return jsi, nil
|
||||
}
|
||||
accounts := []*jsAccount{}
|
||||
|
||||
js.mu.RLock()
|
||||
isLeader := js.cluster == nil || js.cluster.isLeader()
|
||||
js.mu.RUnlock()
|
||||
|
||||
if opts.LeaderOnly && !isLeader {
|
||||
return nil, fmt.Errorf("%w: not leader", errSkipZreq)
|
||||
}
|
||||
|
||||
var accounts []*jsAccount
|
||||
|
||||
s.js.mu.RLock()
|
||||
jsi.Config = s.js.config
|
||||
@@ -2643,9 +2639,13 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
|
||||
if mg := s.js.getMetaGroup(); mg != nil {
|
||||
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
|
||||
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()}
|
||||
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()}
|
||||
if isLeader {
|
||||
jsi.Meta.Replicas = ci.Replicas
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
jsi.JetStreamStats = *s.js.usageStats()
|
||||
|
||||
filterIdx := -1
|
||||
@@ -2697,7 +2697,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
return jsi, nil
|
||||
}
|
||||
|
||||
// HandleJSz process HTTP requests for jetstream information.
|
||||
// HandleJsz process HTTP requests for jetstream information.
|
||||
func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) {
|
||||
s.mu.Lock()
|
||||
s.httpReqStats[JszPath]++
|
||||
|
||||
@@ -1084,7 +1084,7 @@ func TestConnzSortedByStopTimeClosedConn(t *testing.T) {
|
||||
}
|
||||
checkClosedConns(t, s, 4, time.Second)
|
||||
|
||||
//Now adjust the Stop times for these with some random values.
|
||||
// Now adjust the Stop times for these with some random values.
|
||||
s.mu.Lock()
|
||||
now := time.Now().UTC()
|
||||
ccs := s.closed.closedClients()
|
||||
@@ -1129,7 +1129,7 @@ func TestConnzSortedByReason(t *testing.T) {
|
||||
}
|
||||
checkClosedConns(t, s, 20, time.Second)
|
||||
|
||||
//Now adjust the Reasons for these with some random values.
|
||||
// Now adjust the Reasons for these with some random values.
|
||||
s.mu.Lock()
|
||||
ccs := s.closed.closedClients()
|
||||
max := int(ServerShutdown)
|
||||
@@ -4231,6 +4231,24 @@ func TestMonitorJsz(t *testing.T) {
|
||||
t.Fatal("ReplicationLag expected to be present for my-stream-mirror stream")
|
||||
}
|
||||
})
|
||||
t.Run("cluster-info", func(t *testing.T) {
|
||||
found := 0
|
||||
for i, url := range []string{monUrl1, monUrl2} {
|
||||
info := readJsInfo(url + "")
|
||||
if info.Meta.Replicas != nil {
|
||||
found++
|
||||
if info.Meta.Leader != srvs[i].Name() {
|
||||
t.Fatalf("received cluster info from non leader: leader %s, server: %s", info.Meta.Leader, srvs[i].Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
if found == 0 {
|
||||
t.Fatalf("did not receive cluster info from any node")
|
||||
}
|
||||
if found > 1 {
|
||||
t.Fatalf("received cluster info from multiple nodes")
|
||||
}
|
||||
})
|
||||
t.Run("account-non-existing", func(t *testing.T) {
|
||||
for _, url := range []string{monUrl1, monUrl2} {
|
||||
info := readJsInfo(url + "?acc=DOES_NOT_EXIT")
|
||||
|
||||
Reference in New Issue
Block a user