Merge pull request #2932 from ripienaar/jsz_cluster_on_leader

[IMPROVED] Ensures the cluster info in jsz is sent from the leader only
This commit is contained in:
R.I.Pienaar
2022-03-25 19:51:07 +01:00
committed by GitHub
2 changed files with 44 additions and 26 deletions

View File

@@ -1341,7 +1341,10 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
v.Stats = js.usageStats()
if mg := js.getMetaGroup(); mg != nil {
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()}
v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()}
if ci.Leader == s.info.Name {
v.Meta.Replicas = ci.Replicas
}
}
}
}
@@ -2307,7 +2310,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
a.mu.RLock()
defer a.mu.RUnlock()
var vrIssues []ExtVrIssues
claim, _ := jwt.DecodeAccountClaims(a.claimJWT) //ignore error
claim, _ := jwt.DecodeAccountClaims(a.claimJWT) // ignore error
if claim != nil {
vr := jwt.ValidationResults{}
claim.Validate(&vr)
@@ -2606,33 +2609,26 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
opts.Accounts = true
}
// Check if we want a response from the leader only.
if opts.LeaderOnly {
js, cc := s.getJetStreamCluster()
if js == nil {
// Ignore
return nil, fmt.Errorf("%w: no cluster", errSkipZreq)
}
// So if we have JS but no clustering, we are the leader so allow.
if cc != nil {
js.mu.RLock()
isLeader := cc.isLeader()
js.mu.RUnlock()
if !isLeader {
return nil, fmt.Errorf("%w: not leader", errSkipZreq)
}
}
}
jsi := &JSInfo{
ID: s.ID(),
Now: time.Now().UTC(),
}
if !s.JetStreamEnabled() {
js := s.getJetStream()
if js == nil || !js.isEnabled() {
jsi.Disabled = true
return jsi, nil
}
accounts := []*jsAccount{}
js.mu.RLock()
isLeader := js.cluster == nil || js.cluster.isLeader()
js.mu.RUnlock()
if opts.LeaderOnly && !isLeader {
return nil, fmt.Errorf("%w: not leader", errSkipZreq)
}
var accounts []*jsAccount
s.js.mu.RLock()
jsi.Config = s.js.config
@@ -2643,9 +2639,13 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
if mg := s.js.getMetaGroup(); mg != nil {
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()}
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()}
if isLeader {
jsi.Meta.Replicas = ci.Replicas
}
}
}
jsi.JetStreamStats = *s.js.usageStats()
filterIdx := -1
@@ -2697,7 +2697,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
return jsi, nil
}
// HandleJSz process HTTP requests for jetstream information.
// HandleJsz process HTTP requests for jetstream information.
func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.httpReqStats[JszPath]++

View File

@@ -1084,7 +1084,7 @@ func TestConnzSortedByStopTimeClosedConn(t *testing.T) {
}
checkClosedConns(t, s, 4, time.Second)
//Now adjust the Stop times for these with some random values.
// Now adjust the Stop times for these with some random values.
s.mu.Lock()
now := time.Now().UTC()
ccs := s.closed.closedClients()
@@ -1129,7 +1129,7 @@ func TestConnzSortedByReason(t *testing.T) {
}
checkClosedConns(t, s, 20, time.Second)
//Now adjust the Reasons for these with some random values.
// Now adjust the Reasons for these with some random values.
s.mu.Lock()
ccs := s.closed.closedClients()
max := int(ServerShutdown)
@@ -4231,6 +4231,24 @@ func TestMonitorJsz(t *testing.T) {
t.Fatal("ReplicationLag expected to be present for my-stream-mirror stream")
}
})
t.Run("cluster-info", func(t *testing.T) {
found := 0
for i, url := range []string{monUrl1, monUrl2} {
info := readJsInfo(url + "")
if info.Meta.Replicas != nil {
found++
if info.Meta.Leader != srvs[i].Name() {
t.Fatalf("received cluster info from non leader: leader %s, server: %s", info.Meta.Leader, srvs[i].Name())
}
}
}
if found == 0 {
t.Fatalf("did not receive cluster info from any node")
}
if found > 1 {
t.Fatalf("received cluster info from multiple nodes")
}
})
t.Run("account-non-existing", func(t *testing.T) {
for _, url := range []string{monUrl1, monUrl2} {
info := readJsInfo(url + "?acc=DOES_NOT_EXIT")