diff --git a/gnatsd.go b/gnatsd.go index d7716b05..ea7be517 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -6,13 +6,11 @@ import ( "flag" "strings" + "github.com/apcera/gnatsd/logger" "github.com/apcera/gnatsd/server" ) func main() { - // logging setup - server.LogSetup() - // Server Options opts := server.Options{} @@ -44,6 +42,7 @@ func main() { flag.StringVar(&opts.PidFile, "pid", "", "File to store process pid.") flag.StringVar(&opts.LogFile, "l", "", "File to store logging output.") flag.StringVar(&opts.LogFile, "log", "", "File to store logging output.") + flag.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method.") flag.BoolVar(&showVersion, "version", false, "Print version information.") flag.BoolVar(&showVersion, "v", false, "Print version information.") flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port") @@ -92,6 +91,22 @@ func main() { // Create the server with appropriate options. s := server.New(&opts) + // Builds and set the logger based on the flags + s.SetLogger(buildLogger(&opts)) + // Start things up. Block here until done. s.Start() } + +func buildLogger(opts *server.Options) server.Logger { + if opts.Syslog { + return logger.NewSysLogger(opts.Debug, opts.Trace) + } + + if opts.LogFile != "" { + return logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace) + } + + return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace) + +} diff --git a/logger/log.go b/logger/log.go new file mode 100644 index 00000000..7d27a20d --- /dev/null +++ b/logger/log.go @@ -0,0 +1,65 @@ +// Copyright 2012-2014 Apcera Inc. All rights reserved. +package logger + +import ( + "log" + "os" +) + +type Logger struct { + logger *log.Logger + debug bool + trace bool +} + +func NewStdLogger(time, debug, trace bool) *Logger { + flags := 0 + if time { + flags = log.LstdFlags + } + + return &Logger{ + logger: log.New(os.Stderr, "", flags), + debug: debug, + trace: trace, + } +} + +func NewFileLogger(filename string, time, debug, trace bool) *Logger { + fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE + f, err := os.OpenFile(filename, fileflags, 0660) + if err != nil { + log.Fatal("error opening file: %v", err) + } + + flags := 0 + if time { + flags = log.LstdFlags + } + + return &Logger{ + logger: log.New(f, "", flags), + debug: debug, + trace: trace, + } +} + +func (l *Logger) Log(format string, v ...interface{}) { + l.logger.Printf(format, v...) +} + +func (l *Logger) Fatal(format string, v ...interface{}) { + l.logger.Fatalf(format, v) +} + +func (l *Logger) Debug(format string, v ...interface{}) { + if l.debug == true { + l.Log(format, v...) + } +} + +func (l *Logger) Trace(format string, v ...interface{}) { + if l.trace == true { + l.Log(format, v...) + } +} diff --git a/logger/log_test.go b/logger/log_test.go new file mode 100644 index 00000000..bdd5629d --- /dev/null +++ b/logger/log_test.go @@ -0,0 +1,128 @@ +package logger + +import ( + "bytes" + "io" + "io/ioutil" + "log" + "os" + "testing" +) + +func TestStdLogger(t *testing.T) { + logger := NewStdLogger(false, false, false) + + flags := logger.logger.Flags() + if flags != 0 { + t.Fatalf("Expected %q, received %q\n", 0, flags) + } + + if logger.debug != false { + t.Fatalf("Expected %b, received %b\n", false, logger.debug) + } + + if logger.trace != false { + t.Fatalf("Expected %b, received %b\n", false, logger.trace) + } +} + +func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { + logger := NewStdLogger(true, true, true) + + flags := logger.logger.Flags() + if flags != log.LstdFlags { + t.Fatalf("Expected %d, received %d\n", log.LstdFlags, flags) + } + + if logger.debug != true { + t.Fatalf("Expected %b, received %b\n", true, logger.debug) + } + + if logger.trace != true { + t.Fatalf("Expected %b, received %b\n", true, logger.trace) + } +} + +func TestStdLoggerLog(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false) + logger.Log("foo") + }, "foo\n") +} + +func TestStdLoggerDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, true, false) + logger.Debug("foo %s", "bar") + }, "foo bar\n") +} + +func TestStdLoggerDebugWithOutDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false) + logger.Debug("foo") + }, "") +} + +func TestStdLoggerTrace(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, true) + logger.Trace("foo") + }, "foo\n") +} + +func TestStdLoggerTraceWithOutDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false) + logger.Trace("foo") + }, "") +} + +func TestFileLogger(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "_gnatsd") + if err != nil { + t.Fatal("Could not create tmp dir") + } + defer os.RemoveAll(tmpDir) + + file, err := ioutil.TempFile(tmpDir, "gnatsd:log_") + file.Close() + + logger := NewFileLogger(file.Name(), false, false, false) + logger.Log("foo") + + buf, err := ioutil.ReadFile(file.Name()) + if err != nil { + t.Fatalf("Could not read logfile: %v", err) + } + if len(buf) <= 0 { + t.Fatal("Expected a non-zero length logfile") + } + + if string(buf) != "foo\n" { + t.Fatalf("Expected '%s', received '%s'\n", "foo", string(buf)) + } +} + +func expectOutput(t *testing.T, f func(), expect string) { + old := os.Stdout // keep backup of the real stdout + r, w, _ := os.Pipe() + os.Stderr = w + + f() + + outC := make(chan string) + // copy the output in a separate goroutine so printing can't block indefinitely + go func() { + var buf bytes.Buffer + io.Copy(&buf, r) + outC <- buf.String() + }() + + os.Stderr.Close() + os.Stderr = old // restoring the real stdout + out := <-outC + if out != expect { + t.Fatalf("Expected '%s', received '%s'\n", expect, out) + } +} diff --git a/logger/syslog.go b/logger/syslog.go new file mode 100644 index 00000000..f08d4a8e --- /dev/null +++ b/logger/syslog.go @@ -0,0 +1,60 @@ +// Copyright 2012-2014 Apcera Inc. All rights reserved. +package logger + +import ( + "fmt" + "log" + "log/syslog" +) + +type SysLogger struct { + writer *syslog.Writer + debug bool + trace bool +} + +func NewSysLogger(debug, trace bool) *SysLogger { + w, err := syslog.New(syslog.LOG_DAEMON|syslog.LOG_NOTICE, "gnatsd") + if err != nil { + log.Fatal("error connecting to syslog: %v", err) + } + + return &SysLogger{ + writer: w, + debug: debug, + trace: trace, + } +} + +func NewRemoteSysLogger(network, raddr string, debug, trace bool) *SysLogger { + w, err := syslog.Dial(network, raddr, syslog.LOG_DEBUG, "gnatsd") + if err != nil { + log.Fatal("error connecting to syslog: %v", err) + } + + return &SysLogger{ + writer: w, + debug: debug, + trace: trace, + } +} + +func (l *SysLogger) Log(format string, v ...interface{}) { + l.writer.Notice(fmt.Sprintf(format, v...)) +} + +func (l *SysLogger) Fatal(format string, v ...interface{}) { + l.writer.Crit(fmt.Sprintf(format, v...)) +} + +func (l *SysLogger) Debug(format string, v ...interface{}) { + if l.debug == true { + l.writer.Debug(fmt.Sprintf(format, v...)) + } +} + +func (l *SysLogger) Trace(format string, v ...interface{}) { + if l.trace == true { + l.writer.Info(fmt.Sprintf(format, v...)) + } +} diff --git a/server/client.go b/server/client.go index dfc79b66..6c37c3c9 100644 --- a/server/client.go +++ b/server/client.go @@ -33,19 +33,20 @@ const ( ) type client struct { - mu sync.Mutex - typ int - cid uint64 - opts clientOpts - nc net.Conn - bw *bufio.Writer - srv *Server - subs *hashmap.HashMap - pcd map[*client]struct{} - atmr *time.Timer - ptmr *time.Timer - pout int - msgb [msgScratchSize]byte + mu sync.Mutex + typ int + cid uint64 + opts clientOpts + nc net.Conn + bw *bufio.Writer + srv *Server + subs *hashmap.HashMap + pcd map[*client]struct{} + atmr *time.Timer + ptmr *time.Timer + trace bool + pout int + msgb [msgScratchSize]byte parseState stats @@ -145,7 +146,7 @@ func (c *client) readLoop() { return } if err := c.parse(b[:n]); err != nil { - Log(err, clientConnStr(c.nc), c.cid) + log.Log(err.Error(), clientConnStr(c.nc), c.cid) // Auth was handled inline if err != ErrAuthorization { c.sendErr("Parser Error") @@ -162,7 +163,7 @@ func (c *client) readLoop() { err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { - Debugf("Error flushing: %v", err) + log.Debug("Error flushing: %v", err) cp.mu.Unlock() cp.closeConnection() cp.mu.Lock() @@ -179,20 +180,25 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { + if c.trace != true { + return + } + pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])} - Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) + log.Trace("[cid: %d] MSG: %s", c.cid, opa) } func (c *client) traceOp(op string, arg []byte) { - if trace == 0 { + if c.trace != true { return } + opa := []interface{}{fmt.Sprintf("%s OP", op)} if arg != nil { opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) } - Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) + log.Trace("[cid: %d] OP: %s", c.cid, opa) } // Process the info message if we are a route. @@ -203,17 +209,21 @@ func (c *client) processRouteInfo(info *Info) { return } c.route.remoteID = info.ID + // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. s := c.srv c.mu.Unlock() if s.addRoute(c) { - Debug("Registering remote route", info.ID) + log.Debug("[cid: %d] Registering remote route '%s'", c.cid, info.ID) // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) } else { - Debug("Detected duplicate remote route", info.ID, clientConnStr(c.nc), c.cid) + log.Debug( + "[cid: %d] Detected duplicate remote route '%s', %s", + c.cid, info.ID, clientConnStr(c.nc), + ) c.closeConnection() } } @@ -231,7 +241,7 @@ func (c *client) processInfo(arg []byte) error { } func (c *client) processErr(errStr string) { - Log(errStr, clientConnStr(c.nc), c.cid) + log.Log(errStr, clientConnStr(c.nc), c.cid) c.closeConnection() } @@ -301,7 +311,10 @@ func (c *client) processPing() { err := c.bw.Flush() if err != nil { c.clearConnection() - Debug("Error on Flush", err, clientConnStr(c.nc), c.cid) + log.Debug( + "[cid: %d] Error on Flush, error %s, %s", + c.cid, err.Error(), clientConnStr(c.nc), + ) } c.mu.Unlock() } @@ -314,7 +327,7 @@ func (c *client) processPong() { } func (c *client) processMsgArgs(arg []byte) error { - if trace > 0 { + if c.trace == true { c.traceOp("MSG", arg) } @@ -361,7 +374,7 @@ func (c *client) processMsgArgs(arg []byte) error { } func (c *client) processPub(arg []byte) error { - if trace > 0 { + if c.trace == true { c.traceOp("PUB", arg) } @@ -478,8 +491,10 @@ func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() if sub.max > 0 && sub.nm < sub.max { - Debugf("Deferring actual UNSUB(%s): %d max, %d received\n", - string(sub.subject), sub.max, sub.nm) + log.Debug( + "[cid: %d] Deferring actual UNSUB(%s): %d max, %d received\n", + c.cid, string(sub.subject), sub.max, sub.nm, + ) return } c.traceOp("DELSUB", sub.sid) @@ -621,10 +636,10 @@ writeErr: client.mu.Unlock() if ne, ok := err.(net.Error); ok && ne.Timeout() { - Log("Slow Consumer Detected", clientConnStr(client.nc), client.cid) + log.Log("[cid: %d] Slow Consumer Detected %s", client.cid, clientConnStr(client.nc)) client.closeConnection() } else { - Debugf("Error writing msg: %v", err) + log.Debug("[cid: %d] Error writing msg: %v", client.cid, err) } } @@ -647,7 +662,7 @@ func (c *client) processMsg(msg []byte) { atomic.AddInt64(&srv.inBytes, msgSize) } - if trace > 0 { + if c.trace == true { c.traceMsg(msg) } if srv == nil { @@ -732,12 +747,14 @@ func (c *client) processMsg(msg []byte) { } if sub.client == nil || sub.client.nc == nil || sub.client.route == nil || sub.client.route.remoteID == "" { - Debug("Bad or Missing ROUTER Identity, not processing msg", - clientConnStr(c.nc), c.cid) + log.Debug( + "[cid: %d] Bad or Missing ROUTER Identity, not processing msg, %s", + c.cid, clientConnStr(c.nc), + ) continue } if _, ok := rmap[sub.client.route.remoteID]; ok { - Debug("Ignoring route, already processed", c.cid) + log.Debug("[cid: %d] Ignoring route, already processed", c.cid) continue } rmap[sub.client.route.remoteID] = routeSeen @@ -766,12 +783,12 @@ func (c *client) processPingTimer() { return } - Debug("Client Ping Timer", clientConnStr(c.nc), c.cid) + log.Debug("Client Ping Timer", clientConnStr(c.nc), c.cid) // Check for violation c.pout += 1 if c.pout > c.srv.opts.MaxPingsOut { - Debug("Stale Client Connection - Closing", clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] Stale Client Connection - Closing %s", c.cid, clientConnStr(c.nc)) if c.bw != nil { c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")) c.bw.Flush() @@ -784,7 +801,7 @@ func (c *client) processPingTimer() { c.bw.WriteString("PING\r\n") err := c.bw.Flush() if err != nil { - Debug("Error on Client Ping Flush", err, clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] Error on Client Ping Flush, error %s %s", c.cid, err, clientConnStr(c.nc)) c.clearConnection() } else { // Reset to fire again if all OK. @@ -856,9 +873,7 @@ func (c *client) closeConnection() { return } - // FIXME(dlc) - This creates garbage for no reason. - dbgString := fmt.Sprintf("%s connection closed", c.typeString()) - Debug(dbgString, clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] %s connection closed: %s", c.cid, c.typeString(), clientConnStr(c.nc)) c.clearAuthTimer() c.clearPingTimer() @@ -895,10 +910,10 @@ func (c *client) closeConnection() { defer srv.mu.Unlock() rid := c.route.remoteID if rid != "" && srv.remotes[rid] != nil { - Debug("Not attempting reconnect for solicited route, already connected.", rid) + log.Debug("[cid: %d] Not attempting reconnect for solicited route, already connected. Try %d", c.cid, rid) return } else { - Debug("Attempting reconnect for solicited route", c.cid) + log.Debug("[cid: %d] Attempting reconnect for solicited route", c.cid) go srv.reConnectToRoute(c.route.url) } } diff --git a/server/log.go b/server/log.go index 6f62746b..2597056b 100644 --- a/server/log.go +++ b/server/log.go @@ -1,119 +1,23 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. +// Copyright 2012-2014 Apcera Inc. All rights reserved. package server -import ( - "fmt" - "log" - "os" - "strings" - "sync/atomic" -) +var log Logger = &NilLogger{} -// logging functionality, compatible with the original nats-server. - -var trace int32 -var debug int32 -var nolog int32 - -// LogSetup will properly setup logging and the logging flags. -func LogSetup() { - log.SetFlags(0) - atomic.StoreInt32(&nolog, 0) - atomic.StoreInt32(&debug, 0) - atomic.StoreInt32(&trace, 0) +type Logger interface { + Log(format string, v ...interface{}) + Fatal(format string, v ...interface{}) + Debug(format string, v ...interface{}) + Trace(format string, v ...interface{}) } -// LogInit parses option flags and sets up logging. -func (s *Server) LogInit() { - // Reset - LogSetup() - - if s.opts.Logtime { - log.SetFlags(log.LstdFlags) - } - if s.opts.NoLog { - atomic.StoreInt32(&nolog, 1) - } - if s.opts.LogFile != "" { - flags := os.O_WRONLY | os.O_APPEND | os.O_CREATE - file, err := os.OpenFile(s.opts.LogFile, flags, 0660) - if err != nil { - PrintAndDie(fmt.Sprintf("Error opening logfile: %q", s.opts.LogFile)) - } - log.SetOutput(file) - } - if s.opts.Debug { - Log(s.opts) - atomic.StoreInt32(&debug, 1) - Log("DEBUG is on") - } - if s.opts.Trace { - atomic.StoreInt32(&trace, 1) - Log("TRACE is on") - } +func (s *Server) SetLogger(logger Logger) { + log = logger } -func alreadyFormatted(s string) bool { - return strings.HasPrefix(s, "[") -} +type NilLogger struct{} -func logStr(v []interface{}) string { - args := make([]string, 0, len(v)) - for _, vt := range v { - switch t := vt.(type) { - case string: - if alreadyFormatted(t) { - args = append(args, t) - } else { - t = strings.Replace(t, "\"", "\\\"", -1) - args = append(args, fmt.Sprintf("\"%s\"", t)) - } - default: - args = append(args, fmt.Sprintf("%+v", vt)) - } - } - return fmt.Sprintf("[%s]", strings.Join(args, ", ")) -} - -func Log(v ...interface{}) { - if nolog == 0 { - log.Print(logStr(v)) - } -} - -func Logf(format string, v ...interface{}) { - Log(fmt.Sprintf(format, v...)) -} - -func Fatal(v ...interface{}) { - log.Fatalf(logStr(v)) -} - -func Fatalf(format string, v ...interface{}) { - Fatal(fmt.Sprintf(format, v...)) -} - -func Debug(v ...interface{}) { - if debug > 0 { - Log(v...) - } -} - -func Debugf(format string, v ...interface{}) { - if debug > 0 { - Debug(fmt.Sprintf(format, v...)) - } -} - -func Trace(v ...interface{}) { - if trace > 0 { - Log(v...) - } -} - -func Tracef(format string, v ...interface{}) { - if trace > 0 { - Trace(fmt.Sprintf(format, v...)) - } -} +func (l *NilLogger) Log(format string, v ...interface{}) {} +func (l *NilLogger) Fatal(format string, v ...interface{}) {} +func (l *NilLogger) Debug(format string, v ...interface{}) {} +func (l *NilLogger) Trace(format string, v ...interface{}) {} diff --git a/server/log_test.go b/server/log_test.go new file mode 100644 index 00000000..273ad658 --- /dev/null +++ b/server/log_test.go @@ -0,0 +1,25 @@ +// Copyright 2014 Apcera Inc. All rights reserved. + +package server + +import ( + "testing" +) + +func TestSetLogger(t *testing.T) { + // We assert that the default logger is the NilLogger + _ = log.(*NilLogger) + + server := &Server{} + server.SetLogger(&DummyLogger{}) + + // We assert that the logger has change to the DummyLogger + _ = log.(*DummyLogger) +} + +type DummyLogger struct{} + +func (l *DummyLogger) Log(format string, v ...interface{}) {} +func (l *DummyLogger) Fatal(format string, v ...interface{}) {} +func (l *DummyLogger) Debug(format string, v ...interface{}) {} +func (l *DummyLogger) Trace(format string, v ...interface{}) {} diff --git a/server/monitor.go b/server/monitor.go index 27ed78c4..4da019bd 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -88,7 +88,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(c, "", " ") if err != nil { - Logf("Error marshalling response to /connz request: %v", err) + log.Log("Error marshalling response to /connz request: %v", err) } w.Write(b) } @@ -114,7 +114,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(st, "", " ") if err != nil { - Logf("Error marshalling response to /subscriptionsz request: %v", err) + log.Log("Error marshalling response to /subscriptionsz request: %v", err) } w.Write(b) } @@ -161,7 +161,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(v, "", " ") if err != nil { - Logf("Error marshalling response to /varz request: %v", err) + log.Log("Error marshalling response to /varz request: %v", err) } w.Write(b) } diff --git a/server/opts.go b/server/opts.go index bc21d874..6383e404 100644 --- a/server/opts.go +++ b/server/opts.go @@ -43,6 +43,7 @@ type Options struct { ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` + Syslog bool `json:"-"` } type authorization struct { @@ -219,7 +220,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) } if cport == port && isIpInList(selfIPs, getUrlIp(host)) { - Log("Self referencing IP found: ", r) + log.Log("Self referencing IP found: ", r) continue } cleanRoutes = append(cleanRoutes, r) @@ -250,7 +251,7 @@ func getUrlIp(ipStr string) []net.IP { hostAddr, err := net.LookupHost(ipStr) if err != nil { - Log("Error looking up host with route hostname: ", err) + log.Log("Error looking up host with route hostname: ", err) return ipList } for _, addr := range hostAddr { @@ -267,7 +268,7 @@ func getInterfaceIPs() []net.IP { interfaceAddr, err := net.InterfaceAddrs() if err != nil { - Log("Error getting self referencing address: ", err) + log.Log("Error getting self referencing address: ", err) return localIPs } @@ -276,7 +277,7 @@ func getInterfaceIPs() []net.IP { if net.ParseIP(interfaceIP.String()) != nil { localIPs = append(localIPs, interfaceIP) } else { - Log("Error parsing self referencing address: ", err) + log.Log("Error parsing self referencing address: ", err) } } return localIPs diff --git a/server/route.go b/server/route.go index 8c73ebae..19e84eeb 100644 --- a/server/route.go +++ b/server/route.go @@ -46,7 +46,7 @@ func (c *client) sendConnect() { } b, err := json.Marshal(cinfo) if err != nil { - Logf("Error marshalling CONNECT to route: %v\n", err) + log.Log("Error marshalling CONNECT to route: %v\n", err) c.closeConnection() } c.bw.WriteString(fmt.Sprintf(conProto, b)) @@ -79,7 +79,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) { route.bw.Write(b.Bytes()) route.bw.Flush() - Debug("Route sent local subscriptions", route.cid) + log.Debug("Route sent local subscriptions", route.cid) } func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { @@ -93,12 +93,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Initialize c.initClient() - Debug("Route connection created", clientConnStr(c.nc), c.cid) + log.Debug("Route connection created", clientConnStr(c.nc), c.cid) // Queue Connect proto if we solicited the connection. if didSolicit { r.url = rURL - Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) + log.Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) c.sendConnect() } @@ -234,10 +234,10 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { func (s *Server) routeAcceptLoop(ch chan struct{}) { hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort) - Logf("Listening for route connections on %s", hp) + log.Log("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - Fatalf("Error listening on router port: %d - %v", s.opts.Port, e) + log.Fatal("Error listening on router port: %d - %v", s.opts.Port, e) return } @@ -255,7 +255,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - Debug("Temporary Route Accept Error(%v), sleeping %dms", + log.Debug("Temporary Route Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -263,14 +263,14 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Logf("Accept error: %v", err) + log.Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createRoute(conn, nil) } - Debug("Router accept loop exiting..") + log.Debug("Router accept loop exiting..") s.done <- true } @@ -294,7 +294,7 @@ func (s *Server) StartRouting() { // Generate the info json b, err := json.Marshal(info) if err != nil { - Fatalf("Error marshalling Route INFO JSON: %+v\n", err) + log.Fatal("Error marshalling Route INFO JSON: %+v\n", err) } s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) @@ -314,10 +314,10 @@ func (s *Server) reConnectToRoute(rUrl *url.URL) { func (s *Server) connectToRoute(rUrl *url.URL) { for s.isRunning() { - Debugf("Trying to connect to route on %s", rUrl.Host) + log.Debug("Trying to connect to route on %s", rUrl.Host) conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) if err != nil { - Debugf("Error trying to connect to route: %v", err) + log.Debug("Error trying to connect to route: %v", err) select { case <-s.rcQuit: return diff --git a/server/server.go b/server/server.go index 0245b971..ffef2aa4 100644 --- a/server/server.go +++ b/server/server.go @@ -82,6 +82,7 @@ func New(opts *Options) *Server { if opts.Username != "" || opts.Authorization != "" { info.AuthRequired = true } + s := &Server{ info: info, sl: sublist.New(), @@ -95,9 +96,6 @@ func New(opts *Options) *Server { s.mu.Lock() defer s.mu.Unlock() - // Setup logging with flags - s.LogInit() - // For tracking clients s.clients = make(map[uint64]*client) @@ -108,20 +106,16 @@ func New(opts *Options) *Server { // Used to kick out all of the route // connect Go routines. s.rcQuit = make(chan bool) + s.handleSignals() // Generate the info json b, err := json.Marshal(s.info) if err != nil { - Fatalf("Error marshalling INFO JSON: %+v\n", err) + log.Fatal("Error marshalling INFO JSON: %+v\n", err) } + s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) - s.handleSignals() - - Logf("Starting gnatsd version %s", VERSION) - - s.running = true - return s } @@ -146,9 +140,9 @@ func (s *Server) handleSignals() { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - Debugf("Trapped Signal; %v", sig) + log.Debug("Trapped Signal; %v", sig) // FIXME, trip running? - Log("Server Exiting..") + log.Log("Server Exiting..") os.Exit(0) } }() @@ -172,6 +166,8 @@ func (s *Server) logPid() { // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { + log.Log("Starting gnatsd version %s", VERSION) + s.running = true // Log the pid to a file if s.opts.PidFile != _EMPTY_ { @@ -265,14 +261,14 @@ func (s *Server) Shutdown() { // AcceptLoop is exported for easier testing. func (s *Server) AcceptLoop() { hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) - Logf("Listening for client connections on %s", hp) + log.Log("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - Fatalf("Error listening on port: %d - %v", s.opts.Port, e) + log.Fatal("Error listening on port: %d - %v", s.opts.Port, e) return } - Logf("gnatsd is ready") + log.Log("gnatsd is ready") // Setup state that can enable shutdown s.mu.Lock() @@ -282,12 +278,12 @@ func (s *Server) AcceptLoop() { // Write resolved port back to options. _, port, err := net.SplitHostPort(l.Addr().String()) if err != nil { - Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) + log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } portNum, err := strconv.Atoi(port) if err != nil { - Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) + log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } s.opts.Port = portNum @@ -298,7 +294,7 @@ func (s *Server) AcceptLoop() { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - Debug("Temporary Client Accept Error(%v), sleeping %dms", + log.Debug("Temporary Client Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -306,36 +302,39 @@ func (s *Server) AcceptLoop() { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Logf("Accept error: %v", err) + log.Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createClient(conn) } - Log("Server Exiting..") + log.Log("Server Exiting..") s.done <- true } // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { - Logf("Starting profiling on http port %d", s.opts.ProfPort) + log.Log("Starting profiling on http port %d", s.opts.ProfPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.ProfPort) go func() { - Log(http.ListenAndServe(hp, nil)) + err := http.ListenAndServe(hp, nil) + if err != nil { + log.Fatal("error starting monitor server: %s", err) + } }() } // StartHTTPMonitoring will enable the HTTP monitoring port. func (s *Server) StartHTTPMonitoring() { - Logf("Starting http monitor on port %d", s.opts.HTTPPort) + log.Log("Starting http monitor on port %d", s.opts.HTTPPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HTTPPort) l, err := net.Listen("tcp", hp) if err != nil { - Fatalf("Can't listen to the monitor port: %v", err) + log.Fatal("Can't listen to the monitor port: %v", err) } mux := http.NewServeMux() @@ -367,7 +366,7 @@ func (s *Server) StartHTTPMonitoring() { } func (s *Server) createClient(conn net.Conn) *client { - c := &client{srv: s, nc: conn, opts: defaultOpts} + c := &client{srv: s, nc: conn, trace: s.opts.Trace, opts: defaultOpts} // Grab lock c.mu.Lock() @@ -375,7 +374,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Initialize c.initClient() - Debug("Client connection created", clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] Client connection created: %s", c.cid, clientConnStr(c.nc)) // Send our information. s.sendInfo(c) diff --git a/test/log_test.go b/test/log_test.go deleted file mode 100644 index 1335deed..00000000 --- a/test/log_test.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. - -package test - -import ( - "io/ioutil" - "os" - "regexp" - "testing" -) - -var startRe = regexp.MustCompile(`\["Starting gnatsd version\s+([^\s]+)"\]\n`) - -func TestLogFile(t *testing.T) { - opts := DefaultTestOptions - opts.NoLog = false - opts.Logtime = false - - tmpDir, err := ioutil.TempDir("", "_gnatsd") - if err != nil { - t.Fatal("Could not create tmp dir") - } - defer os.RemoveAll(tmpDir) - - file, err := ioutil.TempFile(tmpDir, "gnatsd:log_") - file.Close() - opts.LogFile = file.Name() - - s := RunServer(&opts) - s.Shutdown() - - buf, err := ioutil.ReadFile(opts.LogFile) - if err != nil { - t.Fatalf("Could not read logfile: %v", err) - } - if len(buf) <= 0 { - t.Fatal("Expected a non-zero length logfile") - } - if !startRe.Match(buf) { - t.Fatalf("Logfile did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", buf, startRe) - } -}