Merge pull request #1318 from nats-io/monitoring

[ADDED] Option to include subscription details in monitoring responses
This commit is contained in:
Ivan Kozlovic
2020-03-30 15:51:51 -06:00
committed by GitHub
4 changed files with 172 additions and 66 deletions

View File

@@ -62,6 +62,9 @@ type ConnzOptions struct {
// Subscriptions indicates if subscriptions should be included in the results.
Subscriptions bool `json:"subscriptions"`
// SubscriptionsDetail indicates if subscription details should be included in the results
SubscriptionsDetail bool `json:"subscriptions_detail"`
// Offset is used for pagination. Connz() only returns connections starting at this
// offset from the global results.
Offset int `json:"offset"`
@@ -98,30 +101,31 @@ const (
// ConnInfo has detailed information on a per connection basis.
type ConnInfo struct {
Cid uint64 `json:"cid"`
IP string `json:"ip"`
Port int `json:"port"`
Start time.Time `json:"start"`
LastActivity time.Time `json:"last_activity"`
Stop *time.Time `json:"stop,omitempty"`
Reason string `json:"reason,omitempty"`
RTT string `json:"rtt,omitempty"`
Uptime string `json:"uptime"`
Idle string `json:"idle"`
Pending int `json:"pending_bytes"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
NumSubs uint32 `json:"subscriptions"`
Name string `json:"name,omitempty"`
Lang string `json:"lang,omitempty"`
Version string `json:"version,omitempty"`
TLSVersion string `json:"tls_version,omitempty"`
TLSCipher string `json:"tls_cipher_suite,omitempty"`
AuthorizedUser string `json:"authorized_user,omitempty"`
Account string `json:"account,omitempty"`
Subs []string `json:"subscriptions_list,omitempty"`
Cid uint64 `json:"cid"`
IP string `json:"ip"`
Port int `json:"port"`
Start time.Time `json:"start"`
LastActivity time.Time `json:"last_activity"`
Stop *time.Time `json:"stop,omitempty"`
Reason string `json:"reason,omitempty"`
RTT string `json:"rtt,omitempty"`
Uptime string `json:"uptime"`
Idle string `json:"idle"`
Pending int `json:"pending_bytes"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
NumSubs uint32 `json:"subscriptions"`
Name string `json:"name,omitempty"`
Lang string `json:"lang,omitempty"`
Version string `json:"version,omitempty"`
TLSVersion string `json:"tls_version,omitempty"`
TLSCipher string `json:"tls_cipher_suite,omitempty"`
AuthorizedUser string `json:"authorized_user,omitempty"`
Account string `json:"account,omitempty"`
Subs []string `json:"subscriptions_list,omitempty"`
SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"`
}
// DefaultConnListSize is the default size of the connection list.
@@ -132,12 +136,29 @@ const DefaultSubListSize = 1024
const defaultStackBufSize = 10000
func newSubsDetailList(client *client) []SubDetail {
subsDetail := make([]SubDetail, 0, len(client.subs))
for _, sub := range client.subs {
subsDetail = append(subsDetail, newSubDetail(sub))
}
return subsDetail
}
func newSubsList(client *client) []string {
subs := make([]string, 0, len(client.subs))
for _, sub := range client.subs {
subs = append(subs, string(sub.subject))
}
return subs
}
// Connz returns a Connz struct containing information about connections.
func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
var (
sortOpt = ByCid
auth bool
subs bool
subsDet bool
offset int
limit = DefaultConnListSize
cid = uint64(0)
@@ -166,6 +187,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
acc = opts.Account
subs = opts.Subscriptions
subsDet = opts.SubscriptionsDetail
offset = opts.Offset
if offset < 0 {
offset = 0
@@ -297,10 +319,11 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
ci := &conns[i]
ci.fill(client, client.nc, c.Now)
// Fill in subscription data if requested.
if subs && len(client.subs) > 0 {
ci.Subs = make([]string, 0, len(client.subs))
for _, sub := range client.subs {
ci.Subs = append(ci.Subs, string(sub.subject))
if len(client.subs) > 0 {
if subsDet {
ci.SubsDetail = newSubsDetailList(client)
} else if subs {
ci.Subs = newSubsList(client)
}
}
// Fill in user if auth requested.
@@ -336,8 +359,15 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
cc = &cx
}
// Fill in subscription data if requested.
if subs && len(cc.subs) > 0 {
cc.Subs = cc.subs
if len(cc.subs) > 0 {
if subsDet {
cc.SubsDetail = cc.subs
} else if subs {
cc.Subs = make([]string, 0, len(cc.subs))
for _, sub := range cc.subs {
cc.Subs = append(cc.Subs, sub.Subject)
}
}
}
// Fill in user if auth requested.
if auth {
@@ -526,6 +556,14 @@ func decodeState(w http.ResponseWriter, r *http.Request) (ConnState, error) {
return 0, err
}
func decodeSubs(w http.ResponseWriter, r *http.Request) (subs bool, subsDet bool, err error) {
subsDet = strings.ToLower(r.URL.Query().Get("subs")) == "detail"
if !subsDet {
subs, err = decodeBool(w, r, "subs")
}
return
}
// HandleConnz process HTTP requests for connection information.
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
sortOpt := SortOpt(r.URL.Query().Get("sort"))
@@ -533,7 +571,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
subs, err := decodeBool(w, r, "subs")
subs, subsDet, err := decodeSubs(w, r)
if err != nil {
return
}
@@ -558,15 +596,16 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
acc := r.URL.Query().Get("acc")
connzOpts := &ConnzOptions{
Sort: sortOpt,
Username: auth,
Subscriptions: subs,
Offset: offset,
Limit: limit,
CID: cid,
State: state,
User: user,
Account: acc,
Sort: sortOpt,
Username: auth,
Subscriptions: subs,
SubscriptionsDetail: subsDet,
Offset: offset,
Limit: limit,
CID: cid,
State: state,
User: user,
Account: acc,
}
s.mu.Lock()
@@ -602,6 +641,8 @@ type Routez struct {
type RoutezOptions struct {
// Subscriptions indicates that Routez will return a route's subscriptions
Subscriptions bool `json:"subscriptions"`
// SubscriptionsDetail indicates if subscription details should be included in the results
SubscriptionsDetail bool `json:"subscriptions_detail"`
}
// RouteInfo has detailed information on a per connection basis.
@@ -622,6 +663,7 @@ type RouteInfo struct {
OutBytes int64 `json:"out_bytes"`
NumSubs uint32 `json:"subscriptions"`
Subs []string `json:"subscriptions_list,omitempty"`
SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"`
}
// Routez returns a Routez struct containing information about routes.
@@ -629,7 +671,9 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
rs := &Routez{Routes: []*RouteInfo{}}
rs.Now = time.Now()
subs := routezOpts != nil && routezOpts.Subscriptions
if routezOpts == nil {
routezOpts = &RoutezOptions{}
}
s.mu.Lock()
rs.NumRoutes = len(s.routes)
@@ -661,12 +705,14 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
RTT: r.getRTT(),
}
if subs && len(r.subs) > 0 {
ri.Subs = make([]string, 0, len(r.subs))
for _, sub := range r.subs {
ri.Subs = append(ri.Subs, string(sub.subject))
if len(r.subs) > 0 {
if routezOpts.SubscriptionsDetail {
ri.SubsDetail = newSubsDetailList(r)
} else if routezOpts.Subscriptions {
ri.Subs = newSubsList(r)
}
}
switch conn := r.nc.(type) {
case *net.TCPConn, *tls.Conn:
addr := conn.RemoteAddr().(*net.TCPAddr)
@@ -682,21 +728,19 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
// HandleRoutez process HTTP requests for route information.
func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
subs, err := decodeBool(w, r, "subs")
subs, subsDetail, err := decodeSubs(w, r)
if err != nil {
return
}
var opts *RoutezOptions
if subs {
opts = &RoutezOptions{Subscriptions: true}
}
opts := RoutezOptions{Subscriptions: subs, SubscriptionsDetail: subsDetail}
s.mu.Lock()
s.httpReqStats[RoutezPath]++
s.mu.Unlock()
// As of now, no error is ever returned.
rs, _ := s.Routez(opts)
rs, _ := s.Routez(&opts)
b, err := json.MarshalIndent(rs, "", " ")
if err != nil {
s.Errorf("Error marshaling response to /routez request: %v", err)
@@ -743,6 +787,17 @@ type SubDetail struct {
Cid uint64 `json:"cid"`
}
func newSubDetail(sub *subscription) SubDetail {
return SubDetail{
Subject: string(sub.subject),
Queue: string(sub.queue),
Sid: string(sub.sid),
Msgs: sub.nm,
Max: sub.max,
Cid: sub.client.cid,
}
}
// Subsz returns a Subsz struct containing subjects statistics
func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
var (
@@ -797,14 +852,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
continue
}
sub.client.mu.Lock()
details[i] = SubDetail{
Subject: string(sub.subject),
Queue: string(sub.queue),
Sid: string(sub.sid),
Msgs: sub.nm,
Max: sub.max,
Cid: sub.client.cid,
}
details[i] = newSubDetail(sub)
sub.client.mu.Unlock()
i++
}

View File

@@ -301,6 +301,9 @@ func TestConnz(t *testing.T) {
if len(ci.Subs) != 0 {
t.Fatalf("Expected subs of 0, got %v\n", ci.Subs)
}
if len(ci.SubsDetail) != 0 {
t.Fatalf("Expected subsdetail of 0, got %v\n", ci.SubsDetail)
}
if ci.InMsgs != 1 {
t.Fatalf("Expected InMsgs of 1, got %v\n", ci.InMsgs)
}
@@ -377,6 +380,57 @@ func TestConnzWithSubs(t *testing.T) {
}
}
func TestConnzWithSubsDetail(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
nc := createClientConnSubscribeAndPublish(t, s)
defer nc.Close()
nc.Subscribe("hello.foo", func(m *nats.Msg) {})
ensureServerActivityRecorded(t, nc)
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url+"connz?subs=detail", &ConnzOptions{SubscriptionsDetail: true})
// Test inside details of each connection
ci := c.Conns[0]
if len(ci.SubsDetail) != 1 || ci.SubsDetail[0].Subject != "hello.foo" {
t.Fatalf("Expected subsdetail of 1, got %v\n", ci.Subs)
}
}
}
func TestClosedConnzWithSubsDetail(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
nc := createClientConnSubscribeAndPublish(t, s)
nc.Subscribe("hello.foo", func(m *nats.Msg) {})
ensureServerActivityRecorded(t, nc)
nc.Close()
s.mu.Lock()
for len(s.clients) != 0 {
s.mu.Unlock()
<-time.After(100 * time.Millisecond)
s.mu.Lock()
}
s.mu.Unlock()
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url+"connz?state=closed&subs=detail", &ConnzOptions{State: ConnClosed,
SubscriptionsDetail: true})
// Test inside details of each connection
ci := c.Conns[0]
if len(ci.SubsDetail) != 1 || ci.SubsDetail[0].Subject != "hello.foo" {
t.Fatalf("Expected subsdetail of 1, got %v\n", ci.Subs)
}
}
}
func TestConnzWithCID(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
@@ -1226,10 +1280,10 @@ func TestConnzWithRoutes(t *testing.T) {
checkExpectedSubs(t, 1, s, sc)
// Now check routez
urls := []string{"routez", "routez?subs=1"}
urls := []string{"routez", "routez?subs=1", "routez?subs=detail"}
for subs, urlSuffix := range urls {
for mode := 0; mode < 2; mode++ {
rz := pollRoutez(t, s, mode, url+urlSuffix, &RoutezOptions{Subscriptions: subs == 1})
rz := pollRoutez(t, s, mode, url+urlSuffix, &RoutezOptions{Subscriptions: subs == 1, SubscriptionsDetail: subs == 2})
if rz.NumRoutes != 1 {
t.Fatalf("Expected 1 route, got %d\n", rz.NumRoutes)
@@ -1250,10 +1304,14 @@ func TestConnzWithRoutes(t *testing.T) {
if len(route.Subs) != 0 {
t.Fatalf("There should not be subs, got %v", len(route.Subs))
}
} else {
if len(route.Subs) != 1 {
} else if subs == 1 {
if len(route.Subs) != 1 && len(route.SubsDetail) != 0 {
t.Fatalf("There should be 1 sub, got %v", len(route.Subs))
}
} else if subs == 2 {
if len(route.SubsDetail) != 1 && len(route.Subs) != 0 {
t.Fatalf("There should be 1 sub, got %v", len(route.SubsDetail))
}
}
}
}

View File

@@ -16,7 +16,7 @@ package server
// We wrap to hold onto optional items for /connz.
type closedClient struct {
ConnInfo
subs []string
subs []SubDetail
user string
acc string
}

View File

@@ -1828,9 +1828,9 @@ func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) {
// Do subs, do not place by default in main ConnInfo
if len(c.subs) > 0 {
cc.subs = make([]string, 0, len(c.subs))
cc.subs = make([]SubDetail, 0, len(c.subs))
for _, sub := range c.subs {
cc.subs = append(cc.subs, string(sub.subject))
cc.subs = append(cc.subs, newSubDetail(sub))
}
}
// Hold user as well.