Support for connection last activity tracking

This commit is contained in:
Derek Collison
2016-02-04 17:33:40 -08:00
parent 6b4f6f20ee
commit 26185a9722
5 changed files with 108 additions and 33 deletions

View File

@@ -20,7 +20,8 @@
- [ ] Info updates contain other implicit route servers
- [ ] Pedantic state
- [ ] Multi-tenant accounts with isolation of subject space
- [ ] Track last activity time per connection?
- [ ] Default sort by cid on connz
- [X] Track last activity time per connection?
- [X] Add total connections to varz so we won't miss spikes, etc.
- [X] Add starttime and uptime to connz list.
- [X] Gossip Protocol for discovery for clustering

View File

@@ -50,7 +50,7 @@ type client struct {
ptmr *time.Timer
pout int
msgb [msgScratchSize]byte
last time.Time
parseState
route *route
@@ -254,6 +254,7 @@ func (c *client) processConnect(arg []byte) error {
// so we can just clear it here.
c.mu.Lock()
c.clearAuthTimer()
c.last = time.Now()
c.mu.Unlock()
if err := json.Unmarshal(arg, &c.opts); err != nil {
@@ -439,6 +440,8 @@ func (c *client) processPub(arg []byte) error {
if c.opts.Pedantic && !sublist.IsValidLiteralSubject(c.pa.subject) {
c.sendErr("Invalid Subject")
}
// Update last activity.
c.last = time.Now()
return nil
}
@@ -492,6 +495,8 @@ func (c *client) processSub(argo []byte) (err error) {
c.mu.Unlock()
return nil
}
// Update last activity.
c.last = time.Now()
// We can have two SUB protocols coming from a route due to some
// race conditions. We should make sure that we process only one.

View File

@@ -34,23 +34,24 @@ type Connz struct {
// ConnInfo has detailed information on a per connection basis.
type ConnInfo struct {
Cid uint64 `json:"cid"`
IP string `json:"ip"`
Port int `json:"port"`
Start time.Time `json:"start"`
Uptime string `json:"uptime"`
Pending int `json:"pending_bytes"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
NumSubs uint32 `json:"subscriptions"`
Name string `json:"name,omitempty"`
Lang string `json:"lang,omitempty"`
Version string `json:"version,omitempty"`
TLSVersion string `json:"tls_version,omitempty"`
TLSCipher string `json:"tls_cipher_suite,omitempty"`
Subs []string `json:"subscriptions_list,omitempty"`
Cid uint64 `json:"cid"`
IP string `json:"ip"`
Port int `json:"port"`
Start time.Time `json:"start"`
Uptime string `json:"uptime"`
LastActivity time.Time `json:"last_activity"`
Pending int `json:"pending_bytes"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
NumSubs uint32 `json:"subscriptions"`
Name string `json:"name,omitempty"`
Lang string `json:"lang,omitempty"`
Version string `json:"version,omitempty"`
TLSVersion string `json:"tls_version,omitempty"`
TLSCipher string `json:"tls_cipher_suite,omitempty"`
Subs []string `json:"subscriptions_list,omitempty"`
}
const DefaultConnListSize = 1024
@@ -125,18 +126,19 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
// not 'client', because otherwise we may be off compared to the
// previous sort.
ci := &ConnInfo{
Cid: client.cid,
Start: client.start,
Uptime: myUptime(c.Now.Sub(client.start)),
InMsgs: cli.inMsgs,
OutMsgs: cli.outMsgs,
InBytes: cli.inBytes,
OutBytes: cli.outBytes,
NumSubs: cli.subCount,
Pending: cli.buffered,
Name: client.opts.Name,
Lang: client.opts.Lang,
Version: client.opts.Version,
Cid: client.cid,
Start: client.start,
Uptime: myUptime(c.Now.Sub(client.start)),
LastActivity: cli.last,
InMsgs: cli.inMsgs,
OutMsgs: cli.outMsgs,
InBytes: cli.inBytes,
OutBytes: cli.outBytes,
NumSubs: cli.subCount,
Pending: cli.buffered,
Name: client.opts.Name,
Lang: client.opts.Lang,
Version: client.opts.Version,
}
if tlsRequired {

View File

@@ -1,8 +1,11 @@
// Copyright 2013-2015 Apcera Inc. All rights reserved.
// Copyright 2013-2016 Apcera Inc. All rights reserved.
package server
import "sync/atomic"
import (
"sync/atomic"
"time"
)
// Helper types to sort by ConnInfo values
type SortOpt string
@@ -26,6 +29,7 @@ type ClientInfo struct {
inMsgs int64
outBytes int64
inBytes int64
last time.Time
}
func NewClientInfo(c *client) *ClientInfo {
@@ -37,6 +41,7 @@ func NewClientInfo(c *client) *ClientInfo {
o.buffered = c.bw.Buffered()
o.outMsgs = c.outMsgs
o.outBytes = c.outBytes
o.last = c.last
// inMsgs and inBytes are updated outside of client's lock.
// So use atomic here to read (and updated in processMsg)
o.inMsgs = atomic.LoadInt64(&c.inMsgs)

View File

@@ -280,6 +280,12 @@ func TestConnz(t *testing.T) {
if ci.Uptime == "" {
t.Fatalf("Expected Uptime to be valid\n")
}
if ci.LastActivity.IsZero() {
t.Fatalf("Expected LastActivity to be valid\n")
}
if ci.LastActivity.UnixNano() < ci.Start.UnixNano() {
t.Fatalf("Expected LastActivity [%v] to be > Start [%v]\n", ci.LastActivity, ci.Start)
}
// Test JSONP
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT) + "connz?callback=callback")
@@ -326,6 +332,62 @@ func TestConnzWithSubs(t *testing.T) {
}
}
func TestConnzLastActivity(t *testing.T) {
s := runMonitorServer(DEFAULT_HTTP_PORT)
defer s.Shutdown()
nc := createClientConnSubscribeAndPublish(t)
defer nc.Close()
pollConz := func() *Connz {
url := fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
resp, err := http.Get(url + "connz?subs=1")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}
c := Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}
return &c
}
// Test inside details of each connection
ci := pollConz().Conns[0]
if len(ci.Subs) != 1 {
t.Fatalf("Expected subs of 1, got %v\n", ci.Subs)
}
firstLast := ci.LastActivity
if firstLast.IsZero() {
t.Fatalf("Expected LastActivity to be valid\n")
}
// Sub should trigger update.
nc.Subscribe("hello.world", func(m *nats.Msg) {})
nc.Flush()
ci = pollConz().Conns[0]
subLast := ci.LastActivity
if firstLast.Equal(subLast) {
t.Fatalf("Subscribe should have triggered update to LastActivity\n")
}
// Pub should trigger as well
nc.Publish("foo", []byte("Hello"))
nc.Flush()
ci = pollConz().Conns[0]
pubLast := ci.LastActivity
if subLast.Equal(pubLast) {
t.Fatalf("Publish should have triggered update to LastActivity\n")
}
}
func TestConnzWithOffsetAndLimit(t *testing.T) {
s := runMonitorServer(DEFAULT_HTTP_PORT)
defer s.Shutdown()