Added first pass http monitoring (e.g. /varz)

This commit is contained in:
Derek Collison
2013-01-02 18:11:01 -06:00
parent b763c36157
commit 3977043780
7 changed files with 213 additions and 38 deletions

View File

@@ -4,6 +4,7 @@ package main
import (
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
@@ -33,20 +34,40 @@ func main() {
flag.StringVar(&opts.Password, "pass", "", "Password required for connection.")
flag.StringVar(&opts.Authorization, "auth", "", "Authorization token required for connection.")
flag.IntVar(&opts.HttpPort, "m", 0, "HTTP Port for /varz, /connz endpoints.")
flag.IntVar(&opts.HttpPort, "http_port", 0, "HTTP Port for /varz, /connz endpoints.")
flag.Parse()
if debugAndTrace {
opts.Trace, opts.Debug = true, true
}
// TBD: Parse config if given
// Profiler
go func() {
log.Println(http.ListenAndServe("localhost:6062", nil))
}()
// TBD: Parse config if given
// Create the server with appropriate options.
s := server.New(&opts)
// Start up the http server if needed.
if opts.HttpPort != 0 {
go func() {
// FIXME(dlc): port config
lm := fmt.Sprintf("Starting http monitor on port %d", opts.HttpPort)
server.Log(lm)
http.HandleFunc("/varz", func(w http.ResponseWriter, r *http.Request) {
s.HandleVarz(w, r)
})
hp := fmt.Sprintf("%s:%d", opts.Host, opts.HttpPort)
log.Fatal(http.ListenAndServe(hp, nil))
}()
}
// Wait for clients.
s.AcceptLoop()
}

View File

@@ -9,6 +9,7 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"github.com/apcera/gnatsd/hashmap"
@@ -30,20 +31,14 @@ type client struct {
atmr *time.Timer
ptmr *time.Timer
pout int
cstats
parseState
stats
}
func (c *client) String() string {
return fmt.Sprintf("cid:%d", c.cid)
}
type cstats struct {
nr int
nb int
nm int
}
type subscription struct {
client *client
subject []byte
@@ -107,7 +102,7 @@ func (c *client) readLoop() {
}
func (c *client) traceMsg(msg []byte) {
pm := fmt.Sprintf("Processing msg: %d", c.nm)
pm := fmt.Sprintf("Processing msg: %d", c.inMsgs)
opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg)}
Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid))
}
@@ -361,6 +356,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
return
}
// Update statistics
client.outMsgs++
client.outBytes += int64(len(msg))
atomic.AddInt64(&c.srv.outMsgs, 1)
atomic.AddInt64(&c.srv.outBytes, int64(len(msg)))
// Check to see if our writes will cause a flush
// in the underlying bufio. If so limit time we
// will wait for flush to complete.
@@ -412,7 +414,12 @@ writeErr:
}
func (c *client) processMsg(msg []byte) {
c.nm++
c.inMsgs++
c.inBytes += int64(len(msg))
atomic.AddInt64(&c.srv.inMsgs, 1)
atomic.AddInt64(&c.srv.inBytes, int64(len(msg)))
if trace {
c.traceMsg(msg)
}
@@ -570,13 +577,4 @@ func (c *client) closeConnection() {
}
}
}
/*
log.Printf("Sublist Stats: %+v\n", c.srv.sl.Stats())
if c.nr > 0 {
log.Printf("stats: %d %d %d\n", c.nr, c.nb, c.nm)
log.Printf("bytes per read: %d\n", c.nb/c.nr)
log.Printf("msgs per read: %d\n", c.nm/c.nr)
}
*/
}

View File

@@ -7,7 +7,7 @@ import (
)
const (
VERSION = "go 0.2.4.alpha.1"
VERSION = "go-0.2.6.alpha.1"
DEFAULT_PORT = 4222
DEFAULT_HOST = "0.0.0.0"
@@ -25,7 +25,7 @@ const (
DEFAULT_MAX_CONNECTIONS = (64 * 1024)
// TLS/SSL wait time
SSL_TIMEOUT = 250 * time.Millisecond
SSL_TIMEOUT = 500 * time.Millisecond
// Authorization wait time
AUTH_TIMEOUT = 2 * SSL_TIMEOUT
@@ -38,4 +38,6 @@ const (
// Write/Flush Deadlines
DEFAULT_FLUSH_DEADLINE = 500 * time.Millisecond
DEFAULT_HTTP_PORT = 8333
)

View File

@@ -60,9 +60,6 @@ func (c *client) parse(buf []byte) error {
var i int
var b byte
c.nr++
c.nb += len(buf)
for i, b = range buf {
switch c.state {
case OP_START:

View File

@@ -17,19 +17,24 @@ import (
)
type Options struct {
Host string
Port int
Trace bool
Debug bool
NoLog bool
NoSigs bool
Logtime bool
MaxConn int
Username string
Password string
Authorization string
PingInterval time.Duration
MaxPingsOut int
Host string `json:"addr"`
Port int `json:"port"`
Trace bool `json:"-"`
Debug bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
Logtime bool `json:"-"`
MaxConn int `json:"max_connections"`
Username string `json:"user,omitempty"`
Password string `json:"-"`
Authorization string `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HttpPort int `json:"http_port"`
SslTimeout float64 `json:"ssl_timeout"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int `json:"max_control_line"`
MaxPayload int `json:"max_payload"`
}
type Info struct {
@@ -54,6 +59,15 @@ type Server struct {
listener net.Listener
clients map[uint64]*client
done chan bool
start time.Time
stats
}
type stats struct {
inMsgs int64
outMsgs int64
inBytes int64
outBytes int64
}
func processOptions(opts *Options) {
@@ -73,6 +87,18 @@ func processOptions(opts *Options) {
if opts.MaxPingsOut == 0 {
opts.MaxPingsOut = DEFAULT_PING_MAX_OUT
}
if opts.SslTimeout == 0 {
opts.SslTimeout = float64(SSL_TIMEOUT) / float64(time.Second)
}
if opts.AuthTimeout == 0 {
opts.AuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second)
}
if opts.MaxControlLine == 0 {
opts.MaxControlLine = MAX_CONTROL_LINE_SIZE
}
if opts.MaxPayload == 0 {
opts.MaxPayload = MAX_PAYLOAD_SIZE
}
}
func New(opts *Options) *Server {
@@ -97,6 +123,7 @@ func New(opts *Options) *Server {
debug: opts.Debug,
trace: opts.Trace,
done: make(chan bool, 1),
start: time.Now(),
}
// Setup logging with flags
s.LogInit()

63
server/varz.go Normal file
View File

@@ -0,0 +1,63 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/exec"
"runtime"
"time"
)
type Varz struct {
Start time.Time `json:"start"`
Options *Options `json:"options"`
Mem int64 `json:"mem"`
Cores int `json:"cores"`
Cpu float64 `json:"cpu"`
Connections int `json:"connections"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
Uptime string `json:"uptime"`
}
type usage struct {
Cpu float32
Cores int
Mem int64
}
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
v := Varz{Start: s.start, Options: s.opts}
v.Uptime = time.Since(s.start).String()
updateUsage(&v)
v.Connections = len(s.clients)
v.InMsgs = s.inMsgs
v.InBytes = s.inBytes
v.OutMsgs = s.outMsgs
v.OutBytes = s.outBytes
b, err := json.MarshalIndent(v, "", " ")
if err != nil {
Log("Error marshalling response go /varz request: %v", err)
}
w.Write(b)
}
// FIXME(dlc): This is a big hack, make real..
func updateUsage(v *Varz) {
v.Cores = runtime.NumCPU()
pidStr := fmt.Sprintf("%d", os.Getpid())
out, err := exec.Command("ps", "o", "pcpu=,rss=", "-p", pidStr).Output()
if err != nil {
// FIXME(dlc): Log?
return
}
fmt.Sscanf(string(out), "%f %d", &v.Cpu, &v.Mem)
}

67
test/monitor_test.go Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package test
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"
"github.com/apcera/gnatsd/server"
)
const MONITOR_PORT=11422
// Make sure that we do not run the http server for monitoring unless asked.
func TestNoMonitorPort(t *testing.T) {
s := startServer(t, MONITOR_PORT, "")
defer s.stopServer()
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT)
if resp, err := http.Get(url + "varz"); err == nil {
t.Fatalf("Expected error: Got %+v\n", resp)
}
if resp, err := http.Get(url + "healthz"); err == nil {
t.Fatalf("Expected error: Got %+v\n", resp)
}
if resp, err := http.Get(url + "connz"); err == nil {
t.Fatalf("Expected error: Got %+v\n", resp)
}
}
func TestVarz(t *testing.T) {
args := fmt.Sprintf("-m %d", server.DEFAULT_HTTP_PORT)
s := startServer(t, MONITOR_PORT, args)
defer s.stopServer()
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT)
resp, err := http.Get(url + "varz")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}
v := server.Varz{}
if err := json.Unmarshal(body, &v); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}
// Do some sanity checks on values
if time.Since(v.Start) > 10*time.Second {
t.Fatal("Expected start time to be within 10 seconds.")
}
if v.Mem > 8192 {
t.Fatalf("Did not expect memory to be so high: %d\n", v.Mem)
}
// TODO(dlc): Add checks for connections, etc..
}