From cf0f30200f73f7995c1fd8e7d7c03cb45ad57178 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 2 Aug 2013 16:52:54 -0700 Subject: [PATCH] log file support, data race fixes --- gnatsd.go | 35 ++++++++++++++++++-------------- hash/hash.go | 6 +++--- hashmap/rand_evict.go | 6 +++--- server/client.go | 2 +- server/client_test.go | 16 +++++++-------- server/const.go | 2 +- server/log.go | 18 +++++++++++++++-- server/opts.go | 2 +- server/opts_test.go | 24 +++++++++++++--------- server/parser_test.go | 41 ++++++++++++++++++------------------- server/route.go | 24 +++++++++++++++++++--- server/server.go | 5 +++++ server/split_test.go | 18 ++++++++--------- server/usage.go | 36 +++++++++++++++++++++++++++++++++ server/util.go | 2 +- sublist/sublist_test.go | 10 ++++----- test/auth_test.go | 2 +- test/bench_test.go | 2 +- test/gosrv_test.go | 5 ++--- test/log_test.go | 45 +++++++++++++++++++++++++++++++++++++++++ test/monitor_test.go | 2 +- test/ping_test.go | 4 ++-- test/proto_test.go | 4 ++-- test/routes_test.go | 1 - test/test.go | 18 ++++++++--------- 25 files changed, 227 insertions(+), 103 deletions(-) create mode 100644 server/usage.go create mode 100644 test/log_test.go diff --git a/gnatsd.go b/gnatsd.go index afa47ae6..ae659965 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -4,8 +4,6 @@ package main import ( "flag" - "log" - "os" "strings" "github.com/apcera/gnatsd/server" @@ -15,6 +13,7 @@ func main() { // logging setup server.LogSetup() + // Server Options opts := server.Options{} var showVersion bool @@ -24,14 +23,16 @@ func main() { // Parse flags flag.IntVar(&opts.Port, "port", server.DEFAULT_PORT, "Port to listen on.") flag.IntVar(&opts.Port, "p", server.DEFAULT_PORT, "Port to listen on.") - flag.StringVar(&opts.Host, "host", server.DEFAULT_HOST, "Network host to listen on.") - flag.StringVar(&opts.Host, "h", server.DEFAULT_HOST, "Network host to listen on.") + flag.StringVar(&opts.Host, "addr", server.DEFAULT_HOST, "Network host to listen on.") + flag.StringVar(&opts.Host, "a", server.DEFAULT_HOST, "Network host to listen on.") flag.StringVar(&opts.Host, "net", server.DEFAULT_HOST, "Network host to listen on.") flag.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.") flag.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.") flag.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.") flag.BoolVar(&opts.Trace, "trace", false, "Enable Trace logging.") flag.BoolVar(&debugAndTrace, "DV", false, "Enable Debug and Trace logging.") + flag.BoolVar(&opts.Logtime, "T", true, "Timestamp log entries.") + flag.BoolVar(&opts.Logtime, "logtime", true, "Timestamp log entries.") flag.StringVar(&opts.Username, "user", "", "Username required for connection.") flag.StringVar(&opts.Password, "pass", "", "Password required for connection.") flag.StringVar(&opts.Authorization, "auth", "", "Authorization token required for connection.") @@ -39,13 +40,15 @@ func main() { flag.IntVar(&opts.HttpPort, "http_port", 0, "HTTP Port for /varz, /connz endpoints.") flag.StringVar(&configFile, "c", "", "Configuration file.") flag.StringVar(&configFile, "config", "", "Configuration file.") - flag.StringVar(&configFile, "P", "", "File to store process pid.") - flag.StringVar(&configFile, "pid", "", "File to store process pid.") - flag.StringVar(&configFile, "l", "", "File to store logging output.") - flag.StringVar(&configFile, "log", "", "File to store logging output.") + flag.StringVar(&opts.PidFile, "P", "", "File to store process pid.") + flag.StringVar(&opts.PidFile, "pid", "", "File to store process pid.") + flag.StringVar(&opts.LogFile, "l", "", "File to store logging output.") + flag.StringVar(&opts.LogFile, "log", "", "File to store logging output.") flag.BoolVar(&showVersion, "version", false, "Print version information.") flag.BoolVar(&showVersion, "v", false, "Print version information.") + flag.Usage = server.Usage + flag.Parse() // Show version and exit @@ -53,15 +56,19 @@ func main() { server.PrintServerAndExit() } + // One flag can set multiple options. if debugAndTrace { opts.Trace, opts.Debug = true, true } - // Process args, version only for now + // Process args looking for non-flaf options, + // 'version' and 'help' only for now for _, arg := range flag.Args() { - arg = strings.ToLower(arg) - if arg == "version" { + switch strings.ToLower(arg) { + case "version": server.PrintServerAndExit() + case "help": + server.Usage() } } @@ -72,14 +79,12 @@ func main() { if configFile != "" { fileOpts, err = server.ProcessConfigFile(configFile) if err != nil { - log.Printf("%v\n", err) - os.Exit(1) + server.PrintAndDie(err.Error()) } } // Create the server with appropriate options. - mOpts := server.MergeOptions(fileOpts, &opts) - s := server.New(mOpts) + s := server.New(server.MergeOptions(fileOpts, &opts)) // Start things up. Block here til done. s.Start() diff --git a/hash/hash.go b/hash/hash.go index 5deb3946..6f78475a 100644 --- a/hash/hash.go +++ b/hash/hash.go @@ -56,7 +56,7 @@ func Jesteress(data []byte) uint32 { // Cases: 0,1,2,3,4,5,6,7 if (dlen & _DWSZ) > 0 { k1 := *(*uint64)(unsafe.Pointer(&data[i])) - h32 = uint32(uint64(h32) ^ k1) * _YP32 + h32 = uint32(uint64(h32)^k1) * _YP32 i += _DWSZ } if (dlen & _WSZ) > 0 { @@ -111,10 +111,10 @@ func Yorikke(data []byte) uint32 { for ; dlen >= _DDDWSZ; dlen -= _DDDWSZ { k1 := *(*uint64)(unsafe.Pointer(&data[i])) k2 := *(*uint64)(unsafe.Pointer(&data[i+4])) - h32 = uint32((uint64(h32) ^ (((k1<<5 | k1>>27)) ^ k2)) * _YP32) + h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32) k1 = *(*uint64)(unsafe.Pointer(&data[i+8])) k2 = *(*uint64)(unsafe.Pointer(&data[i+12])) - h32b = uint32((uint64(h32b) ^ (((k1<<5 | k1>>27)) ^ k2)) * _YP32) + h32b = uint32((uint64(h32b) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32) i += _DDDWSZ } if (dlen & _DDWSZ) > 0 { diff --git a/hashmap/rand_evict.go b/hashmap/rand_evict.go index 0b12ae8d..6371f694 100644 --- a/hashmap/rand_evict.go +++ b/hashmap/rand_evict.go @@ -19,9 +19,9 @@ func (h *HashMap) RemoveRandom() { if h.used == 0 { return } - index := (rand.Int())&int(h.msk) + index := (rand.Int()) & int(h.msk) // Walk forward til we find an entry - for i := index ; i < len(h.bkts) ; i++ { + for i := index; i < len(h.bkts); i++ { e := &h.bkts[i] if *e != nil { *e = (*e).next @@ -31,7 +31,7 @@ func (h *HashMap) RemoveRandom() { } // If we are here we hit end and did not remove anything, // use the index and walk backwards. - for i := index ; i >= 0 ; i-- { + for i := index; i >= 0; i-- { e := &h.bkts[i] if *e != nil { *e = (*e).next diff --git a/server/client.go b/server/client.go index 07d56814..649f2736 100644 --- a/server/client.go +++ b/server/client.go @@ -649,7 +649,7 @@ func (c *client) processMsg(msg []byte) { } // Check to see if we have already sent it here. if rmap == nil { - rmap = make(map[string]struct{}, len(srv.routes)) + rmap = make(map[string]struct{}, srv.numRoutes()) } if sub.client == nil || sub.client.route == nil || sub.client.route.remoteId == "" { diff --git a/server/client_test.go b/server/client_test.go index 65b52684..0ecc1c9c 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -102,7 +102,7 @@ func TestClientConnect(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true}) { + if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } @@ -116,7 +116,7 @@ func TestClientConnect(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Username:"derek", Password:"foo"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } @@ -131,7 +131,7 @@ func TestClientConnect(t *testing.T) { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Username:"derek", Password:"foo", Name:"router"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } @@ -146,7 +146,7 @@ func TestClientConnect(t *testing.T) { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Authorization:"YZZ222", Name:"router"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } } @@ -289,12 +289,12 @@ func TestClientPubWithQueueSub(t *testing.T) { var n1, n2, received int for ; ; received += 1 { - time.Sleep(10*time.Millisecond) + time.Sleep(10 * time.Millisecond) l, err := cr.ReadString('\n') if err != nil { break } - matches := msgPat.FindAllStringSubmatch(l,-1)[0] + matches := msgPat.FindAllStringSubmatch(l, -1)[0] // Count which sub switch matches[SID_INDEX] { @@ -336,7 +336,7 @@ func TestClientUnSub(t *testing.T) { var received int for ; ; received += 1 { - time.Sleep(10*time.Millisecond) + time.Sleep(10 * time.Millisecond) l, err := cr.ReadString('\n') if err != nil { break @@ -377,7 +377,7 @@ func TestClientUnSubMax(t *testing.T) { var received int for ; ; received += 1 { - time.Sleep(10*time.Millisecond) + time.Sleep(10 * time.Millisecond) l, err := cr.ReadString('\n') if err != nil { break diff --git a/server/const.go b/server/const.go index 7554c386..346a7bbc 100644 --- a/server/const.go +++ b/server/const.go @@ -7,7 +7,7 @@ import ( ) const ( - VERSION = "go-0.3.1" + VERSION = "go-0.3.2" DEFAULT_PORT = 4222 DEFAULT_HOST = "0.0.0.0" diff --git a/server/log.go b/server/log.go index 9f0bc406..66c51a6c 100644 --- a/server/log.go +++ b/server/log.go @@ -5,6 +5,7 @@ package server import ( "fmt" "log" + "os" "strings" "sync/atomic" ) @@ -17,15 +18,29 @@ var nolog int32 func LogSetup() { log.SetFlags(0) + atomic.StoreInt32(&nolog, 0) + atomic.StoreInt32(&debug, 0) + atomic.StoreInt32(&trace, 0) } func (s *Server) LogInit() { + // Reset + LogSetup() + if s.opts.Logtime { log.SetFlags(log.LstdFlags) } if s.opts.NoLog { atomic.StoreInt32(&nolog, 1) } + if s.opts.LogFile != "" { + flags := os.O_WRONLY | os.O_APPEND | os.O_CREATE + file, err := os.OpenFile(s.opts.LogFile, flags, 0660) + if err != nil { + PrintAndDie(fmt.Sprintf("Error opening logfile: %q", s.opts.LogFile)) + } + log.SetOutput(file) + } if s.opts.Debug { Log(s.opts) atomic.StoreInt32(&debug, 1) @@ -56,7 +71,7 @@ func logStr(v []interface{}) string { args = append(args, fmt.Sprintf("%+v", vt)) } } - return fmt.Sprintf("[%s]", strings.Join(args,", ")) + return fmt.Sprintf("[%s]", strings.Join(args, ", ")) } func Log(v ...interface{}) { @@ -100,4 +115,3 @@ func Tracef(format string, v ...interface{}) { Trace(fmt.Sprintf(format, v...)) } } - diff --git a/server/opts.go b/server/opts.go index 4fca4e0f..dfb20626 100644 --- a/server/opts.go +++ b/server/opts.go @@ -59,7 +59,7 @@ func ProcessConfigFile(configFile string) (*Options, error) { data, err := ioutil.ReadFile(configFile) if err != nil { - return nil, err + return nil, fmt.Errorf("Error opening config file: %v", err) } m, err := conf.Parse(string(data)) diff --git a/server/opts_test.go b/server/opts_test.go index 61ec5dd9..d4830de1 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -10,22 +10,24 @@ import ( func TestDefaultOptions(t *testing.T) { golden := &Options{ - Host: DEFAULT_HOST, - Port: DEFAULT_PORT, - MaxConn: DEFAULT_MAX_CONNECTIONS, - PingInterval: DEFAULT_PING_INTERVAL, - MaxPingsOut: DEFAULT_PING_MAX_OUT, - SslTimeout: float64(SSL_TIMEOUT) / float64(time.Second), - AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), - MaxControlLine: MAX_CONTROL_LINE_SIZE, - MaxPayload: MAX_PAYLOAD_SIZE, + Host: DEFAULT_HOST, + Port: DEFAULT_PORT, + MaxConn: DEFAULT_MAX_CONNECTIONS, + PingInterval: DEFAULT_PING_INTERVAL, + MaxPingsOut: DEFAULT_PING_MAX_OUT, + SslTimeout: float64(SSL_TIMEOUT) / float64(time.Second), + AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), + MaxControlLine: MAX_CONTROL_LINE_SIZE, + MaxPayload: MAX_PAYLOAD_SIZE, + ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), } opts := &Options{} processOptions(opts) if !reflect.DeepEqual(golden, opts) { - t.Fatal("Default options are incorrect") + t.Fatalf("Default Options are incorrect.\nexpected: %+v\ngot: %+v", + golden, opts) } } @@ -66,6 +68,8 @@ func TestMergeOverrides(t *testing.T) { Trace: true, Logtime: false, HttpPort: DEFAULT_HTTP_PORT, + LogFile: "/tmp/gnatsd.log", + PidFile: "/tmp/gnatsd.pid", } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { diff --git a/server/parser_test.go b/server/parser_test.go index abfdda97..86023cd4 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -1,4 +1,3 @@ - // Copyright 2012-2013 Apcera Inc. All rights reserved. package server @@ -202,22 +201,22 @@ func testPubArg(c *client, t *testing.T) { func TestParsePubArg(t *testing.T) { c := dummyClient() - if err := c.processPub([]byte("foo 22")) ; err != nil { + if err := c.processPub([]byte("foo 22")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testPubArg(c, t) - if err := c.processPub([]byte(" foo 22")) ; err != nil { + if err := c.processPub([]byte(" foo 22")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testPubArg(c, t) - if err := c.processPub([]byte(" foo 22 ")) ; err != nil { + if err := c.processPub([]byte(" foo 22 ")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testPubArg(c, t) - if err := c.processPub([]byte("foo 22")) ; err != nil { + if err := c.processPub([]byte("foo 22")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } - if err := c.processPub([]byte("foo 22\r")) ; err != nil { + if err := c.processPub([]byte("foo 22\r")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testPubArg(c, t) @@ -279,22 +278,22 @@ func testMsgArg(c *client, t *testing.T) { func TestParseMsgArg(t *testing.T) { c := dummyClient() - if err := c.processMsgArgs([]byte("foobar RSID:22:1 22")) ; err != nil { + if err := c.processMsgArgs([]byte("foobar RSID:22:1 22")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testMsgArg(c, t) - if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22")) ; err != nil { + if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testMsgArg(c, t) - if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22 ")) ; err != nil { + if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22 ")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testMsgArg(c, t) - if err := c.processMsgArgs([]byte("foobar RSID:22:1 \t22")) ; err != nil { + if err := c.processMsgArgs([]byte("foobar RSID:22:1 \t22")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } - if err := c.processMsgArgs([]byte("foobar\t\tRSID:22:1\t22\r")) ; err != nil { + if err := c.processMsgArgs([]byte("foobar\t\tRSID:22:1\t22\r")); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } testMsgArg(c, t) @@ -303,43 +302,43 @@ func TestParseMsgArg(t *testing.T) { func TestShouldFail(t *testing.T) { c := dummyClient() - if err := c.parse([]byte(" PING")) ; err == nil { + if err := c.parse([]byte(" PING")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("CONNECT \r\n")) ; err == nil { + if err := c.parse([]byte("CONNECT \r\n")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("POO")) ; err == nil { + if err := c.parse([]byte("POO")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("PUB foo\r\n")) ; err == nil { + if err := c.parse([]byte("PUB foo\r\n")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("PUB \r\n")) ; err == nil { + if err := c.parse([]byte("PUB \r\n")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("PUB foo bar \r\n")) ; err == nil { + if err := c.parse([]byte("PUB foo bar \r\n")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("SUB\r\n")) ; err == nil { + if err := c.parse([]byte("SUB\r\n")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("SUB \r\n")) ; err == nil { + if err := c.parse([]byte("SUB \r\n")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("SUB foo\r\n")) ; err == nil { + if err := c.parse([]byte("SUB foo\r\n")); err == nil { t.Fatal("Should have received a parse error") } c.state = OP_START - if err := c.parse([]byte("SUB foo bar baz 22\r\n")) ; err == nil { + if err := c.parse([]byte("SUB foo bar baz 22\r\n")); err == nil { t.Fatal("Should have received a parse error") } } diff --git a/server/route.go b/server/route.go index cc6bb0e8..d9859ba1 100644 --- a/server/route.go +++ b/server/route.go @@ -53,17 +53,27 @@ func (c *client) sendConnect() { } func (s *Server) sendLocalSubsToRoute(route *client) { + b := bytes.Buffer{} + for _, client := range s.clients { - for _, s := range client.subs.All() { + client.mu.Lock() + subs := client.subs.All() + client.mu.Unlock() + for _, s := range subs { if sub, ok := s.(*subscription); ok { rsid := routeSid(sub) proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid) - route.bw.WriteString(proto) + b.WriteString(proto) } } } + + route.mu.Lock() + defer route.mu.Unlock() + route.bw.Write(b.Bytes()) route.bw.Flush() - Debug("Route sent local subscriptions", clientConnStr(route.nc), route.cid) + + Debug("Route sent local subscriptions", route.cid) } func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client { @@ -163,8 +173,10 @@ func routeSid(sub *subscription) string { func (s *Server) broadcastToRoutes(proto string) { for _, route := range s.routes { // FIXME(dlc) - Make same logic as deliverMsg + route.mu.Lock() route.bw.WriteString(proto) route.bw.Flush() + route.mu.Unlock() } } @@ -294,3 +306,9 @@ func (s *Server) solicitRoutes() { go s.connectToRoute(r) } } + +func (s *Server) numRoutes() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.routes) +} \ No newline at end of file diff --git a/server/server.go b/server/server.go index aaf841da..50d4ee6f 100644 --- a/server/server.go +++ b/server/server.go @@ -115,6 +115,11 @@ func New(opts *Options) *Server { return s } +func PrintAndDie(msg string) { + fmt.Fprintf(os.Stderr, "%s\n", msg) + os.Exit(1) +} + // Print our version and exit func PrintServerAndExit() { fmt.Printf("%s\n", VERSION) diff --git a/server/split_test.go b/server/split_test.go index b0eb043d..f7610d85 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -11,20 +11,20 @@ import ( ) func TestSplitBufferSubOp(t *testing.T) { - s := &Server{ sl: sublist.New() } - c := &client{srv:s, subs: hashmap.New()} + s := &Server{sl: sublist.New()} + c := &client{srv: s, subs: hashmap.New()} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] subop2 := subop[6:] - if err := c.parse(subop1) ; err != nil { + if err := c.parse(subop1); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if c.state != SUB_ARG { t.Fatalf("Expected SUB_ARG state vs %d\n", c.state) } - if err := c.parse(subop2) ; err != nil { + if err := c.parse(subop2); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if c.state != OP_START { @@ -47,11 +47,11 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{ sl: sublist.New() } - c := &client{srv:s, subs: hashmap.New()} + s := &Server{sl: sublist.New()} + c := &client{srv: s, subs: hashmap.New()} subop := []byte("SUB foo 1024\r\n") - if err := c.parse(subop) ; err != nil { + if err := c.parse(subop); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if c.state != OP_START { @@ -62,13 +62,13 @@ func TestSplitBufferUnsubOp(t *testing.T) { unsubop1 := unsubop[:8] unsubop2 := unsubop[8:] - if err := c.parse(unsubop1) ; err != nil { + if err := c.parse(unsubop1); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if c.state != UNSUB_ARG { t.Fatalf("Expected UNSUB_ARG state vs %d\n", c.state) } - if err := c.parse(unsubop2) ; err != nil { + if err := c.parse(unsubop2); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if c.state != OP_START { diff --git a/server/usage.go b/server/usage.go new file mode 100644 index 00000000..57103748 --- /dev/null +++ b/server/usage.go @@ -0,0 +1,36 @@ +// Copyright 2013 Apcera Inc. All rights reserved. + +package server + +import ( + "fmt" + "os" +) + +var usageStr = ` +Server options: + -a, --addr HOST Bind to HOST address (default: 0.0.0.0) + -p, --port PORT Use PORT (default: 4222) + -P, --pid FILE File to store PID + -m, --http_port PORT Use HTTP PORT + -c, --config FILE Configuration File + +Logging options: + -l, --log FILE File to redirect log output + -T, --logtime Timestamp log entries (default: true) + -D, --debug Enable debugging output + -V, --trace Trace the raw protocol + +Authorization options: + --user user User required for connections + --pass password Password required for connections + +Common options: + -h, --help Show this message + -v, --version Show version +` + +func Usage() { + fmt.Printf("%s\n", usageStr) + os.Exit(0) +} diff --git a/server/util.go b/server/util.go index 840359b2..2a99ecaf 100644 --- a/server/util.go +++ b/server/util.go @@ -54,4 +54,4 @@ func parseInt64(d []byte) (n int64) { func secondsToDuration(seconds float64) time.Duration { ttl := seconds * float64(time.Second) return time.Duration(ttl) -} \ No newline at end of file +} diff --git a/sublist/sublist_test.go b/sublist/sublist_test.go index 7682f509..40a6ca82 100644 --- a/sublist/sublist_test.go +++ b/sublist/sublist_test.go @@ -273,9 +273,9 @@ func TestStats(t *testing.T) { s.Insert([]byte("stats.>"), "fwc") tmpl := "stats.test.%d" loop := 255 - total := uint32(loop+1) + total := uint32(loop + 1) - for i := 0; i < loop ; i++ { + for i := 0; i < loop; i++ { sub := []byte(fmt.Sprintf(tmpl, i)) s.Insert(sub, "l") } @@ -297,7 +297,7 @@ func TestStats(t *testing.T) { t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, 0) } - for i := 0; i < loop ; i++ { + for i := 0; i < loop; i++ { s.Match([]byte("stats.test.22")) } s.Insert([]byte("stats.*.*"), "pwc") @@ -307,7 +307,7 @@ func TestStats(t *testing.T) { if stats.NumMatches != uint64(loop+1) { t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, loop+1) } - expectedCacheHitRate := 255.0/256.0 + expectedCacheHitRate := 255.0 / 256.0 if stats.CacheHitRate != expectedCacheHitRate { t.Fatalf("Wrong stats for CacheHitRate: %.3g vs %0.3g\n", stats.CacheHitRate, expectedCacheHitRate) } @@ -371,7 +371,7 @@ func init() { sl.Insert(subs[i], subs[i]) } addWildcards() -// println("Sublist holding ", sl.Count(), " subscriptions") + // println("Sublist holding ", sl.Count(), " subscriptions") } func subsInit(pre string) { diff --git a/test/auth_test.go b/test/auth_test.go index 5bbe5869..8473c357 100644 --- a/test/auth_test.go +++ b/test/auth_test.go @@ -38,7 +38,7 @@ func expectAuthRequired(t tLogger, c net.Conn) { // The authorization token version //////////////////////////////////////////////////////////// -const AUTH_PORT=10422 +const AUTH_PORT = 10422 const AUTH_TOKEN = "_YZZ22_" func runAuthServerWithToken() *server.Server { diff --git a/test/bench_test.go b/test/bench_test.go index 61fdb91a..d7264447 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -95,7 +95,7 @@ func Benchmark___Pub1K_Payload(b *testing.B) { func Benchmark___Pub4K_Payload(b *testing.B) { b.StopTimer() - s := sizedString(4*1024) + s := sizedString(4 * 1024) benchPub(b, "a", s) } diff --git a/test/gosrv_test.go b/test/gosrv_test.go index 5419745f..edc71f7c 100644 --- a/test/gosrv_test.go +++ b/test/gosrv_test.go @@ -3,8 +3,8 @@ package test import ( - "testing" "runtime" + "testing" "time" ) @@ -22,7 +22,7 @@ func TestSimpleGoServerShutdown(t *testing.T) { func TestGoServerShutdownWithClients(t *testing.T) { base := runtime.NumGoroutine() s := runDefaultServer() - for i := 0 ; i < 10 ; i++ { + for i := 0; i < 10; i++ { createClientConn(t, "localhost", 4222) } s.Shutdown() @@ -33,4 +33,3 @@ func TestGoServerShutdownWithClients(t *testing.T) { t.Fatalf("%d Go routines still exist post Shutdown()", delta) } } - diff --git a/test/log_test.go b/test/log_test.go new file mode 100644 index 00000000..0435a695 --- /dev/null +++ b/test/log_test.go @@ -0,0 +1,45 @@ +// Copyright 2013 Apcera Inc. All rights reserved. + +package test + +import ( + "io/ioutil" + "os" + "regexp" + "testing" +// "time" + +// "github.com/apcera/gnatsd/server" +) + +var startRe = regexp.MustCompile(`\["Starting nats-server version\s+([^\s]+)"\]\n`) + +func TestLogFile(t *testing.T) { + opts := DefaultTestOptions + opts.NoLog = false + opts.Logtime = false + + tmpDir, err := ioutil.TempDir("", "_gnatsd") + if err != nil { + t.Fatal("Could not create tmp dir") + } + defer os.RemoveAll(tmpDir) + + file, err := ioutil.TempFile(tmpDir, "gnatsd:log_") + file.Close() + opts.LogFile = file.Name() + + s := RunServer(&opts) + s.Shutdown() + + buf, err := ioutil.ReadFile(opts.LogFile) + if err != nil { + t.Fatalf("Could not read logfile: %v", err) + } + if len(buf) <= 0 { + t.Fatal("Expected a non-zero length logfile") + } + if !startRe.Match(buf) { + t.Fatalf("Logfile did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", buf, startRe) + } +} diff --git a/test/monitor_test.go b/test/monitor_test.go index 55dc5f4a..10cdffcd 100644 --- a/test/monitor_test.go +++ b/test/monitor_test.go @@ -13,7 +13,7 @@ import ( "github.com/apcera/gnatsd/server" ) -const MONITOR_PORT=11422 +const MONITOR_PORT = 11422 // Make sure that we do not run the http server for monitoring unless asked. func TestNoMonitorPort(t *testing.T) { diff --git a/test/ping_test.go b/test/ping_test.go index 1a610385..f800d2a4 100644 --- a/test/ping_test.go +++ b/test/ping_test.go @@ -34,8 +34,8 @@ func TestPingInterval(t *testing.T) { expect := expectCommand(t, c) // Expect the max to be delivered correctly.. - for i := 0 ; i < PING_MAX; i++ { - time.Sleep(PING_INTERVAL/2) + for i := 0; i < PING_MAX; i++ { + time.Sleep(PING_INTERVAL / 2) expect(pingRe) } diff --git a/test/proto_test.go b/test/proto_test.go index 6252f4b3..2fc3127e 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -86,7 +86,7 @@ func TestQueueSub(t *testing.T) { send("PUB foo 2\r\nok\r\n") } // Wait for responses - time.Sleep(250*time.Millisecond) + time.Sleep(250 * time.Millisecond) matches := expectMsgs(sent) sids := make(map[string]int) @@ -121,7 +121,7 @@ func TestMultipleQueueSub(t *testing.T) { send("PUB foo 2\r\nok\r\n") } // Wait for responses - time.Sleep(250*time.Millisecond) + time.Sleep(250 * time.Millisecond) matches := expectMsgs(sent * 2) sids := make(map[string]int) diff --git a/test/routes_test.go b/test/routes_test.go index 5bef7066..35d2231a 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -45,7 +45,6 @@ func TestRouteGoServerShutdown(t *testing.T) { time.Sleep(10 * time.Millisecond) delta := (runtime.NumGoroutine() - base) if delta > 1 { - panic("foo") t.Fatalf("%d Go routines still exist post Shutdown()", delta) } } diff --git a/test/test.go b/test/test.go index 642f242d..a2b4593e 100644 --- a/test/test.go +++ b/test/test.go @@ -255,15 +255,15 @@ func sendProto(t tLogger, c net.Conn, op string) { } var ( - infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`) - pingRe = regexp.MustCompile(`PING\r\n`) - pongRe = regexp.MustCompile(`PONG\r\n`) - msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) - okRe = regexp.MustCompile(`\A\+OK\r\n`) - errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`) - subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`) - unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`) - connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`) + infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`) + pingRe = regexp.MustCompile(`PING\r\n`) + pongRe = regexp.MustCompile(`PONG\r\n`) + msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) + okRe = regexp.MustCompile(`\A\+OK\r\n`) + errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`) + subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`) + unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`) + connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`) ) const (