From fd1f6faa59a2758f8bc90d59dc3857c567ab020e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 22 Jan 2013 09:35:46 -0800 Subject: [PATCH] Added on /connz endpoint for monitoring --- gnatsd.go | 23 +++-------- server/client.go | 8 ++++ server/connz.go | 57 ++++++++++++++++++++++++++ server/server.go | 33 ++++++++++++--- test/monitor_test.go | 95 +++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 192 insertions(+), 24 deletions(-) create mode 100644 server/connz.go diff --git a/gnatsd.go b/gnatsd.go index 9f2a5fb9..6d071494 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -4,7 +4,6 @@ package main import ( "flag" - "fmt" "log" "net/http" _ "net/http/pprof" @@ -45,29 +44,19 @@ func main() { // TBD: Parse config if given - // Profiler - go func() { - log.Println(http.ListenAndServe("localhost:6062", nil)) - }() - // Create the server with appropriate options. s := server.New(&opts) // Start up the http server if needed. if opts.HttpPort != 0 { - go func() { - // FIXME(dlc): port config - lm := fmt.Sprintf("Starting http monitor on port %d", opts.HttpPort) - server.Log(lm) - http.HandleFunc("/varz", func(w http.ResponseWriter, r *http.Request) { - s.HandleVarz(w, r) - }) - hp := fmt.Sprintf("%s:%d", opts.Host, opts.HttpPort) - log.Fatal(http.ListenAndServe(hp, nil)) - }() + s.StartHTTPMonitoring() } + // Profiler + go func() { + log.Println(http.ListenAndServe("localhost:6062", nil)) + }() + // Wait for clients. s.AcceptLoop() } - diff --git a/server/client.go b/server/client.go index 52df154f..28e07344 100644 --- a/server/client.go +++ b/server/client.go @@ -64,6 +64,14 @@ func init() { rand.Seed(time.Now().UnixNano()) } +func clientConnStr(conn net.Conn) interface{} { + if ip, ok := conn.(*net.TCPConn); ok { + addr := ip.RemoteAddr().(*net.TCPAddr) + return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)} + } + return "N/A" +} + func (c *client) readLoop() { b := make([]byte, defaultBufSize) for { diff --git a/server/connz.go b/server/connz.go new file mode 100644 index 00000000..65a567df --- /dev/null +++ b/server/connz.go @@ -0,0 +1,57 @@ +// Copyright 2013 Apcera Inc. All rights reserved. + +package server + +import ( + "encoding/json" + "net" + "net/http" +) + +type Connz struct { + NumConns int `json:"num_connections"` + Conns []*ConnInfo `json:"connections"` +} + +type ConnInfo struct { + Cid uint64 `json:"cid"` + Ip string `json:"ip"` + Port int `json:"port"` + Subs uint32 `json:"subscriptions"` + Pending int `json:"pending_size"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` +} + +func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { + c := Connz{Conns: []*ConnInfo{}} + + // Walk the list + s.mu.Lock() + for _, client := range s.clients { + ci := &ConnInfo{ + Cid: client.cid, + Subs: client.subs.Count(), + InMsgs: client.inMsgs, + OutMsgs: client.outMsgs, + InBytes: client.inBytes, + OutBytes: client.outBytes, + } + if ip, ok := client.conn.(*net.TCPConn); ok { + addr := ip.RemoteAddr().(*net.TCPAddr) + ci.Port = addr.Port + ci.Ip = addr.IP.String() + } + c.Conns = append(c.Conns, ci) + } + s.mu.Unlock() + + c.NumConns = len(c.Conns) + b, err := json.MarshalIndent(c, "", " ") + if err != nil { + Log("Error marshalling response go /connzz request: %v", err) + } + w.Write(b) +} diff --git a/server/server.go b/server/server.go index 79059986..74e6ee92 100644 --- a/server/server.go +++ b/server/server.go @@ -7,8 +7,10 @@ import ( "encoding/json" "fmt" "net" + "net/http" "os" "os/signal" + "sync" "sync/atomic" "time" @@ -48,6 +50,7 @@ type Info struct { } type Server struct { + mu sync.Mutex info Info infoJson []byte sl *sublist.Sublist @@ -165,6 +168,7 @@ func (s *Server) handleSignals() { func (s *Server) Shutdown() { s.running = false // Close client connections + // FIXME(dlc) lock? will call back into remove.. for _, c := range s.clients { c.closeConnection() } @@ -204,12 +208,24 @@ func (s *Server) AcceptLoop() { Log("Server Exiting..") } -func clientConnStr(conn net.Conn) interface{} { - if ip, ok := conn.(*net.TCPConn); ok { - addr := ip.RemoteAddr().(*net.TCPAddr) - return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)} - } - return "N/A" +func (s *Server) StartHTTPMonitoring() { + go func() { + // FIXME(dlc): port config + lm := fmt.Sprintf("Starting http monitor on port %d", s.opts.HttpPort) + Log(lm) + // Varz + http.HandleFunc("/varz", func(w http.ResponseWriter, r *http.Request) { + s.HandleVarz(w, r) + }) + // Connz + http.HandleFunc("/connz", func(w http.ResponseWriter, r *http.Request) { + s.HandleConnz(w, r) + }) + + hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HttpPort) + Fatal(http.ListenAndServe(hp, nil)) + }() + } func (s *Server) createClient(conn net.Conn) *client { @@ -240,7 +256,10 @@ func (s *Server) createClient(conn net.Conn) *client { c.setPingTimer() // Register with the server. + s.mu.Lock() s.clients[c.cid] = c + s.mu.Unlock() + return c } @@ -266,5 +285,7 @@ func (s *Server) checkAuth(c *client) bool { } func (s *Server) removeClient(c *client) { + s.mu.Lock() delete(s.clients, c.cid) + s.mu.Unlock() } \ No newline at end of file diff --git a/test/monitor_test.go b/test/monitor_test.go index 411d7127..c45ed82f 100644 --- a/test/monitor_test.go +++ b/test/monitor_test.go @@ -15,7 +15,6 @@ import ( const MONITOR_PORT=11422 - // Make sure that we do not run the http server for monitoring unless asked. func TestNoMonitorPort(t *testing.T) { s := startServer(t, MONITOR_PORT, "") @@ -56,6 +55,7 @@ func TestVarz(t *testing.T) { if err := json.Unmarshal(body, &v); err != nil { t.Fatalf("Got an error unmarshalling the body: %v\n", err) } + // Do some sanity checks on values if time.Since(v.Start) > 10*time.Second { t.Fatal("Expected start time to be within 10 seconds.") @@ -65,3 +65,96 @@ func TestVarz(t *testing.T) { } // TODO(dlc): Add checks for connections, etc.. } + +func TestConnz(t *testing.T) { + args := fmt.Sprintf("-m %d", server.DEFAULT_HTTP_PORT) + s := startServer(t, MONITOR_PORT, args) + defer s.stopServer() + + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT) + resp, err := http.Get(url + "connz") + if err != nil { + t.Fatalf("Expected no error: Got %v\n", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Got an error reading the body: %v\n", err) + } + + c := server.Connz{} + if err := json.Unmarshal(body, &c); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v\n", err) + } + + // Test contents.. + if c.NumConns != 0 { + t.Fatalf("Expected 0 connections, got %d\n", c.NumConns) + } + if c.Conns == nil || len(c.Conns) != 0 { + t.Fatalf("Expected 0 connections in array, got %+p\n", c.Conns) + } + + // Create a connection to test ConnInfo + cl := createClientConn(t, "localhost", MONITOR_PORT) + send := sendCommand(t, cl) + send, expect := setupConn(t, cl) + expectMsgs := expectMsgsCommand(t, expect) + + send("SUB foo 1\r\nPUB foo 5\r\nhello\r\n") + expectMsgs(1) + + resp, err = http.Get(url + "connz") + if err != nil { + t.Fatalf("Expected no error: Got %v\n", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode) + } + defer resp.Body.Close() + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Got an error reading the body: %v\n", err) + } + if err := json.Unmarshal(body, &c); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v\n", err) + } + + if c.NumConns != 1 { + t.Fatalf("Expected 1 connections, got %d\n", c.NumConns) + } + if c.Conns == nil || len(c.Conns) != 1 { + t.Fatalf("Expected 1 connections in array, got %+p\n", c.Conns) + } + + // Test inside details of each connection + ci := c.Conns[0] + + if ci.Cid == 0 { + t.Fatalf("Expected non-zero cid, got %v\n", ci.Cid) + } + if ci.Ip != "127.0.0.1" { + t.Fatalf("Expected \"127.0.0.1\" for IP, got %v\n", ci.Ip) + } + if ci.Port == 0 { + t.Fatalf("Expected non-zero port, got %v\n", ci.Port) + } + if ci.Subs != 1 { + t.Fatalf("Expected subs of 1, got %v\n", ci.Subs) + } + if ci.InMsgs != 1 { + t.Fatalf("Expected subs of 1, got %v\n", ci.InMsgs) + } + if ci.OutMsgs != 1 { + t.Fatalf("Expected subs of 1, got %v\n", ci.OutMsgs) + } + if ci.InBytes != 5 { + t.Fatalf("Expected subs of 1, got %v\n", ci.InBytes) + } + if ci.OutBytes != 5 { + t.Fatalf("Expected subs of 1, got %v\n", ci.OutBytes) + } +}