From c3eba763675596ceebcc6f8d9ee72ee4aad2646f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 13 Nov 2012 11:26:53 -0800 Subject: [PATCH] Added base logging/tracing --- gnatsd.go | 39 ++++++++++++------- server/client.go | 43 ++++++++++++++------- server/const.go | 2 +- server/log.go | 98 ++++++++++++++++++++++++++++++++++++++++++++++++ server/server.go | 88 ++++++++++++++++++++++++++++++------------- 5 files changed, 216 insertions(+), 54 deletions(-) create mode 100644 server/log.go diff --git a/gnatsd.go b/gnatsd.go index e167c135..a34f58ee 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -10,25 +10,38 @@ import ( "github.com/apcera/gnatsd/server" ) -var port = server.DEFAULT_PORT -var host = server.DEFAULT_HOST - func main() { - // Parse flags + // logging setup + server.LogSetup() - flag.IntVar(&port, "port", server.DEFAULT_PORT, "Port to listen on.") - flag.IntVar(&port, "p", server.DEFAULT_PORT, "Port to listen on.") - flag.StringVar(&host, "host", server.DEFAULT_HOST, "Network host to listen on.") - flag.StringVar(&host, "h", server.DEFAULT_HOST, "Network host to listen on.") + opts := server.Options{} + + var debugAndTrace bool + + // Parse flags + flag.IntVar(&opts.Port, "port", server.DEFAULT_PORT, "Port to listen on.") + flag.IntVar(&opts.Port, "p", server.DEFAULT_PORT, "Port to listen on.") + flag.StringVar(&opts.Host, "host", server.DEFAULT_HOST, "Network host to listen on.") + flag.StringVar(&opts.Host, "h", server.DEFAULT_HOST, "Network host to listen on.") + flag.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.") + flag.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.") + flag.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.") + flag.BoolVar(&opts.Trace, "trace", false, "Enable Trace logging.") + flag.BoolVar(&debugAndTrace, "DV", false, "Enable Debug and Trace logging.") flag.Parse() + if debugAndTrace { + opts.Trace, opts.Debug = true, true + } + // Profiler go func() { log.Println(http.ListenAndServe("localhost:6062", nil)) }() - // Parse config if given - log.Println("starting up!") - s := server.New() - s.AcceptLoop(host, port) -} \ No newline at end of file + // Parse config if given + + s := server.New(opts) + s.AcceptLoop() +} + diff --git a/server/client.go b/server/client.go index 5259dc73..7ab19a92 100644 --- a/server/client.go +++ b/server/client.go @@ -17,7 +17,7 @@ import ( // The size of the bufio reader/writer on top of the socket. //const defaultBufSize = 32768 -const defaultBufSize = 65536 +const defaultBufSize = 32768 type client struct { mu sync.Mutex @@ -79,8 +79,24 @@ func (c *client) readLoop() { } } +func (c *client) traceMsg(msg []byte) { + opa := []interface{}{"Processing msg", string(c.pa.subject), string(c.pa.reply), string(msg)} + Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) +} + +func (c *client) traceOp(op string, arg []byte) { + if !trace { + return + } + opa := []interface{}{fmt.Sprintf("%s OP", op)} + if arg != nil { + opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) + } + Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) +} + func (c *client) processConnect(arg []byte) error { - // log.Printf("Got connect arg: '%s'\n", arg) + c.traceOp("CONNECT", arg) // FIXME, check err return json.Unmarshal(arg, &c.opts) } @@ -88,7 +104,7 @@ func (c *client) processConnect(arg []byte) error { var pongResp = []byte(fmt.Sprintf("PONG%s", CR_LF)) func (c *client) processPing() { -// log.Printf("Process ping\n") + c.traceOp("PING", nil) if c.conn == nil { return } @@ -99,7 +115,7 @@ func (c *client) processPing() { const argsLenMax = 3 func (c *client) processPub(arg []byte) error { - // log.Printf("Got pub arg: '%s'\n", arg) + c.traceOp("PUB", arg) args := splitArg(arg) switch len(args) { case 2: @@ -118,7 +134,6 @@ func (c *client) processPub(arg []byte) error { if c.pa.size < 0 { return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg) } -// log.Printf("Parsed pubArg: %+v\n", c.pa) return nil } @@ -146,10 +161,10 @@ func splitArg(arg []byte) [][]byte { } func (c *client) processSub(argo []byte) error { + c.traceOp("SUB", argo) // Copy so we do not reference a potentially large buffer arg := make([]byte, len(argo)) copy(arg, argo) -// log.Printf("Got sub arg for client[%v]: '%s'\n", c, arg) args := splitArg(arg) sub := &subscription{client: c} switch len(args) { @@ -189,9 +204,8 @@ func (c *client) unsubscribe(sub *subscription) { } func (c *client) processUnsub(arg []byte) error { -// log.Printf("Got unsub arg for client[%v]: '%s'\n", c, arg) + c.traceOp("UNSUB", arg) args := splitArg(arg) - var sid []byte max := -1 @@ -204,11 +218,12 @@ func (c *client) processUnsub(arg []byte) error { default: return fmt.Errorf("processUnsub Parse Error: '%s'", arg) } - sub := (c.subs.Get(sid)).(*subscription) - if max > 0 { - sub.max = int64(max) + if sub, ok := (c.subs.Get(sid)).(*subscription); ok { + if max > 0 { + sub.max = int64(max) + } + c.unsubscribe(sub) } - c.unsubscribe(sub) return nil } @@ -248,6 +263,7 @@ func (sub *subscription) deliverMsg(mh, msg []byte) { // go flusher routine. Single for all connections? func (c *client) processMsg(msg []byte) { + c.traceMsg(msg) c.nm++ if c.srv == nil { return @@ -290,7 +306,8 @@ func (c *client) closeConnection() { if c.conn == nil { return } - // log.Printf("Closing Connection: %v\n", c) + Debug("Client connection closed", clientConnStr(c.conn), c.cid) + // c.bw.Flush() c.conn.Close() c.conn = nil diff --git a/server/const.go b/server/const.go index 7bb63cfd..37ebf307 100644 --- a/server/const.go +++ b/server/const.go @@ -7,7 +7,7 @@ import ( ) const ( - VERSION = "0.1.0.alpha.1" + VERSION = "go 0.1.0.alpha.1" DEFAULT_PORT = 4222 DEFAULT_HOST = "0.0.0.0" diff --git a/server/log.go b/server/log.go new file mode 100644 index 00000000..15dc98c9 --- /dev/null +++ b/server/log.go @@ -0,0 +1,98 @@ +// Copyright 2012 Apcera Inc. All rights reserved. + +package server + +import ( + "fmt" + "log" + "strings" +) + +// logging functionality, compatible with original nats-server + +var trace bool +var debug bool + +func LogSetup() { + log.SetFlags(0) +} + +func (s *Server) LogInit() { + if s.opts.Logtime { + log.SetFlags(log.LstdFlags) + } + if s.opts.Trace { + Log(s.opts) + } + if s.opts.Debug { + debug = true + Log("DEBUG is on") + } + if s.opts.Trace { + trace = true + Log("TRACE is on") + } +} + +func alreadyFormatted(s string) bool { + return strings.HasPrefix(s, "[") +} + +func logStr(v []interface{}) string { + args := make([]string, 0, len(v)) + for _, vt := range v { + switch t := vt.(type) { + case string: + if alreadyFormatted(t) { + args = append(args, t) + } else { + t = strings.Replace(t, "\"", "\\\"", -1) + args = append(args, fmt.Sprintf("\"%s\"", t)) + } + default: + args = append(args, fmt.Sprintf("%+v", vt)) + } + } + return fmt.Sprintf("[%s]", strings.Join(args,", ")) +} + +func Log(v ...interface{}) { + log.Print(logStr(v)) +} + +func Logf(format string, v ...interface{}) { + Log(fmt.Sprintf(format, v...)) +} + +func Fatal(v ...interface{}) { + log.Fatalf(logStr(v)) +} + +func Fatalf(format string, v ...interface{}) { + Fatal(fmt.Sprintf(format, v...)) +} + +func Debug(v ...interface{}) { + if debug { + Log(v...) + } +} + +func DebugF(format string, v ...interface{}) { + if debug { + Debug(fmt.Sprintf(format, v...)) + } +} + +func Trace(v ...interface{}) { + if trace { + Log(v...) + } +} + +func TraceF(format string, v ...interface{}) { + if trace { + Trace(fmt.Sprintf(format, v...)) + } +} + diff --git a/server/server.go b/server/server.go index 87f2ea74..740b48e9 100644 --- a/server/server.go +++ b/server/server.go @@ -6,7 +6,6 @@ import ( "bufio" "encoding/json" "fmt" - "log" "net" "sync/atomic" @@ -14,11 +13,20 @@ import ( "github.com/apcera/gnatsd/sublist" ) +type Options struct { + Host string + Port int + Trace bool + Debug bool + Logtime bool + MaxConn int +} + type info struct { Id string `json:"server_id"` Version string `json:"version"` Host string `json:"host"` - Port uint `json:"port"` + Port int `json:"port"` AuthRequired bool `json:"auth_required"` SslRequired bool `json:"ssl_required"` MaxPayload int `json:"max_payload"` @@ -29,44 +37,61 @@ type Server struct { infoJson []byte sl *sublist.Sublist gcid uint64 + opts Options + trace bool + debug bool } -func New() *Server { - s := &Server{ - info: info{ - Id: genId(), - Version: VERSION, - Host: DEFAULT_HOST, - Port: DEFAULT_PORT, - AuthRequired: false, - SslRequired: false, - MaxPayload: MAX_PAYLOAD_SIZE, - }, - sl: sublist.New(), +func optionDefaults(opt *Options) { + if opt.MaxConn == 0 { + opt.MaxConn = DEFAULT_MAX_CONNECTIONS } +} + +func New(opts Options) *Server { + optionDefaults(&opts) + inf := info{ + Id: genId(), + Version: VERSION, + Host: opts.Host, + Port: opts.Port, + AuthRequired: false, + SslRequired: false, + MaxPayload: MAX_PAYLOAD_SIZE, + } + s := &Server{ + info: inf, + sl: sublist.New(), + opts: opts, + debug: opts.Debug, + trace: opts.Trace, + } + // Setup logging with flags + s.LogInit() + // Generate the info json b, err := json.Marshal(s.info) if err != nil { - log.Fatalf("Err marshalling INFO JSON: %+v\n", err) + Fatalf("Err marshalling INFO JSON: %+v\n", err) } s.infoJson = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) - return s } -func (s *Server) AcceptLoop(host string, port int) { - hp := fmt.Sprintf("%s:%d", host, port) +func (s *Server) AcceptLoop() { + Logf("Starting nats-server version %s on port %d", VERSION, s.opts.Port) + + hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) l, e := net.Listen("tcp", hp) if e != nil { - println(e) + Fatalf("Error listening on port: %d - %v", s.opts.Port, e) return } - log.Println("Listening on ", l.Addr()) for { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Printf("Accept error: %v", err) + Logf("Accept error: %v", err) } continue } @@ -74,21 +99,30 @@ func (s *Server) AcceptLoop(host string, port int) { } } +func clientConnStr(conn net.Conn) interface{} { + if ip, ok := conn.(*net.TCPConn); ok { + addr := ip.RemoteAddr().(*net.TCPAddr) + return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)} + } + return "N/A" +} + func (s *Server) createClient(conn net.Conn) *client { c := &client{srv: s, conn: conn} c.cid = atomic.AddUint64(&s.gcid, 1) -// log.Printf("Creating Client: %+v\n", c) c.bw = bufio.NewWriterSize(c.conn, defaultBufSize) c.br = bufio.NewReaderSize(c.conn, defaultBufSize) c.subs = hashmap.New() -/* - if ipc := conn.(*net.TCPConn) ; ipc != nil { - ipc.SetReadBuffer(65536) - } -*/ + + if ip, ok := conn.(*net.TCPConn); ok { + ip.SetReadBuffer(32768) + } s.sendInfo(c) go c.readLoop() + + Debug("Client connection created", clientConnStr(conn), c.cid) + return c }