diff --git a/gnatsd.go b/gnatsd.go index ea7be517..ff2f0cad 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -92,7 +92,7 @@ func main() { s := server.New(&opts) // Builds and set the logger based on the flags - s.SetLogger(buildLogger(&opts)) + s.SetLogger(buildLogger(&opts), opts.Debug, opts.Trace) // Start things up. Block here until done. s.Start() @@ -107,6 +107,6 @@ func buildLogger(opts *server.Options) server.Logger { return logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace) } - return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace) + return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, true) } diff --git a/logger/log.go b/logger/log.go index 7d27a20d..d73e2d93 100644 --- a/logger/log.go +++ b/logger/log.go @@ -2,27 +2,40 @@ package logger import ( + "fmt" "log" "os" ) type Logger struct { - logger *log.Logger - debug bool - trace bool + logger *log.Logger + debug bool + trace bool + logLabel string + fatalLabel string + debugLabel string + traceLabel string } -func NewStdLogger(time, debug, trace bool) *Logger { +func NewStdLogger(time, debug, trace, colors bool) *Logger { flags := 0 if time { flags = log.LstdFlags } - return &Logger{ + l := &Logger{ logger: log.New(os.Stderr, "", flags), debug: debug, trace: trace, } + + if colors { + setColoredLabelFormats(l) + } else { + setPlainLabelFormats(l) + } + + return l } func NewFileLogger(filename string, time, debug, trace bool) *Logger { @@ -37,29 +50,47 @@ func NewFileLogger(filename string, time, debug, trace bool) *Logger { flags = log.LstdFlags } - return &Logger{ + l := &Logger{ logger: log.New(f, "", flags), debug: debug, trace: trace, } + + setPlainLabelFormats(l) + return l +} + +func setPlainLabelFormats(l *Logger) { + l.logLabel = "[LOG] " + l.debugLabel = "[DBG] " + l.fatalLabel = "[ERR] " + l.traceLabel = "[TRA] " +} + +func setColoredLabelFormats(l *Logger) { + colorFormat := "[\x1b[%dm%s\x1b[0m] " + l.logLabel = fmt.Sprintf(colorFormat, 32, "LOG") + l.debugLabel = fmt.Sprintf(colorFormat, 36, "DBG") + l.fatalLabel = fmt.Sprintf(colorFormat, 31, "ERR") + l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRA") } func (l *Logger) Log(format string, v ...interface{}) { - l.logger.Printf(format, v...) + l.logger.Printf(l.logLabel+format, v...) } func (l *Logger) Fatal(format string, v ...interface{}) { - l.logger.Fatalf(format, v) + l.logger.Fatalf(l.fatalLabel+format, v) } func (l *Logger) Debug(format string, v ...interface{}) { if l.debug == true { - l.Log(format, v...) + l.logger.Printf(l.debugLabel+format, v...) } } func (l *Logger) Trace(format string, v ...interface{}) { if l.trace == true { - l.Log(format, v...) + l.logger.Printf(l.traceLabel+format, v...) } } diff --git a/logger/log_test.go b/logger/log_test.go index bdd5629d..7eb1eed3 100644 --- a/logger/log_test.go +++ b/logger/log_test.go @@ -10,7 +10,7 @@ import ( ) func TestStdLogger(t *testing.T) { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) flags := logger.logger.Flags() if flags != 0 { @@ -27,7 +27,7 @@ func TestStdLogger(t *testing.T) { } func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { - logger := NewStdLogger(true, true, true) + logger := NewStdLogger(true, true, true, false) flags := logger.logger.Flags() if flags != log.LstdFlags { @@ -45,35 +45,42 @@ func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { func TestStdLoggerLog(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) logger.Log("foo") - }, "foo\n") + }, "[LOG] foo\n") +} + +func TestStdLoggerLogWithColor(t *testing.T) { + expectOutput(t, func() { + logger := NewStdLogger(false, false, false, true) + logger.Log("foo") + }, "[\x1b[32mLOG\x1b[0m] foo\n") } func TestStdLoggerDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, true, false) + logger := NewStdLogger(false, true, false, false) logger.Debug("foo %s", "bar") - }, "foo bar\n") + }, "[DBG] foo bar\n") } func TestStdLoggerDebugWithOutDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) logger.Debug("foo") }, "") } func TestStdLoggerTrace(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, true) + logger := NewStdLogger(false, false, true, false) logger.Trace("foo") - }, "foo\n") + }, "[TRA] foo\n") } func TestStdLoggerTraceWithOutDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false) + logger := NewStdLogger(false, false, false, false) logger.Trace("foo") }, "") } @@ -99,13 +106,13 @@ func TestFileLogger(t *testing.T) { t.Fatal("Expected a non-zero length logfile") } - if string(buf) != "foo\n" { - t.Fatalf("Expected '%s', received '%s'\n", "foo", string(buf)) + if string(buf) != "[LOG] foo\n" { + t.Fatalf("Expected '%s', received '%s'\n", "[LOG] foo", string(buf)) } } -func expectOutput(t *testing.T, f func(), expect string) { - old := os.Stdout // keep backup of the real stdout +func expectOutput(t *testing.T, f func(), expected string) { + old := os.Stderr // keep backup of the real stdout r, w, _ := os.Pipe() os.Stderr = w @@ -122,7 +129,7 @@ func expectOutput(t *testing.T, f func(), expect string) { os.Stderr.Close() os.Stderr = old // restoring the real stdout out := <-outC - if out != expect { - t.Fatalf("Expected '%s', received '%s'\n", expect, out) + if out != expected { + t.Fatalf("Expected '%s', received '%s'\n", expected, out) } } diff --git a/logger/syslog.go b/logger/syslog.go index f08d4a8e..217396ed 100644 --- a/logger/syslog.go +++ b/logger/syslog.go @@ -48,13 +48,13 @@ func (l *SysLogger) Fatal(format string, v ...interface{}) { } func (l *SysLogger) Debug(format string, v ...interface{}) { - if l.debug == true { + if l.debug { l.writer.Debug(fmt.Sprintf(format, v...)) } } func (l *SysLogger) Trace(format string, v ...interface{}) { - if l.trace == true { + if l.trace { l.writer.Info(fmt.Sprintf(format, v...)) } } diff --git a/logger/syslog_test.go b/logger/syslog_test.go new file mode 100644 index 00000000..db1f1775 --- /dev/null +++ b/logger/syslog_test.go @@ -0,0 +1,135 @@ +package logger + +import ( + "log" + "net" + "strings" + "testing" + "time" +) + +var serverAddr string + +func TestSysLogger(t *testing.T) { + logger := NewSysLogger(false, false) + + if logger.debug != false { + t.Fatalf("Expected %b, received %b\n", false, logger.debug) + } + + if logger.trace != false { + t.Fatalf("Expected %b, received %b\n", false, logger.trace) + } +} + +func TestSysLoggerWithDebugAndTrace(t *testing.T) { + logger := NewSysLogger(true, true) + + if logger.debug != true { + t.Fatalf("Expected %b, received %b\n", true, logger.debug) + } + + if logger.trace != true { + t.Fatalf("Expected %b, received %b\n", true, logger.trace) + } +} + +func TestRemoteSysLogger(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + if logger.debug != true { + t.Fatalf("Expected %b, received %b\n", true, logger.debug) + } + + if logger.trace != true { + t.Fatalf("Expected %b, received %b\n", true, logger.trace) + } +} + +func TestRemoteSysLoggerLog(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + logger.Log("foo %s", "bar") + expectSyslogOutput(t, <-done, "foo bar\n") +} + +func TestRemoteSysLoggerDebug(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + logger.Debug("foo %s", "qux") + expectSyslogOutput(t, <-done, "foo qux\n") +} + +func TestRemoteSysLoggerDebugDisabled(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, false, false) + + logger.Debug("foo %s", "qux") + rcvd := <-done + if rcvd != "" { + t.Fatalf("Unexpected syslog response %s\n", rcvd) + } +} + +func TestRemoteSysLoggerTrace(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, true) + + logger.Trace("foo %s", "qux") + expectSyslogOutput(t, <-done, "foo qux\n") +} + +func TestRemoteSysLoggerTraceDisabled(t *testing.T) { + done := make(chan string) + startServer(done) + logger := NewRemoteSysLogger("udp", serverAddr, true, false) + + logger.Trace("foo %s", "qux") + rcvd := <-done + if rcvd != "" { + t.Fatalf("Unexpected syslog response %s\n", rcvd) + } +} + +func expectSyslogOutput(t *testing.T, line string, expected string) { + data := strings.Split(line, "]: ") + if len(data) != 2 { + t.Fatalf("Unexpected syslog line %s\n", line) + } + + if data[1] != expected { + t.Fatalf("Expected '%s', received '%s'\n", expected, data[1]) + } +} + +func runSyslog(c net.PacketConn, done chan<- string) { + var buf [4096]byte + var rcvd string = "" + for { + n, _, err := c.ReadFrom(buf[:]) + if err != nil || n == 0 { + break + } + rcvd += string(buf[:n]) + } + done <- rcvd +} + +func startServer(done chan<- string) { + c, e := net.ListenPacket("udp", "127.0.0.1:0") + if e != nil { + log.Fatalf("net.ListenPacket failed udp :0 %v", e) + } + + serverAddr = c.LocalAddr().String() + c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + go runSyslog(c, done) +} diff --git a/server/client.go b/server/client.go index 6c37c3c9..23f4267d 100644 --- a/server/client.go +++ b/server/client.go @@ -33,20 +33,19 @@ const ( ) type client struct { - mu sync.Mutex - typ int - cid uint64 - opts clientOpts - nc net.Conn - bw *bufio.Writer - srv *Server - subs *hashmap.HashMap - pcd map[*client]struct{} - atmr *time.Timer - ptmr *time.Timer - trace bool - pout int - msgb [msgScratchSize]byte + mu sync.Mutex + typ int + cid uint64 + opts clientOpts + nc net.Conn + bw *bufio.Writer + srv *Server + subs *hashmap.HashMap + pcd map[*client]struct{} + atmr *time.Timer + ptmr *time.Timer + pout int + msgb [msgScratchSize]byte parseState stats @@ -54,12 +53,19 @@ type client struct { } func (c *client) String() (id string) { + conn := "-" + if ip, ok := c.nc.(*net.TCPConn); ok { + addr := ip.RemoteAddr().(*net.TCPAddr) + conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port) + } + switch c.typ { case CLIENT: - id = fmt.Sprintf("cid:%d", c.cid) + id = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: - id = fmt.Sprintf("rid:%d", c.cid) + id = fmt.Sprintf("%s - rid:%d", conn, c.cid) } + return id } @@ -88,14 +94,6 @@ func init() { rand.Seed(time.Now().UnixNano()) } -func clientConnStr(conn net.Conn) interface{} { - if ip, ok := conn.(*net.TCPConn); ok { - addr := ip.RemoteAddr().(*net.TCPAddr) - return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)} - } - return "N/A" -} - // Lock should be held func (c *client) initClient() { s := c.srv @@ -146,7 +144,7 @@ func (c *client) readLoop() { return } if err := c.parse(b[:n]); err != nil { - log.Log(err.Error(), clientConnStr(c.nc), c.cid) + Log("Error reading from client: %s", err.Error(), c) // Auth was handled inline if err != ErrAuthorization { c.sendErr("Parser Error") @@ -163,7 +161,7 @@ func (c *client) readLoop() { err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { - log.Debug("Error flushing: %v", err) + Debug("Error flushing: %v", err) cp.mu.Unlock() cp.closeConnection() cp.mu.Lock() @@ -180,17 +178,17 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { - if c.trace != true { + if trace == 0 { return } pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])} - log.Trace("[cid: %d] MSG: %s", c.cid, opa) + Trace("MSG: %s", opa, c) } func (c *client) traceOp(op string, arg []byte) { - if c.trace != true { + if trace == 0 { return } @@ -198,7 +196,7 @@ func (c *client) traceOp(op string, arg []byte) { if arg != nil { opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) } - log.Trace("[cid: %d] OP: %s", c.cid, opa) + Trace("OP: %s", opa, c) } // Process the info message if we are a route. @@ -216,14 +214,11 @@ func (c *client) processRouteInfo(info *Info) { c.mu.Unlock() if s.addRoute(c) { - log.Debug("[cid: %d] Registering remote route '%s'", c.cid, info.ID) + Debug("Registering remote route %q", info.ID, c) // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) } else { - log.Debug( - "[cid: %d] Detected duplicate remote route '%s', %s", - c.cid, info.ID, clientConnStr(c.nc), - ) + Debug("Detected duplicate remote route %q", info.ID, c) c.closeConnection() } } @@ -241,7 +236,7 @@ func (c *client) processInfo(arg []byte) error { } func (c *client) processErr(errStr string) { - log.Log(errStr, clientConnStr(c.nc), c.cid) + Log("Client error %s", errStr, c) c.closeConnection() } @@ -311,10 +306,7 @@ func (c *client) processPing() { err := c.bw.Flush() if err != nil { c.clearConnection() - log.Debug( - "[cid: %d] Error on Flush, error %s, %s", - c.cid, err.Error(), clientConnStr(c.nc), - ) + Debug("Error on Flush, error %s", err.Error(), c) } c.mu.Unlock() } @@ -327,7 +319,7 @@ func (c *client) processPong() { } func (c *client) processMsgArgs(arg []byte) error { - if c.trace == true { + if trace == 0 { c.traceOp("MSG", arg) } @@ -374,7 +366,7 @@ func (c *client) processMsgArgs(arg []byte) error { } func (c *client) processPub(arg []byte) error { - if c.trace == true { + if trace == 0 { c.traceOp("PUB", arg) } @@ -491,9 +483,9 @@ func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() if sub.max > 0 && sub.nm < sub.max { - log.Debug( - "[cid: %d] Deferring actual UNSUB(%s): %d max, %d received\n", - c.cid, string(sub.subject), sub.max, sub.nm, + Debug( + "Deferring actual UNSUB(%s): %d max, %d received\n", + string(sub.subject), sub.max, sub.nm, c, ) return } @@ -636,10 +628,10 @@ writeErr: client.mu.Unlock() if ne, ok := err.(net.Error); ok && ne.Timeout() { - log.Log("[cid: %d] Slow Consumer Detected %s", client.cid, clientConnStr(client.nc)) + Log("Slow Consumer Detected", c) client.closeConnection() } else { - log.Debug("[cid: %d] Error writing msg: %v", client.cid, err) + Debug("Error writing msg: %v", err, c) } } @@ -662,7 +654,7 @@ func (c *client) processMsg(msg []byte) { atomic.AddInt64(&srv.inBytes, msgSize) } - if c.trace == true { + if trace > 0 { c.traceMsg(msg) } if srv == nil { @@ -747,14 +739,11 @@ func (c *client) processMsg(msg []byte) { } if sub.client == nil || sub.client.nc == nil || sub.client.route == nil || sub.client.route.remoteID == "" { - log.Debug( - "[cid: %d] Bad or Missing ROUTER Identity, not processing msg, %s", - c.cid, clientConnStr(c.nc), - ) + Debug("Bad or Missing ROUTER Identity, not processing msg", c) continue } if _, ok := rmap[sub.client.route.remoteID]; ok { - log.Debug("[cid: %d] Ignoring route, already processed", c.cid) + Debug("Ignoring route, already processed", c) continue } rmap[sub.client.route.remoteID] = routeSeen @@ -783,12 +772,12 @@ func (c *client) processPingTimer() { return } - log.Debug("Client Ping Timer", clientConnStr(c.nc), c.cid) + Debug("Client Ping Timer", c) // Check for violation c.pout += 1 if c.pout > c.srv.opts.MaxPingsOut { - log.Debug("[cid: %d] Stale Client Connection - Closing %s", c.cid, clientConnStr(c.nc)) + Debug("Stale Client Connection - Closing", c) if c.bw != nil { c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")) c.bw.Flush() @@ -801,7 +790,7 @@ func (c *client) processPingTimer() { c.bw.WriteString("PING\r\n") err := c.bw.Flush() if err != nil { - log.Debug("[cid: %d] Error on Client Ping Flush, error %s %s", c.cid, err, clientConnStr(c.nc)) + Debug("Error on Client Ping Flush, error %s", err) c.clearConnection() } else { // Reset to fire again if all OK. @@ -873,7 +862,7 @@ func (c *client) closeConnection() { return } - log.Debug("[cid: %d] %s connection closed: %s", c.cid, c.typeString(), clientConnStr(c.nc)) + Debug("%s connection closed", c.typeString(), c) c.clearAuthTimer() c.clearPingTimer() @@ -910,10 +899,10 @@ func (c *client) closeConnection() { defer srv.mu.Unlock() rid := c.route.remoteID if rid != "" && srv.remotes[rid] != nil { - log.Debug("[cid: %d] Not attempting reconnect for solicited route, already connected. Try %d", c.cid, rid) + Debug("Not attempting reconnect for solicited route, already connected. Try %d", rid, c) return } else { - log.Debug("[cid: %d] Attempting reconnect for solicited route", c.cid) + Debug("Attempting reconnect for solicited route", c) go srv.reConnectToRoute(c.route.url) } } diff --git a/server/log.go b/server/log.go index 2597056b..fb3efee8 100644 --- a/server/log.go +++ b/server/log.go @@ -2,7 +2,18 @@ package server -var log Logger = &NilLogger{} +import ( + "fmt" + "sync" + "sync/atomic" +) + +var trace int32 +var debug int32 +var log = struct { + logger Logger + sync.Mutex +}{} type Logger interface { Log(format string, v ...interface{}) @@ -11,13 +22,66 @@ type Logger interface { Trace(format string, v ...interface{}) } -func (s *Server) SetLogger(logger Logger) { - log = logger +func (s *Server) SetLogger(logger Logger, d, t bool) { + if d { + atomic.StoreInt32(&debug, 1) + } + + if t { + atomic.StoreInt32(&trace, 1) + } + + log.Lock() + defer log.Unlock() + log.logger = logger } -type NilLogger struct{} +func Log(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Log(format, v...) + }, format, v...) +} -func (l *NilLogger) Log(format string, v ...interface{}) {} -func (l *NilLogger) Fatal(format string, v ...interface{}) {} -func (l *NilLogger) Debug(format string, v ...interface{}) {} -func (l *NilLogger) Trace(format string, v ...interface{}) {} +func Fatal(format string, v ...interface{}) { + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Fatal(format, v...) + }, format, v...) +} + +func Debug(format string, v ...interface{}) { + if debug == 0 { + return + } + + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Debug(format, v...) + }, format, v...) +} + +func Trace(format string, v ...interface{}) { + if trace == 0 { + return + } + + executeLogCall(func(logger Logger, format string, v ...interface{}) { + logger.Trace(format, v...) + }, format, v...) +} + +func executeLogCall(f func(logger Logger, format string, v ...interface{}), format string, args ...interface{}) { + log.Lock() + defer log.Unlock() + if log.logger == nil { + return + } + + argc := len(args) + if argc != 0 { + if client, ok := args[argc-1].(*client); ok { + args = args[:argc-1] + format = fmt.Sprintf("%s - %s", client, format) + } + } + + f(log.logger, format, args...) +} diff --git a/server/log_test.go b/server/log_test.go index 273ad658..3baf6870 100644 --- a/server/log_test.go +++ b/server/log_test.go @@ -7,14 +7,19 @@ import ( ) func TestSetLogger(t *testing.T) { - // We assert that the default logger is the NilLogger - _ = log.(*NilLogger) - server := &Server{} - server.SetLogger(&DummyLogger{}) + server.SetLogger(&DummyLogger{}, true, true) // We assert that the logger has change to the DummyLogger - _ = log.(*DummyLogger) + _ = log.logger.(*DummyLogger) + + if debug != 1 { + t.Fatalf("Expected debug 1, received value %d\n", debug) + } + + if trace != 1 { + t.Fatalf("Expected trace 1, received value %d\n", trace) + } } type DummyLogger struct{} diff --git a/server/monitor.go b/server/monitor.go index 4da019bd..d2c235bc 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -88,7 +88,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(c, "", " ") if err != nil { - log.Log("Error marshalling response to /connz request: %v", err) + Log("Error marshalling response to /connz request: %v", err) } w.Write(b) } @@ -114,7 +114,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(st, "", " ") if err != nil { - log.Log("Error marshalling response to /subscriptionsz request: %v", err) + Log("Error marshalling response to /subscriptionsz request: %v", err) } w.Write(b) } @@ -161,7 +161,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { b, err := json.MarshalIndent(v, "", " ") if err != nil { - log.Log("Error marshalling response to /varz request: %v", err) + Log("Error marshalling response to /varz request: %v", err) } w.Write(b) } diff --git a/server/opts.go b/server/opts.go index 6383e404..e7d7438d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -220,7 +220,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) } if cport == port && isIpInList(selfIPs, getUrlIp(host)) { - log.Log("Self referencing IP found: ", r) + Log("Self referencing IP found: ", r) continue } cleanRoutes = append(cleanRoutes, r) @@ -251,7 +251,7 @@ func getUrlIp(ipStr string) []net.IP { hostAddr, err := net.LookupHost(ipStr) if err != nil { - log.Log("Error looking up host with route hostname: ", err) + Log("Error looking up host with route hostname: ", err) return ipList } for _, addr := range hostAddr { @@ -268,7 +268,7 @@ func getInterfaceIPs() []net.IP { interfaceAddr, err := net.InterfaceAddrs() if err != nil { - log.Log("Error getting self referencing address: ", err) + Log("Error getting self referencing address: ", err) return localIPs } @@ -277,7 +277,7 @@ func getInterfaceIPs() []net.IP { if net.ParseIP(interfaceIP.String()) != nil { localIPs = append(localIPs, interfaceIP) } else { - log.Log("Error parsing self referencing address: ", err) + Log("Error parsing self referencing address: ", err) } } return localIPs diff --git a/server/route.go b/server/route.go index 19e84eeb..ce474f70 100644 --- a/server/route.go +++ b/server/route.go @@ -46,7 +46,7 @@ func (c *client) sendConnect() { } b, err := json.Marshal(cinfo) if err != nil { - log.Log("Error marshalling CONNECT to route: %v\n", err) + Log("Error marshalling CONNECT to route: %v\n", err) c.closeConnection() } c.bw.WriteString(fmt.Sprintf(conProto, b)) @@ -79,7 +79,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) { route.bw.Write(b.Bytes()) route.bw.Flush() - log.Debug("Route sent local subscriptions", route.cid) + Debug("Route sent local subscriptions", route) } func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { @@ -93,12 +93,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Initialize c.initClient() - log.Debug("Route connection created", clientConnStr(c.nc), c.cid) + Debug("Route connection created", c) // Queue Connect proto if we solicited the connection. if didSolicit { r.url = rURL - log.Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) + Debug("Route connect msg sent", c) c.sendConnect() } @@ -234,10 +234,10 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { func (s *Server) routeAcceptLoop(ch chan struct{}) { hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort) - log.Log("Listening for route connections on %s", hp) + Log("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - log.Fatal("Error listening on router port: %d - %v", s.opts.Port, e) + Fatal("Error listening on router port: %d - %v", s.opts.Port, e) return } @@ -255,7 +255,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Debug("Temporary Route Accept Error(%v), sleeping %dms", + Debug("Temporary Route Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -263,14 +263,14 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - log.Log("Accept error: %v", err) + Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createRoute(conn, nil) } - log.Debug("Router accept loop exiting..") + Debug("Router accept loop exiting..") s.done <- true } @@ -294,7 +294,7 @@ func (s *Server) StartRouting() { // Generate the info json b, err := json.Marshal(info) if err != nil { - log.Fatal("Error marshalling Route INFO JSON: %+v\n", err) + Fatal("Error marshalling Route INFO JSON: %+v\n", err) } s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) @@ -314,10 +314,10 @@ func (s *Server) reConnectToRoute(rUrl *url.URL) { func (s *Server) connectToRoute(rUrl *url.URL) { for s.isRunning() { - log.Debug("Trying to connect to route on %s", rUrl.Host) + Debug("Trying to connect to route on %s", rUrl.Host) conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) if err != nil { - log.Debug("Error trying to connect to route: %v", err) + Debug("Error trying to connect to route: %v", err) select { case <-s.rcQuit: return diff --git a/server/server.go b/server/server.go index ffef2aa4..e9115d1f 100644 --- a/server/server.go +++ b/server/server.go @@ -111,7 +111,7 @@ func New(opts *Options) *Server { // Generate the info json b, err := json.Marshal(s.info) if err != nil { - log.Fatal("Error marshalling INFO JSON: %+v\n", err) + Fatal("Error marshalling INFO JSON: %+v\n", err) } s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) @@ -140,9 +140,9 @@ func (s *Server) handleSignals() { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - log.Debug("Trapped Signal; %v", sig) + Debug("Trapped Signal; %v", sig) // FIXME, trip running? - log.Log("Server Exiting..") + Log("Server Exiting..") os.Exit(0) } }() @@ -166,7 +166,7 @@ func (s *Server) logPid() { // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { - log.Log("Starting gnatsd version %s", VERSION) + Log("Starting gnatsd version %s", VERSION) s.running = true // Log the pid to a file @@ -261,14 +261,14 @@ func (s *Server) Shutdown() { // AcceptLoop is exported for easier testing. func (s *Server) AcceptLoop() { hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) - log.Log("Listening for client connections on %s", hp) + Log("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - log.Fatal("Error listening on port: %d - %v", s.opts.Port, e) + Fatal("Error listening on port: %d - %v", s.opts.Port, e) return } - log.Log("gnatsd is ready") + Log("gnatsd is ready") // Setup state that can enable shutdown s.mu.Lock() @@ -278,12 +278,12 @@ func (s *Server) AcceptLoop() { // Write resolved port back to options. _, port, err := net.SplitHostPort(l.Addr().String()) if err != nil { - log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) + Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } portNum, err := strconv.Atoi(port) if err != nil { - log.Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) + Fatal("Error parsing server address (%s): %s", l.Addr().String(), e) return } s.opts.Port = portNum @@ -294,7 +294,7 @@ func (s *Server) AcceptLoop() { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Debug("Temporary Client Accept Error(%v), sleeping %dms", + Debug("Temporary Client Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -302,39 +302,39 @@ func (s *Server) AcceptLoop() { tmpDelay = ACCEPT_MAX_SLEEP } } else if s.isRunning() { - log.Log("Accept error: %v", err) + Log("Accept error: %v", err) } continue } tmpDelay = ACCEPT_MIN_SLEEP s.createClient(conn) } - log.Log("Server Exiting..") + Log("Server Exiting..") s.done <- true } // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { - log.Log("Starting profiling on http port %d", s.opts.ProfPort) + Log("Starting profiling on http port %d", s.opts.ProfPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.ProfPort) go func() { err := http.ListenAndServe(hp, nil) if err != nil { - log.Fatal("error starting monitor server: %s", err) + Fatal("error starting monitor server: %s", err) } }() } // StartHTTPMonitoring will enable the HTTP monitoring port. func (s *Server) StartHTTPMonitoring() { - log.Log("Starting http monitor on port %d", s.opts.HTTPPort) + Log("Starting http monitor on port %d", s.opts.HTTPPort) hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HTTPPort) l, err := net.Listen("tcp", hp) if err != nil { - log.Fatal("Can't listen to the monitor port: %v", err) + Fatal("Can't listen to the monitor port: %v", err) } mux := http.NewServeMux() @@ -366,7 +366,7 @@ func (s *Server) StartHTTPMonitoring() { } func (s *Server) createClient(conn net.Conn) *client { - c := &client{srv: s, nc: conn, trace: s.opts.Trace, opts: defaultOpts} + c := &client{srv: s, nc: conn, opts: defaultOpts} // Grab lock c.mu.Lock() @@ -374,7 +374,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Initialize c.initClient() - log.Debug("[cid: %d] Client connection created: %s", c.cid, clientConnStr(c.nc)) + Debug("Client connection created", c) // Send our information. s.sendInfo(c)