From 4dd23e0f2d727e02a8ec7c3085edeeffcc59fdd7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 14 Jul 2015 14:55:36 -0700 Subject: [PATCH] Added slow consumers to varz, fixed reporting bug --- server/client.go | 3 ++- server/monitor.go | 32 +++++++++++++++++--------------- server/server.go | 9 +++++---- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/server/client.go b/server/client.go index a29e7371..62d8235a 100644 --- a/server/client.go +++ b/server/client.go @@ -655,7 +655,8 @@ writeErr: client.mu.Unlock() if ne, ok := err.(net.Error); ok && ne.Timeout() { - c.Noticef("Slow Consumer Detected") + atomic.AddInt64(&client.srv.slowConsumers, 1) + client.Noticef("Slow Consumer Detected") client.closeConnection() } else { c.Debugf("Error writing msg: %v", err) diff --git a/server/monitor.go b/server/monitor.go index b986cda9..8cc70f9a 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -198,21 +198,22 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { // Varz will output server information on the monitoring port at /varz. type Varz struct { - Information *Info `json:"info"` - Options *Options `json:"options"` - Start time.Time `json:"start"` - Now time.Time `json:"now"` - Uptime string `json:"uptime"` - Mem int64 `json:"mem"` - Cores int `json:"cores"` - CPU float64 `json:"cpu"` - Connections int `json:"connections"` - Routes int `json:"routes"` - Remotes int `json:"remotes"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` + Information *Info `json:"info"` + Options *Options `json:"options"` + Start time.Time `json:"start"` + Now time.Time `json:"now"` + Uptime string `json:"uptime"` + Mem int64 `json:"mem"` + Cores int `json:"cores"` + CPU float64 `json:"cpu"` + Connections int `json:"connections"` + Routes int `json:"routes"` + Remotes int `json:"remotes"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + SlowConsumers int64 `json:"slow_consumers"` } type usage struct { @@ -260,6 +261,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { v.InBytes = s.inBytes v.OutMsgs = s.outMsgs v.OutBytes = s.outBytes + v.SlowConsumers = s.slowConsumers s.mu.Unlock() b, err := json.MarshalIndent(v, "", " ") diff --git a/server/server.go b/server/server.go index 7034b31f..500fd017 100644 --- a/server/server.go +++ b/server/server.go @@ -63,10 +63,11 @@ type Server struct { } type stats struct { - inMsgs int64 - outMsgs int64 - inBytes int64 - outBytes int64 + inMsgs int64 + outMsgs int64 + inBytes int64 + outBytes int64 + slowConsumers int64 } // New will setup a new server struct after parsing the options.