diff --git a/gnatsd.go b/gnatsd.go index c2b0eb57..9f2a5fb9 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -4,6 +4,7 @@ package main import ( "flag" + "fmt" "log" "net/http" _ "net/http/pprof" @@ -33,20 +34,40 @@ func main() { flag.StringVar(&opts.Password, "pass", "", "Password required for connection.") flag.StringVar(&opts.Authorization, "auth", "", "Authorization token required for connection.") + flag.IntVar(&opts.HttpPort, "m", 0, "HTTP Port for /varz, /connz endpoints.") + flag.IntVar(&opts.HttpPort, "http_port", 0, "HTTP Port for /varz, /connz endpoints.") + flag.Parse() if debugAndTrace { opts.Trace, opts.Debug = true, true } + // TBD: Parse config if given + // Profiler go func() { log.Println(http.ListenAndServe("localhost:6062", nil)) }() - // TBD: Parse config if given - + // Create the server with appropriate options. s := server.New(&opts) + + // Start up the http server if needed. + if opts.HttpPort != 0 { + go func() { + // FIXME(dlc): port config + lm := fmt.Sprintf("Starting http monitor on port %d", opts.HttpPort) + server.Log(lm) + http.HandleFunc("/varz", func(w http.ResponseWriter, r *http.Request) { + s.HandleVarz(w, r) + }) + hp := fmt.Sprintf("%s:%d", opts.Host, opts.HttpPort) + log.Fatal(http.ListenAndServe(hp, nil)) + }() + } + + // Wait for clients. s.AcceptLoop() } diff --git a/server/client.go b/server/client.go index fca2219a..617a04ea 100644 --- a/server/client.go +++ b/server/client.go @@ -9,6 +9,7 @@ import ( "math/rand" "net" "sync" + "sync/atomic" "time" "github.com/apcera/gnatsd/hashmap" @@ -30,20 +31,14 @@ type client struct { atmr *time.Timer ptmr *time.Timer pout int - cstats parseState + stats } func (c *client) String() string { return fmt.Sprintf("cid:%d", c.cid) } -type cstats struct { - nr int - nb int - nm int -} - type subscription struct { client *client subject []byte @@ -107,7 +102,7 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { - pm := fmt.Sprintf("Processing msg: %d", c.nm) + pm := fmt.Sprintf("Processing msg: %d", c.inMsgs) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg)} Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) } @@ -361,6 +356,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { return } + // Update statistics + client.outMsgs++ + client.outBytes += int64(len(msg)) + + atomic.AddInt64(&c.srv.outMsgs, 1) + atomic.AddInt64(&c.srv.outBytes, int64(len(msg))) + // Check to see if our writes will cause a flush // in the underlying bufio. If so limit time we // will wait for flush to complete. @@ -412,7 +414,12 @@ writeErr: } func (c *client) processMsg(msg []byte) { - c.nm++ + c.inMsgs++ + c.inBytes += int64(len(msg)) + + atomic.AddInt64(&c.srv.inMsgs, 1) + atomic.AddInt64(&c.srv.inBytes, int64(len(msg))) + if trace { c.traceMsg(msg) } @@ -570,13 +577,4 @@ func (c *client) closeConnection() { } } } - - /* - log.Printf("Sublist Stats: %+v\n", c.srv.sl.Stats()) - if c.nr > 0 { - log.Printf("stats: %d %d %d\n", c.nr, c.nb, c.nm) - log.Printf("bytes per read: %d\n", c.nb/c.nr) - log.Printf("msgs per read: %d\n", c.nm/c.nr) - } - */ } diff --git a/server/const.go b/server/const.go index fd3d8062..9f7590ad 100644 --- a/server/const.go +++ b/server/const.go @@ -7,7 +7,7 @@ import ( ) const ( - VERSION = "go 0.2.4.alpha.1" + VERSION = "go-0.2.6.alpha.1" DEFAULT_PORT = 4222 DEFAULT_HOST = "0.0.0.0" @@ -25,7 +25,7 @@ const ( DEFAULT_MAX_CONNECTIONS = (64 * 1024) // TLS/SSL wait time - SSL_TIMEOUT = 250 * time.Millisecond + SSL_TIMEOUT = 500 * time.Millisecond // Authorization wait time AUTH_TIMEOUT = 2 * SSL_TIMEOUT @@ -38,4 +38,6 @@ const ( // Write/Flush Deadlines DEFAULT_FLUSH_DEADLINE = 500 * time.Millisecond + + DEFAULT_HTTP_PORT = 8333 ) diff --git a/server/parser.go b/server/parser.go index 7de1e583..5793bba1 100644 --- a/server/parser.go +++ b/server/parser.go @@ -60,9 +60,6 @@ func (c *client) parse(buf []byte) error { var i int var b byte - c.nr++ - c.nb += len(buf) - for i, b = range buf { switch c.state { case OP_START: diff --git a/server/server.go b/server/server.go index eaba4527..79059986 100644 --- a/server/server.go +++ b/server/server.go @@ -17,19 +17,24 @@ import ( ) type Options struct { - Host string - Port int - Trace bool - Debug bool - NoLog bool - NoSigs bool - Logtime bool - MaxConn int - Username string - Password string - Authorization string - PingInterval time.Duration - MaxPingsOut int + Host string `json:"addr"` + Port int `json:"port"` + Trace bool `json:"-"` + Debug bool `json:"-"` + NoLog bool `json:"-"` + NoSigs bool `json:"-"` + Logtime bool `json:"-"` + MaxConn int `json:"max_connections"` + Username string `json:"user,omitempty"` + Password string `json:"-"` + Authorization string `json:"-"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HttpPort int `json:"http_port"` + SslTimeout float64 `json:"ssl_timeout"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int `json:"max_control_line"` + MaxPayload int `json:"max_payload"` } type Info struct { @@ -54,6 +59,15 @@ type Server struct { listener net.Listener clients map[uint64]*client done chan bool + start time.Time + stats +} + +type stats struct { + inMsgs int64 + outMsgs int64 + inBytes int64 + outBytes int64 } func processOptions(opts *Options) { @@ -73,6 +87,18 @@ func processOptions(opts *Options) { if opts.MaxPingsOut == 0 { opts.MaxPingsOut = DEFAULT_PING_MAX_OUT } + if opts.SslTimeout == 0 { + opts.SslTimeout = float64(SSL_TIMEOUT) / float64(time.Second) + } + if opts.AuthTimeout == 0 { + opts.AuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second) + } + if opts.MaxControlLine == 0 { + opts.MaxControlLine = MAX_CONTROL_LINE_SIZE + } + if opts.MaxPayload == 0 { + opts.MaxPayload = MAX_PAYLOAD_SIZE + } } func New(opts *Options) *Server { @@ -97,6 +123,7 @@ func New(opts *Options) *Server { debug: opts.Debug, trace: opts.Trace, done: make(chan bool, 1), + start: time.Now(), } // Setup logging with flags s.LogInit() diff --git a/server/varz.go b/server/varz.go new file mode 100644 index 00000000..3bcf2806 --- /dev/null +++ b/server/varz.go @@ -0,0 +1,63 @@ +// Copyright 2012 Apcera Inc. All rights reserved. + +package server + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/exec" + "runtime" + "time" +) + +type Varz struct { + Start time.Time `json:"start"` + Options *Options `json:"options"` + Mem int64 `json:"mem"` + Cores int `json:"cores"` + Cpu float64 `json:"cpu"` + Connections int `json:"connections"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + Uptime string `json:"uptime"` +} + +type usage struct { + Cpu float32 + Cores int + Mem int64 +} + +func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { + v := Varz{Start: s.start, Options: s.opts} + v.Uptime = time.Since(s.start).String() + + updateUsage(&v) + v.Connections = len(s.clients) + v.InMsgs = s.inMsgs + v.InBytes = s.inBytes + v.OutMsgs = s.outMsgs + v.OutBytes = s.outBytes + + b, err := json.MarshalIndent(v, "", " ") + if err != nil { + Log("Error marshalling response go /varz request: %v", err) + } + w.Write(b) +} + +// FIXME(dlc): This is a big hack, make real.. +func updateUsage(v *Varz) { + v.Cores = runtime.NumCPU() + pidStr := fmt.Sprintf("%d", os.Getpid()) + out, err := exec.Command("ps", "o", "pcpu=,rss=", "-p", pidStr).Output() + if err != nil { + // FIXME(dlc): Log? + return + } + fmt.Sscanf(string(out), "%f %d", &v.Cpu, &v.Mem) +} diff --git a/test/monitor_test.go b/test/monitor_test.go new file mode 100644 index 00000000..411d7127 --- /dev/null +++ b/test/monitor_test.go @@ -0,0 +1,67 @@ +// Copyright 2012 Apcera Inc. All rights reserved. + +package test + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/apcera/gnatsd/server" +) + +const MONITOR_PORT=11422 + + +// Make sure that we do not run the http server for monitoring unless asked. +func TestNoMonitorPort(t *testing.T) { + s := startServer(t, MONITOR_PORT, "") + defer s.stopServer() + + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT) + if resp, err := http.Get(url + "varz"); err == nil { + t.Fatalf("Expected error: Got %+v\n", resp) + } + if resp, err := http.Get(url + "healthz"); err == nil { + t.Fatalf("Expected error: Got %+v\n", resp) + } + if resp, err := http.Get(url + "connz"); err == nil { + t.Fatalf("Expected error: Got %+v\n", resp) + } +} + +func TestVarz(t *testing.T) { + args := fmt.Sprintf("-m %d", server.DEFAULT_HTTP_PORT) + s := startServer(t, MONITOR_PORT, args) + defer s.stopServer() + + url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT) + resp, err := http.Get(url + "varz") + if err != nil { + t.Fatalf("Expected no error: Got %v\n", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Got an error reading the body: %v\n", err) + } + + v := server.Varz{} + if err := json.Unmarshal(body, &v); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v\n", err) + } + // Do some sanity checks on values + if time.Since(v.Start) > 10*time.Second { + t.Fatal("Expected start time to be within 10 seconds.") + } + if v.Mem > 8192 { + t.Fatalf("Did not expect memory to be so high: %d\n", v.Mem) + } + // TODO(dlc): Add checks for connections, etc.. +}