From 30ba3336634c5959b693bc68cdf51730973f2247 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 19 Mar 2020 20:03:43 -0400 Subject: [PATCH] Adding an option to include subscription details in monitoring responses. Applies to routez and connz and closed connections. Enable by specifying subs=detail Signed-off-by: Matthias Hanel --- server/monitor.go | 166 ++++++++++++++++++++++++++--------------- server/monitor_test.go | 66 +++++++++++++++- server/ring.go | 2 +- server/server.go | 4 +- 4 files changed, 172 insertions(+), 66 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index d7e4faa3..616a45e4 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -62,6 +62,9 @@ type ConnzOptions struct { // Subscriptions indicates if subscriptions should be included in the results. Subscriptions bool `json:"subscriptions"` + // SubscriptionsDetail indicates if subscription details should be included in the results + SubscriptionsDetail bool `json:"subscriptions_detail"` + // Offset is used for pagination. Connz() only returns connections starting at this // offset from the global results. Offset int `json:"offset"` @@ -98,30 +101,31 @@ const ( // ConnInfo has detailed information on a per connection basis. type ConnInfo struct { - Cid uint64 `json:"cid"` - IP string `json:"ip"` - Port int `json:"port"` - Start time.Time `json:"start"` - LastActivity time.Time `json:"last_activity"` - Stop *time.Time `json:"stop,omitempty"` - Reason string `json:"reason,omitempty"` - RTT string `json:"rtt,omitempty"` - Uptime string `json:"uptime"` - Idle string `json:"idle"` - Pending int `json:"pending_bytes"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - NumSubs uint32 `json:"subscriptions"` - Name string `json:"name,omitempty"` - Lang string `json:"lang,omitempty"` - Version string `json:"version,omitempty"` - TLSVersion string `json:"tls_version,omitempty"` - TLSCipher string `json:"tls_cipher_suite,omitempty"` - AuthorizedUser string `json:"authorized_user,omitempty"` - Account string `json:"account,omitempty"` - Subs []string `json:"subscriptions_list,omitempty"` + Cid uint64 `json:"cid"` + IP string `json:"ip"` + Port int `json:"port"` + Start time.Time `json:"start"` + LastActivity time.Time `json:"last_activity"` + Stop *time.Time `json:"stop,omitempty"` + Reason string `json:"reason,omitempty"` + RTT string `json:"rtt,omitempty"` + Uptime string `json:"uptime"` + Idle string `json:"idle"` + Pending int `json:"pending_bytes"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + NumSubs uint32 `json:"subscriptions"` + Name string `json:"name,omitempty"` + Lang string `json:"lang,omitempty"` + Version string `json:"version,omitempty"` + TLSVersion string `json:"tls_version,omitempty"` + TLSCipher string `json:"tls_cipher_suite,omitempty"` + AuthorizedUser string `json:"authorized_user,omitempty"` + Account string `json:"account,omitempty"` + Subs []string `json:"subscriptions_list,omitempty"` + SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"` } // DefaultConnListSize is the default size of the connection list. @@ -132,12 +136,29 @@ const DefaultSubListSize = 1024 const defaultStackBufSize = 10000 +func newSubsDetailList(client *client) []SubDetail { + subsDetail := make([]SubDetail, 0, len(client.subs)) + for _, sub := range client.subs { + subsDetail = append(subsDetail, newSubDetail(sub)) + } + return subsDetail +} + +func newSubsList(client *client) []string { + subs := make([]string, 0, len(client.subs)) + for _, sub := range client.subs { + subs = append(subs, string(sub.subject)) + } + return subs +} + // Connz returns a Connz struct containing information about connections. func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { var ( sortOpt = ByCid auth bool subs bool + subsDet bool offset int limit = DefaultConnListSize cid = uint64(0) @@ -166,6 +187,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { acc = opts.Account subs = opts.Subscriptions + subsDet = opts.SubscriptionsDetail offset = opts.Offset if offset < 0 { offset = 0 @@ -297,10 +319,11 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { ci := &conns[i] ci.fill(client, client.nc, c.Now) // Fill in subscription data if requested. - if subs && len(client.subs) > 0 { - ci.Subs = make([]string, 0, len(client.subs)) - for _, sub := range client.subs { - ci.Subs = append(ci.Subs, string(sub.subject)) + if len(client.subs) > 0 { + if subsDet { + ci.SubsDetail = newSubsDetailList(client) + } else if subs { + ci.Subs = newSubsList(client) } } // Fill in user if auth requested. @@ -336,8 +359,15 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { cc = &cx } // Fill in subscription data if requested. - if subs && len(cc.subs) > 0 { - cc.Subs = cc.subs + if len(cc.subs) > 0 { + if subsDet { + cc.SubsDetail = cc.subs + } else if subs { + cc.Subs = make([]string, 0, len(cc.subs)) + for _, sub := range cc.subs { + cc.Subs = append(cc.Subs, sub.Subject) + } + } } // Fill in user if auth requested. if auth { @@ -526,6 +556,14 @@ func decodeState(w http.ResponseWriter, r *http.Request) (ConnState, error) { return 0, err } +func decodeSubs(w http.ResponseWriter, r *http.Request) (subs bool, subsDet bool, err error) { + subsDet = strings.ToLower(r.URL.Query().Get("subs")) == "detail" + if !subsDet { + subs, err = decodeBool(w, r, "subs") + } + return +} + // HandleConnz process HTTP requests for connection information. func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { sortOpt := SortOpt(r.URL.Query().Get("sort")) @@ -533,7 +571,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { if err != nil { return } - subs, err := decodeBool(w, r, "subs") + subs, subsDet, err := decodeSubs(w, r) if err != nil { return } @@ -558,15 +596,16 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { acc := r.URL.Query().Get("acc") connzOpts := &ConnzOptions{ - Sort: sortOpt, - Username: auth, - Subscriptions: subs, - Offset: offset, - Limit: limit, - CID: cid, - State: state, - User: user, - Account: acc, + Sort: sortOpt, + Username: auth, + Subscriptions: subs, + SubscriptionsDetail: subsDet, + Offset: offset, + Limit: limit, + CID: cid, + State: state, + User: user, + Account: acc, } s.mu.Lock() @@ -602,6 +641,8 @@ type Routez struct { type RoutezOptions struct { // Subscriptions indicates that Routez will return a route's subscriptions Subscriptions bool `json:"subscriptions"` + // SubscriptionsDetail indicates if subscription details should be included in the results + SubscriptionsDetail bool `json:"subscriptions_detail"` } // RouteInfo has detailed information on a per connection basis. @@ -622,6 +663,7 @@ type RouteInfo struct { OutBytes int64 `json:"out_bytes"` NumSubs uint32 `json:"subscriptions"` Subs []string `json:"subscriptions_list,omitempty"` + SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"` } // Routez returns a Routez struct containing information about routes. @@ -629,7 +671,9 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { rs := &Routez{Routes: []*RouteInfo{}} rs.Now = time.Now() - subs := routezOpts != nil && routezOpts.Subscriptions + if routezOpts == nil { + routezOpts = &RoutezOptions{} + } s.mu.Lock() rs.NumRoutes = len(s.routes) @@ -661,12 +705,14 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { RTT: r.getRTT(), } - if subs && len(r.subs) > 0 { - ri.Subs = make([]string, 0, len(r.subs)) - for _, sub := range r.subs { - ri.Subs = append(ri.Subs, string(sub.subject)) + if len(r.subs) > 0 { + if routezOpts.SubscriptionsDetail { + ri.SubsDetail = newSubsDetailList(r) + } else if routezOpts.Subscriptions { + ri.Subs = newSubsList(r) } } + switch conn := r.nc.(type) { case *net.TCPConn, *tls.Conn: addr := conn.RemoteAddr().(*net.TCPAddr) @@ -682,21 +728,19 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { // HandleRoutez process HTTP requests for route information. func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) { - subs, err := decodeBool(w, r, "subs") + subs, subsDetail, err := decodeSubs(w, r) if err != nil { return } - var opts *RoutezOptions - if subs { - opts = &RoutezOptions{Subscriptions: true} - } + + opts := RoutezOptions{Subscriptions: subs, SubscriptionsDetail: subsDetail} s.mu.Lock() s.httpReqStats[RoutezPath]++ s.mu.Unlock() // As of now, no error is ever returned. - rs, _ := s.Routez(opts) + rs, _ := s.Routez(&opts) b, err := json.MarshalIndent(rs, "", " ") if err != nil { s.Errorf("Error marshaling response to /routez request: %v", err) @@ -743,6 +787,17 @@ type SubDetail struct { Cid uint64 `json:"cid"` } +func newSubDetail(sub *subscription) SubDetail { + return SubDetail{ + Subject: string(sub.subject), + Queue: string(sub.queue), + Sid: string(sub.sid), + Msgs: sub.nm, + Max: sub.max, + Cid: sub.client.cid, + } +} + // Subsz returns a Subsz struct containing subjects statistics func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { var ( @@ -793,14 +848,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { continue } sub.client.mu.Lock() - details[i] = SubDetail{ - Subject: string(sub.subject), - Queue: string(sub.queue), - Sid: string(sub.sid), - Msgs: sub.nm, - Max: sub.max, - Cid: sub.client.cid, - } + details[i] = newSubDetail(sub) sub.client.mu.Unlock() i++ } diff --git a/server/monitor_test.go b/server/monitor_test.go index 7632a3aa..957b7108 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -301,6 +301,9 @@ func TestConnz(t *testing.T) { if len(ci.Subs) != 0 { t.Fatalf("Expected subs of 0, got %v\n", ci.Subs) } + if len(ci.SubsDetail) != 0 { + t.Fatalf("Expected subsdetail of 0, got %v\n", ci.SubsDetail) + } if ci.InMsgs != 1 { t.Fatalf("Expected InMsgs of 1, got %v\n", ci.InMsgs) } @@ -377,6 +380,57 @@ func TestConnzWithSubs(t *testing.T) { } } +func TestConnzWithSubsDetail(t *testing.T) { + s := runMonitorServer() + defer s.Shutdown() + + nc := createClientConnSubscribeAndPublish(t, s) + defer nc.Close() + + nc.Subscribe("hello.foo", func(m *nats.Msg) {}) + ensureServerActivityRecorded(t, nc) + + url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port) + for mode := 0; mode < 2; mode++ { + c := pollConz(t, s, mode, url+"connz?subs=detail", &ConnzOptions{SubscriptionsDetail: true}) + // Test inside details of each connection + ci := c.Conns[0] + if len(ci.SubsDetail) != 1 || ci.SubsDetail[0].Subject != "hello.foo" { + t.Fatalf("Expected subsdetail of 1, got %v\n", ci.Subs) + } + } +} + +func TestClosedConnzWithSubsDetail(t *testing.T) { + s := runMonitorServer() + defer s.Shutdown() + + nc := createClientConnSubscribeAndPublish(t, s) + + nc.Subscribe("hello.foo", func(m *nats.Msg) {}) + ensureServerActivityRecorded(t, nc) + nc.Close() + + s.mu.Lock() + for len(s.clients) != 0 { + s.mu.Unlock() + <-time.After(100 * time.Millisecond) + s.mu.Lock() + } + s.mu.Unlock() + + url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port) + for mode := 0; mode < 2; mode++ { + c := pollConz(t, s, mode, url+"connz?state=closed&subs=detail", &ConnzOptions{State: ConnClosed, + SubscriptionsDetail: true}) + // Test inside details of each connection + ci := c.Conns[0] + if len(ci.SubsDetail) != 1 || ci.SubsDetail[0].Subject != "hello.foo" { + t.Fatalf("Expected subsdetail of 1, got %v\n", ci.Subs) + } + } +} + func TestConnzWithCID(t *testing.T) { s := runMonitorServer() defer s.Shutdown() @@ -1226,10 +1280,10 @@ func TestConnzWithRoutes(t *testing.T) { checkExpectedSubs(t, 1, s, sc) // Now check routez - urls := []string{"routez", "routez?subs=1"} + urls := []string{"routez", "routez?subs=1", "routez?subs=detail"} for subs, urlSuffix := range urls { for mode := 0; mode < 2; mode++ { - rz := pollRoutez(t, s, mode, url+urlSuffix, &RoutezOptions{Subscriptions: subs == 1}) + rz := pollRoutez(t, s, mode, url+urlSuffix, &RoutezOptions{Subscriptions: subs == 1, SubscriptionsDetail: subs == 2}) if rz.NumRoutes != 1 { t.Fatalf("Expected 1 route, got %d\n", rz.NumRoutes) @@ -1250,10 +1304,14 @@ func TestConnzWithRoutes(t *testing.T) { if len(route.Subs) != 0 { t.Fatalf("There should not be subs, got %v", len(route.Subs)) } - } else { - if len(route.Subs) != 1 { + } else if subs == 1 { + if len(route.Subs) != 1 && len(route.SubsDetail) != 0 { t.Fatalf("There should be 1 sub, got %v", len(route.Subs)) } + } else if subs == 2 { + if len(route.SubsDetail) != 1 && len(route.Subs) != 0 { + t.Fatalf("There should be 1 sub, got %v", len(route.SubsDetail)) + } } } } diff --git a/server/ring.go b/server/ring.go index 194c7481..2673a170 100644 --- a/server/ring.go +++ b/server/ring.go @@ -16,7 +16,7 @@ package server // We wrap to hold onto optional items for /connz. type closedClient struct { ConnInfo - subs []string + subs []SubDetail user string acc string } diff --git a/server/server.go b/server/server.go index 94a8d5a3..ea422391 100644 --- a/server/server.go +++ b/server/server.go @@ -1828,9 +1828,9 @@ func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { // Do subs, do not place by default in main ConnInfo if len(c.subs) > 0 { - cc.subs = make([]string, 0, len(c.subs)) + cc.subs = make([]SubDetail, 0, len(c.subs)) for _, sub := range c.subs { - cc.subs = append(cc.subs, string(sub.subject)) + cc.subs = append(cc.subs, newSubDetail(sub)) } } // Hold user as well.