diff --git a/README.md b/README.md index 185d759d..8272ed0b 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ Server options: Logging options: -l, --log FILE File to redirect log output -T, --logtime Timestamp log entries (default: true) + -s, --syslog Enable syslog as log method. + -r, --remote_syslog Syslog server addr (udp://localhost:514). -D, --debug Enable debugging output -V, --trace Trace the raw protocol diff --git a/gnatsd.go b/gnatsd.go index d7716b05..31f96fa4 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -6,13 +6,11 @@ import ( "flag" "strings" + "github.com/apcera/gnatsd/logger" "github.com/apcera/gnatsd/server" ) func main() { - // logging setup - server.LogSetup() - // Server Options opts := server.Options{} @@ -44,6 +42,10 @@ func main() { flag.StringVar(&opts.PidFile, "pid", "", "File to store process pid.") flag.StringVar(&opts.LogFile, "l", "", "File to store logging output.") flag.StringVar(&opts.LogFile, "log", "", "File to store logging output.") + flag.BoolVar(&opts.Syslog, "s", false, "Enable syslog as log method.") + flag.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method..") + flag.StringVar(&opts.RemoteSyslog, "r", "", "Syslog server addr (udp://localhost:514).") + flag.StringVar(&opts.RemoteSyslog, "remote_syslog", "", "Syslog server addr (udp://localhost:514).") flag.BoolVar(&showVersion, "version", false, "Print version information.") flag.BoolVar(&showVersion, "v", false, "Print version information.") flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port") @@ -92,6 +94,25 @@ func main() { // Create the server with appropriate options. s := server.New(&opts) + // Builds and set the logger based on the flags + s.SetLogger(buildLogger(&opts), opts.Debug, opts.Trace) + // Start things up. Block here until done. s.Start() } + +func buildLogger(opts *server.Options) server.Logger { + if opts.LogFile != "" { + return logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace) + } + + if opts.RemoteSyslog != "" { + return logger.NewRemoteSysLogger(opts.RemoteSyslog, opts.Debug, opts.Trace) + } + + if opts.Syslog { + return logger.NewSysLogger(opts.Debug, opts.Trace) + } + + return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, true) +} diff --git a/logger/log.go b/logger/log.go new file mode 100644 index 00000000..66d53313 --- /dev/null +++ b/logger/log.go @@ -0,0 +1,103 @@ +// Copyright 2012-2014 Apcera Inc. All rights reserved. +package logger + +import ( + "fmt" + "log" + "os" +) + +type Logger struct { + logger *log.Logger + debug bool + trace bool + infoLabel string + errorLabel string + fatalLabel string + debugLabel string + traceLabel string +} + +func NewStdLogger(time, debug, trace, colors bool) *Logger { + flags := 0 + if time { + flags = log.LstdFlags + } + + l := &Logger{ + logger: log.New(os.Stderr, "", flags), + debug: debug, + trace: trace, + } + + if colors { + setColoredLabelFormats(l) + } else { + setPlainLabelFormats(l) + } + + return l +} + +func NewFileLogger(filename string, time, debug, trace bool) *Logger { + fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE + f, err := os.OpenFile(filename, fileflags, 0660) + if err != nil { + log.Fatal("error opening file: %v", err) + } + + flags := 0 + if time { + flags = log.LstdFlags + } + + l := &Logger{ + logger: log.New(f, "", flags), + debug: debug, + trace: trace, + } + + setPlainLabelFormats(l) + return l +} + +func setPlainLabelFormats(l *Logger) { + l.infoLabel = "[INFO] " + l.debugLabel = "[DEBUG] " + l.errorLabel = "[ERROR] " + l.fatalLabel = "[FATAL] " + l.traceLabel = "[TRACE] " +} + +func setColoredLabelFormats(l *Logger) { + colorFormat := "[\x1b[%dm%s\x1b[0m] " + l.infoLabel = fmt.Sprintf(colorFormat, 32, "INFO") + l.debugLabel = fmt.Sprintf(colorFormat, 36, "DEBUG") + l.errorLabel = fmt.Sprintf(colorFormat, 31, "ERROR") + l.fatalLabel = fmt.Sprintf(colorFormat, 35, "FATAL") + l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRACE") +} + +func (l *Logger) Notice(format string, v ...interface{}) { + l.logger.Printf(l.infoLabel+format, v...) +} + +func (l *Logger) Error(format string, v ...interface{}) { + l.logger.Printf(l.errorLabel+format, v...) +} + +func (l *Logger) Fatal(format string, v ...interface{}) { + l.logger.Fatalf(l.fatalLabel+format, v) +} + +func (l *Logger) Debug(format string, v ...interface{}) { + if l.debug == true { + l.logger.Printf(l.debugLabel+format, v...) + } +} + +func (l *Logger) Trace(format string, v ...interface{}) { + if l.trace == true { + l.logger.Printf(l.traceLabel+format, v...) + } +} diff --git a/logger/log_test.go b/logger/log_test.go new file mode 100644 index 00000000..20904052 --- /dev/null +++ b/logger/log_test.go @@ -0,0 +1,135 @@ +package logger + +import ( + "bytes" + "io" + "io/ioutil" + "log" + "os" + "testing" +) + +func TestStdLogger(t *testing.T) { + logger := NewStdLogger(false, false, false, false) + + flags := logger.logger.Flags() + if flags != 0 { + t.Fatalf("Expected %q, received %q\n", 0, flags) + } + + if logger.debug { + t.Fatalf("Expected %t, received %t\n", false, logger.debug) + } + + if logger.trace { + t.Fatalf("Expected %t, received %t\n", false, logger.trace) + } +} + +func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { + logger := NewStdLogger(true, true, true, false) + + flags := logger.logger.Flags() + if flags != log.LstdFlags { + t.Fatalf("Expected %d, received %d\n", log.LstdFlags, flags) + } + + if !logger.debug { + t.Fatalf("Expected %t, received %t\n", true, logger.debug) + } + + if !logger.trace { + t.Fatalf("Expected %t, received %t\n", true, logger.trace) + } +} + +func TestStdLoggerNotice(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false, false) + logger.Notice("foo") + }, "[INFO] foo\n") +} + +func TestStdLoggerNoticeWithColor(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false, true) + logger.Notice("foo") + }, "[\x1b[32mINFO\x1b[0m] foo\n") +} + +func TestStdLoggerDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, true, false, false) + logger.Debug("foo %s", "bar") + }, "[DEBUG] foo bar\n") +} + +func TestStdLoggerDebugWithOutDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false, false) + logger.Debug("foo") + }, "") +} + +func TestStdLoggerTrace(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, true, false) + logger.Trace("foo") + }, "[TRACE] foo\n") +} + +func TestStdLoggerTraceWithOutDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false, false) + logger.Trace("foo") + }, "") +} + +func TestFileLogger(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "_gnatsd") + if err != nil { + t.Fatal("Could not create tmp dir") + } + defer os.RemoveAll(tmpDir) + + file, err := ioutil.TempFile(tmpDir, "gnatsd:log_") + file.Close() + + logger := NewFileLogger(file.Name(), false, false, false) + logger.Notice("foo") + + buf, err := ioutil.ReadFile(file.Name()) + if err != nil { + t.Fatalf("Could not read logfile: %v", err) + } + if len(buf) <= 0 { + t.Fatal("Expected a non-zero length logfile") + } + + if string(buf) != "[INFO] foo\n" { + t.Fatalf("Expected '%s', received '%s'\n", "[INFO] foo", string(buf)) + } +} + +func expectOutput(t *testing.T, f func(), expected string) { + old := os.Stderr // keep backup of the real stdout + r, w, _ := os.Pipe() + os.Stderr = w + + f() + + outC := make(chan string) + // copy the output in a separate goroutine so printing can't block indefinitely + go func() { + var buf bytes.Buffer + io.Copy(&buf, r) + outC <- buf.String() + }() + + os.Stderr.Close() + os.Stderr = old // restoring the real stdout + out := <-outC + if out != expected { + t.Fatalf("Expected '%s', received '%s'\n", expected, out) + } +} diff --git a/logger/syslog.go b/logger/syslog.go new file mode 100644 index 00000000..9f04ca01 --- /dev/null +++ b/logger/syslog.go @@ -0,0 +1,84 @@ +// Copyright 2012-2014 Apcera Inc. All rights reserved. +package logger + +import ( + "fmt" + "log" + "log/syslog" + "net/url" +) + +type SysLogger struct { + writer *syslog.Writer + debug bool + trace bool +} + +func NewSysLogger(debug, trace bool) *SysLogger { + w, err := syslog.New(syslog.LOG_DAEMON|syslog.LOG_NOTICE, "gnatsd") + if err != nil { + log.Fatalf("error connecting to syslog: %q", err.Error()) + } + + return &SysLogger{ + writer: w, + debug: debug, + trace: trace, + } +} + +func NewRemoteSysLogger(fqn string, debug, trace bool) *SysLogger { + network, addr := getNetworkAndAddr(fqn) + w, err := syslog.Dial(network, addr, syslog.LOG_DEBUG, "gnatsd") + if err != nil { + log.Fatalf("error connecting to syslog: %q", err.Error()) + } + + return &SysLogger{ + writer: w, + debug: debug, + trace: trace, + } +} + +func getNetworkAndAddr(fqn string) (network, addr string) { + u, err := url.Parse(fqn) + if err != nil { + log.Fatal(err) + } + + network = u.Scheme + if network == "udp" || network == "tcp" { + addr = u.Host + } else if network == "unix" { + addr = u.Path + } else { + log.Fatalf("error invalid network type: %q", u.Scheme) + } + + return +} + +func (l *SysLogger) Notice(format string, v ...interface{}) { + l.writer.Notice(fmt.Sprintf(format, v...)) +} + +func (l *SysLogger) Fatal(format string, v ...interface{}) { + l.writer.Crit(fmt.Sprintf(format, v...)) +} + +func (l *SysLogger) Error(format string, v ...interface{}) { + l.writer.Err(fmt.Sprintf(format, v...)) +} + +func (l *SysLogger) Debug(format string, v ...interface{}) { + if l.debug { + l.writer.Debug(fmt.Sprintf(format, v...)) + } +} + +func (l *SysLogger) Trace(format string, v ...interface{}) { + if l.trace { + l.writer.Notice(fmt.Sprintf(format, v...)) + } +} diff --git a/logger/syslog_test.go b/logger/syslog_test.go new file mode 100644 index 00000000..3835f473 --- /dev/null +++ b/logger/syslog_test.go @@ -0,0 +1,171 @@ +package logger + +import ( + "fmt" + "log" + "net" + "strings" + "testing" + "time" +) + +var serverFQN string + +func TestSysLogger(t *testing.T) { + logger := NewSysLogger(false, false) + + if logger.debug { + t.Fatalf("Expected %t, received %t\n", false, logger.debug) + } + + if logger.trace { + t.Fatalf("Expected %t, received %t\n", false, logger.trace) + } +} + +func TestSysLoggerWithDebugAndTrace(t *testing.T) { + logger := NewSysLogger(true, true) + + if !logger.debug { + t.Fatalf("Expected %t, received %t\n", true, logger.debug) + } + + if !logger.trace { + t.Fatalf("Expected %t, received %t\n", true, logger.trace) + } +} + +func TestRemoteSysLogger(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger(serverFQN, true, true) + + if !logger.debug { + t.Fatalf("Expected %t, received %t\n", true, logger.debug) + } + + if !logger.trace { + t.Fatalf("Expected %t, received %t\n", true, logger.trace) + } +} + +func TestRemoteSysLoggerNotice(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger(serverFQN, true, true) + + logger.Notice("foo %s", "bar") + expectSyslogOutput(t, <-done, "foo bar\n") +} + +func TestRemoteSysLoggerDebug(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger(serverFQN, true, true) + + logger.Debug("foo %s", "qux") + expectSyslogOutput(t, <-done, "foo qux\n") +} + +func TestRemoteSysLoggerDebugDisabled(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger(serverFQN, false, false) + + logger.Debug("foo %s", "qux") + rcvd := <-done + if rcvd != "" { + t.Fatalf("Unexpected syslog response %s\n", rcvd) + } +} + +func TestRemoteSysLoggerTrace(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger(serverFQN, true, true) + + logger.Trace("foo %s", "qux") + expectSyslogOutput(t, <-done, "foo qux\n") +} + +func TestRemoteSysLoggerTraceDisabled(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger(serverFQN, true, false) + + logger.Trace("foo %s", "qux") + rcvd := <-done + if rcvd != "" { + t.Fatalf("Unexpected syslog response %s\n", rcvd) + } +} + +func TestGetNetworkAndAddrUDP(t *testing.T) { + n, a := getNetworkAndAddr("udp://foo.com:1000") + + if n != "udp" { + t.Fatalf("Unexpected network %s\n", n) + } + + if a != "foo.com:1000" { + t.Fatalf("Unexpected addr %s\n", a) + } +} + +func TestGetNetworkAndAddrTCP(t *testing.T) { + n, a := getNetworkAndAddr("tcp://foo.com:1000") + + if n != "tcp" { + t.Fatalf("Unexpected network %s\n", n) + } + + if a != "foo.com:1000" { + t.Fatalf("Unexpected addr %s\n", a) + } +} + +func TestGetNetworkAndAddrUnix(t *testing.T) { + n, a := getNetworkAndAddr("unix:///foo.sock") + + if n != "unix" { + t.Fatalf("Unexpected network %s\n", n) + } + + if a != "/foo.sock" { + t.Fatalf("Unexpected addr %s\n", a) + } +} +func expectSyslogOutput(t *testing.T, line string, expected string) { + data := strings.Split(line, "]: ") + if len(data) != 2 { + t.Fatalf("Unexpected syslog line %s\n", line) + } + + if data[1] != expected { + t.Fatalf("Expected '%s', received '%s'\n", expected, data[1]) + } +} + +func runSyslog(c net.PacketConn, done chan<- string) { + var buf [4096]byte + var rcvd string = "" + for { + n, _, err := c.ReadFrom(buf[:]) + if err != nil || n == 0 { + break + } + rcvd += string(buf[:n]) + } + done <- rcvd +} + +func startServer(done chan<- string) { + c, e := net.ListenPacket("udp", "127.0.0.1:0") + if e != nil { + log.Fatalf("net.ListenPacket failed udp :0 %v", e) + } + + serverFQN = fmt.Sprintf("udp://%s", c.LocalAddr().String()) + c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + go runSyslog(c, done) +} diff --git a/server/client.go b/server/client.go index dfc79b66..95917064 100644 --- a/server/client.go +++ b/server/client.go @@ -53,12 +53,19 @@ type client struct { } func (c *client) String() (id string) { + conn := "-" + if ip, ok := c.nc.(*net.TCPConn); ok { + addr := ip.RemoteAddr().(*net.TCPAddr) + conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port) + } + switch c.typ { case CLIENT: - id = fmt.Sprintf("cid:%d", c.cid) + id = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: - id = fmt.Sprintf("rid:%d", c.cid) + id = fmt.Sprintf("%s - rid:%d", conn, c.cid) } + return id } @@ -87,14 +94,6 @@ func init() { rand.Seed(time.Now().UnixNano()) } -func clientConnStr(conn net.Conn) interface{} { - if ip, ok := conn.(*net.TCPConn); ok { - addr := ip.RemoteAddr().(*net.TCPAddr) - return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)} - } - return "N/A" -} - // Lock should be held func (c *client) initClient() { s := c.srv @@ -145,7 +144,7 @@ func (c *client) readLoop() { return } if err := c.parse(b[:n]); err != nil { - Log(err, clientConnStr(c.nc), c.cid) + Error("Error reading from client: %s", err.Error(), c) // Auth was handled inline if err != ErrAuthorization { c.sendErr("Parser Error") @@ -162,7 +161,7 @@ func (c *client) readLoop() { err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { - Debugf("Error flushing: %v", err) + Debug("Error flushing: %v", err) cp.mu.Unlock() cp.closeConnection() cp.mu.Lock() @@ -179,20 +178,25 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { + if trace == 0 { + return + } + pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])} - Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) + Trace("MSG: %s", opa, c) } func (c *client) traceOp(op string, arg []byte) { if trace == 0 { return } + opa := []interface{}{fmt.Sprintf("%s OP", op)} if arg != nil { opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) } - Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) + Trace("OP: %s", opa, c) } // Process the info message if we are a route. @@ -203,17 +207,18 @@ func (c *client) processRouteInfo(info *Info) { return } c.route.remoteID = info.ID + // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. s := c.srv c.mu.Unlock() if s.addRoute(c) { - Debug("Registering remote route", info.ID) + Debug("Registering remote route %q", info.ID, c) // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) } else { - Debug("Detected duplicate remote route", info.ID, clientConnStr(c.nc), c.cid) + Debug("Detected duplicate remote route %q", info.ID, c) c.closeConnection() } } @@ -231,7 +236,7 @@ func (c *client) processInfo(arg []byte) error { } func (c *client) processErr(errStr string) { - Log(errStr, clientConnStr(c.nc), c.cid) + Error("Client error %s", errStr, c) c.closeConnection() } @@ -301,7 +306,7 @@ func (c *client) processPing() { err := c.bw.Flush() if err != nil { c.clearConnection() - Debug("Error on Flush", err, clientConnStr(c.nc), c.cid) + Debug("Error on Flush, error %s", err.Error(), c) } c.mu.Unlock() } @@ -314,7 +319,7 @@ func (c *client) processPong() { } func (c *client) processMsgArgs(arg []byte) error { - if trace > 0 { + if trace == 0 { c.traceOp("MSG", arg) } @@ -361,7 +366,7 @@ func (c *client) processMsgArgs(arg []byte) error { } func (c *client) processPub(arg []byte) error { - if trace > 0 { + if trace == 0 { c.traceOp("PUB", arg) } @@ -478,8 +483,10 @@ func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() if sub.max > 0 && sub.nm < sub.max { - Debugf("Deferring actual UNSUB(%s): %d max, %d received\n", - string(sub.subject), sub.max, sub.nm) + Debug( + "Deferring actual UNSUB(%s): %d max, %d received\n", + string(sub.subject), sub.max, sub.nm, c, + ) return } c.traceOp("DELSUB", sub.sid) @@ -621,10 +628,10 @@ writeErr: client.mu.Unlock() if ne, ok := err.(net.Error); ok && ne.Timeout() { - Log("Slow Consumer Detected", clientConnStr(client.nc), client.cid) + Notice("Slow Consumer Detected", c) client.closeConnection() } else { - Debugf("Error writing msg: %v", err) + Debug("Error writing msg: %v", err, c) } } @@ -732,12 +739,11 @@ func (c *client) processMsg(msg []byte) { } if sub.client == nil || sub.client.nc == nil || sub.client.route == nil || sub.client.route.remoteID == "" { - Debug("Bad or Missing ROUTER Identity, not processing msg", - clientConnStr(c.nc), c.cid) + Debug("Bad or Missing ROUTER Identity, not processing msg", c) continue } if _, ok := rmap[sub.client.route.remoteID]; ok { - Debug("Ignoring route, already processed", c.cid) + Debug("Ignoring route, already processed", c) continue } rmap[sub.client.route.remoteID] = routeSeen @@ -766,12 +772,12 @@ func (c *client) processPingTimer() { return } - Debug("Client Ping Timer", clientConnStr(c.nc), c.cid) + Debug("Client Ping Timer", c) // Check for violation c.pout += 1 if c.pout > c.srv.opts.MaxPingsOut { - Debug("Stale Client Connection - Closing", clientConnStr(c.nc), c.cid) + Debug("Stale Client Connection - Closing", c) if c.bw != nil { c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")) c.bw.Flush() @@ -784,7 +790,7 @@ func (c *client) processPingTimer() { c.bw.WriteString("PING\r\n") err := c.bw.Flush() if err != nil { - Debug("Error on Client Ping Flush", err, clientConnStr(c.nc), c.cid) + Debug("Error on Client Ping Flush, error %s", err) c.clearConnection() } else { // Reset to fire again if all OK. @@ -856,9 +862,7 @@ func (c *client) closeConnection() { return } - // FIXME(dlc) - This creates garbage for no reason. - dbgString := fmt.Sprintf("%s connection closed", c.typeString()) - Debug(dbgString, clientConnStr(c.nc), c.cid) + Debug("%s connection closed", c.typeString(), c) c.clearAuthTimer() c.clearPingTimer() @@ -895,10 +899,10 @@ func (c *client) closeConnection() { defer srv.mu.Unlock() rid := c.route.remoteID if rid != "" && srv.remotes[rid] != nil { - Debug("Not attempting reconnect for solicited route, already connected.", rid) + Debug("Not attempting reconnect for solicited route, already connected. Try %d", rid, c) return } else { - Debug("Attempting reconnect for solicited route", c.cid) + Debug("Attempting reconnect for solicited route", c) go srv.reConnectToRoute(c.route.url) } } diff --git a/server/configs/test.conf b/server/configs/test.conf index 598af3d7..62e6819c 100644 --- a/server/configs/test.conf +++ b/server/configs/test.conf @@ -17,6 +17,8 @@ debug: false trace: true logtime: false log_file: "/tmp/gnatsd.log" +syslog: true +remote_syslog: "udp://foo.com:33" #pid file pid_file: "/tmp/gnatsd.pid" diff --git a/server/log.go b/server/log.go index 6f62746b..7bf758ef 100644 --- a/server/log.go +++ b/server/log.go @@ -1,119 +1,94 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. +// Copyright 2012-2014 Apcera Inc. All rights reserved. package server import ( "fmt" - "log" - "os" - "strings" + "sync" "sync/atomic" ) -// logging functionality, compatible with the original nats-server. - var trace int32 var debug int32 -var nolog int32 +var log = struct { + logger Logger + sync.Mutex +}{} -// LogSetup will properly setup logging and the logging flags. -func LogSetup() { - log.SetFlags(0) - atomic.StoreInt32(&nolog, 0) - atomic.StoreInt32(&debug, 0) - atomic.StoreInt32(&trace, 0) +type Logger interface { + Notice(format string, v ...interface{}) + Fatal(format string, v ...interface{}) + Error(format string, v ...interface{}) + Debug(format string, v ...interface{}) + Trace(format string, v ...interface{}) } -// LogInit parses option flags and sets up logging. -func (s *Server) LogInit() { - // Reset - LogSetup() - - if s.opts.Logtime { - log.SetFlags(log.LstdFlags) - } - if s.opts.NoLog { - atomic.StoreInt32(&nolog, 1) - } - if s.opts.LogFile != "" { - flags := os.O_WRONLY | os.O_APPEND | os.O_CREATE - file, err := os.OpenFile(s.opts.LogFile, flags, 0660) - if err != nil { - PrintAndDie(fmt.Sprintf("Error opening logfile: %q", s.opts.LogFile)) - } - log.SetOutput(file) - } - if s.opts.Debug { - Log(s.opts) +func (s *Server) SetLogger(logger Logger, d, t bool) { + if d { atomic.StoreInt32(&debug, 1) - Log("DEBUG is on") } - if s.opts.Trace { + + if t { atomic.StoreInt32(&trace, 1) - Log("TRACE is on") } + + log.Lock() + defer log.Unlock() + log.logger = logger } -func alreadyFormatted(s string) bool { - return strings.HasPrefix(s, "[") +func Notice(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Notice(format, v...) + }, format, v...) } -func logStr(v []interface{}) string { - args := make([]string, 0, len(v)) - for _, vt := range v { - switch t := vt.(type) { - case string: - if alreadyFormatted(t) { - args = append(args, t) - } else { - t = strings.Replace(t, "\"", "\\\"", -1) - args = append(args, fmt.Sprintf("\"%s\"", t)) - } - default: - args = append(args, fmt.Sprintf("%+v", vt)) +func Error(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Error(format, v...) + }, format, v...) +} + +func Fatal(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Fatal(format, v...) + }, format, v...) +} + +func Debug(format string, v ...interface{}) { + if debug == 0 { + return + } + + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Debug(format, v...) + }, format, v...) +} + +func Trace(format string, v ...interface{}) { + if trace == 0 { + return + } + + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Trace(format, v...) + }, format, v...) +} + +func executeLogCall(f func(logger Logger, format string, v ...interface{}), format string, args ...interface{}) { + log.Lock() + defer log.Unlock() + if log.logger == nil { + return + } + + argc := len(args) + if argc != 0 { + if client, ok := args[argc-1].(*client); ok { + args = args[:argc-1] + format = fmt.Sprintf("%s - %s", client, format) } } - return fmt.Sprintf("[%s]", strings.Join(args, ", ")) -} -func Log(v ...interface{}) { - if nolog == 0 { - log.Print(logStr(v)) - } -} - -func Logf(format string, v ...interface{}) { - Log(fmt.Sprintf(format, v...)) -} - -func Fatal(v ...interface{}) { - log.Fatalf(logStr(v)) -} - -func Fatalf(format string, v ...interface{}) { - Fatal(fmt.Sprintf(format, v...)) -} - -func Debug(v ...interface{}) { - if debug > 0 { - Log(v...) - } -} - -func Debugf(format string, v ...interface{}) { - if debug > 0 { - Debug(fmt.Sprintf(format, v...)) - } -} - -func Trace(v ...interface{}) { - if trace > 0 { - Log(v...) - } -} - -func Tracef(format string, v ...interface{}) { - if trace > 0 { - Trace(fmt.Sprintf(format, v...)) - } + f(log.logger, format, args...) } diff --git a/server/log_test.go b/server/log_test.go new file mode 100644 index 00000000..75d20291 --- /dev/null +++ b/server/log_test.go @@ -0,0 +1,31 @@ +// Copyright 2014 Apcera Inc. All rights reserved. + +package server + +import ( + "testing" +) + +func TestSetLogger(t *testing.T) { + server := &Server{} + server.SetLogger(&DummyLogger{}, true, true) + + // We assert that the logger has change to the DummyLogger + _ = log.logger.(*DummyLogger) + + if debug != 1 { + t.Fatalf("Expected debug 1, received value %d\n", debug) + } + + if trace != 1 { + t.Fatalf("Expected trace 1, received value %d\n", trace) + } +} + +type DummyLogger struct{} + +func (l *DummyLogger) Notice(format string, v ...interface{}) {} +func (l *DummyLogger) Error(format string, v ...interface{}) {} +func (l *DummyLogger) Fatal(format string, v ...interface{}) {} +func (l *DummyLogger) Debug(format string, v ...interface{}) {} +func (l *DummyLogger) Trace(format string, v ...interface{}) {} diff --git a/server/monitor.go b/server/monitor.go index 27ed78c4..225ae357 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -88,7 +88,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(c, "", " ") if err != nil { - Logf("Error marshalling response to /connz request: %v", err) + Error("Error marshalling response to /connz request: %v", err) } w.Write(b) } @@ -114,7 +114,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(st, "", " ") if err != nil { - Logf("Error marshalling response to /subscriptionsz request: %v", err) + Error("Error marshalling response to /subscriptionsz request: %v", err) } w.Write(b) } @@ -161,7 +161,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(v, "", " ") if err != nil { - Logf("Error marshalling response to /varz request: %v", err) + Error("Error marshalling response to /varz request: %v", err) } w.Write(b) } diff --git a/server/opts.go b/server/opts.go index bc21d874..626a2a26 100644 --- a/server/opts.go +++ b/server/opts.go @@ -43,6 +43,8 @@ type Options struct { ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` + Syslog bool `json:"-"` + RemoteSyslog string `json:"-"` } type authorization struct { @@ -97,6 +99,10 @@ func ProcessConfigFile(configFile string) (*Options, error) { } case "logfile", "log_file": opts.LogFile = v.(string) + case "syslog": + opts.Syslog = v.(bool) + case "remote_syslog": + opts.RemoteSyslog = v.(string) case "pidfile", "pid_file": opts.PidFile = v.(string) case "prof_port": @@ -219,7 +225,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) } if cport == port && isIpInList(selfIPs, getUrlIp(host)) { - Log("Self referencing IP found: ", r) + Notice("Self referencing IP found: ", r) continue } cleanRoutes = append(cleanRoutes, r) @@ -250,7 +256,7 @@ func getUrlIp(ipStr string) []net.IP { hostAddr, err := net.LookupHost(ipStr) if err != nil { - Log("Error looking up host with route hostname: ", err) + Error("Error looking up host with route hostname: ", err) return ipList } for _, addr := range hostAddr { @@ -267,7 +273,7 @@ func getInterfaceIPs() []net.IP { interfaceAddr, err := net.InterfaceAddrs() if err != nil { - Log("Error getting self referencing address: ", err) + Error("Error getting self referencing address: ", err) return localIPs } @@ -276,7 +282,7 @@ func getInterfaceIPs() []net.IP { if net.ParseIP(interfaceIP.String()) != nil { localIPs = append(localIPs, interfaceIP) } else { - Log("Error parsing self referencing address: ", err) + Error("Error parsing self referencing address: ", err) } } return localIPs diff --git a/server/opts_test.go b/server/opts_test.go index f02df7b2..05c70212 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -44,18 +44,20 @@ func TestOptions_RandomPort(t *testing.T) { func TestConfigFile(t *testing.T) { golden := &Options{ - Host: "apcera.me", - Port: 4242, - Username: "derek", - Password: "bella", - AuthTimeout: 1.0, - Debug: false, - Trace: true, - Logtime: false, - HTTPPort: 8222, - LogFile: "/tmp/gnatsd.log", - PidFile: "/tmp/gnatsd.pid", - ProfPort: 6543, + Host: "apcera.me", + Port: 4242, + Username: "derek", + Password: "bella", + AuthTimeout: 1.0, + Debug: false, + Trace: true, + Logtime: false, + HTTPPort: 8222, + LogFile: "/tmp/gnatsd.log", + PidFile: "/tmp/gnatsd.pid", + ProfPort: 6543, + Syslog: true, + RemoteSyslog: "udp://foo.com:33", } opts, err := ProcessConfigFile("./configs/test.conf") @@ -71,18 +73,20 @@ func TestConfigFile(t *testing.T) { func TestMergeOverrides(t *testing.T) { golden := &Options{ - Host: "apcera.me", - Port: 2222, - Username: "derek", - Password: "spooky", - AuthTimeout: 1.0, - Debug: true, - Trace: true, - Logtime: false, - HTTPPort: DEFAULT_HTTP_PORT, - LogFile: "/tmp/gnatsd.log", - PidFile: "/tmp/gnatsd.pid", - ProfPort: 6789, + Host: "apcera.me", + Port: 2222, + Username: "derek", + Password: "spooky", + AuthTimeout: 1.0, + Debug: true, + Trace: true, + Logtime: false, + HTTPPort: DEFAULT_HTTP_PORT, + LogFile: "/tmp/gnatsd.log", + PidFile: "/tmp/gnatsd.pid", + ProfPort: 6789, + Syslog: true, + RemoteSyslog: "udp://foo.com:33", } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { diff --git a/server/route.go b/server/route.go index 8c73ebae..f9be793c 100644 --- a/server/route.go +++ b/server/route.go @@ -46,7 +46,7 @@ func (c *client) sendConnect() { } b, err := json.Marshal(cinfo) if err != nil { - Logf("Error marshalling CONNECT to route: %v\n", err) + Error("Error marshalling CONNECT to route: %v\n", err) c.closeConnection() } c.bw.WriteString(fmt.Sprintf(conProto, b)) @@ -79,7 +79,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) { route.bw.Write(b.Bytes()) route.bw.Flush() - Debug("Route sent local subscriptions", route.cid) + Debug("Route sent local subscriptions", route) } func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { @@ -93,12 +93,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Initialize c.initClient() - Debug("Route connection created", clientConnStr(c.nc), c.cid) + Debug("Route connection created", c) // Queue Connect proto if we solicited the connection. if didSolicit { r.url = rURL - Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) + Debug("Route connect msg sent", c) c.sendConnect() } @@ -234,10 +234,10 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { func (s *Server) routeAcceptLoop(ch chan struct{}) { hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort) - Logf("Listening for route connections on %s", hp) + Notice("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - Fatalf("Error listening on router port: %d - %v", s.opts.Port, e) + Fatal("Error listening on router port: %d - %v", s.opts.Port, e) return } @@ -263,7 +263,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Logf("Accept error: %v", err) + Notice("Accept error: %v", err) } continue } @@ -294,7 +294,7 @@ func (s *Server) StartRouting() { // Generate the info json b, err := json.Marshal(info) if err != nil { - Fatalf("Error marshalling Route INFO JSON: %+v\n", err) + Fatal("Error marshalling Route INFO JSON: %+v\n", err) } s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) @@ -314,10 +314,10 @@ func (s *Server) reConnectToRoute(rUrl *url.URL) { func (s *Server) connectToRoute(rUrl *url.URL) { for s.isRunning() { - Debugf("Trying to connect to route on %s", rUrl.Host) + Debug("Trying to connect to route on %s", rUrl.Host) conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) if err != nil { - Debugf("Error trying to connect to route: %v", err) + Debug("Error trying to connect to route: %v", err) select { case <-s.rcQuit: return diff --git a/server/server.go b/server/server.go index 0245b971..c940f4ed 100644 --- a/server/server.go +++ b/server/server.go @@ -82,6 +82,7 @@ func New(opts *Options) *Server { if opts.Username != "" || opts.Authorization != "" { info.AuthRequired = true } + s := &Server{ info: info, sl: sublist.New(), @@ -95,9 +96,6 @@ func New(opts *Options) *Server { s.mu.Lock() defer s.mu.Unlock() - // Setup logging with flags - s.LogInit() - // For tracking clients s.clients = make(map[uint64]*client) @@ -108,20 +106,16 @@ func New(opts *Options) *Server { // Used to kick out all of the route // connect Go routines. s.rcQuit = make(chan bool) + s.handleSignals() // Generate the info json b, err := json.Marshal(s.info) if err != nil { - Fatalf("Error marshalling INFO JSON: %+v\n", err) + Fatal("Error marshalling INFO JSON: %+v\n", err) } + s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) - s.handleSignals() - - Logf("Starting gnatsd version %s", VERSION) - - s.running = true - return s } @@ -146,9 +140,9 @@ func (s *Server) handleSignals() { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - Debugf("Trapped Signal; %v", sig) + Debug("Trapped Signal; %v", sig) // FIXME, trip running? - Log("Server Exiting..") + Notice("Server Exiting..") os.Exit(0) } }() @@ -172,6 +166,8 @@ func (s *Server) logPid() { // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { + Notice("Starting gnatsd version %s", VERSION) + s.running = true // Log the pid to a file if s.opts.PidFile != _EMPTY_ { @@ -265,14 +261,14 @@ func (s *Server) Shutdown() { // AcceptLoop is exported for easier testing. func (s *Server) AcceptLoop() { hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) - Logf("Listening for client connections on %s", hp) + Notice("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - Fatalf("Error listening on port: %d - %v", s.opts.Port, e) + Fatal("Error listening on port: %d - %v", s.opts.Port, e) return } - Logf("gnatsd is ready") + Notice("gnatsd is ready") // Setup state that can enable shutdown s.mu.Lock() @@ -282,12 +278,12 @@ func (s *Server) AcceptLoop() { // Write resolved port back to options. _, port, err := net.SplitHostPort(l.Addr().String()) if err != nil { - Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) + Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } portNum, err := strconv.Atoi(port) if err != nil { - Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) + Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } s.opts.Port = portNum @@ -306,36 +302,39 @@ func (s *Server) AcceptLoop() { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Logf("Accept error: %v", err) + Notice("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createClient(conn) } - Log("Server Exiting..") + Notice("Server Exiting..") s.done <- true } // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { - Logf("Starting profiling on http port %d", s.opts.ProfPort) + Notice("Starting profiling on http port %d", s.opts.ProfPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.ProfPort) go func() { - Log(http.ListenAndServe(hp, nil)) + err := http.ListenAndServe(hp, nil) + if err != nil { + Fatal("error starting monitor server: %s", err) + } }() } // StartHTTPMonitoring will enable the HTTP monitoring port. func (s *Server) StartHTTPMonitoring() { - Logf("Starting http monitor on port %d", s.opts.HTTPPort) + Notice("Starting http monitor on port %d", s.opts.HTTPPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HTTPPort) l, err := net.Listen("tcp", hp) if err != nil { - Fatalf("Can't listen to the monitor port: %v", err) + Fatal("Can't listen to the monitor port: %v", err) } mux := http.NewServeMux() @@ -375,7 +374,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Initialize c.initClient() - Debug("Client connection created", clientConnStr(c.nc), c.cid) + Debug("Client connection created", c) // Send our information. s.sendInfo(c) diff --git a/server/usage.go b/server/usage.go index c3ed44a7..250f689f 100644 --- a/server/usage.go +++ b/server/usage.go @@ -18,6 +18,8 @@ Server options: Logging options: -l, --log FILE File to redirect log output -T, --logtime Timestamp log entries (default: true) + -s, --syslog Enable syslog as log method. + -r, --remote_syslog Syslog server addr (udp://localhost:514). -D, --debug Enable debugging output -V, --trace Trace the raw protocol diff --git a/test/log_test.go b/test/log_test.go deleted file mode 100644 index 1335deed..00000000 --- a/test/log_test.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. - -package test - -import ( - "io/ioutil" - "os" - "regexp" - "testing" -) - -var startRe = regexp.MustCompile(`\["Starting gnatsd version\s+([^\s]+)"\]\n`) - -func TestLogFile(t *testing.T) { - opts := DefaultTestOptions - opts.NoLog = false - opts.Logtime = false - - tmpDir, err := ioutil.TempDir("", "_gnatsd") - if err != nil { - t.Fatal("Could not create tmp dir") - } - defer os.RemoveAll(tmpDir) - - file, err := ioutil.TempFile(tmpDir, "gnatsd:log_") - file.Close() - opts.LogFile = file.Name() - - s := RunServer(&opts) - s.Shutdown() - - buf, err := ioutil.ReadFile(opts.LogFile) - if err != nil { - t.Fatalf("Could not read logfile: %v", err) - } - if len(buf) <= 0 { - t.Fatal("Expected a non-zero length logfile") - } - if !startRe.Match(buf) { - t.Fatalf("Logfile did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", buf, startRe) - } -}