diff --git a/auth/plain.go b/auth/plain.go index ca0d1517..feec87a8 100644 --- a/auth/plain.go +++ b/auth/plain.go @@ -9,17 +9,19 @@ import ( "golang.org/x/crypto/bcrypt" ) -const BcryptPrefix = "$2a$" +const bcryptPrefix = "$2a$" func isBcrypt(password string) bool { - return strings.HasPrefix(password, BcryptPrefix) + return strings.HasPrefix(password, bcryptPrefix) } +// Plain authentication is a basic username and password type Plain struct { Username string Password string } +// Check authenticates the client using a username and password func (p *Plain) Check(c server.ClientAuth) bool { opts := c.GetOpts() if p.Username != opts.Username { diff --git a/auth/token.go b/auth/token.go index 66815575..1e0486b4 100644 --- a/auth/token.go +++ b/auth/token.go @@ -5,10 +5,12 @@ import ( "golang.org/x/crypto/bcrypt" ) +// Token holds a string token used for authentication type Token struct { Token string } +// Check authenticates a client from a token func (p *Token) Check(c server.ClientAuth) bool { opts := c.GetOpts() // Check to see if the token is a bcrypt hash diff --git a/conf/lex.go b/conf/lex.go index cec332b7..ba34a854 100644 --- a/conf/lex.go +++ b/conf/lex.go @@ -709,9 +709,8 @@ func lexNumberOrDateStart(lx *lexer) stateFn { if !isDigit(r) { if r == '.' { return lx.errorf("Floats must start with a digit, not '.'.") - } else { - return lx.errorf("Expected a digit but got '%v'.", r) } + return lx.errorf("Expected a digit but got '%v'.", r) } return lexNumberOrDate } @@ -772,9 +771,8 @@ func lexNumberStart(lx *lexer) stateFn { if !isDigit(r) { if r == '.' { return lx.errorf("Floats must start with a digit, not '.'.") - } else { - return lx.errorf("Expected a digit but got '%v'.", r) } + return lx.errorf("Expected a digit but got '%v'.", r) } return lexNumber } diff --git a/conf/parse.go b/conf/parse.go index a2edc99a..2d2a4fd6 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -1,6 +1,6 @@ // Copyright 2013 Apcera Inc. All rights reserved. -// Conf is a configuration file format used by gnatsd. It is +// Package conf supports a configuration file format used by gnatsd. It is // a flexible format that combines the best of traditional // configuration formats and newer styles such as JSON and YAML. package conf @@ -20,9 +20,6 @@ import ( "time" ) -// Parser will return a map of keys to interface{}, although concrete types -// underly them. The values supported are string, bool, int64, float64, DateTime. -// Arrays and nested Maps are also supported. type parser struct { mapping map[string]interface{} lx *lexer @@ -37,6 +34,9 @@ type parser struct { keys []string } +// Parse will return a map of keys to interface{}, although concrete types +// underly them. The values supported are string, bool, int64, float64, DateTime. +// Arrays and nested Maps are also supported. func Parse(data string) (map[string]interface{}, error) { p, err := parse(data) if err != nil { @@ -121,9 +121,8 @@ func (p *parser) processItem(it item) error { if e, ok := err.(*strconv.NumError); ok && e.Err == strconv.ErrRange { return fmt.Errorf("Integer '%s' is out of the range.", it.val) - } else { - return fmt.Errorf("Expected integer, but got '%s'.", it.val) } + return fmt.Errorf("Expected integer, but got '%s'.", it.val) } p.setValue(num) case itemFloat: @@ -132,9 +131,8 @@ func (p *parser) processItem(it item) error { if e, ok := err.(*strconv.NumError); ok && e.Err == strconv.ErrRange { return fmt.Errorf("Float '%s' is out of the range.", it.val) - } else { - return fmt.Errorf("Expected float, but got '%s'.", it.val) } + return fmt.Errorf("Expected float, but got '%s'.", it.val) } p.setValue(num) case itemBool: @@ -154,7 +152,7 @@ func (p *parser) processItem(it item) error { } p.setValue(dt) case itemArrayStart: - array := make([]interface{}, 0) + var array = make([]interface{}, 0) p.pushContext(array) case itemArrayEnd: array := p.ctx diff --git a/gnatsd.go b/gnatsd.go index 3a313608..d16a653b 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -81,7 +81,7 @@ func main() { } if showTLSHelp { - server.PrintTlsHelpAndDie() + server.PrintTLSHelpAndDie() } // One flag can set multiple options. diff --git a/hash/hash.go b/hash/hash.go index fdc2814e..1eb39563 100644 --- a/hash/hash.go +++ b/hash/hash.go @@ -1,13 +1,13 @@ // Copyright 2012 Apcera Inc. All rights reserved. -// Collection of high performance 32-bit hash functions. +// Package hash is a collection of high performance 32-bit hash functions. package hash import ( "unsafe" ) -// Generates a Bernstein Hash. +// Bernstein Hash. func Bernstein(data []byte) uint32 { hash := uint32(5381) for _, b := range data { @@ -23,7 +23,7 @@ const ( _YP32 = 709607 ) -// Generates an FNV1A Hash [http://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function] +// FNV1A Hash [http://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function] func FNV1A(data []byte) uint32 { var hash uint32 = _OFF32 for _, c := range data { @@ -152,10 +152,10 @@ const ( _F2 = uint32(0xc2b2ae35) ) -// A default seed for Murmur3 +// M3Seed is a default seed for Murmur3 const M3Seed = uint32(0x9747b28c) -// Generates a Murmur3 Hash [http://code.google.com/p/smhasher/wiki/MurmurHash3] +// Murmur3 Hash [http://code.google.com/p/smhasher/wiki/MurmurHash3] // Does not generate intermediate objects. func Murmur3(data []byte, seed uint32) uint32 { h1 := seed diff --git a/hashmap/hashmap.go b/hashmap/hashmap.go index 767dbeed..0edc6aec 100644 --- a/hashmap/hashmap.go +++ b/hashmap/hashmap.go @@ -1,6 +1,6 @@ // Copyright 2012-2015 Apcera Inc. All rights reserved. -// HashMap defines a high performance hashmap based on +// Package hashmap defines a high performance hashmap based on // fast hashing and fast key comparison. Simple chaining // is used, relying on the hashing algorithms for good // distribution @@ -93,7 +93,7 @@ func (h *HashMap) Set(key []byte, data interface{}) { ne := &Entry{hk: hk, key: key, data: data} ne.next = h.bkts[hk&h.msk] h.bkts[hk&h.msk] = ne - h.used += 1 + h.used++ // Check for resizing if h.rsz && (h.used > uint32(len(h.bkts))) { h.grow() @@ -152,7 +152,7 @@ func (h *HashMap) Remove(key []byte) { if len(key) == len((*e).key) && bytes.Equal(key, (*e).key) { // Success *e = (*e).next - h.used -= 1 + h.used-- // Check for resizing lbkts := uint32(len(h.bkts)) if h.rsz && lbkts > _BSZ && (h.used < lbkts/4) { @@ -235,11 +235,11 @@ func (h *HashMap) Stats() *Stats { lc, totalc, slots := 0, 0, 0 for _, e := range h.bkts { if e != nil { - slots += 1 + slots++ } i := 0 for ; e != nil; e = e.next { - i += 1 + i++ if i > lc { lc = i } diff --git a/hashmap/rand_evict.go b/hashmap/rand_evict.go index 6371f694..81c4ce6d 100644 --- a/hashmap/rand_evict.go +++ b/hashmap/rand_evict.go @@ -25,7 +25,7 @@ func (h *HashMap) RemoveRandom() { e := &h.bkts[i] if *e != nil { *e = (*e).next - h.used -= 1 + h.used-- return } } @@ -35,7 +35,7 @@ func (h *HashMap) RemoveRandom() { e := &h.bkts[i] if *e != nil { *e = (*e).next - h.used -= 1 + h.used-- return } } diff --git a/logger/log.go b/logger/log.go index 3c800227..da72adbc 100644 --- a/logger/log.go +++ b/logger/log.go @@ -1,4 +1,6 @@ // Copyright 2012-2015 Apcera Inc. All rights reserved. + +//Package logger provides logging facilities for the NATS server package logger import ( @@ -7,6 +9,7 @@ import ( "os" ) +// Logger is the server logger type Logger struct { logger *log.Logger debug bool @@ -18,6 +21,7 @@ type Logger struct { traceLabel string } +// NewStdLogger creates a logger with output directed to Stderr func NewStdLogger(time, debug, trace, colors, pid bool) *Logger { flags := 0 if time { @@ -44,6 +48,7 @@ func NewStdLogger(time, debug, trace, colors, pid bool) *Logger { return l } +// NewFileLogger creates a logger with output directed to a file func NewFileLogger(filename string, time, debug, trace, pid bool) *Logger { fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE f, err := os.OpenFile(filename, fileflags, 0660) @@ -93,24 +98,29 @@ func setColoredLabelFormats(l *Logger) { l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRC") } +// Noticef logs a notice statement func (l *Logger) Noticef(format string, v ...interface{}) { l.logger.Printf(l.infoLabel+format, v...) } +// Errorf logs an error statement func (l *Logger) Errorf(format string, v ...interface{}) { l.logger.Printf(l.errorLabel+format, v...) } +// Fatalf logs a fatal error func (l *Logger) Fatalf(format string, v ...interface{}) { l.logger.Fatalf(l.fatalLabel+format, v...) } +// Debugf logs a debug statement func (l *Logger) Debugf(format string, v ...interface{}) { if l.debug == true { l.logger.Printf(l.debugLabel+format, v...) } } +// Tracef logs a trace statement func (l *Logger) Tracef(format string, v ...interface{}) { if l.trace == true { l.logger.Printf(l.traceLabel+format, v...) diff --git a/logger/syslog.go b/logger/syslog.go index 4ce69c95..99e2cffb 100644 --- a/logger/syslog.go +++ b/logger/syslog.go @@ -11,12 +11,14 @@ import ( "net/url" ) +// SysLogger provides a system logger facility type SysLogger struct { writer *syslog.Writer debug bool trace bool } +// NewSysLogger creates a new system logger func NewSysLogger(debug, trace bool) *SysLogger { w, err := syslog.New(syslog.LOG_DAEMON|syslog.LOG_NOTICE, "gnatsd") if err != nil { @@ -30,6 +32,7 @@ func NewSysLogger(debug, trace bool) *SysLogger { } } +// NewRemoteSysLogger creates a new remote system logger func NewRemoteSysLogger(fqn string, debug, trace bool) *SysLogger { network, addr := getNetworkAndAddr(fqn) w, err := syslog.Dial(network, addr, syslog.LOG_DEBUG, "gnatsd") @@ -62,24 +65,29 @@ func getNetworkAndAddr(fqn string) (network, addr string) { return } +// Noticef logs a notice statement func (l *SysLogger) Noticef(format string, v ...interface{}) { l.writer.Notice(fmt.Sprintf(format, v...)) } +// Fatalf logs a fatal error func (l *SysLogger) Fatalf(format string, v ...interface{}) { l.writer.Crit(fmt.Sprintf(format, v...)) } +// Errorf logs an error statement func (l *SysLogger) Errorf(format string, v ...interface{}) { l.writer.Err(fmt.Sprintf(format, v...)) } +// Debugf logs a debug statement func (l *SysLogger) Debugf(format string, v ...interface{}) { if l.debug { l.writer.Debug(fmt.Sprintf(format, v...)) } } +// Tracef logs a trace statement func (l *SysLogger) Tracef(format string, v ...interface{}) { if l.trace { l.writer.Notice(fmt.Sprintf(format, v...)) diff --git a/logger/syslog_test.go b/logger/syslog_test.go index 84924e67..16452e80 100644 --- a/logger/syslog_test.go +++ b/logger/syslog_test.go @@ -150,7 +150,7 @@ func expectSyslogOutput(t *testing.T, line string, expected string) { func runSyslog(c net.PacketConn, done chan<- string) { var buf [4096]byte - var rcvd string = "" + var rcvd string for { n, _, err := c.ReadFrom(buf[:]) if err != nil || n == 0 { diff --git a/server/auth.go b/server/auth.go index f80f5319..92a2b8f9 100644 --- a/server/auth.go +++ b/server/auth.go @@ -2,10 +2,14 @@ package server +// Auth is an interface for implementing authentication type Auth interface { + // Check if a client is authorized to connect Check(c ClientAuth) bool } +// ClientAuth is an interface for client authentication type ClientAuth interface { + // Get options associated with a client GetOpts() *clientOpts } diff --git a/server/client.go b/server/client.go index 1d2d9fa1..f3d9eefb 100644 --- a/server/client.go +++ b/server/client.go @@ -356,7 +356,7 @@ func (c *client) processPing() { func (c *client) processPong() { c.traceInOp("PONG", nil) c.mu.Lock() - c.pout -= 1 + c.pout-- c.mu.Unlock() } @@ -861,7 +861,7 @@ func (c *client) processPingTimer() { c.Debugf("%s Ping Timer", c.typeString()) // Check for violation - c.pout += 1 + c.pout++ if c.pout > c.srv.opts.MaxPingsOut { c.Debugf("Stale Client Connection - Closing") if c.bw != nil { diff --git a/server/client_test.go b/server/client_test.go index f9fc270b..21dba130 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -304,7 +304,7 @@ func TestClientPubWithQueueSub(t *testing.T) { }() var n1, n2, received int - for ; ; received += 1 { + for ; ; received++ { l, err := cr.ReadString('\n') if err != nil { break @@ -353,7 +353,7 @@ func TestClientUnSub(t *testing.T) { }() var received int - for ; ; received += 1 { + for ; ; received++ { l, err := cr.ReadString('\n') if err != nil { break @@ -396,7 +396,7 @@ func TestClientUnSubMax(t *testing.T) { }() var received int - for ; ; received += 1 { + for ; ; received++ { l, err := cr.ReadString('\n') if err != nil { break diff --git a/server/const.go b/server/const.go index 5aa65b4c..db999477 100644 --- a/server/const.go +++ b/server/const.go @@ -47,7 +47,7 @@ const ( // DEFAULT_PING_MAX_OUT is maximum allowed pings outstanding before disconnect. DEFAULT_PING_MAX_OUT = 2 - // CRLF string + // CR_LF string CR_LF = "\r\n" // LEN_CR_LF hold onto the computed size. @@ -83,7 +83,7 @@ const ( // MAX_PUB_ARGS Maximum possible number of arguments from PUB proto. MAX_PUB_ARGS = 3 - // Default Buffer size for reads and writes per connection. Will be replaced by dynamic - // system in the long run. + // DEFAULT_BUF_SIZE is the default buffer size for reads and writes per connection. + // It will be replaced by a dynamic system in the long run. DEFAULT_BUF_SIZE = 32768 ) diff --git a/server/log.go b/server/log.go index 8239dd23..a7a29966 100644 --- a/server/log.go +++ b/server/log.go @@ -16,14 +16,26 @@ var log = struct { logger Logger }{} +// Logger interface of the NATS Server type Logger interface { + + // Log a notice statement Noticef(format string, v ...interface{}) + + // Log a fatal error Fatalf(format string, v ...interface{}) + + // Log an error Errorf(format string, v ...interface{}) + + // Log a debug statement Debugf(format string, v ...interface{}) + + // Log a trace statement Tracef(format string, v ...interface{}) } +// SetLogger sets the logger of the server func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) { if debugFlag { atomic.StoreInt32(&debug, 1) @@ -38,24 +50,28 @@ func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) { log.Unlock() } +// Noticef logs a notice statement func Noticef(format string, v ...interface{}) { executeLogCall(func(logger Logger, format string, v ...interface{}) { logger.Noticef(format, v...) }, format, v...) } +// Errorf logs an error func Errorf(format string, v ...interface{}) { executeLogCall(func(logger Logger, format string, v ...interface{}) { logger.Errorf(format, v...) }, format, v...) } +// Fatalf logs a fatal error func Fatalf(format string, v ...interface{}) { executeLogCall(func(logger Logger, format string, v ...interface{}) { logger.Fatalf(format, v...) }, format, v...) } +// Debugf logs a debug statement func Debugf(format string, v ...interface{}) { if atomic.LoadInt32(&debug) == 0 { return @@ -66,6 +82,7 @@ func Debugf(format string, v ...interface{}) { }, format, v...) } +// Tracef logs a trace statement func Tracef(format string, v ...interface{}) { if atomic.LoadInt32(&trace) == 0 { return diff --git a/server/monitor.go b/server/monitor.go index 02c19ed9..a873b07e 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -56,6 +56,7 @@ type ConnInfo struct { Subs []string `json:"subscriptions_list,omitempty"` } +// DefaultConnListSize is the default size of the connection list. const DefaultConnListSize = 1024 // HandleConnz process HTTP requests for connection information. @@ -317,7 +318,7 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, b) } -// HandleStats process HTTP requests for subjects stats. +// HandleSubsz processes HTTP requests for subjects stats. func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { s.mu.Lock() s.httpReqStats[SubszPath]++ diff --git a/server/monitor_sort_opts.go b/server/monitor_sort_opts.go index 043133b2..91bce567 100644 --- a/server/monitor_sort_opts.go +++ b/server/monitor_sort_opts.go @@ -2,7 +2,7 @@ package server -// Helper types to sort by ConnInfo values +// SortOpt is a helper type to sort by ConnInfo values type SortOpt string const ( @@ -17,6 +17,7 @@ const ( byIdle = "idle" ) +// IsValid determines if a sort option is valid func (s SortOpt) IsValid() bool { switch s { case "", byCid, bySubs, byPending, byOutMsgs, byInMsgs, byOutBytes, byInBytes, byLast, byIdle: @@ -26,11 +27,13 @@ func (s SortOpt) IsValid() bool { } } +// Pair type is internally used. type Pair struct { Key *client Val int64 } +// Pairs type is internally used. type Pairs []Pair func (d Pairs) Len() int { diff --git a/server/monitor_test.go b/server/monitor_test.go index 6f555981..bdfed9b5 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -36,7 +36,7 @@ func runMonitorServer() *Server { return RunServer(&opts) } -func runMonitorServerNoHttpPort() *Server { +func runMonitorServerNoHTTPPort() *Server { resetPreviousHTTPConnections() opts := DefaultMonitorOptions opts.HTTPPort = 0 @@ -81,7 +81,7 @@ func TestMyUptime(t *testing.T) { // Make sure that we do not run the http server for monitoring unless asked. func TestNoMonitorPort(t *testing.T) { - s := runMonitorServerNoHttpPort() + s := runMonitorServerNoHTTPPort() defer s.Shutdown() url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) @@ -949,8 +949,8 @@ func TestConnzWithRoutes(t *testing.T) { NoLog: true, NoSigs: true, } - routeUrl, _ := url.Parse(fmt.Sprintf("nats-route://localhost:%d", CLUSTER_PORT)) - opts.Routes = []*url.URL{routeUrl} + routeURL, _ := url.Parse(fmt.Sprintf("nats-route://localhost:%d", CLUSTER_PORT)) + opts.Routes = []*url.URL{routeURL} sc := RunServer(&opts) defer sc.Shutdown() @@ -1168,10 +1168,10 @@ func createClientConnSubscribeAndPublish(t *testing.T) *nats.Conn { } func createClientConnWithName(t *testing.T, name string) *nats.Conn { - natsUri := fmt.Sprintf("nats://localhost:%d", CLIENT_PORT) + natsURI := fmt.Sprintf("nats://localhost:%d", CLIENT_PORT) client := nats.DefaultOptions - client.Servers = []string{natsUri} + client.Servers = []string{natsURI} client.Name = name nc, err := client.Connect() if err != nil { diff --git a/server/opts.go b/server/opts.go index 31f9a4e5..6a4b3616 100644 --- a/server/opts.go +++ b/server/opts.go @@ -69,8 +69,8 @@ type authorization struct { timeout float64 } -// This struct holds the parsed tls config information. -// It's public so we can use it for flag parsing +// TLSConfigOpts holds the parsed tls config information, +// used with flag parsing type TLSConfigOpts struct { CertFile string KeyFile string @@ -227,8 +227,8 @@ func parseAuthorization(am map[string]interface{}) authorization { return auth } -// For Usage... -func PrintTlsHelpAndDie() { +// PrintTLSHelpAndDie prints TLS usage and exits. +func PrintTLSHelpAndDie() { var tlsUsage = ` TLS configuration is specified in the tls section of a configuration file: @@ -301,7 +301,7 @@ func parseTLS(tlsm map[string]interface{}) (*TLSConfigOpts, error) { case "cipher_suites": ra := mv.([]interface{}) if len(ra) == 0 { - return nil, fmt.Errorf("error parsing tls config, 'cipher_suites' cannot be empty.") + return nil, fmt.Errorf("error parsing tls config, 'cipher_suites' cannot be empty") } tc.Ciphers = make([]uint16, 0, len(ra)) for _, r := range ra { @@ -333,6 +333,7 @@ func parseTLS(tlsm map[string]interface{}) (*TLSConfigOpts, error) { return &tc, nil } +// GenTLSConfig loads TLS related configuration parameters. func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error) { // Now load in cert and private key @@ -429,6 +430,7 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { return &opts } +// RoutesFromStr parses route URLs from a string func RoutesFromStr(routesStr string) []*url.URL { routes := strings.Split(routesStr, ",") if len(routes) == 0 { @@ -453,6 +455,7 @@ func mergeRoutes(opts, flagOpts *Options) { opts.RoutesStr = flagOpts.RoutesStr } +// RemoveSelfReference removes this server from an array of routes func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) { var cleanRoutes []*url.URL cport := strconv.Itoa(clusterPort) @@ -464,7 +467,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) return nil, err } - if cport == port && isIpInList(selfIPs, getUrlIp(host)) { + if cport == port && isIPInList(selfIPs, getURLIP(host)) { Noticef("Self referencing IP found: ", r) continue } @@ -474,7 +477,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) return cleanRoutes, nil } -func isIpInList(list1 []net.IP, list2 []net.IP) bool { +func isIPInList(list1 []net.IP, list2 []net.IP) bool { for _, ip1 := range list1 { for _, ip2 := range list2 { if ip1.Equal(ip2) { @@ -485,7 +488,7 @@ func isIpInList(list1 []net.IP, list2 []net.IP) bool { return false } -func getUrlIp(ipStr string) []net.IP { +func getURLIP(ipStr string) []net.IP { ipList := []net.IP{} ip := net.ParseIP(ipStr) diff --git a/server/parser.go b/server/parser.go index 0dcc520f..bd1870e5 100644 --- a/server/parser.go +++ b/server/parser.go @@ -24,6 +24,7 @@ type parseState struct { scratch [MAX_CONTROL_LINE_SIZE]byte } +// Parser constants const ( OP_START = iota OP_PLUS diff --git a/server/route.go b/server/route.go index e5180421..54cbcadd 100644 --- a/server/route.go +++ b/server/route.go @@ -17,7 +17,7 @@ import ( "time" ) -// Designate the router type +// RouteType designates the router type type RouteType int // Type of Route @@ -47,6 +47,7 @@ type connectInfo struct { Name string `json:"name"` } +// Route protocol constants const ( ConProto = "CONNECT %s" + _CRLF_ InfoProto = "INFO %s" + _CRLF_ @@ -378,7 +379,7 @@ const ( // FIXME(dlc) - Make these reserved and reject if they come in as a sid // from a client connection. - +// Route constants const ( RSID = "RSID" QRSID = "QRSID" @@ -588,18 +589,18 @@ func (s *Server) StartRouting() { s.solicitRoutes() } -func (s *Server) reConnectToRoute(rUrl *url.URL, rtype RouteType) { +func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) { tryForEver := rtype == Explicit if tryForEver { time.Sleep(DEFAULT_ROUTE_RECONNECT) } - s.connectToRoute(rUrl, tryForEver) + s.connectToRoute(rURL, tryForEver) } -func (s *Server) connectToRoute(rUrl *url.URL, tryForEver bool) { - for s.isRunning() && rUrl != nil { - Debugf("Trying to connect to route on %s", rUrl.Host) - conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) +func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) { + for s.isRunning() && rURL != nil { + Debugf("Trying to connect to route on %s", rURL.Host) + conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL) if err != nil { Debugf("Error trying to connect to route: %v", err) select { @@ -614,7 +615,7 @@ func (s *Server) connectToRoute(rUrl *url.URL, tryForEver bool) { } // We have a route connection here. // Go ahead and create it and exit this func. - s.createRoute(conn, rUrl) + s.createRoute(conn, rURL) return } } diff --git a/server/routes_test.go b/server/routes_test.go index 329a855a..d49fe16c 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -161,9 +161,9 @@ func checkClusterFormed(t *testing.T, servers ...*Server) { // Helper function to generate next opts to make sure no port conflicts etc. func nextServerOpts(opts *Options) *Options { nopts := *opts - nopts.Port += 1 - nopts.ClusterPort += 1 - nopts.HTTPPort += 1 + nopts.Port++ + nopts.ClusterPort++ + nopts.HTTPPort++ return &nopts } diff --git a/server/server.go b/server/server.go index f8b3c98b..90c565c0 100644 --- a/server/server.go +++ b/server/server.go @@ -128,7 +128,7 @@ func New(opts *Options) *Server { return s } -// Sets the authentication method for clients. +// SetClientAuthMethod sets the authentication method for clients. func (s *Server) SetClientAuthMethod(authMethod Auth) { s.mu.Lock() defer s.mu.Unlock() @@ -139,7 +139,7 @@ func (s *Server) SetClientAuthMethod(authMethod Auth) { s.generateServerInfoJSON() } -// Sets the authentication method for routes. +// SetRouteAuthMethod sets the authentication method for routes. func (s *Server) SetRouteAuthMethod(authMethod Auth) { s.mu.Lock() defer s.mu.Unlock() @@ -388,7 +388,7 @@ func (s *Server) StartHTTPMonitoring() { s.startMonitoring(false) } -// StartHTTPMonitoring will enable the HTTPS monitoring port. +// StartHTTPSMonitoring will enable the HTTPS monitoring port. func (s *Server) StartHTTPSMonitoring() { s.startMonitoring(true) } @@ -757,7 +757,7 @@ func (s *Server) GetRouteListenEndpoint() string { return net.JoinHostPort(host, strconv.Itoa(s.opts.ClusterPort)) } -// Server's ID +// Id returns the server's ID func (s *Server) Id() string { s.mu.Lock() defer s.mu.Unlock() diff --git a/server/split_test.go b/server/split_test.go index ccc868bd..8e940bfb 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -247,7 +247,7 @@ func TestSplitConnectArg(t *testing.T) { connectAll := []byte("CONNECT {\"verbose\":false,\"ssl_required\":false," + "\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n") - argJson := connectAll[8:] + argJSON := connectAll[8:] c1 := connectAll[:5] c2 := connectAll[5:22] @@ -267,8 +267,8 @@ func TestSplitConnectArg(t *testing.T) { if c.argBuf == nil { t.Fatalf("Expected argBug to not be nil.\n") } - if !bytes.Equal(c.argBuf, argJson[:14]) { - t.Fatalf("argBuf not correct, received %q, wanted %q\n", argJson[:14], c.argBuf) + if !bytes.Equal(c.argBuf, argJSON[:14]) { + t.Fatalf("argBuf not correct, received %q, wanted %q\n", argJSON[:14], c.argBuf) } if err := c.parse(c3); err != nil { @@ -277,9 +277,9 @@ func TestSplitConnectArg(t *testing.T) { if c.argBuf == nil { t.Fatalf("Expected argBug to not be nil.\n") } - if !bytes.Equal(c.argBuf, argJson[:len(argJson)-2]) { + if !bytes.Equal(c.argBuf, argJSON[:len(argJSON)-2]) { t.Fatalf("argBuf not correct, received %q, wanted %q\n", - argJson[:len(argJson)-2], c.argBuf) + argJSON[:len(argJSON)-2], c.argBuf) } if err := c.parse(c4); err != nil { diff --git a/server/util.go b/server/util.go index 728b0cea..af9b572c 100644 --- a/server/util.go +++ b/server/util.go @@ -15,8 +15,8 @@ func genID() string { // Ascii numbers 0-9 const ( - ascii_0 = 48 - ascii_9 = 57 + asciiZero = 48 + asciiNine = 57 ) // parseSize expects decimal positive numbers. We @@ -26,10 +26,10 @@ func parseSize(d []byte) (n int) { return -1 } for _, dec := range d { - if dec < ascii_0 || dec > ascii_9 { + if dec < asciiZero || dec > asciiNine { return -1 } - n = n*10 + (int(dec) - ascii_0) + n = n*10 + (int(dec) - asciiZero) } return n } @@ -41,10 +41,10 @@ func parseInt64(d []byte) (n int64) { return -1 } for _, dec := range d { - if dec < ascii_0 || dec > ascii_9 { + if dec < asciiZero || dec > asciiNine { return -1 } - n = n*10 + (int64(dec) - ascii_0) + n = n*10 + (int64(dec) - asciiZero) } return n } diff --git a/sublist/sublist.go b/sublist/sublist.go index da423699..c66a1a09 100644 --- a/sublist/sublist.go +++ b/sublist/sublist.go @@ -1,8 +1,8 @@ // Copyright 2012-2016 Apcera Inc. All rights reserved. -// Sublist is a subject distribution data structure that can match subjects to -// interested subscribers. Subscribers can have wildcard subjects to match -// multiple published subjects. +// Package sublist handles subject distribution and provides a facility +// match subjects to interested subscribers. Subscribers can have wildcard +// subjects to match multiple published subjects. package sublist import ( @@ -85,6 +85,7 @@ const ( _SEP = byte('.') ) +// Sublist related errors var ( ErrInvalidSubject = errors.New("Invalid Subject") ErrNotFound = errors.New("No Matches Found") @@ -102,6 +103,7 @@ func split(subject []byte, tokens [][]byte) [][]byte { return append(tokens, subject[start:]) } +// Insert adds a subject into the sublist func (s *Sublist) Insert(subject []byte, sub interface{}) error { tsa := [16][]byte{} toks := split(subject, tsa[:0]) @@ -375,10 +377,10 @@ func (n *node) isEmpty() bool { func (l *level) numNodes() uint32 { num := l.nodes.Count() if l.pwc != nil { - num += 1 + num++ } if l.fwc != nil { - num += 1 + num++ } return num } @@ -415,10 +417,10 @@ func matchLiteral(literal, subject []byte) bool { ll := len(literal) for { if li >= ll || literal[li] == _SEP { - li -= 1 + li-- break } - li += 1 + li++ } case _FWC: return true @@ -427,7 +429,7 @@ func matchLiteral(literal, subject []byte) bool { return false } } - li += 1 + li++ } // Make sure we have processed all of the literal's chars.. if li < ll { @@ -436,6 +438,7 @@ func matchLiteral(literal, subject []byte) bool { return true } +// IsValidLiteralSubject returns true if a subject is valid, false otherwise func IsValidLiteralSubject(subject []byte) bool { tsa := [16][]byte{} toks := split(subject, tsa[:0]) @@ -523,7 +526,7 @@ func visitLevel(l *level, depth int) int { return depth } - depth += 1 + depth++ maxDepth := depth all := l.nodes.All() diff --git a/test/cluster_test.go b/test/cluster_test.go index 7b93ce98..0d20ac9f 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -137,17 +137,17 @@ func TestClusterQueueSubs(t *testing.T) { expectMsgsB := expectMsgsCommand(t, expectB) // Capture sids for checking later. - qg1Sids_a := []string{"1", "2", "3"} + qg1SidsA := []string{"1", "2", "3"} // Three queue subscribers - for _, sid := range qg1Sids_a { + for _, sid := range qg1SidsA { sendA(fmt.Sprintf("SUB foo qg1 %s\r\n", sid)) } sendA("PING\r\n") expectA(pongRe) // Make sure the subs have propagated to srvB before continuing - if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil { + if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil { t.Fatalf("%v", err) } @@ -174,7 +174,7 @@ func TestClusterQueueSubs(t *testing.T) { expectA(pongRe) // Make sure the subs have propagated to srvB before continuing - if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids), srvB); err != nil { + if err := checkExpectedSubs(len(qg1SidsA)+len(pSids), srvB); err != nil { t.Fatalf("%v", err) } @@ -185,7 +185,7 @@ func TestClusterQueueSubs(t *testing.T) { // Should receive 5. matches = expectMsgsA(5) - checkForQueueSid(t, matches, qg1Sids_a) + checkForQueueSid(t, matches, qg1SidsA) checkForPubSids(t, matches, pSids) // Send to A @@ -193,19 +193,19 @@ func TestClusterQueueSubs(t *testing.T) { // Should receive 5. matches = expectMsgsA(5) - checkForQueueSid(t, matches, qg1Sids_a) + checkForQueueSid(t, matches, qg1SidsA) checkForPubSids(t, matches, pSids) // Now add queue subscribers to B - qg2Sids_b := []string{"1", "2", "3"} - for _, sid := range qg2Sids_b { + qg2SidsB := []string{"1", "2", "3"} + for _, sid := range qg2SidsB { sendB(fmt.Sprintf("SUB foo qg2 %s\r\n", sid)) } sendB("PING\r\n") expectB(pongRe) // Make sure the subs have propagated to srvA before continuing - if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids)+len(qg2Sids_b), srvA); err != nil { + if err := checkExpectedSubs(len(qg1SidsA)+len(pSids)+len(qg2SidsB), srvA); err != nil { t.Fatalf("%v", err) } @@ -214,22 +214,22 @@ func TestClusterQueueSubs(t *testing.T) { // Should receive 1 from B. matches = expectMsgsB(1) - checkForQueueSid(t, matches, qg2Sids_b) + checkForQueueSid(t, matches, qg2SidsB) // Should receive 5 still from A. matches = expectMsgsA(5) - checkForQueueSid(t, matches, qg1Sids_a) + checkForQueueSid(t, matches, qg1SidsA) checkForPubSids(t, matches, pSids) // Now drop queue subscribers from A - for _, sid := range qg1Sids_a { + for _, sid := range qg1SidsA { sendA(fmt.Sprintf("UNSUB %s\r\n", sid)) } sendA("PING\r\n") expectA(pongRe) // Make sure the subs have propagated to srvB before continuing - if err := checkExpectedSubs(len(pSids)+len(qg2Sids_b), srvB); err != nil { + if err := checkExpectedSubs(len(pSids)+len(qg2SidsB), srvB); err != nil { t.Fatalf("%v", err) } @@ -238,7 +238,7 @@ func TestClusterQueueSubs(t *testing.T) { // Should receive 1 from B. matches = expectMsgsB(1) - checkForQueueSid(t, matches, qg2Sids_b) + checkForQueueSid(t, matches, qg2SidsB) sendB("PING\r\n") expectB(pongRe) @@ -278,17 +278,17 @@ func TestClusterDoubleMsgs(t *testing.T) { expectMsgsA2 := expectMsgsCommand(t, expectA2) // Capture sids for checking later. - qg1Sids_a := []string{"1", "2", "3"} + qg1SidsA := []string{"1", "2", "3"} // Three queue subscribers - for _, sid := range qg1Sids_a { + for _, sid := range qg1SidsA { sendA1(fmt.Sprintf("SUB foo qg1 %s\r\n", sid)) } sendA1("PING\r\n") expectA1(pongRe) // Make sure the subs have propagated to srvB before continuing - if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil { + if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil { t.Fatalf("%v", err) } @@ -299,7 +299,7 @@ func TestClusterDoubleMsgs(t *testing.T) { // Make sure we get only 1. matches := expectMsgsA1(1) checkMsg(t, matches[0], "foo", "", "", "2", "ok") - checkForQueueSid(t, matches, qg1Sids_a) + checkForQueueSid(t, matches, qg1SidsA) // Add a FWC subscriber on A2 sendA2("SUB > 1\r\n") @@ -309,7 +309,7 @@ func TestClusterDoubleMsgs(t *testing.T) { pSids := []string{"1", "2"} // Make sure the subs have propagated to srvB before continuing - if err := checkExpectedSubs(len(qg1Sids_a)+2, srvB); err != nil { + if err := checkExpectedSubs(len(qg1SidsA)+2, srvB); err != nil { t.Fatalf("%v", err) } @@ -319,7 +319,7 @@ func TestClusterDoubleMsgs(t *testing.T) { matches = expectMsgsA1(1) checkMsg(t, matches[0], "foo", "", "", "2", "ok") - checkForQueueSid(t, matches, qg1Sids_a) + checkForQueueSid(t, matches, qg1SidsA) matches = expectMsgsA2(2) checkMsg(t, matches[0], "foo", "", "", "2", "ok") diff --git a/test/monitor_test.go b/test/monitor_test.go index c4abdc44..b6d11fc6 100644 --- a/test/monitor_test.go +++ b/test/monitor_test.go @@ -29,7 +29,7 @@ func runMonitorServer() *server.Server { return RunServer(&opts) } -func runMonitorServerNoHttpPort() *server.Server { +func runMonitorServerNoHTTPPort() *server.Server { resetPreviousHTTPConnections() opts := DefaultTestOptions opts.Port = CLIENT_PORT @@ -44,7 +44,7 @@ func resetPreviousHTTPConnections() { // Make sure that we do not run the http server for monitoring unless asked. func TestNoMonitorPort(t *testing.T) { - s := runMonitorServerNoHttpPort() + s := runMonitorServerNoHTTPPort() defer s.Shutdown() url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) diff --git a/test/proto_test.go b/test/proto_test.go index abe259f3..26950e35 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -103,7 +103,7 @@ func TestQueueSub(t *testing.T) { matches := expectMsgs(sent) sids := make(map[string]int) for _, m := range matches { - sids[string(m[SID_INDEX])]++ + sids[string(m[sidIndex])]++ } if len(sids) != 2 { t.Fatalf("Expected only 2 sids, got %d\n", len(sids)) @@ -140,7 +140,7 @@ func TestMultipleQueueSub(t *testing.T) { matches := expectMsgs(sent * 2) sids := make(map[string]int) for _, m := range matches { - sids[string(m[SID_INDEX])]++ + sids[string(m[sidIndex])]++ } if len(sids) != 4 { t.Fatalf("Expected 4 sids, got %d\n", len(sids)) diff --git a/test/route_discovery_test.go b/test/route_discovery_test.go index b26aef92..411344a1 100644 --- a/test/route_discovery_test.go +++ b/test/route_discovery_test.go @@ -4,7 +4,6 @@ package test import ( "encoding/json" - "errors" "fmt" "io/ioutil" "net" @@ -174,7 +173,7 @@ func TestSeedSolicitWorks(t *testing.T) { // Grab Routez from monitor ports, make sure we are fully connected url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort) - rz := readHttpRoutez(t, url) + rz := readHTTPRoutez(t, url) ris := expectRids(t, rz, []string{s2.Id(), s3.Id()}) if ris[s2.Id()].IsConfigured == true { t.Fatalf("Expected server not to be configured\n") @@ -184,7 +183,7 @@ func TestSeedSolicitWorks(t *testing.T) { } url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort) - rz = readHttpRoutez(t, url) + rz = readHTTPRoutez(t, url) ris = expectRids(t, rz, []string{s1.Id(), s3.Id()}) if ris[s1.Id()].IsConfigured != true { t.Fatalf("Expected seed server to be configured\n") @@ -194,7 +193,7 @@ func TestSeedSolicitWorks(t *testing.T) { } url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort) - rz = readHttpRoutez(t, url) + rz = readHTTPRoutez(t, url) ris = expectRids(t, rz, []string{s1.Id(), s2.Id()}) if ris[s1.Id()].IsConfigured != true { t.Fatalf("Expected seed server to be configured\n") @@ -214,7 +213,7 @@ func checkConnected(t *testing.T, servers []serverInfo, current int, oneSeed boo // Grab Routez from monitor ports, make sure we are fully connected url := fmt.Sprintf("http://%s:%d/", s.opts.Host, s.opts.HTTPPort) - rz := readHttpRoutez(t, url) + rz := readHTTPRoutez(t, url) total := len(servers) var ids []string for i := 0; i < total; i++ { @@ -234,11 +233,11 @@ func checkConnected(t *testing.T, servers []serverInfo, current int, oneSeed boo s := servers[i] if current == 0 || ((oneSeed && i > 0) || (!oneSeed && (i != current-1))) { if ris[s.server.Id()].IsConfigured != false { - return errors.New(fmt.Sprintf("Expected server %s:%d not to be configured", s.opts.Host, s.opts.Port)) + return fmt.Errorf("Expected server %s:%d not to be configured", s.opts.Host, s.opts.Port) } } else if oneSeed || (i == current-1) { if ris[s.server.Id()].IsConfigured != true { - return errors.New(fmt.Sprintf("Expected server %s:%d to be configured", s.opts.Host, s.opts.Port)) + return fmt.Errorf("Expected server %s:%d to be configured", s.opts.Host, s.opts.Port) } } } @@ -350,7 +349,7 @@ func TestChainedSolicitWorks(t *testing.T) { // Grab Routez from monitor ports, make sure we are fully connected url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort) - rz := readHttpRoutez(t, url) + rz := readHTTPRoutez(t, url) ris := expectRids(t, rz, []string{s2.Id(), s3.Id()}) if ris[s2.Id()].IsConfigured == true { t.Fatalf("Expected server not to be configured\n") @@ -360,7 +359,7 @@ func TestChainedSolicitWorks(t *testing.T) { } url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort) - rz = readHttpRoutez(t, url) + rz = readHTTPRoutez(t, url) ris = expectRids(t, rz, []string{s1.Id(), s3.Id()}) if ris[s1.Id()].IsConfigured != true { t.Fatalf("Expected seed server to be configured\n") @@ -370,7 +369,7 @@ func TestChainedSolicitWorks(t *testing.T) { } url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort) - rz = readHttpRoutez(t, url) + rz = readHTTPRoutez(t, url) ris = expectRids(t, rz, []string{s1.Id(), s2.Id()}) if ris[s2.Id()].IsConfigured != true { t.Fatalf("Expected s2 server to be configured\n") @@ -488,7 +487,7 @@ func TestAuthSeedSolicitWorks(t *testing.T) { // Grab Routez from monitor ports, make sure we are fully connected url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort) - rz := readHttpRoutez(t, url) + rz := readHTTPRoutez(t, url) ris := expectRids(t, rz, []string{s2.Id(), s3.Id()}) if ris[s2.Id()].IsConfigured == true { t.Fatalf("Expected server not to be configured\n") @@ -498,7 +497,7 @@ func TestAuthSeedSolicitWorks(t *testing.T) { } url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort) - rz = readHttpRoutez(t, url) + rz = readHTTPRoutez(t, url) ris = expectRids(t, rz, []string{s1.Id(), s3.Id()}) if ris[s1.Id()].IsConfigured != true { t.Fatalf("Expected seed server to be configured\n") @@ -508,7 +507,7 @@ func TestAuthSeedSolicitWorks(t *testing.T) { } url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort) - rz = readHttpRoutez(t, url) + rz = readHTTPRoutez(t, url) ris = expectRids(t, rz, []string{s1.Id(), s2.Id()}) if ris[s1.Id()].IsConfigured != true { t.Fatalf("Expected seed server to be configured\n") @@ -534,7 +533,7 @@ func expectRidsNoFatal(t *testing.T, direct bool, rz *server.Routez, rids []stri } if len(rids) != rz.NumRoutes { _, fn, line, _ := runtime.Caller(caller) - return nil, errors.New(fmt.Sprintf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes)) + return nil, fmt.Errorf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes) } set := make(map[string]bool) for _, v := range rids { @@ -545,7 +544,7 @@ func expectRidsNoFatal(t *testing.T, direct bool, rz *server.Routez, rids []stri for _, r := range rz.Routes { if set[r.RemoteId] != true { _, fn, line, _ := runtime.Caller(caller) - return nil, errors.New(fmt.Sprintf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids)) + return nil, fmt.Errorf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids) } ri[r.RemoteId] = r } @@ -553,7 +552,7 @@ func expectRidsNoFatal(t *testing.T, direct bool, rz *server.Routez, rids []stri } // Helper to easily grab routez info. -func readHttpRoutez(t *testing.T, url string) *server.Routez { +func readHTTPRoutez(t *testing.T, url string) *server.Routez { resp, err := http.Get(url + "routez") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) diff --git a/test/routes_test.go b/test/routes_test.go index ad6e6a13..b159a3e8 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -106,8 +106,8 @@ func TestSendRouteInfoOnConnect(t *testing.T) { if err != nil { t.Fatalf("Could not marshal test route info: %v", err) } - infoJson := fmt.Sprintf("INFO %s\r\n", b) - routeSend(infoJson) + infoJSON := fmt.Sprintf("INFO %s\r\n", b) + routeSend(infoJSON) routeSend("PING\r\n") routeExpect(pongRe) } @@ -200,9 +200,9 @@ func TestSendRouteSolicit(t *testing.T) { if len(opts.Routes) <= 0 { t.Fatalf("Need an outbound solicted route for this test") } - rUrl := opts.Routes[0] + rURL := opts.Routes[0] - conn := acceptRouteConn(t, rUrl.Host, server.DEFAULT_ROUTE_CONNECT) + conn := acceptRouteConn(t, rURL.Host, server.DEFAULT_ROUTE_CONNECT) defer conn.Close() // We should receive a connect message right away due to auth. @@ -391,8 +391,8 @@ func TestRouteQueueSemantics(t *testing.T) { matches = expectMsgs(2) // Expect first to be the normal subscriber, next will be the queue one. - if string(matches[0][SID_INDEX]) != "RSID:2:4" && - string(matches[1][SID_INDEX]) != "RSID:2:4" { + if string(matches[0][sidIndex]) != "RSID:2:4" && + string(matches[1][sidIndex]) != "RSID:2:4" { t.Fatalf("Did not received routed sid\n") } checkMsg(t, matches[0], "foo", "", "", "2", "ok") @@ -400,10 +400,10 @@ func TestRouteQueueSemantics(t *testing.T) { // Check the rsid to verify it is one of the queue group subscribers. var rsid string - if matches[0][SID_INDEX][0] == 'Q' { - rsid = string(matches[0][SID_INDEX]) + if matches[0][sidIndex][0] == 'Q' { + rsid = string(matches[0][sidIndex]) } else { - rsid = string(matches[1][SID_INDEX]) + rsid = string(matches[1][sidIndex]) } if rsid != qrsid1 && rsid != qrsid2 { t.Fatalf("Expected a queue group rsid, got %s\n", rsid) @@ -443,15 +443,15 @@ func TestSolicitRouteReconnect(t *testing.T) { s, opts := runRouteServer(t) defer s.Shutdown() - rUrl := opts.Routes[0] + rURL := opts.Routes[0] - route := acceptRouteConn(t, rUrl.Host, 2*server.DEFAULT_ROUTE_CONNECT) + route := acceptRouteConn(t, rURL.Host, 2*server.DEFAULT_ROUTE_CONNECT) // Go ahead and close the Route. route.Close() // We expect to get called back.. - route = acceptRouteConn(t, rUrl.Host, 2*server.DEFAULT_ROUTE_CONNECT) + route = acceptRouteConn(t, rURL.Host, 2*server.DEFAULT_ROUTE_CONNECT) route.Close() } @@ -546,10 +546,10 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) { if err != nil { t.Fatalf("Could not marshal test route info: %v", err) } - infoJson := fmt.Sprintf("INFO %s\r\n", b) + infoJSON := fmt.Sprintf("INFO %s\r\n", b) // Trigger the send of local subs. - routeSend(infoJson) + routeSend(infoJSON) routeExpect(subRe) @@ -563,7 +563,7 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) { routeExpect(infoRe) - routeSend(infoJson) + routeSend(infoJSON) routeExpect(subRe) } diff --git a/test/test.go b/test/test.go index 22d96fb4..94630da6 100644 --- a/test/test.go +++ b/test/test.go @@ -32,6 +32,7 @@ type tLogger interface { Errorf(format string, args ...interface{}) } +// DefaultTestOptions are default options for the unit tests. var DefaultTestOptions = server.Options{ Host: "localhost", Port: 4222, @@ -39,15 +40,17 @@ var DefaultTestOptions = server.Options{ NoSigs: true, } +// RunDefaultServer starts a new Go routine based server using the default options func RunDefaultServer() *server.Server { return RunServer(&DefaultTestOptions) } -// New Go Routine based server +// RunServer starts a new Go routine based server func RunServer(opts *server.Options) *server.Server { return RunServerWithAuth(opts, nil) } +// LoadConfig loads a configuration from a filename func LoadConfig(configFile string) (opts *server.Options) { opts, err := server.ProcessConfigFile(configFile) if err != nil { @@ -57,6 +60,7 @@ func LoadConfig(configFile string) (opts *server.Options) { return } +// RunServerWithConfig starts a new Go routine based server with a configuration file. func RunServerWithConfig(configFile string) (srv *server.Server, opts *server.Options) { opts = LoadConfig(configFile) @@ -72,7 +76,7 @@ func RunServerWithConfig(configFile string) (srv *server.Server, opts *server.Op return } -// New Go Routine based server with auth +// RunServerWithAuth starts a new Go routine based server with auth func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server { if opts == nil { opts = &DefaultTestOptions @@ -251,10 +255,10 @@ func doDefaultConnect(t tLogger, c net.Conn) { doConnect(t, c, false, false, false) } -const CONNECT_F = "CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\",\"name\":\"%s\"}\r\n" +const connectProto = "CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\",\"name\":\"%s\"}\r\n" func doRouteAuthConnect(t tLogger, c net.Conn, user, pass, id string) { - cs := fmt.Sprintf(CONNECT_F, user, pass, id) + cs := fmt.Sprintf(connectProto, user, pass, id) sendProto(t, c, cs) } @@ -320,11 +324,11 @@ var ( ) const ( - SUB_INDEX = 1 - SID_INDEX = 2 - REPLY_INDEX = 4 - LEN_INDEX = 5 - MSG_INDEX = 6 + subIndex = 1 + sidIndex = 2 + replyIndex = 4 + lenIndex = 5 + msgIndex = 6 ) // Reuse expect buffer @@ -361,20 +365,20 @@ func expectNothing(t tLogger, c net.Conn) { // This will check that we got what we expected. func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) { - if string(m[SUB_INDEX]) != subject { - stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[SUB_INDEX]) + if string(m[subIndex]) != subject { + stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[subIndex]) } - if sid != "" && string(m[SID_INDEX]) != sid { - stackFatalf(t, "Did not get correct sid: expected '%s' got '%s'\n", sid, m[SID_INDEX]) + if sid != "" && string(m[sidIndex]) != sid { + stackFatalf(t, "Did not get correct sid: expected '%s' got '%s'\n", sid, m[sidIndex]) } - if string(m[REPLY_INDEX]) != reply { - stackFatalf(t, "Did not get correct reply: expected '%s' got '%s'\n", reply, m[REPLY_INDEX]) + if string(m[replyIndex]) != reply { + stackFatalf(t, "Did not get correct reply: expected '%s' got '%s'\n", reply, m[replyIndex]) } - if string(m[LEN_INDEX]) != len { - stackFatalf(t, "Did not get correct msg length: expected '%s' got '%s'\n", len, m[LEN_INDEX]) + if string(m[lenIndex]) != len { + stackFatalf(t, "Did not get correct msg length: expected '%s' got '%s'\n", len, m[lenIndex]) } - if string(m[MSG_INDEX]) != msg { - stackFatalf(t, "Did not get correct msg: expected '%s' got '%s'\n", msg, m[MSG_INDEX]) + if string(m[msgIndex]) != msg { + stackFatalf(t, "Did not get correct msg: expected '%s' got '%s'\n", msg, m[msgIndex]) } } @@ -398,9 +402,9 @@ func checkForQueueSid(t tLogger, matches [][][]byte, sids []string) { seen[sid] = 0 } for _, m := range matches { - sid := string(m[SID_INDEX]) + sid := string(m[sidIndex]) if _, ok := seen[sid]; ok { - seen[sid] += 1 + seen[sid]++ } } // Make sure we only see one and exactly one. @@ -421,9 +425,9 @@ func checkForPubSids(t tLogger, matches [][][]byte, sids []string) { seen[sid] = 0 } for _, m := range matches { - sid := string(m[SID_INDEX]) + sid := string(m[sidIndex]) if _, ok := seen[sid]; ok { - seen[sid] += 1 + seen[sid]++ } } // Make sure we only see one and exactly one for each sid. @@ -438,8 +442,8 @@ func checkForPubSids(t tLogger, matches [][][]byte, sids []string) { // Helper function to generate next opts to make sure no port conflicts etc. func nextServerOpts(opts *server.Options) *server.Options { nopts := *opts - nopts.Port += 1 - nopts.ClusterPort += 1 - nopts.HTTPPort += 1 + nopts.Port++ + nopts.ClusterPort++ + nopts.HTTPPort++ return &nopts }