Added on /connz endpoint for monitoring

This commit is contained in:
Derek Collison
2013-01-22 09:35:46 -08:00
parent d098037651
commit fd1f6faa59
5 changed files with 192 additions and 24 deletions

View File

@@ -4,7 +4,6 @@ package main
import (
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
@@ -45,29 +44,19 @@ func main() {
// TBD: Parse config if given
// Profiler
go func() {
log.Println(http.ListenAndServe("localhost:6062", nil))
}()
// Create the server with appropriate options.
s := server.New(&opts)
// Start up the http server if needed.
if opts.HttpPort != 0 {
go func() {
// FIXME(dlc): port config
lm := fmt.Sprintf("Starting http monitor on port %d", opts.HttpPort)
server.Log(lm)
http.HandleFunc("/varz", func(w http.ResponseWriter, r *http.Request) {
s.HandleVarz(w, r)
})
hp := fmt.Sprintf("%s:%d", opts.Host, opts.HttpPort)
log.Fatal(http.ListenAndServe(hp, nil))
}()
s.StartHTTPMonitoring()
}
// Profiler
go func() {
log.Println(http.ListenAndServe("localhost:6062", nil))
}()
// Wait for clients.
s.AcceptLoop()
}

View File

@@ -64,6 +64,14 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
func clientConnStr(conn net.Conn) interface{} {
if ip, ok := conn.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)}
}
return "N/A"
}
func (c *client) readLoop() {
b := make([]byte, defaultBufSize)
for {

57
server/connz.go Normal file
View File

@@ -0,0 +1,57 @@
// Copyright 2013 Apcera Inc. All rights reserved.
package server
import (
"encoding/json"
"net"
"net/http"
)
type Connz struct {
NumConns int `json:"num_connections"`
Conns []*ConnInfo `json:"connections"`
}
type ConnInfo struct {
Cid uint64 `json:"cid"`
Ip string `json:"ip"`
Port int `json:"port"`
Subs uint32 `json:"subscriptions"`
Pending int `json:"pending_size"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
}
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
c := Connz{Conns: []*ConnInfo{}}
// Walk the list
s.mu.Lock()
for _, client := range s.clients {
ci := &ConnInfo{
Cid: client.cid,
Subs: client.subs.Count(),
InMsgs: client.inMsgs,
OutMsgs: client.outMsgs,
InBytes: client.inBytes,
OutBytes: client.outBytes,
}
if ip, ok := client.conn.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
ci.Port = addr.Port
ci.Ip = addr.IP.String()
}
c.Conns = append(c.Conns, ci)
}
s.mu.Unlock()
c.NumConns = len(c.Conns)
b, err := json.MarshalIndent(c, "", " ")
if err != nil {
Log("Error marshalling response go /connzz request: %v", err)
}
w.Write(b)
}

View File

@@ -7,8 +7,10 @@ import (
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
@@ -48,6 +50,7 @@ type Info struct {
}
type Server struct {
mu sync.Mutex
info Info
infoJson []byte
sl *sublist.Sublist
@@ -165,6 +168,7 @@ func (s *Server) handleSignals() {
func (s *Server) Shutdown() {
s.running = false
// Close client connections
// FIXME(dlc) lock? will call back into remove..
for _, c := range s.clients {
c.closeConnection()
}
@@ -204,12 +208,24 @@ func (s *Server) AcceptLoop() {
Log("Server Exiting..")
}
func clientConnStr(conn net.Conn) interface{} {
if ip, ok := conn.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)}
}
return "N/A"
func (s *Server) StartHTTPMonitoring() {
go func() {
// FIXME(dlc): port config
lm := fmt.Sprintf("Starting http monitor on port %d", s.opts.HttpPort)
Log(lm)
// Varz
http.HandleFunc("/varz", func(w http.ResponseWriter, r *http.Request) {
s.HandleVarz(w, r)
})
// Connz
http.HandleFunc("/connz", func(w http.ResponseWriter, r *http.Request) {
s.HandleConnz(w, r)
})
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HttpPort)
Fatal(http.ListenAndServe(hp, nil))
}()
}
func (s *Server) createClient(conn net.Conn) *client {
@@ -240,7 +256,10 @@ func (s *Server) createClient(conn net.Conn) *client {
c.setPingTimer()
// Register with the server.
s.mu.Lock()
s.clients[c.cid] = c
s.mu.Unlock()
return c
}
@@ -266,5 +285,7 @@ func (s *Server) checkAuth(c *client) bool {
}
func (s *Server) removeClient(c *client) {
s.mu.Lock()
delete(s.clients, c.cid)
s.mu.Unlock()
}

View File

@@ -15,7 +15,6 @@ import (
const MONITOR_PORT=11422
// Make sure that we do not run the http server for monitoring unless asked.
func TestNoMonitorPort(t *testing.T) {
s := startServer(t, MONITOR_PORT, "")
@@ -56,6 +55,7 @@ func TestVarz(t *testing.T) {
if err := json.Unmarshal(body, &v); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}
// Do some sanity checks on values
if time.Since(v.Start) > 10*time.Second {
t.Fatal("Expected start time to be within 10 seconds.")
@@ -65,3 +65,96 @@ func TestVarz(t *testing.T) {
}
// TODO(dlc): Add checks for connections, etc..
}
func TestConnz(t *testing.T) {
args := fmt.Sprintf("-m %d", server.DEFAULT_HTTP_PORT)
s := startServer(t, MONITOR_PORT, args)
defer s.stopServer()
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT)
resp, err := http.Get(url + "connz")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}
c := server.Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}
// Test contents..
if c.NumConns != 0 {
t.Fatalf("Expected 0 connections, got %d\n", c.NumConns)
}
if c.Conns == nil || len(c.Conns) != 0 {
t.Fatalf("Expected 0 connections in array, got %+p\n", c.Conns)
}
// Create a connection to test ConnInfo
cl := createClientConn(t, "localhost", MONITOR_PORT)
send := sendCommand(t, cl)
send, expect := setupConn(t, cl)
expectMsgs := expectMsgsCommand(t, expect)
send("SUB foo 1\r\nPUB foo 5\r\nhello\r\n")
expectMsgs(1)
resp, err = http.Get(url + "connz")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}
if c.NumConns != 1 {
t.Fatalf("Expected 1 connections, got %d\n", c.NumConns)
}
if c.Conns == nil || len(c.Conns) != 1 {
t.Fatalf("Expected 1 connections in array, got %+p\n", c.Conns)
}
// Test inside details of each connection
ci := c.Conns[0]
if ci.Cid == 0 {
t.Fatalf("Expected non-zero cid, got %v\n", ci.Cid)
}
if ci.Ip != "127.0.0.1" {
t.Fatalf("Expected \"127.0.0.1\" for IP, got %v\n", ci.Ip)
}
if ci.Port == 0 {
t.Fatalf("Expected non-zero port, got %v\n", ci.Port)
}
if ci.Subs != 1 {
t.Fatalf("Expected subs of 1, got %v\n", ci.Subs)
}
if ci.InMsgs != 1 {
t.Fatalf("Expected subs of 1, got %v\n", ci.InMsgs)
}
if ci.OutMsgs != 1 {
t.Fatalf("Expected subs of 1, got %v\n", ci.OutMsgs)
}
if ci.InBytes != 5 {
t.Fatalf("Expected subs of 1, got %v\n", ci.InBytes)
}
if ci.OutBytes != 5 {
t.Fatalf("Expected subs of 1, got %v\n", ci.OutBytes)
}
}