diff --git a/TODO.md b/TODO.md index 9e9ffefd..41f293ab 100644 --- a/TODO.md +++ b/TODO.md @@ -20,7 +20,8 @@ - [ ] Info updates contain other implicit route servers - [ ] Pedantic state - [ ] Multi-tenant accounts with isolation of subject space -- [ ] Track last activity time per connection? +- [ ] Default sort by cid on connz +- [X] Track last activity time per connection? - [X] Add total connections to varz so we won't miss spikes, etc. - [X] Add starttime and uptime to connz list. - [X] Gossip Protocol for discovery for clustering diff --git a/server/client.go b/server/client.go index b2116f23..832c869d 100644 --- a/server/client.go +++ b/server/client.go @@ -50,7 +50,7 @@ type client struct { ptmr *time.Timer pout int msgb [msgScratchSize]byte - + last time.Time parseState route *route @@ -254,6 +254,7 @@ func (c *client) processConnect(arg []byte) error { // so we can just clear it here. c.mu.Lock() c.clearAuthTimer() + c.last = time.Now() c.mu.Unlock() if err := json.Unmarshal(arg, &c.opts); err != nil { @@ -439,6 +440,8 @@ func (c *client) processPub(arg []byte) error { if c.opts.Pedantic && !sublist.IsValidLiteralSubject(c.pa.subject) { c.sendErr("Invalid Subject") } + // Update last activity. + c.last = time.Now() return nil } @@ -492,6 +495,8 @@ func (c *client) processSub(argo []byte) (err error) { c.mu.Unlock() return nil } + // Update last activity. + c.last = time.Now() // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. diff --git a/server/monitor.go b/server/monitor.go index 3cc1261d..9470781d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -34,23 +34,24 @@ type Connz struct { // ConnInfo has detailed information on a per connection basis. type ConnInfo struct { - Cid uint64 `json:"cid"` - IP string `json:"ip"` - Port int `json:"port"` - Start time.Time `json:"start"` - Uptime string `json:"uptime"` - Pending int `json:"pending_bytes"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - NumSubs uint32 `json:"subscriptions"` - Name string `json:"name,omitempty"` - Lang string `json:"lang,omitempty"` - Version string `json:"version,omitempty"` - TLSVersion string `json:"tls_version,omitempty"` - TLSCipher string `json:"tls_cipher_suite,omitempty"` - Subs []string `json:"subscriptions_list,omitempty"` + Cid uint64 `json:"cid"` + IP string `json:"ip"` + Port int `json:"port"` + Start time.Time `json:"start"` + Uptime string `json:"uptime"` + LastActivity time.Time `json:"last_activity"` + Pending int `json:"pending_bytes"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + NumSubs uint32 `json:"subscriptions"` + Name string `json:"name,omitempty"` + Lang string `json:"lang,omitempty"` + Version string `json:"version,omitempty"` + TLSVersion string `json:"tls_version,omitempty"` + TLSCipher string `json:"tls_cipher_suite,omitempty"` + Subs []string `json:"subscriptions_list,omitempty"` } const DefaultConnListSize = 1024 @@ -125,18 +126,19 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { // not 'client', because otherwise we may be off compared to the // previous sort. ci := &ConnInfo{ - Cid: client.cid, - Start: client.start, - Uptime: myUptime(c.Now.Sub(client.start)), - InMsgs: cli.inMsgs, - OutMsgs: cli.outMsgs, - InBytes: cli.inBytes, - OutBytes: cli.outBytes, - NumSubs: cli.subCount, - Pending: cli.buffered, - Name: client.opts.Name, - Lang: client.opts.Lang, - Version: client.opts.Version, + Cid: client.cid, + Start: client.start, + Uptime: myUptime(c.Now.Sub(client.start)), + LastActivity: cli.last, + InMsgs: cli.inMsgs, + OutMsgs: cli.outMsgs, + InBytes: cli.inBytes, + OutBytes: cli.outBytes, + NumSubs: cli.subCount, + Pending: cli.buffered, + Name: client.opts.Name, + Lang: client.opts.Lang, + Version: client.opts.Version, } if tlsRequired { diff --git a/server/monitor_sort_opts.go b/server/monitor_sort_opts.go index a220a7d2..83ce963f 100644 --- a/server/monitor_sort_opts.go +++ b/server/monitor_sort_opts.go @@ -1,8 +1,11 @@ -// Copyright 2013-2015 Apcera Inc. All rights reserved. +// Copyright 2013-2016 Apcera Inc. All rights reserved. package server -import "sync/atomic" +import ( + "sync/atomic" + "time" +) // Helper types to sort by ConnInfo values type SortOpt string @@ -26,6 +29,7 @@ type ClientInfo struct { inMsgs int64 outBytes int64 inBytes int64 + last time.Time } func NewClientInfo(c *client) *ClientInfo { @@ -37,6 +41,7 @@ func NewClientInfo(c *client) *ClientInfo { o.buffered = c.bw.Buffered() o.outMsgs = c.outMsgs o.outBytes = c.outBytes + o.last = c.last // inMsgs and inBytes are updated outside of client's lock. // So use atomic here to read (and updated in processMsg) o.inMsgs = atomic.LoadInt64(&c.inMsgs) diff --git a/server/monitor_test.go b/server/monitor_test.go index 5f4b9a30..18ab21e2 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -280,6 +280,12 @@ func TestConnz(t *testing.T) { if ci.Uptime == "" { t.Fatalf("Expected Uptime to be valid\n") } + if ci.LastActivity.IsZero() { + t.Fatalf("Expected LastActivity to be valid\n") + } + if ci.LastActivity.UnixNano() < ci.Start.UnixNano() { + t.Fatalf("Expected LastActivity [%v] to be > Start [%v]\n", ci.LastActivity, ci.Start) + } // Test JSONP respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT) + "connz?callback=callback") @@ -326,6 +332,62 @@ func TestConnzWithSubs(t *testing.T) { } } +func TestConnzLastActivity(t *testing.T) { + s := runMonitorServer(DEFAULT_HTTP_PORT) + defer s.Shutdown() + + nc := createClientConnSubscribeAndPublish(t) + defer nc.Close() + + pollConz := func() *Connz { + url := fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT) + resp, err := http.Get(url + "connz?subs=1") + if err != nil { + t.Fatalf("Expected no error: Got %v\n", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Got an error reading the body: %v\n", err) + } + c := Connz{} + if err := json.Unmarshal(body, &c); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v\n", err) + } + return &c + } + + // Test inside details of each connection + ci := pollConz().Conns[0] + if len(ci.Subs) != 1 { + t.Fatalf("Expected subs of 1, got %v\n", ci.Subs) + } + firstLast := ci.LastActivity + if firstLast.IsZero() { + t.Fatalf("Expected LastActivity to be valid\n") + } + + // Sub should trigger update. + nc.Subscribe("hello.world", func(m *nats.Msg) {}) + nc.Flush() + ci = pollConz().Conns[0] + subLast := ci.LastActivity + if firstLast.Equal(subLast) { + t.Fatalf("Subscribe should have triggered update to LastActivity\n") + } + // Pub should trigger as well + nc.Publish("foo", []byte("Hello")) + nc.Flush() + ci = pollConz().Conns[0] + pubLast := ci.LastActivity + if subLast.Equal(pubLast) { + t.Fatalf("Publish should have triggered update to LastActivity\n") + } +} + func TestConnzWithOffsetAndLimit(t *testing.T) { s := runMonitorServer(DEFAULT_HTTP_PORT) defer s.Shutdown()