diff --git a/server/monitor.go b/server/monitor.go index c11578cc..27ed78c4 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013 Apcera Inc. All rights reserved. +// Copyright 2013-2014 Apcera Inc. All rights reserved. package server @@ -10,6 +10,7 @@ import ( "os" "os/exec" "runtime" + "strconv" "time" "github.com/apcera/gnatsd/sublist" @@ -18,6 +19,8 @@ import ( // Connz represents detail information on current connections. type Connz struct { NumConns int `json:"num_connections"` + Offset int `json:"offset"` + Limit int `json:"limit"` Conns []*ConnInfo `json:"connections"` } @@ -37,11 +40,30 @@ type ConnInfo struct { // HandleConnz process HTTP requests for connection information. func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { - c := Connz{Conns: []*ConnInfo{}} + c := &Connz{Conns: []*ConnInfo{}} + + subs, _ := strconv.Atoi(r.URL.Query().Get("subs")) + c.Offset, _ = strconv.Atoi(r.URL.Query().Get("offset")) + c.Limit, _ = strconv.Atoi(r.URL.Query().Get("limit")) + if c.Limit == 0 { + c.Limit = 100 + } // Walk the list s.mu.Lock() + c.NumConns = len(s.clients) + + i := 0 for _, client := range s.clients { + if i >= c.Offset+c.Limit { + break + } + + i++ + if i <= c.Offset { + continue + } + ci := &ConnInfo{ Cid: client.cid, InMsgs: client.inMsgs, @@ -51,7 +73,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { NumSubs: client.subs.Count(), } - if subs := r.URL.Query().Get("subs"); subs == "1" { + if subs == 1 { ci.Subs = castToSliceString(client.subs.All()) } @@ -64,8 +86,6 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { } s.mu.Unlock() - c.NumConns = len(c.Conns) - b, err := json.MarshalIndent(c, "", " ") if err != nil { Logf("Error marshalling response to /connz request: %v", err) @@ -74,6 +94,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { } func castToSliceString(input []interface{}) []string { + output := make([]string, 0, len(input)) for _, line := range input { output = append(output, string(line.(*subscription).subject)) @@ -89,11 +110,11 @@ type Subsz struct { // HandleStats process HTTP requests for subjects stats. func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { - st := Subsz{SubjectStats: s.sl.Stats()} + st := &Subsz{SubjectStats: s.sl.Stats()} b, err := json.MarshalIndent(st, "", " ") if err != nil { - Logf("Error marshalling response to /stats request: %v", err) + Logf("Error marshalling response to /subscriptionsz request: %v", err) } w.Write(b) } @@ -123,10 +144,10 @@ type usage struct { // HandleVarz will process HTTP requests for server information. func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { - v := Varz{Start: s.start, Options: s.opts} + v := &Varz{Start: s.start, Options: s.opts} v.Uptime = time.Since(s.start).String() - updateUsage(&v) + updateUsage(v) s.mu.Lock() v.Connections = len(s.clients) diff --git a/server/server.go b/server/server.go index 1cad13e7..0245b971 100644 --- a/server/server.go +++ b/server/server.go @@ -347,7 +347,7 @@ func (s *Server) StartHTTPMonitoring() { mux.HandleFunc("/connz", s.HandleConnz) // Subz - mux.HandleFunc("/subsz", s.HandleSubsz) + mux.HandleFunc("/subscriptionsz", s.HandleSubsz) srv := &http.Server{ Addr: hp, diff --git a/test/monitor_test.go b/test/monitor_test.go index efa2fb25..ac076260 100644 --- a/test/monitor_test.go +++ b/test/monitor_test.go @@ -41,10 +41,10 @@ func TestNoMonitorPort(t *testing.T) { } func TestVarz(t *testing.T) { - s := runMonitorServer(server.DEFAULT_HTTP_PORT) + s := runMonitorServer(server.DEFAULT_HTTP_PORT + 5) defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT) + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+5) resp, err := http.Get(url + "varz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -107,10 +107,10 @@ func TestVarz(t *testing.T) { } func TestConnz(t *testing.T) { - s := runMonitorServer(server.DEFAULT_HTTP_PORT - 1) + s := runMonitorServer(server.DEFAULT_HTTP_PORT + 1) defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT-1) + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+1) resp, err := http.Get(url + "connz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -163,6 +163,14 @@ func TestConnz(t *testing.T) { t.Fatalf("Expected 1 connections in array, got %p\n", c.Conns) } + if c.Limit != 100 { + t.Fatalf("Expected limit of 100, got %v\n", c.Limit) + } + + if c.Offset != 0 { + t.Fatalf("Expected offset of 0, got %v\n", c.Offset) + } + // Test inside details of each connection ci := c.Conns[0] @@ -196,13 +204,13 @@ func TestConnz(t *testing.T) { } func TestConnzWithSubs(t *testing.T) { - s := runMonitorServer(server.DEFAULT_HTTP_PORT + 1) + s := runMonitorServer(server.DEFAULT_HTTP_PORT + 2) defer s.Shutdown() cl := createClientConnSubscribeAndPublish(t) defer cl.Close() - url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+1) + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+2) resp, err := http.Get(url + "connz?subs=1") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -228,15 +236,57 @@ func TestConnzWithSubs(t *testing.T) { } } -func TestSubsz(t *testing.T) { +func TestConnzWithOffsetAndLimit(t *testing.T) { s := runMonitorServer(server.DEFAULT_HTTP_PORT + 3) defer s.Shutdown() + cl1 := createClientConnSubscribeAndPublish(t) + defer cl1.Close() + + cl2 := createClientConnSubscribeAndPublish(t) + defer cl2.Close() + + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+3) + resp, err := http.Get(url + "connz?offset=1&limit=1") + if err != nil { + t.Fatalf("Expected no error: Got %v\n", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Got an error reading the body: %v\n", err) + } + + c := server.Connz{} + if err := json.Unmarshal(body, &c); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v\n", err) + } + + if c.Limit != 1 { + t.Fatalf("Expected limit of 1, got %v\n", c.Limit) + } + + if c.Offset != 1 { + t.Fatalf("Expected offset of 1, got %v\n", c.Offset) + } + + if len(c.Conns) != 1 { + t.Fatalf("Expected conns of 1, got %v\n", len(c.Conns)) + } +} + +func TestSubsz(t *testing.T) { + s := runMonitorServer(server.DEFAULT_HTTP_PORT + 4) + defer s.Shutdown() + cl := createClientConnSubscribeAndPublish(t) defer cl.Close() - url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+3) - resp, err := http.Get(url + "subsz") + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+4) + resp, err := http.Get(url + "subscriptionsz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) }