diff --git a/server/monitor.go b/server/monitor.go index 8cefb2b1..49e3a1b8 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2638,6 +2638,7 @@ type JSzOptions struct { LeaderOnly bool `json:"leader_only,omitempty"` Offset int `json:"offset,omitempty"` Limit int `json:"limit,omitempty"` + RaftGroups bool `json:"raft,omitempty"` } // HealthzOptions are options passed to Healthz @@ -2654,15 +2655,24 @@ type ProfilezOptions struct { Debug int `json:"debug"` } +// StreamDetail shows information about the stream state and its consumers. type StreamDetail struct { - Name string `json:"name"` - Created time.Time `json:"created"` - Cluster *ClusterInfo `json:"cluster,omitempty"` - Config *StreamConfig `json:"config,omitempty"` - State StreamState `json:"state,omitempty"` - Consumer []*ConsumerInfo `json:"consumer_detail,omitempty"` - Mirror *StreamSourceInfo `json:"mirror,omitempty"` - Sources []*StreamSourceInfo `json:"sources,omitempty"` + Name string `json:"name"` + Created time.Time `json:"created"` + Cluster *ClusterInfo `json:"cluster,omitempty"` + Config *StreamConfig `json:"config,omitempty"` + State StreamState `json:"state,omitempty"` + Consumer []*ConsumerInfo `json:"consumer_detail,omitempty"` + Mirror *StreamSourceInfo `json:"mirror,omitempty"` + Sources []*StreamSourceInfo `json:"sources,omitempty"` + RaftGroup string `json:"stream_raft_group,omitempty"` + ConsumerRaftGroups []*RaftGroupDetail `json:"consumer_raft_groups,omitempty"` +} + +// RaftGroupDetail shows information details about the Raft group. +type RaftGroupDetail struct { + Name string `json:"name"` + RaftGroup string `json:"raft_group,omitempty"` } type AccountDetail struct { @@ -2698,7 +2708,7 @@ type JSInfo struct { AccountDetails []*AccountDetail `json:"account_details,omitempty"` } -func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg bool) *AccountDetail { +func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft bool) *AccountDetail { jsa.mu.RLock() acc := jsa.account name := acc.GetName() @@ -2737,7 +2747,8 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg if optStreams { for _, stream := range streams { - ci := s.js.clusterInfo(stream.raftGroup()) + rgroup := stream.raftGroup() + ci := s.js.clusterInfo(rgroup) var cfg *StreamConfig if optCfg { c := stream.config() @@ -2752,17 +2763,28 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg Mirror: stream.mirrorInfo(), Sources: stream.sourcesInfo(), } + if optRaft && rgroup != nil { + sdet.RaftGroup = rgroup.Name + sdet.ConsumerRaftGroups = make([]*RaftGroupDetail, 0) + } if optConsumers { for _, consumer := range stream.getPublicConsumers() { cInfo := consumer.info() if cInfo == nil { continue } - if !optCfg { cInfo.Config = nil } sdet.Consumer = append(sdet.Consumer, cInfo) + if optRaft { + crgroup := consumer.raftGroup() + if crgroup != nil { + sdet.ConsumerRaftGroups = append(sdet.ConsumerRaftGroups, + &RaftGroupDetail{cInfo.Name, crgroup.Name}, + ) + } + } } } detail.Streams = append(detail.Streams, sdet) @@ -2786,7 +2808,7 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } - return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config), nil + return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups), nil } // helper to get cluster info from node via dummy group @@ -2913,7 +2935,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { } // if wanted, obtain accounts/streams/consumer for _, jsa := range accounts { - detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config) + detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups) jsi.AccountDetails = append(jsi.AccountDetails, detail) } return jsi, nil @@ -2952,6 +2974,10 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { if err != nil { return } + rgroups, err := decodeBool(w, r, "raft") + if err != nil { + return + } l, err := s.Jsz(&JSzOptions{ r.URL.Query().Get("acc"), @@ -2961,7 +2987,9 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { config, leader, offset, - limit}) + limit, + rgroups, + }) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) diff --git a/server/monitor_test.go b/server/monitor_test.go index cebd90c4..78d7ef2f 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4310,6 +4310,9 @@ func TestMonitorJsz(t *testing.T) { if info.AccountDetails[0].Streams[0].Consumer[0].Config != nil { t.Fatal("Config expected to not be present") } + if len(info.AccountDetails[0].Streams[0].ConsumerRaftGroups) != 0 { + t.Fatalf("expected consumer raft groups to not be returned by %s but got %v", url, info) + } } }) t.Run("config", func(t *testing.T) { @@ -4393,6 +4396,31 @@ func TestMonitorJsz(t *testing.T) { } } }) + t.Run("raftgroups", func(t *testing.T) { + for _, url := range []string{monUrl1, monUrl2} { + info := readJsInfo(url + "?acc=ACC&consumers=true&raft=true") + if len(info.AccountDetails) != 1 { + t.Fatalf("expected account ACC to be returned by %s but got %v", url, info) + } + if len(info.AccountDetails[0].Streams[0].Consumer) == 0 { + t.Fatalf("expected consumers to be returned by %s but got %v", url, info) + } + if len(info.AccountDetails[0].Streams[0].ConsumerRaftGroups) == 0 { + t.Fatalf("expected consumer raft groups to be returned by %s but got %v", url, info) + } + srgroup := info.AccountDetails[0].Streams[0].RaftGroup + if len(srgroup) == 0 { + t.Fatal("expected stream raft group info to be included") + } + crgroup := info.AccountDetails[0].Streams[0].ConsumerRaftGroups[0] + if crgroup.Name != "my-consumer-replicated" { + t.Fatalf("expected consumer name to be included in raft group info, got: %v", crgroup.Name) + } + if len(crgroup.RaftGroup) == 0 { + t.Fatal("expected consumer raft group info to be included") + } + } + }) } func TestMonitorReloadTLSConfig(t *testing.T) {