Added more sort options, fixed some broken ones.

Fixes #700, #701, #702

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-06-29 17:16:00 -07:00
parent 1aa928a88d
commit cd834a36fa
5 changed files with 306 additions and 22 deletions

View File

@@ -6,7 +6,7 @@ install:
- go get github.com/nats-io/go-nats
- go get github.com/mattn/goveralls
- go get github.com/wadey/gocovmerge
- go get -u honnef.co/go/tools/cmd/megacheck
- go get -u -insecure honnef.co/go/tools/cmd/megacheck
- go get -u github.com/client9/misspell/cmd/misspell
before_script:
- EXCLUDE_VENDOR=$(go list ./... | grep -v "/vendor/")

View File

@@ -128,7 +128,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
if opts != nil {
// If no sort option given or sort is by uptime, then sort by cid
if opts.Sort == "" || opts.Sort == ByUptime {
if opts.Sort == "" {
sortOpt = ByCid
} else {
sortOpt = opts.Sort
@@ -149,6 +149,15 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
// state
state = opts.State
// ByStop only makes sense on closed connections
if sortOpt == ByStop && state != ConnClosed {
return nil, fmt.Errorf("Sort by stop only valid on closed connections")
}
// ByReason is the same.
if sortOpt == ByReason && state != ConnClosed {
return nil, fmt.Errorf("Sort by reason only valid on closed connections")
}
// If searching by CID
if opts.CID > 0 {
cid = opts.CID
@@ -289,7 +298,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
}
switch sortOpt {
case ByCid:
case ByCid, ByStart:
sort.Sort(byCid{pconns})
case BySubs:
sort.Sort(sort.Reverse(bySubs{pconns}))
@@ -308,7 +317,11 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
case ByIdle:
sort.Sort(sort.Reverse(byIdle{pconns}))
case ByUptime:
sort.Sort(sort.Reverse(byCid{pconns}))
sort.Sort(byUptime{pconns, time.Now()})
case ByStop:
sort.Sort(sort.Reverse(byStop{pconns}))
case ByReason:
sort.Sort(byReason{pconns})
}
minoff := c.Offset

View File

@@ -13,6 +13,10 @@
package server
import (
"time"
)
// Represents a connection info list. We use pointers since it will be sorted.
type ConnInfos []*ConnInfo
@@ -26,6 +30,7 @@ type SortOpt string
// Possible sort options
const (
ByCid SortOpt = "cid" // By connection ID
ByStart SortOpt = "start" // By connection start time, same as CID
BySubs SortOpt = "subs" // By number of subscriptions
ByPending SortOpt = "pending" // By amount of data in bytes waiting to be sent to client
ByOutMsgs SortOpt = "msgs_to" // By number of messages sent
@@ -35,6 +40,9 @@ const (
ByLast SortOpt = "last" // By the last activity
ByIdle SortOpt = "idle" // By the amount of inactivity
ByUptime SortOpt = "uptime" // By the amount of time connections exist
ByStop SortOpt = "stop" // By the stop time for a closed connection
ByReason SortOpt = "reason" // By the reason for a closed connection
)
// Individual sort options provide the Less for sort.Interface. Len and Swap are on cList.
@@ -83,12 +91,55 @@ func (l byLast) Less(i, j int) bool {
// Idle time
type byIdle struct{ ConnInfos }
func (l byIdle) Less(i, j int) bool { return l.ConnInfos[i].Idle < l.ConnInfos[j].Idle }
func (l byIdle) Less(i, j int) bool {
ii := l.ConnInfos[i].LastActivity.Sub(l.ConnInfos[i].Start)
ij := l.ConnInfos[j].LastActivity.Sub(l.ConnInfos[j].Start)
return ii < ij
}
// Uptime
type byUptime struct {
ConnInfos
now time.Time
}
func (l byUptime) Less(i, j int) bool {
ci := l.ConnInfos[i]
cj := l.ConnInfos[j]
var upi, upj time.Duration
if ci.Stop == nil || ci.Stop.IsZero() {
upi = l.now.Sub(ci.Start)
} else {
upi = ci.Stop.Sub(ci.Start)
}
if cj.Stop == nil || cj.Stop.IsZero() {
upj = l.now.Sub(cj.Start)
} else {
upj = cj.Stop.Sub(cj.Start)
}
return upi < upj
}
// Stop
type byStop struct{ ConnInfos }
func (l byStop) Less(i, j int) bool {
ciStop := l.ConnInfos[i].Stop
cjStop := l.ConnInfos[j].Stop
return ciStop.Before(*cjStop)
}
// Reason
type byReason struct{ ConnInfos }
func (l byReason) Less(i, j int) bool {
return l.ConnInfos[i].Reason < l.ConnInfos[j].Reason
}
// IsValid determines if a sort option is valid
func (s SortOpt) IsValid() bool {
switch s {
case "", ByCid, BySubs, ByPending, ByOutMsgs, ByInMsgs, ByOutBytes, ByInBytes, ByLast, ByIdle, ByUptime:
case "", ByCid, ByStart, BySubs, ByPending, ByOutMsgs, ByInMsgs, ByOutBytes, ByInBytes, ByLast, ByIdle, ByUptime, ByStop, ByReason:
return true
default:
return false

View File

@@ -17,9 +17,11 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"runtime"
"sort"
"strings"
"sync"
"testing"
@@ -678,7 +680,30 @@ func TestConnzSortedByCid(t *testing.T) {
if c.Conns[0].Cid > c.Conns[1].Cid ||
c.Conns[1].Cid > c.Conns[2].Cid ||
c.Conns[2].Cid > c.Conns[3].Cid {
t.Fatalf("Expected conns sorted in ascending order by cid, got %v < %v\n", c.Conns[0].Cid, c.Conns[3].Cid)
t.Fatalf("Expected conns sorted in ascending order by cid, got [%v, %v, %v, %v]\n",
c.Conns[0].Cid, c.Conns[1].Cid, c.Conns[2].Cid, c.Conns[3].Cid)
}
}
}
func TestConnzSortedByStart(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
clients := make([]*nats.Conn, 4)
for i := range clients {
clients[i] = createClientConnSubscribeAndPublish(t, s)
defer clients[i].Close()
}
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url+"connz?sort=start", &ConnzOptions{Sort: ByStart})
if c.Conns[0].Start.After(c.Conns[1].Start) ||
c.Conns[1].Start.After(c.Conns[2].Start) ||
c.Conns[2].Start.After(c.Conns[3].Start) {
t.Fatalf("Expected conns sorted in ascending order by startime, got [%v, %v, %v, %v]\n",
c.Conns[0].Start, c.Conns[1].Start, c.Conns[2].Start, c.Conns[3].Start)
}
}
}
@@ -821,23 +846,194 @@ func TestConnzSortedByUptime(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
clients := make([]*nats.Conn, 5)
for i := range clients {
clients[i] = createClientConnSubscribeAndPublish(t, s)
defer clients[i].Close()
time.Sleep(250 * time.Millisecond)
for i := 0; i < 4; i++ {
client := createClientConnSubscribeAndPublish(t, s)
defer client.Close()
// Since we check times (now-start) does not have to be big.
time.Sleep(50 * time.Millisecond)
}
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url+"connz?sort=uptime", &ConnzOptions{Sort: ByUptime})
// uptime is generated by Conn.Start
if c.Conns[0].Start.UnixNano() > c.Conns[1].Start.UnixNano() ||
c.Conns[1].Start.UnixNano() > c.Conns[2].Start.UnixNano() ||
c.Conns[2].Start.UnixNano() > c.Conns[3].Start.UnixNano() {
t.Fatalf("Expected conns sorted in ascending order by start time, got %v > one of [%v, %v, %v]\n",
c.Conns[0].Start, c.Conns[1].Start, c.Conns[2].Start, c.Conns[3].Start)
now := time.Now()
ups := make([]int, 4)
for i := 0; i < 4; i++ {
ups[i] = int(now.Sub(c.Conns[i].Start))
}
if !sort.IntsAreSorted(ups) {
d := make([]time.Duration, 4)
for i := 0; i < 4; i++ {
d[i] = time.Duration(ups[i])
}
t.Fatalf("Expected conns sorted in ascending order by uptime (now-Start), got %+v\n", d)
}
}
}
func TestConnzSortedByUptimeClosedConn(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
for i := time.Duration(1); i <= 4; i++ {
c := createClientConnSubscribeAndPublish(t, s)
// Grab client and asjust start time such that
client := s.getClient(uint64(i))
if client == nil {
t.Fatalf("Could nopt retrieve client for %d\n", i)
}
client.mu.Lock()
client.start = client.start.Add(-10 * (4 - i) * time.Second)
client.mu.Unlock()
c.Close()
}
checkClosedConns(t, s, 4, time.Second)
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url+"connz?state=closed&sort=uptime", &ConnzOptions{State: ConnClosed, Sort: ByUptime})
ups := make([]int, 4)
for i := 0; i < 4; i++ {
ups[i] = int(c.Conns[i].Stop.Sub(c.Conns[i].Start))
}
if !sort.IntsAreSorted(ups) {
d := make([]time.Duration, 4)
for i := 0; i < 4; i++ {
d[i] = time.Duration(ups[i])
}
t.Fatalf("Expected conns sorted in ascending order by uptime, got %+v\n", d)
}
}
}
func TestConnzSortedByStopOnOpen(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
opts := s.getOpts()
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
// 4 clients
for i := 0; i < 4; i++ {
c, err := nats.Connect(url)
if err != nil {
t.Fatalf("Could not create client: %v\n", err)
}
defer c.Close()
}
c, err := s.Connz(&ConnzOptions{Sort: ByStop})
if err == nil {
t.Fatalf("Expected err to be non-nil, got %+v\n", c)
}
}
func TestConnzSortedByStopTimeClosedConn(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
opts := s.getOpts()
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
// 4 clients
for i := 0; i < 4; i++ {
c, err := nats.Connect(url)
if err != nil {
t.Fatalf("Could not create client: %v\n", err)
}
c.Close()
}
checkClosedConns(t, s, 4, time.Second)
//Now adjust the Stop times for these with some random values.
s.mu.Lock()
now := time.Now()
ccs := s.closed.closedClients()
for _, cc := range ccs {
newStop := now.Add(time.Duration(rand.Int()%120) * -time.Minute)
cc.Stop = &newStop
}
s.mu.Unlock()
url = fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url+"connz?state=closed&sort=stop", &ConnzOptions{State: ConnClosed, Sort: ByStop})
ups := make([]int, 4)
nowU := time.Now().UnixNano()
for i := 0; i < 4; i++ {
ups[i] = int(nowU - c.Conns[i].Stop.UnixNano())
}
if !sort.IntsAreSorted(ups) {
d := make([]time.Duration, 4)
for i := 0; i < 4; i++ {
d[i] = time.Duration(ups[i])
}
t.Fatalf("Expected conns sorted in ascending order by stop time, got %+v\n", d)
}
}
}
func TestConnzSortedByReason(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
opts := s.getOpts()
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
// 20 clients
for i := 0; i < 20; i++ {
c, err := nats.Connect(url)
if err != nil {
t.Fatalf("Could not create client: %v\n", err)
}
c.Close()
}
checkClosedConns(t, s, 20, time.Second)
//Now adjust the Reasons for these with some random values.
s.mu.Lock()
ccs := s.closed.closedClients()
max := int(ServerShutdown)
for _, cc := range ccs {
cc.Reason = ClosedState(rand.Int() % max).String()
}
s.mu.Unlock()
url = fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url+"connz?state=closed&sort=reason", &ConnzOptions{State: ConnClosed, Sort: ByReason})
rs := make([]string, 20)
for i := 0; i < 20; i++ {
rs[i] = c.Conns[i].Reason
}
if !sort.StringsAreSorted(rs) {
t.Fatalf("Expected conns sorted in order by stop reason, got %#v\n", rs)
}
}
}
func TestConnzSortedByReasonOnOpen(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
opts := s.getOpts()
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
// 4 clients
for i := 0; i < 4; i++ {
c, err := nats.Connect(url)
if err != nil {
t.Fatalf("Could not create client: %v\n", err)
}
defer c.Close()
}
c, err := s.Connz(&ConnzOptions{Sort: ByReason})
if err == nil {
t.Fatalf("Expected err to be non-nil, got %+v\n", c)
}
}
@@ -855,8 +1051,25 @@ func TestConnzSortedByIdle(t *testing.T) {
secondClient := createClientConnSubscribeAndPublish(t, s)
defer secondClient.Close()
secondClient.Subscribe("client.2", func(m *nats.Msg) {})
secondClient.Flush()
// Make it such that the second client started 10 secs ago. 10 is important since bug
// was strcmp, e.g. 1s vs 11s
var cid uint64
switch mode {
case 0:
cid = uint64(2)
case 1:
cid = uint64(4)
}
client := s.getClient(cid)
if client == nil {
t.Fatalf("Error looking up client %v\n", 2)
}
client.mu.Lock()
client.start = client.start.Add(-10 * time.Second)
client.last = client.start
client.mu.Unlock()
// The Idle granularity is a whole second
time.Sleep(time.Second)
@@ -882,9 +1095,9 @@ func TestConnzSortedByIdle(t *testing.T) {
t.Fatalf("Unable to parse duration %v, err=%v", c.Conns[0].Idle, err)
}
if idle1 < idle2 {
if idle2 < idle1 {
t.Fatalf("Expected conns sorted in descending order by Idle, got %v < %v\n",
idle1, idle2)
idle2, idle1)
}
}
for mode := 0; mode < 2; mode++ {

View File

@@ -1030,6 +1030,13 @@ func (s *Server) NumClients() int {
return len(s.clients)
}
// getClient will return the client associated with cid.
func (s *Server) getClient(cid uint64) *client {
s.mu.Lock()
defer s.mu.Unlock()
return s.clients[cid]
}
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()