mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge pull request #54 from mcuadros/logger
logging system, syslog and abstraction improvements
This commit is contained in:
@@ -16,6 +16,8 @@ Server options:
|
||||
Logging options:
|
||||
-l, --log FILE File to redirect log output
|
||||
-T, --logtime Timestamp log entries (default: true)
|
||||
-s, --syslog Enable syslog as log method.
|
||||
-r, --remote_syslog Syslog server addr (udp://localhost:514).
|
||||
-D, --debug Enable debugging output
|
||||
-V, --trace Trace the raw protocol
|
||||
|
||||
|
||||
27
gnatsd.go
27
gnatsd.go
@@ -6,13 +6,11 @@ import (
|
||||
"flag"
|
||||
"strings"
|
||||
|
||||
"github.com/apcera/gnatsd/logger"
|
||||
"github.com/apcera/gnatsd/server"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// logging setup
|
||||
server.LogSetup()
|
||||
|
||||
// Server Options
|
||||
opts := server.Options{}
|
||||
|
||||
@@ -44,6 +42,10 @@ func main() {
|
||||
flag.StringVar(&opts.PidFile, "pid", "", "File to store process pid.")
|
||||
flag.StringVar(&opts.LogFile, "l", "", "File to store logging output.")
|
||||
flag.StringVar(&opts.LogFile, "log", "", "File to store logging output.")
|
||||
flag.BoolVar(&opts.Syslog, "s", false, "Enable syslog as log method.")
|
||||
flag.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method..")
|
||||
flag.StringVar(&opts.RemoteSyslog, "r", "", "Syslog server addr (udp://localhost:514).")
|
||||
flag.StringVar(&opts.RemoteSyslog, "remote_syslog", "", "Syslog server addr (udp://localhost:514).")
|
||||
flag.BoolVar(&showVersion, "version", false, "Print version information.")
|
||||
flag.BoolVar(&showVersion, "v", false, "Print version information.")
|
||||
flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port")
|
||||
@@ -92,6 +94,25 @@ func main() {
|
||||
// Create the server with appropriate options.
|
||||
s := server.New(&opts)
|
||||
|
||||
// Builds and set the logger based on the flags
|
||||
s.SetLogger(buildLogger(&opts), opts.Debug, opts.Trace)
|
||||
|
||||
// Start things up. Block here until done.
|
||||
s.Start()
|
||||
}
|
||||
|
||||
func buildLogger(opts *server.Options) server.Logger {
|
||||
if opts.LogFile != "" {
|
||||
return logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace)
|
||||
}
|
||||
|
||||
if opts.RemoteSyslog != "" {
|
||||
return logger.NewRemoteSysLogger(opts.RemoteSyslog, opts.Debug, opts.Trace)
|
||||
}
|
||||
|
||||
if opts.Syslog {
|
||||
return logger.NewSysLogger(opts.Debug, opts.Trace)
|
||||
}
|
||||
|
||||
return logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, true)
|
||||
}
|
||||
|
||||
103
logger/log.go
Normal file
103
logger/log.go
Normal file
@@ -0,0 +1,103 @@
|
||||
// Copyright 2012-2014 Apcera Inc. All rights reserved.
|
||||
package logger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Logger struct {
|
||||
logger *log.Logger
|
||||
debug bool
|
||||
trace bool
|
||||
infoLabel string
|
||||
errorLabel string
|
||||
fatalLabel string
|
||||
debugLabel string
|
||||
traceLabel string
|
||||
}
|
||||
|
||||
func NewStdLogger(time, debug, trace, colors bool) *Logger {
|
||||
flags := 0
|
||||
if time {
|
||||
flags = log.LstdFlags
|
||||
}
|
||||
|
||||
l := &Logger{
|
||||
logger: log.New(os.Stderr, "", flags),
|
||||
debug: debug,
|
||||
trace: trace,
|
||||
}
|
||||
|
||||
if colors {
|
||||
setColoredLabelFormats(l)
|
||||
} else {
|
||||
setPlainLabelFormats(l)
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
func NewFileLogger(filename string, time, debug, trace bool) *Logger {
|
||||
fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
|
||||
f, err := os.OpenFile(filename, fileflags, 0660)
|
||||
if err != nil {
|
||||
log.Fatal("error opening file: %v", err)
|
||||
}
|
||||
|
||||
flags := 0
|
||||
if time {
|
||||
flags = log.LstdFlags
|
||||
}
|
||||
|
||||
l := &Logger{
|
||||
logger: log.New(f, "", flags),
|
||||
debug: debug,
|
||||
trace: trace,
|
||||
}
|
||||
|
||||
setPlainLabelFormats(l)
|
||||
return l
|
||||
}
|
||||
|
||||
func setPlainLabelFormats(l *Logger) {
|
||||
l.infoLabel = "[INFO] "
|
||||
l.debugLabel = "[DEBUG] "
|
||||
l.errorLabel = "[ERROR] "
|
||||
l.fatalLabel = "[FATAL] "
|
||||
l.traceLabel = "[TRACE] "
|
||||
}
|
||||
|
||||
func setColoredLabelFormats(l *Logger) {
|
||||
colorFormat := "[\x1b[%dm%s\x1b[0m] "
|
||||
l.infoLabel = fmt.Sprintf(colorFormat, 32, "INFO")
|
||||
l.debugLabel = fmt.Sprintf(colorFormat, 36, "DEBUG")
|
||||
l.errorLabel = fmt.Sprintf(colorFormat, 31, "ERROR")
|
||||
l.fatalLabel = fmt.Sprintf(colorFormat, 35, "FATAL")
|
||||
l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRACE")
|
||||
}
|
||||
|
||||
func (l *Logger) Notice(format string, v ...interface{}) {
|
||||
l.logger.Printf(l.infoLabel+format, v...)
|
||||
}
|
||||
|
||||
func (l *Logger) Error(format string, v ...interface{}) {
|
||||
l.logger.Printf(l.errorLabel+format, v...)
|
||||
}
|
||||
|
||||
func (l *Logger) Fatal(format string, v ...interface{}) {
|
||||
l.logger.Fatalf(l.fatalLabel+format, v)
|
||||
}
|
||||
|
||||
func (l *Logger) Debug(format string, v ...interface{}) {
|
||||
if l.debug == true {
|
||||
l.logger.Printf(l.debugLabel+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Logger) Trace(format string, v ...interface{}) {
|
||||
if l.trace == true {
|
||||
l.logger.Printf(l.traceLabel+format, v...)
|
||||
}
|
||||
}
|
||||
135
logger/log_test.go
Normal file
135
logger/log_test.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStdLogger(t *testing.T) {
|
||||
logger := NewStdLogger(false, false, false, false)
|
||||
|
||||
flags := logger.logger.Flags()
|
||||
if flags != 0 {
|
||||
t.Fatalf("Expected %q, received %q\n", 0, flags)
|
||||
}
|
||||
|
||||
if logger.debug {
|
||||
t.Fatalf("Expected %t, received %t\n", false, logger.debug)
|
||||
}
|
||||
|
||||
if logger.trace {
|
||||
t.Fatalf("Expected %t, received %t\n", false, logger.trace)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStdLoggerWithDebugTraceAndTime(t *testing.T) {
|
||||
logger := NewStdLogger(true, true, true, false)
|
||||
|
||||
flags := logger.logger.Flags()
|
||||
if flags != log.LstdFlags {
|
||||
t.Fatalf("Expected %d, received %d\n", log.LstdFlags, flags)
|
||||
}
|
||||
|
||||
if !logger.debug {
|
||||
t.Fatalf("Expected %t, received %t\n", true, logger.debug)
|
||||
}
|
||||
|
||||
if !logger.trace {
|
||||
t.Fatalf("Expected %t, received %t\n", true, logger.trace)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStdLoggerNotice(t *testing.T) {
|
||||
expectOutput(t, func() {
|
||||
logger := NewStdLogger(false, false, false, false)
|
||||
logger.Notice("foo")
|
||||
}, "[INFO] foo\n")
|
||||
}
|
||||
|
||||
func TestStdLoggerNoticeWithColor(t *testing.T) {
|
||||
expectOutput(t, func() {
|
||||
logger := NewStdLogger(false, false, false, true)
|
||||
logger.Notice("foo")
|
||||
}, "[\x1b[32mINFO\x1b[0m] foo\n")
|
||||
}
|
||||
|
||||
func TestStdLoggerDebug(t *testing.T) {
|
||||
expectOutput(t, func() {
|
||||
logger := NewStdLogger(false, true, false, false)
|
||||
logger.Debug("foo %s", "bar")
|
||||
}, "[DEBUG] foo bar\n")
|
||||
}
|
||||
|
||||
func TestStdLoggerDebugWithOutDebug(t *testing.T) {
|
||||
expectOutput(t, func() {
|
||||
logger := NewStdLogger(false, false, false, false)
|
||||
logger.Debug("foo")
|
||||
}, "")
|
||||
}
|
||||
|
||||
func TestStdLoggerTrace(t *testing.T) {
|
||||
expectOutput(t, func() {
|
||||
logger := NewStdLogger(false, false, true, false)
|
||||
logger.Trace("foo")
|
||||
}, "[TRACE] foo\n")
|
||||
}
|
||||
|
||||
func TestStdLoggerTraceWithOutDebug(t *testing.T) {
|
||||
expectOutput(t, func() {
|
||||
logger := NewStdLogger(false, false, false, false)
|
||||
logger.Trace("foo")
|
||||
}, "")
|
||||
}
|
||||
|
||||
func TestFileLogger(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "_gnatsd")
|
||||
if err != nil {
|
||||
t.Fatal("Could not create tmp dir")
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
file, err := ioutil.TempFile(tmpDir, "gnatsd:log_")
|
||||
file.Close()
|
||||
|
||||
logger := NewFileLogger(file.Name(), false, false, false)
|
||||
logger.Notice("foo")
|
||||
|
||||
buf, err := ioutil.ReadFile(file.Name())
|
||||
if err != nil {
|
||||
t.Fatalf("Could not read logfile: %v", err)
|
||||
}
|
||||
if len(buf) <= 0 {
|
||||
t.Fatal("Expected a non-zero length logfile")
|
||||
}
|
||||
|
||||
if string(buf) != "[INFO] foo\n" {
|
||||
t.Fatalf("Expected '%s', received '%s'\n", "[INFO] foo", string(buf))
|
||||
}
|
||||
}
|
||||
|
||||
func expectOutput(t *testing.T, f func(), expected string) {
|
||||
old := os.Stderr // keep backup of the real stdout
|
||||
r, w, _ := os.Pipe()
|
||||
os.Stderr = w
|
||||
|
||||
f()
|
||||
|
||||
outC := make(chan string)
|
||||
// copy the output in a separate goroutine so printing can't block indefinitely
|
||||
go func() {
|
||||
var buf bytes.Buffer
|
||||
io.Copy(&buf, r)
|
||||
outC <- buf.String()
|
||||
}()
|
||||
|
||||
os.Stderr.Close()
|
||||
os.Stderr = old // restoring the real stdout
|
||||
out := <-outC
|
||||
if out != expected {
|
||||
t.Fatalf("Expected '%s', received '%s'\n", expected, out)
|
||||
}
|
||||
}
|
||||
84
logger/syslog.go
Normal file
84
logger/syslog.go
Normal file
@@ -0,0 +1,84 @@
|
||||
// Copyright 2012-2014 Apcera Inc. All rights reserved.
|
||||
package logger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"log/syslog"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type SysLogger struct {
|
||||
writer *syslog.Writer
|
||||
debug bool
|
||||
trace bool
|
||||
}
|
||||
|
||||
func NewSysLogger(debug, trace bool) *SysLogger {
|
||||
w, err := syslog.New(syslog.LOG_DAEMON|syslog.LOG_NOTICE, "gnatsd")
|
||||
if err != nil {
|
||||
log.Fatalf("error connecting to syslog: %q", err.Error())
|
||||
}
|
||||
|
||||
return &SysLogger{
|
||||
writer: w,
|
||||
debug: debug,
|
||||
trace: trace,
|
||||
}
|
||||
}
|
||||
|
||||
func NewRemoteSysLogger(fqn string, debug, trace bool) *SysLogger {
|
||||
network, addr := getNetworkAndAddr(fqn)
|
||||
w, err := syslog.Dial(network, addr, syslog.LOG_DEBUG, "gnatsd")
|
||||
if err != nil {
|
||||
log.Fatalf("error connecting to syslog: %q", err.Error())
|
||||
}
|
||||
|
||||
return &SysLogger{
|
||||
writer: w,
|
||||
debug: debug,
|
||||
trace: trace,
|
||||
}
|
||||
}
|
||||
|
||||
func getNetworkAndAddr(fqn string) (network, addr string) {
|
||||
u, err := url.Parse(fqn)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
network = u.Scheme
|
||||
if network == "udp" || network == "tcp" {
|
||||
addr = u.Host
|
||||
} else if network == "unix" {
|
||||
addr = u.Path
|
||||
} else {
|
||||
log.Fatalf("error invalid network type: %q", u.Scheme)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (l *SysLogger) Notice(format string, v ...interface{}) {
|
||||
l.writer.Notice(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
func (l *SysLogger) Fatal(format string, v ...interface{}) {
|
||||
l.writer.Crit(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
func (l *SysLogger) Error(format string, v ...interface{}) {
|
||||
l.writer.Err(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
func (l *SysLogger) Debug(format string, v ...interface{}) {
|
||||
if l.debug {
|
||||
l.writer.Debug(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
func (l *SysLogger) Trace(format string, v ...interface{}) {
|
||||
if l.trace {
|
||||
l.writer.Notice(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
171
logger/syslog_test.go
Normal file
171
logger/syslog_test.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var serverFQN string
|
||||
|
||||
func TestSysLogger(t *testing.T) {
|
||||
logger := NewSysLogger(false, false)
|
||||
|
||||
if logger.debug {
|
||||
t.Fatalf("Expected %t, received %t\n", false, logger.debug)
|
||||
}
|
||||
|
||||
if logger.trace {
|
||||
t.Fatalf("Expected %t, received %t\n", false, logger.trace)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSysLoggerWithDebugAndTrace(t *testing.T) {
|
||||
logger := NewSysLogger(true, true)
|
||||
|
||||
if !logger.debug {
|
||||
t.Fatalf("Expected %t, received %t\n", true, logger.debug)
|
||||
}
|
||||
|
||||
if !logger.trace {
|
||||
t.Fatalf("Expected %t, received %t\n", true, logger.trace)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteSysLogger(t *testing.T) {
|
||||
done := make(chan string)
|
||||
startServer(done)
|
||||
logger := NewRemoteSysLogger(serverFQN, true, true)
|
||||
|
||||
if !logger.debug {
|
||||
t.Fatalf("Expected %t, received %t\n", true, logger.debug)
|
||||
}
|
||||
|
||||
if !logger.trace {
|
||||
t.Fatalf("Expected %t, received %t\n", true, logger.trace)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteSysLoggerNotice(t *testing.T) {
|
||||
done := make(chan string)
|
||||
startServer(done)
|
||||
logger := NewRemoteSysLogger(serverFQN, true, true)
|
||||
|
||||
logger.Notice("foo %s", "bar")
|
||||
expectSyslogOutput(t, <-done, "foo bar\n")
|
||||
}
|
||||
|
||||
func TestRemoteSysLoggerDebug(t *testing.T) {
|
||||
done := make(chan string)
|
||||
startServer(done)
|
||||
logger := NewRemoteSysLogger(serverFQN, true, true)
|
||||
|
||||
logger.Debug("foo %s", "qux")
|
||||
expectSyslogOutput(t, <-done, "foo qux\n")
|
||||
}
|
||||
|
||||
func TestRemoteSysLoggerDebugDisabled(t *testing.T) {
|
||||
done := make(chan string)
|
||||
startServer(done)
|
||||
logger := NewRemoteSysLogger(serverFQN, false, false)
|
||||
|
||||
logger.Debug("foo %s", "qux")
|
||||
rcvd := <-done
|
||||
if rcvd != "" {
|
||||
t.Fatalf("Unexpected syslog response %s\n", rcvd)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteSysLoggerTrace(t *testing.T) {
|
||||
done := make(chan string)
|
||||
startServer(done)
|
||||
logger := NewRemoteSysLogger(serverFQN, true, true)
|
||||
|
||||
logger.Trace("foo %s", "qux")
|
||||
expectSyslogOutput(t, <-done, "foo qux\n")
|
||||
}
|
||||
|
||||
func TestRemoteSysLoggerTraceDisabled(t *testing.T) {
|
||||
done := make(chan string)
|
||||
startServer(done)
|
||||
logger := NewRemoteSysLogger(serverFQN, true, false)
|
||||
|
||||
logger.Trace("foo %s", "qux")
|
||||
rcvd := <-done
|
||||
if rcvd != "" {
|
||||
t.Fatalf("Unexpected syslog response %s\n", rcvd)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNetworkAndAddrUDP(t *testing.T) {
|
||||
n, a := getNetworkAndAddr("udp://foo.com:1000")
|
||||
|
||||
if n != "udp" {
|
||||
t.Fatalf("Unexpected network %s\n", n)
|
||||
}
|
||||
|
||||
if a != "foo.com:1000" {
|
||||
t.Fatalf("Unexpected addr %s\n", a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNetworkAndAddrTCP(t *testing.T) {
|
||||
n, a := getNetworkAndAddr("tcp://foo.com:1000")
|
||||
|
||||
if n != "tcp" {
|
||||
t.Fatalf("Unexpected network %s\n", n)
|
||||
}
|
||||
|
||||
if a != "foo.com:1000" {
|
||||
t.Fatalf("Unexpected addr %s\n", a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNetworkAndAddrUnix(t *testing.T) {
|
||||
n, a := getNetworkAndAddr("unix:///foo.sock")
|
||||
|
||||
if n != "unix" {
|
||||
t.Fatalf("Unexpected network %s\n", n)
|
||||
}
|
||||
|
||||
if a != "/foo.sock" {
|
||||
t.Fatalf("Unexpected addr %s\n", a)
|
||||
}
|
||||
}
|
||||
func expectSyslogOutput(t *testing.T, line string, expected string) {
|
||||
data := strings.Split(line, "]: ")
|
||||
if len(data) != 2 {
|
||||
t.Fatalf("Unexpected syslog line %s\n", line)
|
||||
}
|
||||
|
||||
if data[1] != expected {
|
||||
t.Fatalf("Expected '%s', received '%s'\n", expected, data[1])
|
||||
}
|
||||
}
|
||||
|
||||
func runSyslog(c net.PacketConn, done chan<- string) {
|
||||
var buf [4096]byte
|
||||
var rcvd string = ""
|
||||
for {
|
||||
n, _, err := c.ReadFrom(buf[:])
|
||||
if err != nil || n == 0 {
|
||||
break
|
||||
}
|
||||
rcvd += string(buf[:n])
|
||||
}
|
||||
done <- rcvd
|
||||
}
|
||||
|
||||
func startServer(done chan<- string) {
|
||||
c, e := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
if e != nil {
|
||||
log.Fatalf("net.ListenPacket failed udp :0 %v", e)
|
||||
}
|
||||
|
||||
serverFQN = fmt.Sprintf("udp://%s", c.LocalAddr().String())
|
||||
c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
go runSyslog(c, done)
|
||||
}
|
||||
@@ -53,12 +53,19 @@ type client struct {
|
||||
}
|
||||
|
||||
func (c *client) String() (id string) {
|
||||
conn := "-"
|
||||
if ip, ok := c.nc.(*net.TCPConn); ok {
|
||||
addr := ip.RemoteAddr().(*net.TCPAddr)
|
||||
conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port)
|
||||
}
|
||||
|
||||
switch c.typ {
|
||||
case CLIENT:
|
||||
id = fmt.Sprintf("cid:%d", c.cid)
|
||||
id = fmt.Sprintf("%s - cid:%d", conn, c.cid)
|
||||
case ROUTER:
|
||||
id = fmt.Sprintf("rid:%d", c.cid)
|
||||
id = fmt.Sprintf("%s - rid:%d", conn, c.cid)
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
@@ -87,14 +94,6 @@ func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func clientConnStr(conn net.Conn) interface{} {
|
||||
if ip, ok := conn.(*net.TCPConn); ok {
|
||||
addr := ip.RemoteAddr().(*net.TCPAddr)
|
||||
return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)}
|
||||
}
|
||||
return "N/A"
|
||||
}
|
||||
|
||||
// Lock should be held
|
||||
func (c *client) initClient() {
|
||||
s := c.srv
|
||||
@@ -145,7 +144,7 @@ func (c *client) readLoop() {
|
||||
return
|
||||
}
|
||||
if err := c.parse(b[:n]); err != nil {
|
||||
Log(err, clientConnStr(c.nc), c.cid)
|
||||
Error("Error reading from client: %s", err.Error(), c)
|
||||
// Auth was handled inline
|
||||
if err != ErrAuthorization {
|
||||
c.sendErr("Parser Error")
|
||||
@@ -162,7 +161,7 @@ func (c *client) readLoop() {
|
||||
err := cp.bw.Flush()
|
||||
cp.nc.SetWriteDeadline(time.Time{})
|
||||
if err != nil {
|
||||
Debugf("Error flushing: %v", err)
|
||||
Debug("Error flushing: %v", err)
|
||||
cp.mu.Unlock()
|
||||
cp.closeConnection()
|
||||
cp.mu.Lock()
|
||||
@@ -179,20 +178,25 @@ func (c *client) readLoop() {
|
||||
}
|
||||
|
||||
func (c *client) traceMsg(msg []byte) {
|
||||
if trace == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs)
|
||||
opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])}
|
||||
Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid))
|
||||
Trace("MSG: %s", opa, c)
|
||||
}
|
||||
|
||||
func (c *client) traceOp(op string, arg []byte) {
|
||||
if trace == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
opa := []interface{}{fmt.Sprintf("%s OP", op)}
|
||||
if arg != nil {
|
||||
opa = append(opa, fmt.Sprintf("%s %s", op, string(arg)))
|
||||
}
|
||||
Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid))
|
||||
Trace("OP: %s", opa, c)
|
||||
}
|
||||
|
||||
// Process the info message if we are a route.
|
||||
@@ -203,17 +207,18 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
return
|
||||
}
|
||||
c.route.remoteID = info.ID
|
||||
|
||||
// Check to see if we have this remote already registered.
|
||||
// This can happen when both servers have routes to each other.
|
||||
s := c.srv
|
||||
c.mu.Unlock()
|
||||
|
||||
if s.addRoute(c) {
|
||||
Debug("Registering remote route", info.ID)
|
||||
Debug("Registering remote route %q", info.ID, c)
|
||||
// Send our local subscriptions to this route.
|
||||
s.sendLocalSubsToRoute(c)
|
||||
} else {
|
||||
Debug("Detected duplicate remote route", info.ID, clientConnStr(c.nc), c.cid)
|
||||
Debug("Detected duplicate remote route %q", info.ID, c)
|
||||
c.closeConnection()
|
||||
}
|
||||
}
|
||||
@@ -231,7 +236,7 @@ func (c *client) processInfo(arg []byte) error {
|
||||
}
|
||||
|
||||
func (c *client) processErr(errStr string) {
|
||||
Log(errStr, clientConnStr(c.nc), c.cid)
|
||||
Error("Client error %s", errStr, c)
|
||||
c.closeConnection()
|
||||
}
|
||||
|
||||
@@ -301,7 +306,7 @@ func (c *client) processPing() {
|
||||
err := c.bw.Flush()
|
||||
if err != nil {
|
||||
c.clearConnection()
|
||||
Debug("Error on Flush", err, clientConnStr(c.nc), c.cid)
|
||||
Debug("Error on Flush, error %s", err.Error(), c)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
@@ -314,7 +319,7 @@ func (c *client) processPong() {
|
||||
}
|
||||
|
||||
func (c *client) processMsgArgs(arg []byte) error {
|
||||
if trace > 0 {
|
||||
if trace == 0 {
|
||||
c.traceOp("MSG", arg)
|
||||
}
|
||||
|
||||
@@ -361,7 +366,7 @@ func (c *client) processMsgArgs(arg []byte) error {
|
||||
}
|
||||
|
||||
func (c *client) processPub(arg []byte) error {
|
||||
if trace > 0 {
|
||||
if trace == 0 {
|
||||
c.traceOp("PUB", arg)
|
||||
}
|
||||
|
||||
@@ -478,8 +483,10 @@ func (c *client) unsubscribe(sub *subscription) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if sub.max > 0 && sub.nm < sub.max {
|
||||
Debugf("Deferring actual UNSUB(%s): %d max, %d received\n",
|
||||
string(sub.subject), sub.max, sub.nm)
|
||||
Debug(
|
||||
"Deferring actual UNSUB(%s): %d max, %d received\n",
|
||||
string(sub.subject), sub.max, sub.nm, c,
|
||||
)
|
||||
return
|
||||
}
|
||||
c.traceOp("DELSUB", sub.sid)
|
||||
@@ -621,10 +628,10 @@ writeErr:
|
||||
client.mu.Unlock()
|
||||
|
||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
||||
Log("Slow Consumer Detected", clientConnStr(client.nc), client.cid)
|
||||
Notice("Slow Consumer Detected", c)
|
||||
client.closeConnection()
|
||||
} else {
|
||||
Debugf("Error writing msg: %v", err)
|
||||
Debug("Error writing msg: %v", err, c)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -732,12 +739,11 @@ func (c *client) processMsg(msg []byte) {
|
||||
}
|
||||
if sub.client == nil || sub.client.nc == nil || sub.client.route == nil ||
|
||||
sub.client.route.remoteID == "" {
|
||||
Debug("Bad or Missing ROUTER Identity, not processing msg",
|
||||
clientConnStr(c.nc), c.cid)
|
||||
Debug("Bad or Missing ROUTER Identity, not processing msg", c)
|
||||
continue
|
||||
}
|
||||
if _, ok := rmap[sub.client.route.remoteID]; ok {
|
||||
Debug("Ignoring route, already processed", c.cid)
|
||||
Debug("Ignoring route, already processed", c)
|
||||
continue
|
||||
}
|
||||
rmap[sub.client.route.remoteID] = routeSeen
|
||||
@@ -766,12 +772,12 @@ func (c *client) processPingTimer() {
|
||||
return
|
||||
}
|
||||
|
||||
Debug("Client Ping Timer", clientConnStr(c.nc), c.cid)
|
||||
Debug("Client Ping Timer", c)
|
||||
|
||||
// Check for violation
|
||||
c.pout += 1
|
||||
if c.pout > c.srv.opts.MaxPingsOut {
|
||||
Debug("Stale Client Connection - Closing", clientConnStr(c.nc), c.cid)
|
||||
Debug("Stale Client Connection - Closing", c)
|
||||
if c.bw != nil {
|
||||
c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection"))
|
||||
c.bw.Flush()
|
||||
@@ -784,7 +790,7 @@ func (c *client) processPingTimer() {
|
||||
c.bw.WriteString("PING\r\n")
|
||||
err := c.bw.Flush()
|
||||
if err != nil {
|
||||
Debug("Error on Client Ping Flush", err, clientConnStr(c.nc), c.cid)
|
||||
Debug("Error on Client Ping Flush, error %s", err)
|
||||
c.clearConnection()
|
||||
} else {
|
||||
// Reset to fire again if all OK.
|
||||
@@ -856,9 +862,7 @@ func (c *client) closeConnection() {
|
||||
return
|
||||
}
|
||||
|
||||
// FIXME(dlc) - This creates garbage for no reason.
|
||||
dbgString := fmt.Sprintf("%s connection closed", c.typeString())
|
||||
Debug(dbgString, clientConnStr(c.nc), c.cid)
|
||||
Debug("%s connection closed", c.typeString(), c)
|
||||
|
||||
c.clearAuthTimer()
|
||||
c.clearPingTimer()
|
||||
@@ -895,10 +899,10 @@ func (c *client) closeConnection() {
|
||||
defer srv.mu.Unlock()
|
||||
rid := c.route.remoteID
|
||||
if rid != "" && srv.remotes[rid] != nil {
|
||||
Debug("Not attempting reconnect for solicited route, already connected.", rid)
|
||||
Debug("Not attempting reconnect for solicited route, already connected. Try %d", rid, c)
|
||||
return
|
||||
} else {
|
||||
Debug("Attempting reconnect for solicited route", c.cid)
|
||||
Debug("Attempting reconnect for solicited route", c)
|
||||
go srv.reConnectToRoute(c.route.url)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ debug: false
|
||||
trace: true
|
||||
logtime: false
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
syslog: true
|
||||
remote_syslog: "udp://foo.com:33"
|
||||
|
||||
#pid file
|
||||
pid_file: "/tmp/gnatsd.pid"
|
||||
|
||||
163
server/log.go
163
server/log.go
@@ -1,119 +1,94 @@
|
||||
// Copyright 2012-2013 Apcera Inc. All rights reserved.
|
||||
// Copyright 2012-2014 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// logging functionality, compatible with the original nats-server.
|
||||
|
||||
var trace int32
|
||||
var debug int32
|
||||
var nolog int32
|
||||
var log = struct {
|
||||
logger Logger
|
||||
sync.Mutex
|
||||
}{}
|
||||
|
||||
// LogSetup will properly setup logging and the logging flags.
|
||||
func LogSetup() {
|
||||
log.SetFlags(0)
|
||||
atomic.StoreInt32(&nolog, 0)
|
||||
atomic.StoreInt32(&debug, 0)
|
||||
atomic.StoreInt32(&trace, 0)
|
||||
type Logger interface {
|
||||
Notice(format string, v ...interface{})
|
||||
Fatal(format string, v ...interface{})
|
||||
Error(format string, v ...interface{})
|
||||
Debug(format string, v ...interface{})
|
||||
Trace(format string, v ...interface{})
|
||||
}
|
||||
|
||||
// LogInit parses option flags and sets up logging.
|
||||
func (s *Server) LogInit() {
|
||||
// Reset
|
||||
LogSetup()
|
||||
|
||||
if s.opts.Logtime {
|
||||
log.SetFlags(log.LstdFlags)
|
||||
}
|
||||
if s.opts.NoLog {
|
||||
atomic.StoreInt32(&nolog, 1)
|
||||
}
|
||||
if s.opts.LogFile != "" {
|
||||
flags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
|
||||
file, err := os.OpenFile(s.opts.LogFile, flags, 0660)
|
||||
if err != nil {
|
||||
PrintAndDie(fmt.Sprintf("Error opening logfile: %q", s.opts.LogFile))
|
||||
}
|
||||
log.SetOutput(file)
|
||||
}
|
||||
if s.opts.Debug {
|
||||
Log(s.opts)
|
||||
func (s *Server) SetLogger(logger Logger, d, t bool) {
|
||||
if d {
|
||||
atomic.StoreInt32(&debug, 1)
|
||||
Log("DEBUG is on")
|
||||
}
|
||||
if s.opts.Trace {
|
||||
|
||||
if t {
|
||||
atomic.StoreInt32(&trace, 1)
|
||||
Log("TRACE is on")
|
||||
}
|
||||
|
||||
log.Lock()
|
||||
defer log.Unlock()
|
||||
log.logger = logger
|
||||
}
|
||||
|
||||
func alreadyFormatted(s string) bool {
|
||||
return strings.HasPrefix(s, "[")
|
||||
func Notice(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Notice(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
func logStr(v []interface{}) string {
|
||||
args := make([]string, 0, len(v))
|
||||
for _, vt := range v {
|
||||
switch t := vt.(type) {
|
||||
case string:
|
||||
if alreadyFormatted(t) {
|
||||
args = append(args, t)
|
||||
} else {
|
||||
t = strings.Replace(t, "\"", "\\\"", -1)
|
||||
args = append(args, fmt.Sprintf("\"%s\"", t))
|
||||
}
|
||||
default:
|
||||
args = append(args, fmt.Sprintf("%+v", vt))
|
||||
func Error(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Error(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
func Fatal(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Fatal(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
func Debug(format string, v ...interface{}) {
|
||||
if debug == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Debug(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
func Trace(format string, v ...interface{}) {
|
||||
if trace == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Trace(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
func executeLogCall(f func(logger Logger, format string, v ...interface{}), format string, args ...interface{}) {
|
||||
log.Lock()
|
||||
defer log.Unlock()
|
||||
if log.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
argc := len(args)
|
||||
if argc != 0 {
|
||||
if client, ok := args[argc-1].(*client); ok {
|
||||
args = args[:argc-1]
|
||||
format = fmt.Sprintf("%s - %s", client, format)
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("[%s]", strings.Join(args, ", "))
|
||||
}
|
||||
|
||||
func Log(v ...interface{}) {
|
||||
if nolog == 0 {
|
||||
log.Print(logStr(v))
|
||||
}
|
||||
}
|
||||
|
||||
func Logf(format string, v ...interface{}) {
|
||||
Log(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
func Fatal(v ...interface{}) {
|
||||
log.Fatalf(logStr(v))
|
||||
}
|
||||
|
||||
func Fatalf(format string, v ...interface{}) {
|
||||
Fatal(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
func Debug(v ...interface{}) {
|
||||
if debug > 0 {
|
||||
Log(v...)
|
||||
}
|
||||
}
|
||||
|
||||
func Debugf(format string, v ...interface{}) {
|
||||
if debug > 0 {
|
||||
Debug(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
func Trace(v ...interface{}) {
|
||||
if trace > 0 {
|
||||
Log(v...)
|
||||
}
|
||||
}
|
||||
|
||||
func Tracef(format string, v ...interface{}) {
|
||||
if trace > 0 {
|
||||
Trace(fmt.Sprintf(format, v...))
|
||||
}
|
||||
f(log.logger, format, args...)
|
||||
}
|
||||
|
||||
31
server/log_test.go
Normal file
31
server/log_test.go
Normal file
@@ -0,0 +1,31 @@
|
||||
// Copyright 2014 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSetLogger(t *testing.T) {
|
||||
server := &Server{}
|
||||
server.SetLogger(&DummyLogger{}, true, true)
|
||||
|
||||
// We assert that the logger has change to the DummyLogger
|
||||
_ = log.logger.(*DummyLogger)
|
||||
|
||||
if debug != 1 {
|
||||
t.Fatalf("Expected debug 1, received value %d\n", debug)
|
||||
}
|
||||
|
||||
if trace != 1 {
|
||||
t.Fatalf("Expected trace 1, received value %d\n", trace)
|
||||
}
|
||||
}
|
||||
|
||||
type DummyLogger struct{}
|
||||
|
||||
func (l *DummyLogger) Notice(format string, v ...interface{}) {}
|
||||
func (l *DummyLogger) Error(format string, v ...interface{}) {}
|
||||
func (l *DummyLogger) Fatal(format string, v ...interface{}) {}
|
||||
func (l *DummyLogger) Debug(format string, v ...interface{}) {}
|
||||
func (l *DummyLogger) Trace(format string, v ...interface{}) {}
|
||||
@@ -88,7 +88,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
b, err := json.MarshalIndent(c, "", " ")
|
||||
if err != nil {
|
||||
Logf("Error marshalling response to /connz request: %v", err)
|
||||
Error("Error marshalling response to /connz request: %v", err)
|
||||
}
|
||||
w.Write(b)
|
||||
}
|
||||
@@ -114,7 +114,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
b, err := json.MarshalIndent(st, "", " ")
|
||||
if err != nil {
|
||||
Logf("Error marshalling response to /subscriptionsz request: %v", err)
|
||||
Error("Error marshalling response to /subscriptionsz request: %v", err)
|
||||
}
|
||||
w.Write(b)
|
||||
}
|
||||
@@ -161,7 +161,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
b, err := json.MarshalIndent(v, "", " ")
|
||||
if err != nil {
|
||||
Logf("Error marshalling response to /varz request: %v", err)
|
||||
Error("Error marshalling response to /varz request: %v", err)
|
||||
}
|
||||
w.Write(b)
|
||||
}
|
||||
|
||||
@@ -43,6 +43,8 @@ type Options struct {
|
||||
ProfPort int `json:"-"`
|
||||
PidFile string `json:"-"`
|
||||
LogFile string `json:"-"`
|
||||
Syslog bool `json:"-"`
|
||||
RemoteSyslog string `json:"-"`
|
||||
}
|
||||
|
||||
type authorization struct {
|
||||
@@ -97,6 +99,10 @@ func ProcessConfigFile(configFile string) (*Options, error) {
|
||||
}
|
||||
case "logfile", "log_file":
|
||||
opts.LogFile = v.(string)
|
||||
case "syslog":
|
||||
opts.Syslog = v.(bool)
|
||||
case "remote_syslog":
|
||||
opts.RemoteSyslog = v.(string)
|
||||
case "pidfile", "pid_file":
|
||||
opts.PidFile = v.(string)
|
||||
case "prof_port":
|
||||
@@ -219,7 +225,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error)
|
||||
}
|
||||
|
||||
if cport == port && isIpInList(selfIPs, getUrlIp(host)) {
|
||||
Log("Self referencing IP found: ", r)
|
||||
Notice("Self referencing IP found: ", r)
|
||||
continue
|
||||
}
|
||||
cleanRoutes = append(cleanRoutes, r)
|
||||
@@ -250,7 +256,7 @@ func getUrlIp(ipStr string) []net.IP {
|
||||
|
||||
hostAddr, err := net.LookupHost(ipStr)
|
||||
if err != nil {
|
||||
Log("Error looking up host with route hostname: ", err)
|
||||
Error("Error looking up host with route hostname: ", err)
|
||||
return ipList
|
||||
}
|
||||
for _, addr := range hostAddr {
|
||||
@@ -267,7 +273,7 @@ func getInterfaceIPs() []net.IP {
|
||||
|
||||
interfaceAddr, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
Log("Error getting self referencing address: ", err)
|
||||
Error("Error getting self referencing address: ", err)
|
||||
return localIPs
|
||||
}
|
||||
|
||||
@@ -276,7 +282,7 @@ func getInterfaceIPs() []net.IP {
|
||||
if net.ParseIP(interfaceIP.String()) != nil {
|
||||
localIPs = append(localIPs, interfaceIP)
|
||||
} else {
|
||||
Log("Error parsing self referencing address: ", err)
|
||||
Error("Error parsing self referencing address: ", err)
|
||||
}
|
||||
}
|
||||
return localIPs
|
||||
|
||||
@@ -44,18 +44,20 @@ func TestOptions_RandomPort(t *testing.T) {
|
||||
|
||||
func TestConfigFile(t *testing.T) {
|
||||
golden := &Options{
|
||||
Host: "apcera.me",
|
||||
Port: 4242,
|
||||
Username: "derek",
|
||||
Password: "bella",
|
||||
AuthTimeout: 1.0,
|
||||
Debug: false,
|
||||
Trace: true,
|
||||
Logtime: false,
|
||||
HTTPPort: 8222,
|
||||
LogFile: "/tmp/gnatsd.log",
|
||||
PidFile: "/tmp/gnatsd.pid",
|
||||
ProfPort: 6543,
|
||||
Host: "apcera.me",
|
||||
Port: 4242,
|
||||
Username: "derek",
|
||||
Password: "bella",
|
||||
AuthTimeout: 1.0,
|
||||
Debug: false,
|
||||
Trace: true,
|
||||
Logtime: false,
|
||||
HTTPPort: 8222,
|
||||
LogFile: "/tmp/gnatsd.log",
|
||||
PidFile: "/tmp/gnatsd.pid",
|
||||
ProfPort: 6543,
|
||||
Syslog: true,
|
||||
RemoteSyslog: "udp://foo.com:33",
|
||||
}
|
||||
|
||||
opts, err := ProcessConfigFile("./configs/test.conf")
|
||||
@@ -71,18 +73,20 @@ func TestConfigFile(t *testing.T) {
|
||||
|
||||
func TestMergeOverrides(t *testing.T) {
|
||||
golden := &Options{
|
||||
Host: "apcera.me",
|
||||
Port: 2222,
|
||||
Username: "derek",
|
||||
Password: "spooky",
|
||||
AuthTimeout: 1.0,
|
||||
Debug: true,
|
||||
Trace: true,
|
||||
Logtime: false,
|
||||
HTTPPort: DEFAULT_HTTP_PORT,
|
||||
LogFile: "/tmp/gnatsd.log",
|
||||
PidFile: "/tmp/gnatsd.pid",
|
||||
ProfPort: 6789,
|
||||
Host: "apcera.me",
|
||||
Port: 2222,
|
||||
Username: "derek",
|
||||
Password: "spooky",
|
||||
AuthTimeout: 1.0,
|
||||
Debug: true,
|
||||
Trace: true,
|
||||
Logtime: false,
|
||||
HTTPPort: DEFAULT_HTTP_PORT,
|
||||
LogFile: "/tmp/gnatsd.log",
|
||||
PidFile: "/tmp/gnatsd.pid",
|
||||
ProfPort: 6789,
|
||||
Syslog: true,
|
||||
RemoteSyslog: "udp://foo.com:33",
|
||||
}
|
||||
fopts, err := ProcessConfigFile("./configs/test.conf")
|
||||
if err != nil {
|
||||
|
||||
@@ -46,7 +46,7 @@ func (c *client) sendConnect() {
|
||||
}
|
||||
b, err := json.Marshal(cinfo)
|
||||
if err != nil {
|
||||
Logf("Error marshalling CONNECT to route: %v\n", err)
|
||||
Error("Error marshalling CONNECT to route: %v\n", err)
|
||||
c.closeConnection()
|
||||
}
|
||||
c.bw.WriteString(fmt.Sprintf(conProto, b))
|
||||
@@ -79,7 +79,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
route.bw.Write(b.Bytes())
|
||||
route.bw.Flush()
|
||||
|
||||
Debug("Route sent local subscriptions", route.cid)
|
||||
Debug("Route sent local subscriptions", route)
|
||||
}
|
||||
|
||||
func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
@@ -93,12 +93,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
// Initialize
|
||||
c.initClient()
|
||||
|
||||
Debug("Route connection created", clientConnStr(c.nc), c.cid)
|
||||
Debug("Route connection created", c)
|
||||
|
||||
// Queue Connect proto if we solicited the connection.
|
||||
if didSolicit {
|
||||
r.url = rURL
|
||||
Debug("Route connect msg sent", clientConnStr(c.nc), c.cid)
|
||||
Debug("Route connect msg sent", c)
|
||||
c.sendConnect()
|
||||
}
|
||||
|
||||
@@ -234,10 +234,10 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
|
||||
|
||||
func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort)
|
||||
Logf("Listening for route connections on %s", hp)
|
||||
Notice("Listening for route connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
Fatalf("Error listening on router port: %d - %v", s.opts.Port, e)
|
||||
Fatal("Error listening on router port: %d - %v", s.opts.Port, e)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -263,7 +263,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
tmpDelay = ACCEPT_MAX_SLEEP
|
||||
}
|
||||
} else if s.isRunning() {
|
||||
Logf("Accept error: %v", err)
|
||||
Notice("Accept error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -294,7 +294,7 @@ func (s *Server) StartRouting() {
|
||||
// Generate the info json
|
||||
b, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
Fatalf("Error marshalling Route INFO JSON: %+v\n", err)
|
||||
Fatal("Error marshalling Route INFO JSON: %+v\n", err)
|
||||
}
|
||||
s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF))
|
||||
|
||||
@@ -314,10 +314,10 @@ func (s *Server) reConnectToRoute(rUrl *url.URL) {
|
||||
|
||||
func (s *Server) connectToRoute(rUrl *url.URL) {
|
||||
for s.isRunning() {
|
||||
Debugf("Trying to connect to route on %s", rUrl.Host)
|
||||
Debug("Trying to connect to route on %s", rUrl.Host)
|
||||
conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL)
|
||||
if err != nil {
|
||||
Debugf("Error trying to connect to route: %v", err)
|
||||
Debug("Error trying to connect to route: %v", err)
|
||||
select {
|
||||
case <-s.rcQuit:
|
||||
return
|
||||
|
||||
@@ -82,6 +82,7 @@ func New(opts *Options) *Server {
|
||||
if opts.Username != "" || opts.Authorization != "" {
|
||||
info.AuthRequired = true
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
info: info,
|
||||
sl: sublist.New(),
|
||||
@@ -95,9 +96,6 @@ func New(opts *Options) *Server {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Setup logging with flags
|
||||
s.LogInit()
|
||||
|
||||
// For tracking clients
|
||||
s.clients = make(map[uint64]*client)
|
||||
|
||||
@@ -108,20 +106,16 @@ func New(opts *Options) *Server {
|
||||
// Used to kick out all of the route
|
||||
// connect Go routines.
|
||||
s.rcQuit = make(chan bool)
|
||||
s.handleSignals()
|
||||
|
||||
// Generate the info json
|
||||
b, err := json.Marshal(s.info)
|
||||
if err != nil {
|
||||
Fatalf("Error marshalling INFO JSON: %+v\n", err)
|
||||
Fatal("Error marshalling INFO JSON: %+v\n", err)
|
||||
}
|
||||
|
||||
s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF))
|
||||
|
||||
s.handleSignals()
|
||||
|
||||
Logf("Starting gnatsd version %s", VERSION)
|
||||
|
||||
s.running = true
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -146,9 +140,9 @@ func (s *Server) handleSignals() {
|
||||
signal.Notify(c, os.Interrupt)
|
||||
go func() {
|
||||
for sig := range c {
|
||||
Debugf("Trapped Signal; %v", sig)
|
||||
Debug("Trapped Signal; %v", sig)
|
||||
// FIXME, trip running?
|
||||
Log("Server Exiting..")
|
||||
Notice("Server Exiting..")
|
||||
os.Exit(0)
|
||||
}
|
||||
}()
|
||||
@@ -172,6 +166,8 @@ func (s *Server) logPid() {
|
||||
// Start up the server, this will block.
|
||||
// Start via a Go routine if needed.
|
||||
func (s *Server) Start() {
|
||||
Notice("Starting gnatsd version %s", VERSION)
|
||||
s.running = true
|
||||
|
||||
// Log the pid to a file
|
||||
if s.opts.PidFile != _EMPTY_ {
|
||||
@@ -265,14 +261,14 @@ func (s *Server) Shutdown() {
|
||||
// AcceptLoop is exported for easier testing.
|
||||
func (s *Server) AcceptLoop() {
|
||||
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port)
|
||||
Logf("Listening for client connections on %s", hp)
|
||||
Notice("Listening for client connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
Fatalf("Error listening on port: %d - %v", s.opts.Port, e)
|
||||
Fatal("Error listening on port: %d - %v", s.opts.Port, e)
|
||||
return
|
||||
}
|
||||
|
||||
Logf("gnatsd is ready")
|
||||
Notice("gnatsd is ready")
|
||||
|
||||
// Setup state that can enable shutdown
|
||||
s.mu.Lock()
|
||||
@@ -282,12 +278,12 @@ func (s *Server) AcceptLoop() {
|
||||
// Write resolved port back to options.
|
||||
_, port, err := net.SplitHostPort(l.Addr().String())
|
||||
if err != nil {
|
||||
Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
Fatal("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
return
|
||||
}
|
||||
portNum, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
Fatal("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
return
|
||||
}
|
||||
s.opts.Port = portNum
|
||||
@@ -306,36 +302,39 @@ func (s *Server) AcceptLoop() {
|
||||
tmpDelay = ACCEPT_MAX_SLEEP
|
||||
}
|
||||
} else if s.isRunning() {
|
||||
Logf("Accept error: %v", err)
|
||||
Notice("Accept error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
tmpDelay = ACCEPT_MIN_SLEEP
|
||||
s.createClient(conn)
|
||||
}
|
||||
Log("Server Exiting..")
|
||||
Notice("Server Exiting..")
|
||||
s.done <- true
|
||||
}
|
||||
|
||||
// StartProfiler is called to enable dynamic profiling.
|
||||
func (s *Server) StartProfiler() {
|
||||
Logf("Starting profiling on http port %d", s.opts.ProfPort)
|
||||
Notice("Starting profiling on http port %d", s.opts.ProfPort)
|
||||
|
||||
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.ProfPort)
|
||||
go func() {
|
||||
Log(http.ListenAndServe(hp, nil))
|
||||
err := http.ListenAndServe(hp, nil)
|
||||
if err != nil {
|
||||
Fatal("error starting monitor server: %s", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// StartHTTPMonitoring will enable the HTTP monitoring port.
|
||||
func (s *Server) StartHTTPMonitoring() {
|
||||
Logf("Starting http monitor on port %d", s.opts.HTTPPort)
|
||||
Notice("Starting http monitor on port %d", s.opts.HTTPPort)
|
||||
|
||||
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.HTTPPort)
|
||||
|
||||
l, err := net.Listen("tcp", hp)
|
||||
if err != nil {
|
||||
Fatalf("Can't listen to the monitor port: %v", err)
|
||||
Fatal("Can't listen to the monitor port: %v", err)
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -375,7 +374,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
// Initialize
|
||||
c.initClient()
|
||||
|
||||
Debug("Client connection created", clientConnStr(c.nc), c.cid)
|
||||
Debug("Client connection created", c)
|
||||
|
||||
// Send our information.
|
||||
s.sendInfo(c)
|
||||
|
||||
@@ -18,6 +18,8 @@ Server options:
|
||||
Logging options:
|
||||
-l, --log FILE File to redirect log output
|
||||
-T, --logtime Timestamp log entries (default: true)
|
||||
-s, --syslog Enable syslog as log method.
|
||||
-r, --remote_syslog Syslog server addr (udp://localhost:514).
|
||||
-D, --debug Enable debugging output
|
||||
-V, --trace Trace the raw protocol
|
||||
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
// Copyright 2012-2013 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"regexp"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var startRe = regexp.MustCompile(`\["Starting gnatsd version\s+([^\s]+)"\]\n`)
|
||||
|
||||
func TestLogFile(t *testing.T) {
|
||||
opts := DefaultTestOptions
|
||||
opts.NoLog = false
|
||||
opts.Logtime = false
|
||||
|
||||
tmpDir, err := ioutil.TempDir("", "_gnatsd")
|
||||
if err != nil {
|
||||
t.Fatal("Could not create tmp dir")
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
file, err := ioutil.TempFile(tmpDir, "gnatsd:log_")
|
||||
file.Close()
|
||||
opts.LogFile = file.Name()
|
||||
|
||||
s := RunServer(&opts)
|
||||
s.Shutdown()
|
||||
|
||||
buf, err := ioutil.ReadFile(opts.LogFile)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not read logfile: %v", err)
|
||||
}
|
||||
if len(buf) <= 0 {
|
||||
t.Fatal("Expected a non-zero length logfile")
|
||||
}
|
||||
if !startRe.Match(buf) {
|
||||
t.Fatalf("Logfile did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", buf, startRe)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user