Fixed msg payload accounting, made http monitoring work in Go routines

This commit is contained in:
Derek Collison
2013-08-19 15:09:26 -07:00
parent 1d4e02fe32
commit 6ab5d6a337
4 changed files with 73 additions and 34 deletions

View File

@@ -96,7 +96,6 @@ func clientConnStr(conn net.Conn) interface{} {
func (c *client) initClient() {
s := c.srv
c.cid = atomic.AddUint64(&s.gcid, 1)
c.bw = bufio.NewWriterSize(c.nc, defaultBufSize)
c.subs = hashmap.New()
@@ -509,11 +508,15 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
}
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
msgSize := int64(len(msg) - LEN_CR_LF)
client.outMsgs++
client.outBytes += int64(len(msg))
client.outBytes += msgSize
atomic.AddInt64(&c.srv.outMsgs, 1)
atomic.AddInt64(&c.srv.outBytes, int64(len(msg)))
atomic.AddInt64(&c.srv.outBytes, msgSize)
// Check to see if our writes will cause a flush
// in the underlying bufio. If so limit time we
@@ -559,22 +562,29 @@ writeErr:
}
}
// processMsg is called to process an inbound msg from a client.
func (c *client) processMsg(msg []byte) {
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
msgSize := int64(len(msg) - LEN_CR_LF)
c.inMsgs++
c.inBytes += int64(len(msg))
c.inBytes += msgSize
// Snapshot server.
srv := c.srv
if srv != nil {
atomic.AddInt64(&srv.inMsgs, 1)
atomic.AddInt64(&srv.inBytes, int64(len(msg)))
atomic.AddInt64(&srv.inBytes, msgSize)
}
if trace > 0 {
c.traceMsg(msg)
}
if c.srv == nil {
if srv == nil {
return
}
if c.opts.Verbose {
@@ -584,7 +594,7 @@ func (c *client) processMsg(msg []byte) {
// Scratch buffer..
msgh := c.msgb[:len(msgHeadProto)]
r := c.srv.sl.Match(c.pa.subject)
r := srv.sl.Match(c.pa.subject)
if len(r) <= 0 {
return
}
@@ -603,7 +613,7 @@ func (c *client) processMsg(msg []byte) {
// If we are a route and we have a queue subscription, deliver direct
// since they are sent direct via L2 semantics.
if isRoute {
if sub := c.srv.routeSidQueueSubscriber(c.pa.sid); sub != nil {
if sub := srv.routeSidQueueSubscriber(c.pa.sid); sub != nil {
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
return

View File

@@ -44,6 +44,7 @@ type Server struct {
routes map[uint64]*client
done chan bool
start time.Time
http net.Listener
stats
routeListener net.Listener
@@ -217,6 +218,13 @@ func (s *Server) Shutdown() {
s.routeListener = nil
}
// Kick HTTP monitoring if its running
if s.http != nil {
doneExpected++
s.http.Close()
s.http = nil
}
// Release the solicited routes connect go routines.
close(s.rcQuit)
@@ -285,22 +293,38 @@ func (s *Server) StartProfiler() {
}
func (s *Server) StartHTTPMonitoring() {
Logf("Starting http monitor on port %d", s.opts.HttpPort)
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HttpPort)
l, err := net.Listen("tcp", hp)
if err != nil {
Fatalf("Can't listen to the monitor port: %v", err)
}
mux := http.NewServeMux()
// Varz
mux.HandleFunc("/varz", s.HandleVarz)
// Connz
mux.HandleFunc("/connz", s.HandleConnz)
srv := &http.Server{
Addr: hp,
Handler: mux,
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
MaxHeaderBytes: 1 << 20,
}
s.http = l
go func() {
Logf("Starting http monitor on port %d", s.opts.HttpPort)
// Varz
http.HandleFunc("/varz", func(w http.ResponseWriter, r *http.Request) {
s.HandleVarz(w, r)
})
// Connz
http.HandleFunc("/connz", func(w http.ResponseWriter, r *http.Request) {
s.HandleConnz(w, r)
})
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HttpPort)
Fatal(http.ListenAndServe(hp, nil))
srv.Serve(s.http)
srv.Handler = nil
s.done <- true
}()
}
func (s *Server) createClient(conn net.Conn) *client {

View File

@@ -37,11 +37,14 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
v.Uptime = time.Since(s.start).String()
updateUsage(&v)
s.mu.Lock()
v.Connections = len(s.clients)
v.InMsgs = s.inMsgs
v.InBytes = s.inBytes
v.OutMsgs = s.outMsgs
v.OutBytes = s.outBytes
s.mu.Unlock()
b, err := json.MarshalIndent(v, "", " ")
if err != nil {

View File

@@ -15,10 +15,17 @@ import (
const MONITOR_PORT = 11422
func runMonitorServer(monitorPort int) *server.Server {
opts := DefaultTestOptions
opts.Port = MONITOR_PORT
opts.HttpPort = monitorPort
return RunServer(&opts)
}
// Make sure that we do not run the http server for monitoring unless asked.
func TestNoMonitorPort(t *testing.T) {
s := startServer(t, MONITOR_PORT, "")
defer s.stopServer()
s := runMonitorServer(0)
defer s.Shutdown()
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT)
if resp, err := http.Get(url + "varz"); err == nil {
@@ -33,9 +40,8 @@ func TestNoMonitorPort(t *testing.T) {
}
func TestVarz(t *testing.T) {
args := fmt.Sprintf("-m %d", server.DEFAULT_HTTP_PORT)
s := startServer(t, MONITOR_PORT, args)
defer s.stopServer()
s := runMonitorServer(server.DEFAULT_HTTP_PORT)
defer s.Shutdown()
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT)
resp, err := http.Get(url + "varz")
@@ -60,9 +66,6 @@ func TestVarz(t *testing.T) {
if time.Since(v.Start) > 10*time.Second {
t.Fatal("Expected start time to be within 10 seconds.")
}
if v.Mem > 8192 {
t.Fatalf("Did not expect memory to be so high: %d\n", v.Mem)
}
// Create a connection to test ConnInfo
cl := createClientConn(t, "localhost", MONITOR_PORT)
@@ -109,11 +112,10 @@ func TestVarz(t *testing.T) {
}
func TestConnz(t *testing.T) {
args := fmt.Sprintf("-m %d", server.DEFAULT_HTTP_PORT)
s := startServer(t, MONITOR_PORT, args)
defer s.stopServer()
s := runMonitorServer(server.DEFAULT_HTTP_PORT+1)
defer s.Shutdown()
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT)
url := fmt.Sprintf("http://localhost:%d/", server.DEFAULT_HTTP_PORT+1)
resp, err := http.Get(url + "connz")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)