mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
connz: limit and offset
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright 2013 Apcera Inc. All rights reserved.
|
||||
// Copyright 2013-2014 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/apcera/gnatsd/sublist"
|
||||
@@ -18,6 +19,8 @@ import (
|
||||
// Connz represents detail information on current connections.
|
||||
type Connz struct {
|
||||
NumConns int `json:"num_connections"`
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit"`
|
||||
Conns []*ConnInfo `json:"connections"`
|
||||
}
|
||||
|
||||
@@ -37,11 +40,30 @@ type ConnInfo struct {
|
||||
|
||||
// HandleConnz process HTTP requests for connection information.
|
||||
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
c := Connz{Conns: []*ConnInfo{}}
|
||||
c := &Connz{Conns: []*ConnInfo{}}
|
||||
|
||||
subs, _ := strconv.Atoi(r.URL.Query().Get("subs"))
|
||||
c.Offset, _ = strconv.Atoi(r.URL.Query().Get("offset"))
|
||||
c.Limit, _ = strconv.Atoi(r.URL.Query().Get("limit"))
|
||||
if c.Limit == 0 {
|
||||
c.Limit = 100
|
||||
}
|
||||
|
||||
// Walk the list
|
||||
s.mu.Lock()
|
||||
c.NumConns = len(s.clients)
|
||||
|
||||
i := 0
|
||||
for _, client := range s.clients {
|
||||
if i >= c.Offset+c.Limit {
|
||||
break
|
||||
}
|
||||
|
||||
i++
|
||||
if i <= c.Offset {
|
||||
continue
|
||||
}
|
||||
|
||||
ci := &ConnInfo{
|
||||
Cid: client.cid,
|
||||
InMsgs: client.inMsgs,
|
||||
@@ -51,7 +73,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
NumSubs: client.subs.Count(),
|
||||
}
|
||||
|
||||
if subs := r.URL.Query().Get("subs"); subs == "1" {
|
||||
if subs == 1 {
|
||||
ci.Subs = castToSliceString(client.subs.All())
|
||||
}
|
||||
|
||||
@@ -64,8 +86,6 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
c.NumConns = len(c.Conns)
|
||||
|
||||
b, err := json.MarshalIndent(c, "", " ")
|
||||
if err != nil {
|
||||
Logf("Error marshalling response to /connz request: %v", err)
|
||||
@@ -74,6 +94,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func castToSliceString(input []interface{}) []string {
|
||||
|
||||
output := make([]string, 0, len(input))
|
||||
for _, line := range input {
|
||||
output = append(output, string(line.(*subscription).subject))
|
||||
@@ -89,11 +110,11 @@ type Subsz struct {
|
||||
|
||||
// HandleStats process HTTP requests for subjects stats.
|
||||
func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
|
||||
st := Subsz{SubjectStats: s.sl.Stats()}
|
||||
st := &Subsz{SubjectStats: s.sl.Stats()}
|
||||
|
||||
b, err := json.MarshalIndent(st, "", " ")
|
||||
if err != nil {
|
||||
Logf("Error marshalling response to /stats request: %v", err)
|
||||
Logf("Error marshalling response to /subscriptionsz request: %v", err)
|
||||
}
|
||||
w.Write(b)
|
||||
}
|
||||
@@ -123,10 +144,10 @@ type usage struct {
|
||||
|
||||
// HandleVarz will process HTTP requests for server information.
|
||||
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
|
||||
v := Varz{Start: s.start, Options: s.opts}
|
||||
v := &Varz{Start: s.start, Options: s.opts}
|
||||
v.Uptime = time.Since(s.start).String()
|
||||
|
||||
updateUsage(&v)
|
||||
updateUsage(v)
|
||||
|
||||
s.mu.Lock()
|
||||
v.Connections = len(s.clients)
|
||||
|
||||
@@ -347,7 +347,7 @@ func (s *Server) StartHTTPMonitoring() {
|
||||
mux.HandleFunc("/connz", s.HandleConnz)
|
||||
|
||||
// Subz
|
||||
mux.HandleFunc("/subsz", s.HandleSubsz)
|
||||
mux.HandleFunc("/subscriptionsz", s.HandleSubsz)
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: hp,
|
||||
|
||||
@@ -41,10 +41,10 @@ func TestNoMonitorPort(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVarz(t *testing.T) {
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT)
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT + 5)
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+5)
|
||||
resp, err := http.Get(url + "varz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -107,10 +107,10 @@ func TestVarz(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnz(t *testing.T) {
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT - 1)
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT + 1)
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT-1)
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+1)
|
||||
resp, err := http.Get(url + "connz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -163,6 +163,14 @@ func TestConnz(t *testing.T) {
|
||||
t.Fatalf("Expected 1 connections in array, got %p\n", c.Conns)
|
||||
}
|
||||
|
||||
if c.Limit != 100 {
|
||||
t.Fatalf("Expected limit of 100, got %v\n", c.Limit)
|
||||
}
|
||||
|
||||
if c.Offset != 0 {
|
||||
t.Fatalf("Expected offset of 0, got %v\n", c.Offset)
|
||||
}
|
||||
|
||||
// Test inside details of each connection
|
||||
ci := c.Conns[0]
|
||||
|
||||
@@ -196,13 +204,13 @@ func TestConnz(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnzWithSubs(t *testing.T) {
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT + 1)
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT + 2)
|
||||
defer s.Shutdown()
|
||||
|
||||
cl := createClientConnSubscribeAndPublish(t)
|
||||
defer cl.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+1)
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+2)
|
||||
resp, err := http.Get(url + "connz?subs=1")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -228,15 +236,57 @@ func TestConnzWithSubs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubsz(t *testing.T) {
|
||||
func TestConnzWithOffsetAndLimit(t *testing.T) {
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT + 3)
|
||||
defer s.Shutdown()
|
||||
|
||||
cl1 := createClientConnSubscribeAndPublish(t)
|
||||
defer cl1.Close()
|
||||
|
||||
cl2 := createClientConnSubscribeAndPublish(t)
|
||||
defer cl2.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+3)
|
||||
resp, err := http.Get(url + "connz?offset=1&limit=1")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("Got an error reading the body: %v\n", err)
|
||||
}
|
||||
|
||||
c := server.Connz{}
|
||||
if err := json.Unmarshal(body, &c); err != nil {
|
||||
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
||||
}
|
||||
|
||||
if c.Limit != 1 {
|
||||
t.Fatalf("Expected limit of 1, got %v\n", c.Limit)
|
||||
}
|
||||
|
||||
if c.Offset != 1 {
|
||||
t.Fatalf("Expected offset of 1, got %v\n", c.Offset)
|
||||
}
|
||||
|
||||
if len(c.Conns) != 1 {
|
||||
t.Fatalf("Expected conns of 1, got %v\n", len(c.Conns))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubsz(t *testing.T) {
|
||||
s := runMonitorServer(server.DEFAULT_HTTP_PORT + 4)
|
||||
defer s.Shutdown()
|
||||
|
||||
cl := createClientConnSubscribeAndPublish(t)
|
||||
defer cl.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+3)
|
||||
resp, err := http.Get(url + "subsz")
|
||||
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+4)
|
||||
resp, err := http.Get(url + "subscriptionsz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user