From 96d044dce40d5ff00fc0641cfde897d882a574d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Cuadros=20Ortiz?= Date: Tue, 7 Oct 2014 02:17:39 +0200 Subject: [PATCH 1/4] login system abstraction --- gnatsd.go | 21 ++++++-- logger/log.go | 65 +++++++++++++++++++++++ logger/log_test.go | 128 +++++++++++++++++++++++++++++++++++++++++++++ logger/syslog.go | 60 +++++++++++++++++++++ server/client.go | 95 +++++++++++++++++++-------------- server/log.go | 124 +++++-------------------------------------- server/log_test.go | 25 +++++++++ server/monitor.go | 6 +-- server/opts.go | 9 ++-- server/route.go | 24 ++++----- server/server.go | 51 +++++++++--------- test/log_test.go | 42 --------------- 12 files changed, 410 insertions(+), 240 deletions(-) create mode 100644 logger/log.go create mode 100644 logger/log_test.go create mode 100644 logger/syslog.go create mode 100644 server/log_test.go delete mode 100644 test/log_test.go diff --git a/gnatsd.go b/gnatsd.go index d7716b05..ea7be517 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -6,13 +6,11 @@ import ( "flag" "strings" + "github.com/apcera/gnatsd/logger" "github.com/apcera/gnatsd/server" ) func main() { - // logging setup - server.LogSetup() - // Server Options opts := server.Options{} @@ -44,6 +42,7 @@ func main() { flag.StringVar(&opts.PidFile, "pid", "", "File to store process pid.") flag.StringVar(&opts.LogFile, "l", "", "File to store logging output.") flag.StringVar(&opts.LogFile, "log", "", "File to store logging output.") + flag.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method.") flag.BoolVar(&showVersion, "version", false, "Print version information.") flag.BoolVar(&showVersion, "v", false, "Print version information.") flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port") @@ -92,6 +91,22 @@ func main() { // Create the server with appropriate options. s := server.New(&opts) + // Builds and set the logger based on the flags + s.SetLogger(buildLogger(&opts)) + // Start things up. Block here until done. s.Start() } + +func buildLogger(opts *server.Options) server.Logger { + if opts.Syslog { + return logger.NewSysLogger(opts.Debug, opts.Trace) + } + + if opts.LogFile != "" { + return logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace) + } + + return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace) + +} diff --git a/logger/log.go b/logger/log.go new file mode 100644 index 00000000..7d27a20d --- /dev/null +++ b/logger/log.go @@ -0,0 +1,65 @@ +// Copyright 2012-2014 Apcera Inc. All rights reserved. +package logger + +import ( + "log" + "os" +) + +type Logger struct { + logger *log.Logger + debug bool + trace bool +} + +func NewStdLogger(time, debug, trace bool) *Logger { + flags := 0 + if time { + flags = log.LstdFlags + } + + return &Logger{ + logger: log.New(os.Stderr, "", flags), + debug: debug, + trace: trace, + } +} + +func NewFileLogger(filename string, time, debug, trace bool) *Logger { + fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE + f, err := os.OpenFile(filename, fileflags, 0660) + if err != nil { + log.Fatal("error opening file: %v", err) + } + + flags := 0 + if time { + flags = log.LstdFlags + } + + return &Logger{ + logger: log.New(f, "", flags), + debug: debug, + trace: trace, + } +} + +func (l *Logger) Log(format string, v ...interface{}) { + l.logger.Printf(format, v...) +} + +func (l *Logger) Fatal(format string, v ...interface{}) { + l.logger.Fatalf(format, v) +} + +func (l *Logger) Debug(format string, v ...interface{}) { + if l.debug == true { + l.Log(format, v...) + } +} + +func (l *Logger) Trace(format string, v ...interface{}) { + if l.trace == true { + l.Log(format, v...) + } +} diff --git a/logger/log_test.go b/logger/log_test.go new file mode 100644 index 00000000..bdd5629d --- /dev/null +++ b/logger/log_test.go @@ -0,0 +1,128 @@ +package logger + +import ( + "bytes" + "io" + "io/ioutil" + "log" + "os" + "testing" +) + +func TestStdLogger(t *testing.T) { + logger := NewStdLogger(false, false, false) + + flags := logger.logger.Flags() + if flags != 0 { + t.Fatalf("Expected %q, received %q\n", 0, flags) + } + + if logger.debug != false { + t.Fatalf("Expected %b, received %b\n", false, logger.debug) + } + + if logger.trace != false { + t.Fatalf("Expected %b, received %b\n", false, logger.trace) + } +} + +func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { + logger := NewStdLogger(true, true, true) + + flags := logger.logger.Flags() + if flags != log.LstdFlags { + t.Fatalf("Expected %d, received %d\n", log.LstdFlags, flags) + } + + if logger.debug != true { + t.Fatalf("Expected %b, received %b\n", true, logger.debug) + } + + if logger.trace != true { + t.Fatalf("Expected %b, received %b\n", true, logger.trace) + } +} + +func TestStdLoggerLog(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false) + logger.Log("foo") + }, "foo\n") +} + +func TestStdLoggerDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, true, false) + logger.Debug("foo %s", "bar") + }, "foo bar\n") +} + +func TestStdLoggerDebugWithOutDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false) + logger.Debug("foo") + }, "") +} + +func TestStdLoggerTrace(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, true) + logger.Trace("foo") + }, "foo\n") +} + +func TestStdLoggerTraceWithOutDebug(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false) + logger.Trace("foo") + }, "") +} + +func TestFileLogger(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "_gnatsd") + if err != nil { + t.Fatal("Could not create tmp dir") + } + defer os.RemoveAll(tmpDir) + + file, err := ioutil.TempFile(tmpDir, "gnatsd:log_") + file.Close() + + logger := NewFileLogger(file.Name(), false, false, false) + logger.Log("foo") + + buf, err := ioutil.ReadFile(file.Name()) + if err != nil { + t.Fatalf("Could not read logfile: %v", err) + } + if len(buf) <= 0 { + t.Fatal("Expected a non-zero length logfile") + } + + if string(buf) != "foo\n" { + t.Fatalf("Expected '%s', received '%s'\n", "foo", string(buf)) + } +} + +func expectOutput(t *testing.T, f func(), expect string) { + old := os.Stdout // keep backup of the real stdout + r, w, _ := os.Pipe() + os.Stderr = w + + f() + + outC := make(chan string) + // copy the output in a separate goroutine so printing can't block indefinitely + go func() { + var buf bytes.Buffer + io.Copy(&buf, r) + outC <- buf.String() + }() + + os.Stderr.Close() + os.Stderr = old // restoring the real stdout + out := <-outC + if out != expect { + t.Fatalf("Expected '%s', received '%s'\n", expect, out) + } +} diff --git a/logger/syslog.go b/logger/syslog.go new file mode 100644 index 00000000..f08d4a8e --- /dev/null +++ b/logger/syslog.go @@ -0,0 +1,60 @@ +// Copyright 2012-2014 Apcera Inc. All rights reserved. +package logger + +import ( + "fmt" + "log" + "log/syslog" +) + +type SysLogger struct { + writer *syslog.Writer + debug bool + trace bool +} + +func NewSysLogger(debug, trace bool) *SysLogger { + w, err := syslog.New(syslog.LOG_DAEMON|syslog.LOG_NOTICE, "gnatsd") + if err != nil { + log.Fatal("error connecting to syslog: %v", err) + } + + return &SysLogger{ + writer: w, + debug: debug, + trace: trace, + } +} + +func NewRemoteSysLogger(network, raddr string, debug, trace bool) *SysLogger { + w, err := syslog.Dial(network, raddr, syslog.LOG_DEBUG, "gnatsd") + if err != nil { + log.Fatal("error connecting to syslog: %v", err) + } + + return &SysLogger{ + writer: w, + debug: debug, + trace: trace, + } +} + +func (l *SysLogger) Log(format string, v ...interface{}) { + l.writer.Notice(fmt.Sprintf(format, v...)) +} + +func (l *SysLogger) Fatal(format string, v ...interface{}) { + l.writer.Crit(fmt.Sprintf(format, v...)) +} + +func (l *SysLogger) Debug(format string, v ...interface{}) { + if l.debug == true { + l.writer.Debug(fmt.Sprintf(format, v...)) + } +} + +func (l *SysLogger) Trace(format string, v ...interface{}) { + if l.trace == true { + l.writer.Info(fmt.Sprintf(format, v...)) + } +} diff --git a/server/client.go b/server/client.go index dfc79b66..6c37c3c9 100644 --- a/server/client.go +++ b/server/client.go @@ -33,19 +33,20 @@ const ( ) type client struct { - mu sync.Mutex - typ int - cid uint64 - opts clientOpts - nc net.Conn - bw *bufio.Writer - srv *Server - subs *hashmap.HashMap - pcd map[*client]struct{} - atmr *time.Timer - ptmr *time.Timer - pout int - msgb [msgScratchSize]byte + mu sync.Mutex + typ int + cid uint64 + opts clientOpts + nc net.Conn + bw *bufio.Writer + srv *Server + subs *hashmap.HashMap + pcd map[*client]struct{} + atmr *time.Timer + ptmr *time.Timer + trace bool + pout int + msgb [msgScratchSize]byte parseState stats @@ -145,7 +146,7 @@ func (c *client) readLoop() { return } if err := c.parse(b[:n]); err != nil { - Log(err, clientConnStr(c.nc), c.cid) + log.Log(err.Error(), clientConnStr(c.nc), c.cid) // Auth was handled inline if err != ErrAuthorization { c.sendErr("Parser Error") @@ -162,7 +163,7 @@ func (c *client) readLoop() { err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { - Debugf("Error flushing: %v", err) + log.Debug("Error flushing: %v", err) cp.mu.Unlock() cp.closeConnection() cp.mu.Lock() @@ -179,20 +180,25 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { + if c.trace != true { + return + } + pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])} - Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) + log.Trace("[cid: %d] MSG: %s", c.cid, opa) } func (c *client) traceOp(op string, arg []byte) { - if trace == 0 { + if c.trace != true { return } + opa := []interface{}{fmt.Sprintf("%s OP", op)} if arg != nil { opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) } - Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) + log.Trace("[cid: %d] OP: %s", c.cid, opa) } // Process the info message if we are a route. @@ -203,17 +209,21 @@ func (c *client) processRouteInfo(info *Info) { return } c.route.remoteID = info.ID + // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. s := c.srv c.mu.Unlock() if s.addRoute(c) { - Debug("Registering remote route", info.ID) + log.Debug("[cid: %d] Registering remote route '%s'", c.cid, info.ID) // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) } else { - Debug("Detected duplicate remote route", info.ID, clientConnStr(c.nc), c.cid) + log.Debug( + "[cid: %d] Detected duplicate remote route '%s', %s", + c.cid, info.ID, clientConnStr(c.nc), + ) c.closeConnection() } } @@ -231,7 +241,7 @@ func (c *client) processInfo(arg []byte) error { } func (c *client) processErr(errStr string) { - Log(errStr, clientConnStr(c.nc), c.cid) + log.Log(errStr, clientConnStr(c.nc), c.cid) c.closeConnection() } @@ -301,7 +311,10 @@ func (c *client) processPing() { err := c.bw.Flush() if err != nil { c.clearConnection() - Debug("Error on Flush", err, clientConnStr(c.nc), c.cid) + log.Debug( + "[cid: %d] Error on Flush, error %s, %s", + c.cid, err.Error(), clientConnStr(c.nc), + ) } c.mu.Unlock() } @@ -314,7 +327,7 @@ func (c *client) processPong() { } func (c *client) processMsgArgs(arg []byte) error { - if trace > 0 { + if c.trace == true { c.traceOp("MSG", arg) } @@ -361,7 +374,7 @@ func (c *client) processMsgArgs(arg []byte) error { } func (c *client) processPub(arg []byte) error { - if trace > 0 { + if c.trace == true { c.traceOp("PUB", arg) } @@ -478,8 +491,10 @@ func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() if sub.max > 0 && sub.nm < sub.max { - Debugf("Deferring actual UNSUB(%s): %d max, %d received\n", - string(sub.subject), sub.max, sub.nm) + log.Debug( + "[cid: %d] Deferring actual UNSUB(%s): %d max, %d received\n", + c.cid, string(sub.subject), sub.max, sub.nm, + ) return } c.traceOp("DELSUB", sub.sid) @@ -621,10 +636,10 @@ writeErr: client.mu.Unlock() if ne, ok := err.(net.Error); ok && ne.Timeout() { - Log("Slow Consumer Detected", clientConnStr(client.nc), client.cid) + log.Log("[cid: %d] Slow Consumer Detected %s", client.cid, clientConnStr(client.nc)) client.closeConnection() } else { - Debugf("Error writing msg: %v", err) + log.Debug("[cid: %d] Error writing msg: %v", client.cid, err) } } @@ -647,7 +662,7 @@ func (c *client) processMsg(msg []byte) { atomic.AddInt64(&srv.inBytes, msgSize) } - if trace > 0 { + if c.trace == true { c.traceMsg(msg) } if srv == nil { @@ -732,12 +747,14 @@ func (c *client) processMsg(msg []byte) { } if sub.client == nil || sub.client.nc == nil || sub.client.route == nil || sub.client.route.remoteID == "" { - Debug("Bad or Missing ROUTER Identity, not processing msg", - clientConnStr(c.nc), c.cid) + log.Debug( + "[cid: %d] Bad or Missing ROUTER Identity, not processing msg, %s", + c.cid, clientConnStr(c.nc), + ) continue } if _, ok := rmap[sub.client.route.remoteID]; ok { - Debug("Ignoring route, already processed", c.cid) + log.Debug("[cid: %d] Ignoring route, already processed", c.cid) continue } rmap[sub.client.route.remoteID] = routeSeen @@ -766,12 +783,12 @@ func (c *client) processPingTimer() { return } - Debug("Client Ping Timer", clientConnStr(c.nc), c.cid) + log.Debug("Client Ping Timer", clientConnStr(c.nc), c.cid) // Check for violation c.pout += 1 if c.pout > c.srv.opts.MaxPingsOut { - Debug("Stale Client Connection - Closing", clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] Stale Client Connection - Closing %s", c.cid, clientConnStr(c.nc)) if c.bw != nil { c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")) c.bw.Flush() @@ -784,7 +801,7 @@ func (c *client) processPingTimer() { c.bw.WriteString("PING\r\n") err := c.bw.Flush() if err != nil { - Debug("Error on Client Ping Flush", err, clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] Error on Client Ping Flush, error %s %s", c.cid, err, clientConnStr(c.nc)) c.clearConnection() } else { // Reset to fire again if all OK. @@ -856,9 +873,7 @@ func (c *client) closeConnection() { return } - // FIXME(dlc) - This creates garbage for no reason. - dbgString := fmt.Sprintf("%s connection closed", c.typeString()) - Debug(dbgString, clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] %s connection closed: %s", c.cid, c.typeString(), clientConnStr(c.nc)) c.clearAuthTimer() c.clearPingTimer() @@ -895,10 +910,10 @@ func (c *client) closeConnection() { defer srv.mu.Unlock() rid := c.route.remoteID if rid != "" && srv.remotes[rid] != nil { - Debug("Not attempting reconnect for solicited route, already connected.", rid) + log.Debug("[cid: %d] Not attempting reconnect for solicited route, already connected. Try %d", c.cid, rid) return } else { - Debug("Attempting reconnect for solicited route", c.cid) + log.Debug("[cid: %d] Attempting reconnect for solicited route", c.cid) go srv.reConnectToRoute(c.route.url) } } diff --git a/server/log.go b/server/log.go index 6f62746b..2597056b 100644 --- a/server/log.go +++ b/server/log.go @@ -1,119 +1,23 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. +// Copyright 2012-2014 Apcera Inc. All rights reserved. package server -import ( - "fmt" - "log" - "os" - "strings" - "sync/atomic" -) +var log Logger = &NilLogger{} -// logging functionality, compatible with the original nats-server. - -var trace int32 -var debug int32 -var nolog int32 - -// LogSetup will properly setup logging and the logging flags. -func LogSetup() { - log.SetFlags(0) - atomic.StoreInt32(&nolog, 0) - atomic.StoreInt32(&debug, 0) - atomic.StoreInt32(&trace, 0) +type Logger interface { + Log(format string, v ...interface{}) + Fatal(format string, v ...interface{}) + Debug(format string, v ...interface{}) + Trace(format string, v ...interface{}) } -// LogInit parses option flags and sets up logging. -func (s *Server) LogInit() { - // Reset - LogSetup() - - if s.opts.Logtime { - log.SetFlags(log.LstdFlags) - } - if s.opts.NoLog { - atomic.StoreInt32(&nolog, 1) - } - if s.opts.LogFile != "" { - flags := os.O_WRONLY | os.O_APPEND | os.O_CREATE - file, err := os.OpenFile(s.opts.LogFile, flags, 0660) - if err != nil { - PrintAndDie(fmt.Sprintf("Error opening logfile: %q", s.opts.LogFile)) - } - log.SetOutput(file) - } - if s.opts.Debug { - Log(s.opts) - atomic.StoreInt32(&debug, 1) - Log("DEBUG is on") - } - if s.opts.Trace { - atomic.StoreInt32(&trace, 1) - Log("TRACE is on") - } +func (s *Server) SetLogger(logger Logger) { + log = logger } -func alreadyFormatted(s string) bool { - return strings.HasPrefix(s, "[") -} +type NilLogger struct{} -func logStr(v []interface{}) string { - args := make([]string, 0, len(v)) - for _, vt := range v { - switch t := vt.(type) { - case string: - if alreadyFormatted(t) { - args = append(args, t) - } else { - t = strings.Replace(t, "\"", "\\\"", -1) - args = append(args, fmt.Sprintf("\"%s\"", t)) - } - default: - args = append(args, fmt.Sprintf("%+v", vt)) - } - } - return fmt.Sprintf("[%s]", strings.Join(args, ", ")) -} - -func Log(v ...interface{}) { - if nolog == 0 { - log.Print(logStr(v)) - } -} - -func Logf(format string, v ...interface{}) { - Log(fmt.Sprintf(format, v...)) -} - -func Fatal(v ...interface{}) { - log.Fatalf(logStr(v)) -} - -func Fatalf(format string, v ...interface{}) { - Fatal(fmt.Sprintf(format, v...)) -} - -func Debug(v ...interface{}) { - if debug > 0 { - Log(v...) - } -} - -func Debugf(format string, v ...interface{}) { - if debug > 0 { - Debug(fmt.Sprintf(format, v...)) - } -} - -func Trace(v ...interface{}) { - if trace > 0 { - Log(v...) - } -} - -func Tracef(format string, v ...interface{}) { - if trace > 0 { - Trace(fmt.Sprintf(format, v...)) - } -} +func (l *NilLogger) Log(format string, v ...interface{}) {} +func (l *NilLogger) Fatal(format string, v ...interface{}) {} +func (l *NilLogger) Debug(format string, v ...interface{}) {} +func (l *NilLogger) Trace(format string, v ...interface{}) {} diff --git a/server/log_test.go b/server/log_test.go new file mode 100644 index 00000000..273ad658 --- /dev/null +++ b/server/log_test.go @@ -0,0 +1,25 @@ +// Copyright 2014 Apcera Inc. All rights reserved. + +package server + +import ( + "testing" +) + +func TestSetLogger(t *testing.T) { + // We assert that the default logger is the NilLogger + _ = log.(*NilLogger) + + server := &Server{} + server.SetLogger(&DummyLogger{}) + + // We assert that the logger has change to the DummyLogger + _ = log.(*DummyLogger) +} + +type DummyLogger struct{} + +func (l *DummyLogger) Log(format string, v ...interface{}) {} +func (l *DummyLogger) Fatal(format string, v ...interface{}) {} +func (l *DummyLogger) Debug(format string, v ...interface{}) {} +func (l *DummyLogger) Trace(format string, v ...interface{}) {} diff --git a/server/monitor.go b/server/monitor.go index 27ed78c4..4da019bd 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -88,7 +88,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(c, "", " ") if err != nil { - Logf("Error marshalling response to /connz request: %v", err) + log.Log("Error marshalling response to /connz request: %v", err) } w.Write(b) } @@ -114,7 +114,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(st, "", " ") if err != nil { - Logf("Error marshalling response to /subscriptionsz request: %v", err) + log.Log("Error marshalling response to /subscriptionsz request: %v", err) } w.Write(b) } @@ -161,7 +161,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(v, "", " ") if err != nil { - Logf("Error marshalling response to /varz request: %v", err) + log.Log("Error marshalling response to /varz request: %v", err) } w.Write(b) } diff --git a/server/opts.go b/server/opts.go index bc21d874..6383e404 100644 --- a/server/opts.go +++ b/server/opts.go @@ -43,6 +43,7 @@ type Options struct { ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` + Syslog bool `json:"-"` } type authorization struct { @@ -219,7 +220,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) } if cport == port && isIpInList(selfIPs, getUrlIp(host)) { - Log("Self referencing IP found: ", r) + log.Log("Self referencing IP found: ", r) continue } cleanRoutes = append(cleanRoutes, r) @@ -250,7 +251,7 @@ func getUrlIp(ipStr string) []net.IP { hostAddr, err := net.LookupHost(ipStr) if err != nil { - Log("Error looking up host with route hostname: ", err) + log.Log("Error looking up host with route hostname: ", err) return ipList } for _, addr := range hostAddr { @@ -267,7 +268,7 @@ func getInterfaceIPs() []net.IP { interfaceAddr, err := net.InterfaceAddrs() if err != nil { - Log("Error getting self referencing address: ", err) + log.Log("Error getting self referencing address: ", err) return localIPs } @@ -276,7 +277,7 @@ func getInterfaceIPs() []net.IP { if net.ParseIP(interfaceIP.String()) != nil { localIPs = append(localIPs, interfaceIP) } else { - Log("Error parsing self referencing address: ", err) + log.Log("Error parsing self referencing address: ", err) } } return localIPs diff --git a/server/route.go b/server/route.go index 8c73ebae..19e84eeb 100644 --- a/server/route.go +++ b/server/route.go @@ -46,7 +46,7 @@ func (c *client) sendConnect() { } b, err := json.Marshal(cinfo) if err != nil { - Logf("Error marshalling CONNECT to route: %v\n", err) + log.Log("Error marshalling CONNECT to route: %v\n", err) c.closeConnection() } c.bw.WriteString(fmt.Sprintf(conProto, b)) @@ -79,7 +79,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) { route.bw.Write(b.Bytes()) route.bw.Flush() - Debug("Route sent local subscriptions", route.cid) + log.Debug("Route sent local subscriptions", route.cid) } func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { @@ -93,12 +93,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Initialize c.initClient() - Debug("Route connection created", clientConnStr(c.nc), c.cid) + log.Debug("Route connection created", clientConnStr(c.nc), c.cid) // Queue Connect proto if we solicited the connection. if didSolicit { r.url = rURL - Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) + log.Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) c.sendConnect() } @@ -234,10 +234,10 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { func (s *Server) routeAcceptLoop(ch chan struct{}) { hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort) - Logf("Listening for route connections on %s", hp) + log.Log("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - Fatalf("Error listening on router port: %d - %v", s.opts.Port, e) + log.Fatal("Error listening on router port: %d - %v", s.opts.Port, e) return } @@ -255,7 +255,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - Debug("Temporary Route Accept Error(%v), sleeping %dms", + log.Debug("Temporary Route Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -263,14 +263,14 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Logf("Accept error: %v", err) + log.Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createRoute(conn, nil) } - Debug("Router accept loop exiting..") + log.Debug("Router accept loop exiting..") s.done <- true } @@ -294,7 +294,7 @@ func (s *Server) StartRouting() { // Generate the info json b, err := json.Marshal(info) if err != nil { - Fatalf("Error marshalling Route INFO JSON: %+v\n", err) + log.Fatal("Error marshalling Route INFO JSON: %+v\n", err) } s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) @@ -314,10 +314,10 @@ func (s *Server) reConnectToRoute(rUrl *url.URL) { func (s *Server) connectToRoute(rUrl *url.URL) { for s.isRunning() { - Debugf("Trying to connect to route on %s", rUrl.Host) + log.Debug("Trying to connect to route on %s", rUrl.Host) conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) if err != nil { - Debugf("Error trying to connect to route: %v", err) + log.Debug("Error trying to connect to route: %v", err) select { case <-s.rcQuit: return diff --git a/server/server.go b/server/server.go index 0245b971..ffef2aa4 100644 --- a/server/server.go +++ b/server/server.go @@ -82,6 +82,7 @@ func New(opts *Options) *Server { if opts.Username != "" || opts.Authorization != "" { info.AuthRequired = true } + s := &Server{ info: info, sl: sublist.New(), @@ -95,9 +96,6 @@ func New(opts *Options) *Server { s.mu.Lock() defer s.mu.Unlock() - // Setup logging with flags - s.LogInit() - // For tracking clients s.clients = make(map[uint64]*client) @@ -108,20 +106,16 @@ func New(opts *Options) *Server { // Used to kick out all of the route // connect Go routines. s.rcQuit = make(chan bool) + s.handleSignals() // Generate the info json b, err := json.Marshal(s.info) if err != nil { - Fatalf("Error marshalling INFO JSON: %+v\n", err) + log.Fatal("Error marshalling INFO JSON: %+v\n", err) } + s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) - s.handleSignals() - - Logf("Starting gnatsd version %s", VERSION) - - s.running = true - return s } @@ -146,9 +140,9 @@ func (s *Server) handleSignals() { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - Debugf("Trapped Signal; %v", sig) + log.Debug("Trapped Signal; %v", sig) // FIXME, trip running? - Log("Server Exiting..") + log.Log("Server Exiting..") os.Exit(0) } }() @@ -172,6 +166,8 @@ func (s *Server) logPid() { // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { + log.Log("Starting gnatsd version %s", VERSION) + s.running = true // Log the pid to a file if s.opts.PidFile != _EMPTY_ { @@ -265,14 +261,14 @@ func (s *Server) Shutdown() { // AcceptLoop is exported for easier testing. func (s *Server) AcceptLoop() { hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) - Logf("Listening for client connections on %s", hp) + log.Log("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - Fatalf("Error listening on port: %d - %v", s.opts.Port, e) + log.Fatal("Error listening on port: %d - %v", s.opts.Port, e) return } - Logf("gnatsd is ready") + log.Log("gnatsd is ready") // Setup state that can enable shutdown s.mu.Lock() @@ -282,12 +278,12 @@ func (s *Server) AcceptLoop() { // Write resolved port back to options. _, port, err := net.SplitHostPort(l.Addr().String()) if err != nil { - Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) + log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } portNum, err := strconv.Atoi(port) if err != nil { - Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) + log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } s.opts.Port = portNum @@ -298,7 +294,7 @@ func (s *Server) AcceptLoop() { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - Debug("Temporary Client Accept Error(%v), sleeping %dms", + log.Debug("Temporary Client Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -306,36 +302,39 @@ func (s *Server) AcceptLoop() { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Logf("Accept error: %v", err) + log.Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createClient(conn) } - Log("Server Exiting..") + log.Log("Server Exiting..") s.done <- true } // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { - Logf("Starting profiling on http port %d", s.opts.ProfPort) + log.Log("Starting profiling on http port %d", s.opts.ProfPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.ProfPort) go func() { - Log(http.ListenAndServe(hp, nil)) + err := http.ListenAndServe(hp, nil) + if err != nil { + log.Fatal("error starting monitor server: %s", err) + } }() } // StartHTTPMonitoring will enable the HTTP monitoring port. func (s *Server) StartHTTPMonitoring() { - Logf("Starting http monitor on port %d", s.opts.HTTPPort) + log.Log("Starting http monitor on port %d", s.opts.HTTPPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HTTPPort) l, err := net.Listen("tcp", hp) if err != nil { - Fatalf("Can't listen to the monitor port: %v", err) + log.Fatal("Can't listen to the monitor port: %v", err) } mux := http.NewServeMux() @@ -367,7 +366,7 @@ func (s *Server) StartHTTPMonitoring() { } func (s *Server) createClient(conn net.Conn) *client { - c := &client{srv: s, nc: conn, opts: defaultOpts} + c := &client{srv: s, nc: conn, trace: s.opts.Trace, opts: defaultOpts} // Grab lock c.mu.Lock() @@ -375,7 +374,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Initialize c.initClient() - Debug("Client connection created", clientConnStr(c.nc), c.cid) + log.Debug("[cid: %d] Client connection created: %s", c.cid, clientConnStr(c.nc)) // Send our information. s.sendInfo(c) diff --git a/test/log_test.go b/test/log_test.go deleted file mode 100644 index 1335deed..00000000 --- a/test/log_test.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. - -package test - -import ( - "io/ioutil" - "os" - "regexp" - "testing" -) - -var startRe = regexp.MustCompile(`\["Starting gnatsd version\s+([^\s]+)"\]\n`) - -func TestLogFile(t *testing.T) { - opts := DefaultTestOptions - opts.NoLog = false - opts.Logtime = false - - tmpDir, err := ioutil.TempDir("", "_gnatsd") - if err != nil { - t.Fatal("Could not create tmp dir") - } - defer os.RemoveAll(tmpDir) - - file, err := ioutil.TempFile(tmpDir, "gnatsd:log_") - file.Close() - opts.LogFile = file.Name() - - s := RunServer(&opts) - s.Shutdown() - - buf, err := ioutil.ReadFile(opts.LogFile) - if err != nil { - t.Fatalf("Could not read logfile: %v", err) - } - if len(buf) <= 0 { - t.Fatal("Expected a non-zero length logfile") - } - if !startRe.Match(buf) { - t.Fatalf("Logfile did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", buf, startRe) - } -} From 6586ac4653b0fe53443f68f7df63217d679d5628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Cuadros=20Ortiz?= Date: Wed, 8 Oct 2014 02:19:20 +0200 Subject: [PATCH 2/4] better client identification at logs and some performances improves --- gnatsd.go | 4 +- logger/log.go | 51 ++++++++++++---- logger/log_test.go | 39 +++++++----- logger/syslog.go | 4 +- logger/syslog_test.go | 135 ++++++++++++++++++++++++++++++++++++++++++ server/client.go | 107 +++++++++++++++------------------ server/log.go | 80 ++++++++++++++++++++++--- server/log_test.go | 15 +++-- server/monitor.go | 6 +- server/opts.go | 8 +-- server/route.go | 24 ++++---- server/server.go | 36 +++++------ 12 files changed, 370 insertions(+), 139 deletions(-) create mode 100644 logger/syslog_test.go diff --git a/gnatsd.go b/gnatsd.go index ea7be517..ff2f0cad 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -92,7 +92,7 @@ func main() { s := server.New(&opts) // Builds and set the logger based on the flags - s.SetLogger(buildLogger(&opts)) + s.SetLogger(buildLogger(&opts), opts.Debug, opts.Trace) // Start things up. Block here until done. s.Start() @@ -107,6 +107,6 @@ func buildLogger(opts *server.Options) server.Logger { return logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace) } - return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace) + return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, true) } diff --git a/logger/log.go b/logger/log.go index 7d27a20d..d73e2d93 100644 --- a/logger/log.go +++ b/logger/log.go @@ -2,27 +2,40 @@ package logger import ( + "fmt" "log" "os" ) type Logger struct { - logger *log.Logger - debug bool - trace bool + logger *log.Logger + debug bool + trace bool + logLabel string + fatalLabel string + debugLabel string + traceLabel string } -func NewStdLogger(time, debug, trace bool) *Logger { +func NewStdLogger(time, debug, trace, colors bool) *Logger { flags := 0 if time { flags = log.LstdFlags } - return &Logger{ + l := &Logger{ logger: log.New(os.Stderr, "", flags), debug: debug, trace: trace, } + + if colors { + setColoredLabelFormats(l) + } else { + setPlainLabelFormats(l) + } + + return l } func NewFileLogger(filename string, time, debug, trace bool) *Logger { @@ -37,29 +50,47 @@ func NewFileLogger(filename string, time, debug, trace bool) *Logger { flags = log.LstdFlags } - return &Logger{ + l := &Logger{ logger: log.New(f, "", flags), debug: debug, trace: trace, } + + setPlainLabelFormats(l) + return l +} + +func setPlainLabelFormats(l *Logger) { + l.logLabel = "[LOG] " + l.debugLabel = "[DBG] " + l.fatalLabel = "[ERR] " + l.traceLabel = "[TRA] " +} + +func setColoredLabelFormats(l *Logger) { + colorFormat := "[\x1b[%dm%s\x1b[0m] " + l.logLabel = fmt.Sprintf(colorFormat, 32, "LOG") + l.debugLabel = fmt.Sprintf(colorFormat, 36, "DBG") + l.fatalLabel = fmt.Sprintf(colorFormat, 31, "ERR") + l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRA") } func (l *Logger) Log(format string, v ...interface{}) { - l.logger.Printf(format, v...) + l.logger.Printf(l.logLabel+format, v...) } func (l *Logger) Fatal(format string, v ...interface{}) { - l.logger.Fatalf(format, v) + l.logger.Fatalf(l.fatalLabel+format, v) } func (l *Logger) Debug(format string, v ...interface{}) { if l.debug == true { - l.Log(format, v...) + l.logger.Printf(l.debugLabel+format, v...) } } func (l *Logger) Trace(format string, v ...interface{}) { if l.trace == true { - l.Log(format, v...) + l.logger.Printf(l.traceLabel+format, v...) } } diff --git a/logger/log_test.go b/logger/log_test.go index bdd5629d..7eb1eed3 100644 --- a/logger/log_test.go +++ b/logger/log_test.go @@ -10,7 +10,7 @@ import ( ) func TestStdLogger(t *testing.T) { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) flags := logger.logger.Flags() if flags != 0 { @@ -27,7 +27,7 @@ func TestStdLogger(t *testing.T) { } func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { - logger := NewStdLogger(true, true, true) + logger := NewStdLogger(true, true, true, false) flags := logger.logger.Flags() if flags != log.LstdFlags { @@ -45,35 +45,42 @@ func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { func TestStdLoggerLog(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) logger.Log("foo") - }, "foo\n") + }, "[LOG] foo\n") +} + +func TestStdLoggerLogWithColor(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false, true) + logger.Log("foo") + }, "[\x1b[32mLOG\x1b[0m] foo\n") } func TestStdLoggerDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, true, false) + logger := NewStdLogger(false, true, false, false) logger.Debug("foo %s", "bar") - }, "foo bar\n") + }, "[DBG] foo bar\n") } func TestStdLoggerDebugWithOutDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) logger.Debug("foo") }, "") } func TestStdLoggerTrace(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, true) + logger := NewStdLogger(false, false, true, false) logger.Trace("foo") - }, "foo\n") + }, "[TRA] foo\n") } func TestStdLoggerTraceWithOutDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) logger.Trace("foo") }, "") } @@ -99,13 +106,13 @@ func TestFileLogger(t *testing.T) { t.Fatal("Expected a non-zero length logfile") } - if string(buf) != "foo\n" { - t.Fatalf("Expected '%s', received '%s'\n", "foo", string(buf)) + if string(buf) != "[LOG] foo\n" { + t.Fatalf("Expected '%s', received '%s'\n", "[LOG] foo", string(buf)) } } -func expectOutput(t *testing.T, f func(), expect string) { - old := os.Stdout // keep backup of the real stdout +func expectOutput(t *testing.T, f func(), expected string) { + old := os.Stderr // keep backup of the real stdout r, w, _ := os.Pipe() os.Stderr = w @@ -122,7 +129,7 @@ func expectOutput(t *testing.T, f func(), expect string) { os.Stderr.Close() os.Stderr = old // restoring the real stdout out := <-outC - if out != expect { - t.Fatalf("Expected '%s', received '%s'\n", expect, out) + if out != expected { + t.Fatalf("Expected '%s', received '%s'\n", expected, out) } } diff --git a/logger/syslog.go b/logger/syslog.go index f08d4a8e..217396ed 100644 --- a/logger/syslog.go +++ b/logger/syslog.go @@ -48,13 +48,13 @@ func (l *SysLogger) Fatal(format string, v ...interface{}) { } func (l *SysLogger) Debug(format string, v ...interface{}) { - if l.debug == true { + if l.debug { l.writer.Debug(fmt.Sprintf(format, v...)) } } func (l *SysLogger) Trace(format string, v ...interface{}) { - if l.trace == true { + if l.trace { l.writer.Info(fmt.Sprintf(format, v...)) } } diff --git a/logger/syslog_test.go b/logger/syslog_test.go new file mode 100644 index 00000000..db1f1775 --- /dev/null +++ b/logger/syslog_test.go @@ -0,0 +1,135 @@ +package logger + +import ( + "log" + "net" + "strings" + "testing" + "time" +) + +var serverAddr string + +func TestSysLogger(t *testing.T) { + logger := NewSysLogger(false, false) + + if logger.debug != false { + t.Fatalf("Expected %b, received %b\n", false, logger.debug) + } + + if logger.trace != false { + t.Fatalf("Expected %b, received %b\n", false, logger.trace) + } +} + +func TestSysLoggerWithDebugAndTrace(t *testing.T) { + logger := NewSysLogger(true, true) + + if logger.debug != true { + t.Fatalf("Expected %b, received %b\n", true, logger.debug) + } + + if logger.trace != true { + t.Fatalf("Expected %b, received %b\n", true, logger.trace) + } +} + +func TestRemoteSysLogger(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + if logger.debug != true { + t.Fatalf("Expected %b, received %b\n", true, logger.debug) + } + + if logger.trace != true { + t.Fatalf("Expected %b, received %b\n", true, logger.trace) + } +} + +func TestRemoteSysLoggerLog(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + logger.Log("foo %s", "bar") + expectSyslogOutput(t, <-done, "foo bar\n") +} + +func TestRemoteSysLoggerDebug(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + logger.Debug("foo %s", "qux") + expectSyslogOutput(t, <-done, "foo qux\n") +} + +func TestRemoteSysLoggerDebugDisabled(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, false, false) + + logger.Debug("foo %s", "qux") + rcvd := <-done + if rcvd != "" { + t.Fatalf("Unexpected syslog response %s\n", rcvd) + } +} + +func TestRemoteSysLoggerTrace(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + logger.Trace("foo %s", "qux") + expectSyslogOutput(t, <-done, "foo qux\n") +} + +func TestRemoteSysLoggerTraceDisabled(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, false) + + logger.Trace("foo %s", "qux") + rcvd := <-done + if rcvd != "" { + t.Fatalf("Unexpected syslog response %s\n", rcvd) + } +} + +func expectSyslogOutput(t *testing.T, line string, expected string) { + data := strings.Split(line, "]: ") + if len(data) != 2 { + t.Fatalf("Unexpected syslog line %s\n", line) + } + + if data[1] != expected { + t.Fatalf("Expected '%s', received '%s'\n", expected, data[1]) + } +} + +func runSyslog(c net.PacketConn, done chan<- string) { + var buf [4096]byte + var rcvd string = "" + for { + n, _, err := c.ReadFrom(buf[:]) + if err != nil || n == 0 { + break + } + rcvd += string(buf[:n]) + } + done <- rcvd +} + +func startServer(done chan<- string) { + c, e := net.ListenPacket("udp", "127.0.0.1:0") + if e != nil { + log.Fatalf("net.ListenPacket failed udp :0 %v", e) + } + + serverAddr = c.LocalAddr().String() + c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + go runSyslog(c, done) +} diff --git a/server/client.go b/server/client.go index 6c37c3c9..23f4267d 100644 --- a/server/client.go +++ b/server/client.go @@ -33,20 +33,19 @@ const ( ) type client struct { - mu sync.Mutex - typ int - cid uint64 - opts clientOpts - nc net.Conn - bw *bufio.Writer - srv *Server - subs *hashmap.HashMap - pcd map[*client]struct{} - atmr *time.Timer - ptmr *time.Timer - trace bool - pout int - msgb [msgScratchSize]byte + mu sync.Mutex + typ int + cid uint64 + opts clientOpts + nc net.Conn + bw *bufio.Writer + srv *Server + subs *hashmap.HashMap + pcd map[*client]struct{} + atmr *time.Timer + ptmr *time.Timer + pout int + msgb [msgScratchSize]byte parseState stats @@ -54,12 +53,19 @@ type client struct { } func (c *client) String() (id string) { + conn := "-" + if ip, ok := c.nc.(*net.TCPConn); ok { + addr := ip.RemoteAddr().(*net.TCPAddr) + conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port) + } + switch c.typ { case CLIENT: - id = fmt.Sprintf("cid:%d", c.cid) + id = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: - id = fmt.Sprintf("rid:%d", c.cid) + id = fmt.Sprintf("%s - rid:%d", conn, c.cid) } + return id } @@ -88,14 +94,6 @@ func init() { rand.Seed(time.Now().UnixNano()) } -func clientConnStr(conn net.Conn) interface{} { - if ip, ok := conn.(*net.TCPConn); ok { - addr := ip.RemoteAddr().(*net.TCPAddr) - return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)} - } - return "N/A" -} - // Lock should be held func (c *client) initClient() { s := c.srv @@ -146,7 +144,7 @@ func (c *client) readLoop() { return } if err := c.parse(b[:n]); err != nil { - log.Log(err.Error(), clientConnStr(c.nc), c.cid) + Log("Error reading from client: %s", err.Error(), c) // Auth was handled inline if err != ErrAuthorization { c.sendErr("Parser Error") @@ -163,7 +161,7 @@ func (c *client) readLoop() { err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { - log.Debug("Error flushing: %v", err) + Debug("Error flushing: %v", err) cp.mu.Unlock() cp.closeConnection() cp.mu.Lock() @@ -180,17 +178,17 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { - if c.trace != true { + if trace == 0 { return } pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])} - log.Trace("[cid: %d] MSG: %s", c.cid, opa) + Trace("MSG: %s", opa, c) } func (c *client) traceOp(op string, arg []byte) { - if c.trace != true { + if trace == 0 { return } @@ -198,7 +196,7 @@ func (c *client) traceOp(op string, arg []byte) { if arg != nil { opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) } - log.Trace("[cid: %d] OP: %s", c.cid, opa) + Trace("OP: %s", opa, c) } // Process the info message if we are a route. @@ -216,14 +214,11 @@ func (c *client) processRouteInfo(info *Info) { c.mu.Unlock() if s.addRoute(c) { - log.Debug("[cid: %d] Registering remote route '%s'", c.cid, info.ID) + Debug("Registering remote route %q", info.ID, c) // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) } else { - log.Debug( - "[cid: %d] Detected duplicate remote route '%s', %s", - c.cid, info.ID, clientConnStr(c.nc), - ) + Debug("Detected duplicate remote route %q", info.ID, c) c.closeConnection() } } @@ -241,7 +236,7 @@ func (c *client) processInfo(arg []byte) error { } func (c *client) processErr(errStr string) { - log.Log(errStr, clientConnStr(c.nc), c.cid) + Log("Client error %s", errStr, c) c.closeConnection() } @@ -311,10 +306,7 @@ func (c *client) processPing() { err := c.bw.Flush() if err != nil { c.clearConnection() - log.Debug( - "[cid: %d] Error on Flush, error %s, %s", - c.cid, err.Error(), clientConnStr(c.nc), - ) + Debug("Error on Flush, error %s", err.Error(), c) } c.mu.Unlock() } @@ -327,7 +319,7 @@ func (c *client) processPong() { } func (c *client) processMsgArgs(arg []byte) error { - if c.trace == true { + if trace == 0 { c.traceOp("MSG", arg) } @@ -374,7 +366,7 @@ func (c *client) processMsgArgs(arg []byte) error { } func (c *client) processPub(arg []byte) error { - if c.trace == true { + if trace == 0 { c.traceOp("PUB", arg) } @@ -491,9 +483,9 @@ func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() if sub.max > 0 && sub.nm < sub.max { - log.Debug( - "[cid: %d] Deferring actual UNSUB(%s): %d max, %d received\n", - c.cid, string(sub.subject), sub.max, sub.nm, + Debug( + "Deferring actual UNSUB(%s): %d max, %d received\n", + string(sub.subject), sub.max, sub.nm, c, ) return } @@ -636,10 +628,10 @@ writeErr: client.mu.Unlock() if ne, ok := err.(net.Error); ok && ne.Timeout() { - log.Log("[cid: %d] Slow Consumer Detected %s", client.cid, clientConnStr(client.nc)) + Log("Slow Consumer Detected", c) client.closeConnection() } else { - log.Debug("[cid: %d] Error writing msg: %v", client.cid, err) + Debug("Error writing msg: %v", err, c) } } @@ -662,7 +654,7 @@ func (c *client) processMsg(msg []byte) { atomic.AddInt64(&srv.inBytes, msgSize) } - if c.trace == true { + if trace > 0 { c.traceMsg(msg) } if srv == nil { @@ -747,14 +739,11 @@ func (c *client) processMsg(msg []byte) { } if sub.client == nil || sub.client.nc == nil || sub.client.route == nil || sub.client.route.remoteID == "" { - log.Debug( - "[cid: %d] Bad or Missing ROUTER Identity, not processing msg, %s", - c.cid, clientConnStr(c.nc), - ) + Debug("Bad or Missing ROUTER Identity, not processing msg", c) continue } if _, ok := rmap[sub.client.route.remoteID]; ok { - log.Debug("[cid: %d] Ignoring route, already processed", c.cid) + Debug("Ignoring route, already processed", c) continue } rmap[sub.client.route.remoteID] = routeSeen @@ -783,12 +772,12 @@ func (c *client) processPingTimer() { return } - log.Debug("Client Ping Timer", clientConnStr(c.nc), c.cid) + Debug("Client Ping Timer", c) // Check for violation c.pout += 1 if c.pout > c.srv.opts.MaxPingsOut { - log.Debug("[cid: %d] Stale Client Connection - Closing %s", c.cid, clientConnStr(c.nc)) + Debug("Stale Client Connection - Closing", c) if c.bw != nil { c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")) c.bw.Flush() @@ -801,7 +790,7 @@ func (c *client) processPingTimer() { c.bw.WriteString("PING\r\n") err := c.bw.Flush() if err != nil { - log.Debug("[cid: %d] Error on Client Ping Flush, error %s %s", c.cid, err, clientConnStr(c.nc)) + Debug("Error on Client Ping Flush, error %s", err) c.clearConnection() } else { // Reset to fire again if all OK. @@ -873,7 +862,7 @@ func (c *client) closeConnection() { return } - log.Debug("[cid: %d] %s connection closed: %s", c.cid, c.typeString(), clientConnStr(c.nc)) + Debug("%s connection closed", c.typeString(), c) c.clearAuthTimer() c.clearPingTimer() @@ -910,10 +899,10 @@ func (c *client) closeConnection() { defer srv.mu.Unlock() rid := c.route.remoteID if rid != "" && srv.remotes[rid] != nil { - log.Debug("[cid: %d] Not attempting reconnect for solicited route, already connected. Try %d", c.cid, rid) + Debug("Not attempting reconnect for solicited route, already connected. Try %d", rid, c) return } else { - log.Debug("[cid: %d] Attempting reconnect for solicited route", c.cid) + Debug("Attempting reconnect for solicited route", c) go srv.reConnectToRoute(c.route.url) } } diff --git a/server/log.go b/server/log.go index 2597056b..fb3efee8 100644 --- a/server/log.go +++ b/server/log.go @@ -2,7 +2,18 @@ package server -var log Logger = &NilLogger{} +import ( + "fmt" + "sync" + "sync/atomic" +) + +var trace int32 +var debug int32 +var log = struct { + logger Logger + sync.Mutex +}{} type Logger interface { Log(format string, v ...interface{}) @@ -11,13 +22,66 @@ type Logger interface { Trace(format string, v ...interface{}) } -func (s *Server) SetLogger(logger Logger) { - log = logger +func (s *Server) SetLogger(logger Logger, d, t bool) { + if d { + atomic.StoreInt32(&debug, 1) + } + + if t { + atomic.StoreInt32(&trace, 1) + } + + log.Lock() + defer log.Unlock() + log.logger = logger } -type NilLogger struct{} +func Log(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Log(format, v...) + }, format, v...) +} -func (l *NilLogger) Log(format string, v ...interface{}) {} -func (l *NilLogger) Fatal(format string, v ...interface{}) {} -func (l *NilLogger) Debug(format string, v ...interface{}) {} -func (l *NilLogger) Trace(format string, v ...interface{}) {} +func Fatal(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Fatal(format, v...) + }, format, v...) +} + +func Debug(format string, v ...interface{}) { + if debug == 0 { + return + } + + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Debug(format, v...) + }, format, v...) +} + +func Trace(format string, v ...interface{}) { + if trace == 0 { + return + } + + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Trace(format, v...) + }, format, v...) +} + +func executeLogCall(f func(logger Logger, format string, v ...interface{}), format string, args ...interface{}) { + log.Lock() + defer log.Unlock() + if log.logger == nil { + return + } + + argc := len(args) + if argc != 0 { + if client, ok := args[argc-1].(*client); ok { + args = args[:argc-1] + format = fmt.Sprintf("%s - %s", client, format) + } + } + + f(log.logger, format, args...) +} diff --git a/server/log_test.go b/server/log_test.go index 273ad658..3baf6870 100644 --- a/server/log_test.go +++ b/server/log_test.go @@ -7,14 +7,19 @@ import ( ) func TestSetLogger(t *testing.T) { - // We assert that the default logger is the NilLogger - _ = log.(*NilLogger) - server := &Server{} - server.SetLogger(&DummyLogger{}) + server.SetLogger(&DummyLogger{}, true, true) // We assert that the logger has change to the DummyLogger - _ = log.(*DummyLogger) + _ = log.logger.(*DummyLogger) + + if debug != 1 { + t.Fatalf("Expected debug 1, received value %d\n", debug) + } + + if trace != 1 { + t.Fatalf("Expected trace 1, received value %d\n", trace) + } } type DummyLogger struct{} diff --git a/server/monitor.go b/server/monitor.go index 4da019bd..d2c235bc 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -88,7 +88,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(c, "", " ") if err != nil { - log.Log("Error marshalling response to /connz request: %v", err) + Log("Error marshalling response to /connz request: %v", err) } w.Write(b) } @@ -114,7 +114,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(st, "", " ") if err != nil { - log.Log("Error marshalling response to /subscriptionsz request: %v", err) + Log("Error marshalling response to /subscriptionsz request: %v", err) } w.Write(b) } @@ -161,7 +161,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(v, "", " ") if err != nil { - log.Log("Error marshalling response to /varz request: %v", err) + Log("Error marshalling response to /varz request: %v", err) } w.Write(b) } diff --git a/server/opts.go b/server/opts.go index 6383e404..e7d7438d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -220,7 +220,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) } if cport == port && isIpInList(selfIPs, getUrlIp(host)) { - log.Log("Self referencing IP found: ", r) + Log("Self referencing IP found: ", r) continue } cleanRoutes = append(cleanRoutes, r) @@ -251,7 +251,7 @@ func getUrlIp(ipStr string) []net.IP { hostAddr, err := net.LookupHost(ipStr) if err != nil { - log.Log("Error looking up host with route hostname: ", err) + Log("Error looking up host with route hostname: ", err) return ipList } for _, addr := range hostAddr { @@ -268,7 +268,7 @@ func getInterfaceIPs() []net.IP { interfaceAddr, err := net.InterfaceAddrs() if err != nil { - log.Log("Error getting self referencing address: ", err) + Log("Error getting self referencing address: ", err) return localIPs } @@ -277,7 +277,7 @@ func getInterfaceIPs() []net.IP { if net.ParseIP(interfaceIP.String()) != nil { localIPs = append(localIPs, interfaceIP) } else { - log.Log("Error parsing self referencing address: ", err) + Log("Error parsing self referencing address: ", err) } } return localIPs diff --git a/server/route.go b/server/route.go index 19e84eeb..ce474f70 100644 --- a/server/route.go +++ b/server/route.go @@ -46,7 +46,7 @@ func (c *client) sendConnect() { } b, err := json.Marshal(cinfo) if err != nil { - log.Log("Error marshalling CONNECT to route: %v\n", err) + Log("Error marshalling CONNECT to route: %v\n", err) c.closeConnection() } c.bw.WriteString(fmt.Sprintf(conProto, b)) @@ -79,7 +79,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) { route.bw.Write(b.Bytes()) route.bw.Flush() - log.Debug("Route sent local subscriptions", route.cid) + Debug("Route sent local subscriptions", route) } func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { @@ -93,12 +93,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Initialize c.initClient() - log.Debug("Route connection created", clientConnStr(c.nc), c.cid) + Debug("Route connection created", c) // Queue Connect proto if we solicited the connection. if didSolicit { r.url = rURL - log.Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) + Debug("Route connect msg sent", c) c.sendConnect() } @@ -234,10 +234,10 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { func (s *Server) routeAcceptLoop(ch chan struct{}) { hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort) - log.Log("Listening for route connections on %s", hp) + Log("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - log.Fatal("Error listening on router port: %d - %v", s.opts.Port, e) + Fatal("Error listening on router port: %d - %v", s.opts.Port, e) return } @@ -255,7 +255,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Debug("Temporary Route Accept Error(%v), sleeping %dms", + Debug("Temporary Route Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -263,14 +263,14 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - log.Log("Accept error: %v", err) + Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createRoute(conn, nil) } - log.Debug("Router accept loop exiting..") + Debug("Router accept loop exiting..") s.done <- true } @@ -294,7 +294,7 @@ func (s *Server) StartRouting() { // Generate the info json b, err := json.Marshal(info) if err != nil { - log.Fatal("Error marshalling Route INFO JSON: %+v\n", err) + Fatal("Error marshalling Route INFO JSON: %+v\n", err) } s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) @@ -314,10 +314,10 @@ func (s *Server) reConnectToRoute(rUrl *url.URL) { func (s *Server) connectToRoute(rUrl *url.URL) { for s.isRunning() { - log.Debug("Trying to connect to route on %s", rUrl.Host) + Debug("Trying to connect to route on %s", rUrl.Host) conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) if err != nil { - log.Debug("Error trying to connect to route: %v", err) + Debug("Error trying to connect to route: %v", err) select { case <-s.rcQuit: return diff --git a/server/server.go b/server/server.go index ffef2aa4..e9115d1f 100644 --- a/server/server.go +++ b/server/server.go @@ -111,7 +111,7 @@ func New(opts *Options) *Server { // Generate the info json b, err := json.Marshal(s.info) if err != nil { - log.Fatal("Error marshalling INFO JSON: %+v\n", err) + Fatal("Error marshalling INFO JSON: %+v\n", err) } s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) @@ -140,9 +140,9 @@ func (s *Server) handleSignals() { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - log.Debug("Trapped Signal; %v", sig) + Debug("Trapped Signal; %v", sig) // FIXME, trip running? - log.Log("Server Exiting..") + Log("Server Exiting..") os.Exit(0) } }() @@ -166,7 +166,7 @@ func (s *Server) logPid() { // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { - log.Log("Starting gnatsd version %s", VERSION) + Log("Starting gnatsd version %s", VERSION) s.running = true // Log the pid to a file @@ -261,14 +261,14 @@ func (s *Server) Shutdown() { // AcceptLoop is exported for easier testing. func (s *Server) AcceptLoop() { hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) - log.Log("Listening for client connections on %s", hp) + Log("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - log.Fatal("Error listening on port: %d - %v", s.opts.Port, e) + Fatal("Error listening on port: %d - %v", s.opts.Port, e) return } - log.Log("gnatsd is ready") + Log("gnatsd is ready") // Setup state that can enable shutdown s.mu.Lock() @@ -278,12 +278,12 @@ func (s *Server) AcceptLoop() { // Write resolved port back to options. _, port, err := net.SplitHostPort(l.Addr().String()) if err != nil { - log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) + Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } portNum, err := strconv.Atoi(port) if err != nil { - log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) + Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } s.opts.Port = portNum @@ -294,7 +294,7 @@ func (s *Server) AcceptLoop() { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Debug("Temporary Client Accept Error(%v), sleeping %dms", + Debug("Temporary Client Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -302,39 +302,39 @@ func (s *Server) AcceptLoop() { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - log.Log("Accept error: %v", err) + Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createClient(conn) } - log.Log("Server Exiting..") + Log("Server Exiting..") s.done <- true } // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { - log.Log("Starting profiling on http port %d", s.opts.ProfPort) + Log("Starting profiling on http port %d", s.opts.ProfPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.ProfPort) go func() { err := http.ListenAndServe(hp, nil) if err != nil { - log.Fatal("error starting monitor server: %s", err) + Fatal("error starting monitor server: %s", err) } }() } // StartHTTPMonitoring will enable the HTTP monitoring port. func (s *Server) StartHTTPMonitoring() { - log.Log("Starting http monitor on port %d", s.opts.HTTPPort) + Log("Starting http monitor on port %d", s.opts.HTTPPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HTTPPort) l, err := net.Listen("tcp", hp) if err != nil { - log.Fatal("Can't listen to the monitor port: %v", err) + Fatal("Can't listen to the monitor port: %v", err) } mux := http.NewServeMux() @@ -366,7 +366,7 @@ func (s *Server) StartHTTPMonitoring() { } func (s *Server) createClient(conn net.Conn) *client { - c := &client{srv: s, nc: conn, trace: s.opts.Trace, opts: defaultOpts} + c := &client{srv: s, nc: conn, opts: defaultOpts} // Grab lock c.mu.Lock() @@ -374,7 +374,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Initialize c.initClient() - log.Debug("[cid: %d] Client connection created: %s", c.cid, clientConnStr(c.nc)) + Debug("Client connection created", c) // Send our information. s.sendInfo(c) From d99c6aeead5e1fa70ee1a387dbdb98fb8c99eb33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Cuadros=20Ortiz?= Date: Sat, 11 Oct 2014 00:54:28 +0200 Subject: [PATCH 3/4] remote syslog support --- README.md | 2 ++ gnatsd.go | 18 ++++++---- logger/log_test.go | 16 ++++----- logger/syslog.go | 28 ++++++++++++--- logger/syslog_test.go | 76 +++++++++++++++++++++++++++++----------- server/configs/test.conf | 2 ++ server/opts.go | 5 +++ server/opts_test.go | 52 ++++++++++++++------------- server/usage.go | 2 ++ 9 files changed, 139 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 185d759d..8272ed0b 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ Server options: Logging options: -l, --log FILE File to redirect log output -T, --logtime Timestamp log entries (default: true) + -s, --syslog Enable syslog as log method. + -r, --remote_syslog Syslog server addr (udp://localhost:514). -D, --debug Enable debugging output -V, --trace Trace the raw protocol diff --git a/gnatsd.go b/gnatsd.go index ff2f0cad..31f96fa4 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -42,7 +42,10 @@ func main() { flag.StringVar(&opts.PidFile, "pid", "", "File to store process pid.") flag.StringVar(&opts.LogFile, "l", "", "File to store logging output.") flag.StringVar(&opts.LogFile, "log", "", "File to store logging output.") - flag.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method.") + flag.BoolVar(&opts.Syslog, "s", false, "Enable syslog as log method.") + flag.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method..") + flag.StringVar(&opts.RemoteSyslog, "r", "", "Syslog server addr (udp://localhost:514).") + flag.StringVar(&opts.RemoteSyslog, "remote_syslog", "", "Syslog server addr (udp://localhost:514).") flag.BoolVar(&showVersion, "version", false, "Print version information.") flag.BoolVar(&showVersion, "v", false, "Print version information.") flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port") @@ -99,14 +102,17 @@ func main() { } func buildLogger(opts *server.Options) server.Logger { - if opts.Syslog { - return logger.NewSysLogger(opts.Debug, opts.Trace) - } - if opts.LogFile != "" { return logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace) } - return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, true) + if opts.RemoteSyslog != "" { + return logger.NewRemoteSysLogger(opts.RemoteSyslog, opts.Debug, opts.Trace) + } + if opts.Syslog { + return logger.NewSysLogger(opts.Debug, opts.Trace) + } + + return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, true) } diff --git a/logger/log_test.go b/logger/log_test.go index 7eb1eed3..c497946a 100644 --- a/logger/log_test.go +++ b/logger/log_test.go @@ -17,12 +17,12 @@ func TestStdLogger(t *testing.T) { t.Fatalf("Expected %q, received %q\n", 0, flags) } - if logger.debug != false { - t.Fatalf("Expected %b, received %b\n", false, logger.debug) + if logger.debug { + t.Fatalf("Expected %t, received %t\n", false, logger.debug) } - if logger.trace != false { - t.Fatalf("Expected %b, received %b\n", false, logger.trace) + if logger.trace { + t.Fatalf("Expected %t, received %t\n", false, logger.trace) } } @@ -34,12 +34,12 @@ func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { t.Fatalf("Expected %d, received %d\n", log.LstdFlags, flags) } - if logger.debug != true { - t.Fatalf("Expected %b, received %b\n", true, logger.debug) + if !logger.debug { + t.Fatalf("Expected %t, received %t\n", true, logger.debug) } - if logger.trace != true { - t.Fatalf("Expected %b, received %b\n", true, logger.trace) + if !logger.trace { + t.Fatalf("Expected %t, received %t\n", true, logger.trace) } } diff --git a/logger/syslog.go b/logger/syslog.go index 217396ed..b29365eb 100644 --- a/logger/syslog.go +++ b/logger/syslog.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "log/syslog" + "net/url" ) type SysLogger struct { @@ -16,7 +17,7 @@ type SysLogger struct { func NewSysLogger(debug, trace bool) *SysLogger { w, err := syslog.New(syslog.LOG_DAEMON|syslog.LOG_NOTICE, "gnatsd") if err != nil { - log.Fatal("error connecting to syslog: %v", err) + log.Fatalf("error connecting to syslog: %q", err.Error()) } return &SysLogger{ @@ -26,10 +27,11 @@ func NewSysLogger(debug, trace bool) *SysLogger { } } -func NewRemoteSysLogger(network, raddr string, debug, trace bool) *SysLogger { - w, err := syslog.Dial(network, raddr, syslog.LOG_DEBUG, "gnatsd") +func NewRemoteSysLogger(fqn string, debug, trace bool) *SysLogger { + network, addr := getNetworkAndAddr(fqn) + w, err := syslog.Dial(network, addr, syslog.LOG_DEBUG, "gnatsd") if err != nil { - log.Fatal("error connecting to syslog: %v", err) + log.Fatalf("error connecting to syslog: %q", err.Error()) } return &SysLogger{ @@ -39,6 +41,24 @@ func NewRemoteSysLogger(network, raddr string, debug, trace bool) *SysLogger { } } +func getNetworkAndAddr(fqn string) (network, addr string) { + u, err := url.Parse(fqn) + if err != nil { + log.Fatal(err) + } + + network = u.Scheme + if network == "udp" || network == "tcp" { + addr = u.Host + } else if network == "unix" { + addr = u.Path + } else { + log.Fatalf("error invalid network type: %q", u.Scheme) + } + + return +} + func (l *SysLogger) Log(format string, v ...interface{}) { l.writer.Notice(fmt.Sprintf(format, v...)) } diff --git a/logger/syslog_test.go b/logger/syslog_test.go index db1f1775..06d2a41d 100644 --- a/logger/syslog_test.go +++ b/logger/syslog_test.go @@ -1,6 +1,7 @@ package logger import ( + "fmt" "log" "net" "strings" @@ -8,50 +9,50 @@ import ( "time" ) -var serverAddr string +var serverFQN string func TestSysLogger(t *testing.T) { logger := NewSysLogger(false, false) - if logger.debug != false { - t.Fatalf("Expected %b, received %b\n", false, logger.debug) + if logger.debug { + t.Fatalf("Expected %t, received %t\n", false, logger.debug) } - if logger.trace != false { - t.Fatalf("Expected %b, received %b\n", false, logger.trace) + if logger.trace { + t.Fatalf("Expected %t, received %t\n", false, logger.trace) } } func TestSysLoggerWithDebugAndTrace(t *testing.T) { logger := NewSysLogger(true, true) - if logger.debug != true { - t.Fatalf("Expected %b, received %b\n", true, logger.debug) + if !logger.debug { + t.Fatalf("Expected %t, received %t\n", true, logger.debug) } - if logger.trace != true { - t.Fatalf("Expected %b, received %b\n", true, logger.trace) + if !logger.trace { + t.Fatalf("Expected %t, received %t\n", true, logger.trace) } } func TestRemoteSysLogger(t *testing.T) { done := make(chan string) startServer(done) - logger := NewRemoteSysLogger("udp", serverAddr, true, true) + logger := NewRemoteSysLogger(serverFQN, true, true) - if logger.debug != true { - t.Fatalf("Expected %b, received %b\n", true, logger.debug) + if !logger.debug { + t.Fatalf("Expected %t, received %t\n", true, logger.debug) } - if logger.trace != true { - t.Fatalf("Expected %b, received %b\n", true, logger.trace) + if !logger.trace { + t.Fatalf("Expected %t, received %t\n", true, logger.trace) } } func TestRemoteSysLoggerLog(t *testing.T) { done := make(chan string) startServer(done) - logger := NewRemoteSysLogger("udp", serverAddr, true, true) + logger := NewRemoteSysLogger(serverFQN, true, true) logger.Log("foo %s", "bar") expectSyslogOutput(t, <-done, "foo bar\n") @@ -60,7 +61,7 @@ func TestRemoteSysLoggerLog(t *testing.T) { func TestRemoteSysLoggerDebug(t *testing.T) { done := make(chan string) startServer(done) - logger := NewRemoteSysLogger("udp", serverAddr, true, true) + logger := NewRemoteSysLogger(serverFQN, true, true) logger.Debug("foo %s", "qux") expectSyslogOutput(t, <-done, "foo qux\n") @@ -69,7 +70,7 @@ func TestRemoteSysLoggerDebug(t *testing.T) { func TestRemoteSysLoggerDebugDisabled(t *testing.T) { done := make(chan string) startServer(done) - logger := NewRemoteSysLogger("udp", serverAddr, false, false) + logger := NewRemoteSysLogger(serverFQN, false, false) logger.Debug("foo %s", "qux") rcvd := <-done @@ -81,7 +82,7 @@ func TestRemoteSysLoggerDebugDisabled(t *testing.T) { func TestRemoteSysLoggerTrace(t *testing.T) { done := make(chan string) startServer(done) - logger := NewRemoteSysLogger("udp", serverAddr, true, true) + logger := NewRemoteSysLogger(serverFQN, true, true) logger.Trace("foo %s", "qux") expectSyslogOutput(t, <-done, "foo qux\n") @@ -90,7 +91,7 @@ func TestRemoteSysLoggerTrace(t *testing.T) { func TestRemoteSysLoggerTraceDisabled(t *testing.T) { done := make(chan string) startServer(done) - logger := NewRemoteSysLogger("udp", serverAddr, true, false) + logger := NewRemoteSysLogger(serverFQN, true, false) logger.Trace("foo %s", "qux") rcvd := <-done @@ -99,6 +100,41 @@ func TestRemoteSysLoggerTraceDisabled(t *testing.T) { } } +func TestGetNetworkAndAddrUDP(t *testing.T) { + n, a := getNetworkAndAddr("udp://foo.com:1000") + + if n != "udp" { + t.Fatalf("Unexpected network %s\n", n) + } + + if a != "foo.com:1000" { + t.Fatalf("Unexpected addr %s\n", a) + } +} + +func TestGetNetworkAndAddrTCP(t *testing.T) { + n, a := getNetworkAndAddr("tcp://foo.com:1000") + + if n != "tcp" { + t.Fatalf("Unexpected network %s\n", n) + } + + if a != "foo.com:1000" { + t.Fatalf("Unexpected addr %s\n", a) + } +} + +func TestGetNetworkAndAddrUnix(t *testing.T) { + n, a := getNetworkAndAddr("unix:///foo.sock") + + if n != "unix" { + t.Fatalf("Unexpected network %s\n", n) + } + + if a != "/foo.sock" { + t.Fatalf("Unexpected addr %s\n", a) + } +} func expectSyslogOutput(t *testing.T, line string, expected string) { data := strings.Split(line, "]: ") if len(data) != 2 { @@ -129,7 +165,7 @@ func startServer(done chan<- string) { log.Fatalf("net.ListenPacket failed udp :0 %v", e) } - serverAddr = c.LocalAddr().String() + serverFQN = fmt.Sprintf("udp://%s", c.LocalAddr().String()) c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) go runSyslog(c, done) } diff --git a/server/configs/test.conf b/server/configs/test.conf index 598af3d7..62e6819c 100644 --- a/server/configs/test.conf +++ b/server/configs/test.conf @@ -17,6 +17,8 @@ debug: false trace: true logtime: false log_file: "/tmp/gnatsd.log" +syslog: true +remote_syslog: "udp://foo.com:33" #pid file pid_file: "/tmp/gnatsd.pid" diff --git a/server/opts.go b/server/opts.go index e7d7438d..72b636ea 100644 --- a/server/opts.go +++ b/server/opts.go @@ -44,6 +44,7 @@ type Options struct { PidFile string `json:"-"` LogFile string `json:"-"` Syslog bool `json:"-"` + RemoteSyslog string `json:"-"` } type authorization struct { @@ -98,6 +99,10 @@ func ProcessConfigFile(configFile string) (*Options, error) { } case "logfile", "log_file": opts.LogFile = v.(string) + case "syslog": + opts.Syslog = v.(bool) + case "remote_syslog": + opts.RemoteSyslog = v.(string) case "pidfile", "pid_file": opts.PidFile = v.(string) case "prof_port": diff --git a/server/opts_test.go b/server/opts_test.go index f02df7b2..05c70212 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -44,18 +44,20 @@ func TestOptions_RandomPort(t *testing.T) { func TestConfigFile(t *testing.T) { golden := &Options{ - Host: "apcera.me", - Port: 4242, - Username: "derek", - Password: "bella", - AuthTimeout: 1.0, - Debug: false, - Trace: true, - Logtime: false, - HTTPPort: 8222, - LogFile: "/tmp/gnatsd.log", - PidFile: "/tmp/gnatsd.pid", - ProfPort: 6543, + Host: "apcera.me", + Port: 4242, + Username: "derek", + Password: "bella", + AuthTimeout: 1.0, + Debug: false, + Trace: true, + Logtime: false, + HTTPPort: 8222, + LogFile: "/tmp/gnatsd.log", + PidFile: "/tmp/gnatsd.pid", + ProfPort: 6543, + Syslog: true, + RemoteSyslog: "udp://foo.com:33", } opts, err := ProcessConfigFile("./configs/test.conf") @@ -71,18 +73,20 @@ func TestConfigFile(t *testing.T) { func TestMergeOverrides(t *testing.T) { golden := &Options{ - Host: "apcera.me", - Port: 2222, - Username: "derek", - Password: "spooky", - AuthTimeout: 1.0, - Debug: true, - Trace: true, - Logtime: false, - HTTPPort: DEFAULT_HTTP_PORT, - LogFile: "/tmp/gnatsd.log", - PidFile: "/tmp/gnatsd.pid", - ProfPort: 6789, + Host: "apcera.me", + Port: 2222, + Username: "derek", + Password: "spooky", + AuthTimeout: 1.0, + Debug: true, + Trace: true, + Logtime: false, + HTTPPort: DEFAULT_HTTP_PORT, + LogFile: "/tmp/gnatsd.log", + PidFile: "/tmp/gnatsd.pid", + ProfPort: 6789, + Syslog: true, + RemoteSyslog: "udp://foo.com:33", } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { diff --git a/server/usage.go b/server/usage.go index c3ed44a7..250f689f 100644 --- a/server/usage.go +++ b/server/usage.go @@ -18,6 +18,8 @@ Server options: Logging options: -l, --log FILE File to redirect log output -T, --logtime Timestamp log entries (default: true) + -s, --syslog Enable syslog as log method. + -r, --remote_syslog Syslog server addr (udp://localhost:514). -D, --debug Enable debugging output -V, --trace Trace the raw protocol From 7c7578ae38c33f0550bea17e4214db80fee014f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Cuadros=20Ortiz?= Date: Thu, 16 Oct 2014 01:16:21 +0200 Subject: [PATCH 4/4] Notice and Logger messages --- logger/log.go | 29 ++++++++++++++++++----------- logger/log_test.go | 22 +++++++++++----------- logger/syslog.go | 8 ++++++-- logger/syslog_test.go | 4 ++-- server/client.go | 6 +++--- server/log.go | 13 ++++++++++--- server/log_test.go | 9 +++++---- server/monitor.go | 6 +++--- server/opts.go | 8 ++++---- server/route.go | 6 +++--- server/server.go | 16 ++++++++-------- 11 files changed, 73 insertions(+), 54 deletions(-) diff --git a/logger/log.go b/logger/log.go index d73e2d93..66d53313 100644 --- a/logger/log.go +++ b/logger/log.go @@ -11,7 +11,8 @@ type Logger struct { logger *log.Logger debug bool trace bool - logLabel string + infoLabel string + errorLabel string fatalLabel string debugLabel string traceLabel string @@ -61,22 +62,28 @@ func NewFileLogger(filename string, time, debug, trace bool) *Logger { } func setPlainLabelFormats(l *Logger) { - l.logLabel = "[LOG] " - l.debugLabel = "[DBG] " - l.fatalLabel = "[ERR] " - l.traceLabel = "[TRA] " + l.infoLabel = "[INFO] " + l.debugLabel = "[DEBUG] " + l.errorLabel = "[ERROR] " + l.fatalLabel = "[FATAL] " + l.traceLabel = "[TRACE] " } func setColoredLabelFormats(l *Logger) { colorFormat := "[\x1b[%dm%s\x1b[0m] " - l.logLabel = fmt.Sprintf(colorFormat, 32, "LOG") - l.debugLabel = fmt.Sprintf(colorFormat, 36, "DBG") - l.fatalLabel = fmt.Sprintf(colorFormat, 31, "ERR") - l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRA") + l.infoLabel = fmt.Sprintf(colorFormat, 32, "INFO") + l.debugLabel = fmt.Sprintf(colorFormat, 36, "DEBUG") + l.errorLabel = fmt.Sprintf(colorFormat, 31, "ERROR") + l.fatalLabel = fmt.Sprintf(colorFormat, 35, "FATAL") + l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRACE") } -func (l *Logger) Log(format string, v ...interface{}) { - l.logger.Printf(l.logLabel+format, v...) +func (l *Logger) Notice(format string, v ...interface{}) { + l.logger.Printf(l.infoLabel+format, v...) +} + +func (l *Logger) Error(format string, v ...interface{}) { + l.logger.Printf(l.errorLabel+format, v...) } func (l *Logger) Fatal(format string, v ...interface{}) { diff --git a/logger/log_test.go b/logger/log_test.go index c497946a..20904052 100644 --- a/logger/log_test.go +++ b/logger/log_test.go @@ -43,25 +43,25 @@ func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { } } -func TestStdLoggerLog(t *testing.T) { +func TestStdLoggerNotice(t *testing.T) { expectOutput(t, func() { logger := NewStdLogger(false, false, false, false) - logger.Log("foo") - }, "[LOG] foo\n") + logger.Notice("foo") + }, "[INFO] foo\n") } -func TestStdLoggerLogWithColor(t *testing.T) { +func TestStdLoggerNoticeWithColor(t *testing.T) { expectOutput(t, func() { logger := NewStdLogger(false, false, false, true) - logger.Log("foo") - }, "[\x1b[32mLOG\x1b[0m] foo\n") + logger.Notice("foo") + }, "[\x1b[32mINFO\x1b[0m] foo\n") } func TestStdLoggerDebug(t *testing.T) { expectOutput(t, func() { logger := NewStdLogger(false, true, false, false) logger.Debug("foo %s", "bar") - }, "[DBG] foo bar\n") + }, "[DEBUG] foo bar\n") } func TestStdLoggerDebugWithOutDebug(t *testing.T) { @@ -75,7 +75,7 @@ func TestStdLoggerTrace(t *testing.T) { expectOutput(t, func() { logger := NewStdLogger(false, false, true, false) logger.Trace("foo") - }, "[TRA] foo\n") + }, "[TRACE] foo\n") } func TestStdLoggerTraceWithOutDebug(t *testing.T) { @@ -96,7 +96,7 @@ func TestFileLogger(t *testing.T) { file.Close() logger := NewFileLogger(file.Name(), false, false, false) - logger.Log("foo") + logger.Notice("foo") buf, err := ioutil.ReadFile(file.Name()) if err != nil { @@ -106,8 +106,8 @@ func TestFileLogger(t *testing.T) { t.Fatal("Expected a non-zero length logfile") } - if string(buf) != "[LOG] foo\n" { - t.Fatalf("Expected '%s', received '%s'\n", "[LOG] foo", string(buf)) + if string(buf) != "[INFO] foo\n" { + t.Fatalf("Expected '%s', received '%s'\n", "[INFO] foo", string(buf)) } } diff --git a/logger/syslog.go b/logger/syslog.go index b29365eb..9f04ca01 100644 --- a/logger/syslog.go +++ b/logger/syslog.go @@ -59,7 +59,7 @@ func getNetworkAndAddr(fqn string) (network, addr string) { return } -func (l *SysLogger) Log(format string, v ...interface{}) { +func (l *SysLogger) Notice(format string, v ...interface{}) { l.writer.Notice(fmt.Sprintf(format, v...)) } @@ -67,6 +67,10 @@ func (l *SysLogger) Fatal(format string, v ...interface{}) { l.writer.Crit(fmt.Sprintf(format, v...)) } +func (l *SysLogger) Error(format string, v ...interface{}) { + l.writer.Err(fmt.Sprintf(format, v...)) +} + func (l *SysLogger) Debug(format string, v ...interface{}) { if l.debug { l.writer.Debug(fmt.Sprintf(format, v...)) @@ -75,6 +79,6 @@ func (l *SysLogger) Debug(format string, v ...interface{}) { func (l *SysLogger) Trace(format string, v ...interface{}) { if l.trace { - l.writer.Info(fmt.Sprintf(format, v...)) + l.writer.Notice(fmt.Sprintf(format, v...)) } } diff --git a/logger/syslog_test.go b/logger/syslog_test.go index 06d2a41d..3835f473 100644 --- a/logger/syslog_test.go +++ b/logger/syslog_test.go @@ -49,12 +49,12 @@ func TestRemoteSysLogger(t *testing.T) { } } -func TestRemoteSysLoggerLog(t *testing.T) { +func TestRemoteSysLoggerNotice(t *testing.T) { done := make(chan string) startServer(done) logger := NewRemoteSysLogger(serverFQN, true, true) - logger.Log("foo %s", "bar") + logger.Notice("foo %s", "bar") expectSyslogOutput(t, <-done, "foo bar\n") } diff --git a/server/client.go b/server/client.go index 23f4267d..95917064 100644 --- a/server/client.go +++ b/server/client.go @@ -144,7 +144,7 @@ func (c *client) readLoop() { return } if err := c.parse(b[:n]); err != nil { - Log("Error reading from client: %s", err.Error(), c) + Error("Error reading from client: %s", err.Error(), c) // Auth was handled inline if err != ErrAuthorization { c.sendErr("Parser Error") @@ -236,7 +236,7 @@ func (c *client) processInfo(arg []byte) error { } func (c *client) processErr(errStr string) { - Log("Client error %s", errStr, c) + Error("Client error %s", errStr, c) c.closeConnection() } @@ -628,7 +628,7 @@ writeErr: client.mu.Unlock() if ne, ok := err.(net.Error); ok && ne.Timeout() { - Log("Slow Consumer Detected", c) + Notice("Slow Consumer Detected", c) client.closeConnection() } else { Debug("Error writing msg: %v", err, c) diff --git a/server/log.go b/server/log.go index fb3efee8..7bf758ef 100644 --- a/server/log.go +++ b/server/log.go @@ -16,8 +16,9 @@ var log = struct { }{} type Logger interface { - Log(format string, v ...interface{}) + Notice(format string, v ...interface{}) Fatal(format string, v ...interface{}) + Error(format string, v ...interface{}) Debug(format string, v ...interface{}) Trace(format string, v ...interface{}) } @@ -36,9 +37,15 @@ func (s *Server) SetLogger(logger Logger, d, t bool) { log.logger = logger } -func Log(format string, v ...interface{}) { +func Notice(format string, v ...interface{}) { executeLogCall(func(logger Logger, format string, v ...interface{}) { - logger.Log(format, v...) + logger.Notice(format, v...) + }, format, v...) +} + +func Error(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Error(format, v...) }, format, v...) } diff --git a/server/log_test.go b/server/log_test.go index 3baf6870..75d20291 100644 --- a/server/log_test.go +++ b/server/log_test.go @@ -24,7 +24,8 @@ func TestSetLogger(t *testing.T) { type DummyLogger struct{} -func (l *DummyLogger) Log(format string, v ...interface{}) {} -func (l *DummyLogger) Fatal(format string, v ...interface{}) {} -func (l *DummyLogger) Debug(format string, v ...interface{}) {} -func (l *DummyLogger) Trace(format string, v ...interface{}) {} +func (l *DummyLogger) Notice(format string, v ...interface{}) {} +func (l *DummyLogger) Error(format string, v ...interface{}) {} +func (l *DummyLogger) Fatal(format string, v ...interface{}) {} +func (l *DummyLogger) Debug(format string, v ...interface{}) {} +func (l *DummyLogger) Trace(format string, v ...interface{}) {} diff --git a/server/monitor.go b/server/monitor.go index d2c235bc..225ae357 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -88,7 +88,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(c, "", " ") if err != nil { - Log("Error marshalling response to /connz request: %v", err) + Error("Error marshalling response to /connz request: %v", err) } w.Write(b) } @@ -114,7 +114,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(st, "", " ") if err != nil { - Log("Error marshalling response to /subscriptionsz request: %v", err) + Error("Error marshalling response to /subscriptionsz request: %v", err) } w.Write(b) } @@ -161,7 +161,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(v, "", " ") if err != nil { - Log("Error marshalling response to /varz request: %v", err) + Error("Error marshalling response to /varz request: %v", err) } w.Write(b) } diff --git a/server/opts.go b/server/opts.go index 72b636ea..626a2a26 100644 --- a/server/opts.go +++ b/server/opts.go @@ -225,7 +225,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) } if cport == port && isIpInList(selfIPs, getUrlIp(host)) { - Log("Self referencing IP found: ", r) + Notice("Self referencing IP found: ", r) continue } cleanRoutes = append(cleanRoutes, r) @@ -256,7 +256,7 @@ func getUrlIp(ipStr string) []net.IP { hostAddr, err := net.LookupHost(ipStr) if err != nil { - Log("Error looking up host with route hostname: ", err) + Error("Error looking up host with route hostname: ", err) return ipList } for _, addr := range hostAddr { @@ -273,7 +273,7 @@ func getInterfaceIPs() []net.IP { interfaceAddr, err := net.InterfaceAddrs() if err != nil { - Log("Error getting self referencing address: ", err) + Error("Error getting self referencing address: ", err) return localIPs } @@ -282,7 +282,7 @@ func getInterfaceIPs() []net.IP { if net.ParseIP(interfaceIP.String()) != nil { localIPs = append(localIPs, interfaceIP) } else { - Log("Error parsing self referencing address: ", err) + Error("Error parsing self referencing address: ", err) } } return localIPs diff --git a/server/route.go b/server/route.go index ce474f70..f9be793c 100644 --- a/server/route.go +++ b/server/route.go @@ -46,7 +46,7 @@ func (c *client) sendConnect() { } b, err := json.Marshal(cinfo) if err != nil { - Log("Error marshalling CONNECT to route: %v\n", err) + Error("Error marshalling CONNECT to route: %v\n", err) c.closeConnection() } c.bw.WriteString(fmt.Sprintf(conProto, b)) @@ -234,7 +234,7 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { func (s *Server) routeAcceptLoop(ch chan struct{}) { hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort) - Log("Listening for route connections on %s", hp) + Notice("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { Fatal("Error listening on router port: %d - %v", s.opts.Port, e) @@ -263,7 +263,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Log("Accept error: %v", err) + Notice("Accept error: %v", err) } continue } diff --git a/server/server.go b/server/server.go index e9115d1f..c940f4ed 100644 --- a/server/server.go +++ b/server/server.go @@ -142,7 +142,7 @@ func (s *Server) handleSignals() { for sig := range c { Debug("Trapped Signal; %v", sig) // FIXME, trip running? - Log("Server Exiting..") + Notice("Server Exiting..") os.Exit(0) } }() @@ -166,7 +166,7 @@ func (s *Server) logPid() { // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { - Log("Starting gnatsd version %s", VERSION) + Notice("Starting gnatsd version %s", VERSION) s.running = true // Log the pid to a file @@ -261,14 +261,14 @@ func (s *Server) Shutdown() { // AcceptLoop is exported for easier testing. func (s *Server) AcceptLoop() { hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) - Log("Listening for client connections on %s", hp) + Notice("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { Fatal("Error listening on port: %d - %v", s.opts.Port, e) return } - Log("gnatsd is ready") + Notice("gnatsd is ready") // Setup state that can enable shutdown s.mu.Lock() @@ -302,20 +302,20 @@ func (s *Server) AcceptLoop() { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - Log("Accept error: %v", err) + Notice("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createClient(conn) } - Log("Server Exiting..") + Notice("Server Exiting..") s.done <- true } // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { - Log("Starting profiling on http port %d", s.opts.ProfPort) + Notice("Starting profiling on http port %d", s.opts.ProfPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.ProfPort) go func() { @@ -328,7 +328,7 @@ func (s *Server) StartProfiler() { // StartHTTPMonitoring will enable the HTTP monitoring port. func (s *Server) StartHTTPMonitoring() { - Log("Starting http monitor on port %d", s.opts.HTTPPort) + Notice("Starting http monitor on port %d", s.opts.HTTPPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HTTPPort)