mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add option to sort by pending size
This commit is contained in:
@@ -79,6 +79,8 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
sort.Sort(ByCid(pairs))
|
||||
case bySubs:
|
||||
sort.Sort(sort.Reverse(BySubs(pairs)))
|
||||
case byPending:
|
||||
sort.Sort(sort.Reverse(ByPending(pairs)))
|
||||
case byOutMsgs:
|
||||
sort.Sort(sort.Reverse(ByOutMsgs(pairs)))
|
||||
case byInMsgs:
|
||||
|
||||
@@ -8,6 +8,7 @@ type SortOpt string
|
||||
const (
|
||||
byCid SortOpt = "cid"
|
||||
bySubs = "subs"
|
||||
byPending = "pending"
|
||||
byOutMsgs = "msgs_to"
|
||||
byInMsgs = "msgs_from"
|
||||
byOutBytes = "bytes_to"
|
||||
@@ -43,6 +44,18 @@ func (d BySubs) Less(i, j int) bool {
|
||||
return d[i].Val.subs.Count() < d[j].Val.subs.Count()
|
||||
}
|
||||
|
||||
type ByPending []Pair
|
||||
|
||||
func (d ByPending) Len() int {
|
||||
return len(d)
|
||||
}
|
||||
func (d ByPending) Swap(i, j int) {
|
||||
d[i], d[j] = d[j], d[i]
|
||||
}
|
||||
func (d ByPending) Less(i, j int) bool {
|
||||
return d[i].Val.bw.Buffered() < d[j].Val.bw.Buffered()
|
||||
}
|
||||
|
||||
type ByOutMsgs []Pair
|
||||
|
||||
func (d ByOutMsgs) Len() int {
|
||||
|
||||
@@ -569,6 +569,46 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnzSortedByPending(t *testing.T) {
|
||||
s := runMonitorServer(DEFAULT_HTTP_PORT)
|
||||
defer s.Shutdown()
|
||||
|
||||
firstClient := createClientConnSubscribeAndPublish(t)
|
||||
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
||||
clients := make([]*nats.Conn, 3)
|
||||
for i, _ := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
defer clients[i].Close()
|
||||
}
|
||||
defer firstClient.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
|
||||
resp, err := http.Get(url + "connz?sort=pending")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("Got an error reading the body: %v\n", err)
|
||||
}
|
||||
|
||||
c := Connz{}
|
||||
if err := json.Unmarshal(body, &c); err != nil {
|
||||
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
||||
}
|
||||
|
||||
if c.Conns[0].Pending < c.Conns[1].Pending ||
|
||||
c.Conns[0].Pending < c.Conns[2].Pending ||
|
||||
c.Conns[0].Pending < c.Conns[3].Pending {
|
||||
t.Fatalf("Expected conns sorted in descending order by number of pending, got %v < one of [%v, %v, %v]\n",
|
||||
c.Conns[0].Pending, c.Conns[1].Pending, c.Conns[2].Pending, c.Conns[3].Pending)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnzSortedBySubs(t *testing.T) {
|
||||
s := runMonitorServer(DEFAULT_HTTP_PORT)
|
||||
defer s.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user