mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #224 from nats-io/fix-lint-issues
Address issues found by golint.
This commit is contained in:
@@ -9,17 +9,19 @@ import (
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
)
|
||||
|
||||
const BcryptPrefix = "$2a$"
|
||||
const bcryptPrefix = "$2a$"
|
||||
|
||||
func isBcrypt(password string) bool {
|
||||
return strings.HasPrefix(password, BcryptPrefix)
|
||||
return strings.HasPrefix(password, bcryptPrefix)
|
||||
}
|
||||
|
||||
// Plain authentication is a basic username and password
|
||||
type Plain struct {
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
// Check authenticates the client using a username and password
|
||||
func (p *Plain) Check(c server.ClientAuth) bool {
|
||||
opts := c.GetOpts()
|
||||
if p.Username != opts.Username {
|
||||
|
||||
@@ -5,10 +5,12 @@ import (
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
)
|
||||
|
||||
// Token holds a string token used for authentication
|
||||
type Token struct {
|
||||
Token string
|
||||
}
|
||||
|
||||
// Check authenticates a client from a token
|
||||
func (p *Token) Check(c server.ClientAuth) bool {
|
||||
opts := c.GetOpts()
|
||||
// Check to see if the token is a bcrypt hash
|
||||
|
||||
@@ -709,9 +709,8 @@ func lexNumberOrDateStart(lx *lexer) stateFn {
|
||||
if !isDigit(r) {
|
||||
if r == '.' {
|
||||
return lx.errorf("Floats must start with a digit, not '.'.")
|
||||
} else {
|
||||
return lx.errorf("Expected a digit but got '%v'.", r)
|
||||
}
|
||||
return lx.errorf("Expected a digit but got '%v'.", r)
|
||||
}
|
||||
return lexNumberOrDate
|
||||
}
|
||||
@@ -772,9 +771,8 @@ func lexNumberStart(lx *lexer) stateFn {
|
||||
if !isDigit(r) {
|
||||
if r == '.' {
|
||||
return lx.errorf("Floats must start with a digit, not '.'.")
|
||||
} else {
|
||||
return lx.errorf("Expected a digit but got '%v'.", r)
|
||||
}
|
||||
return lx.errorf("Expected a digit but got '%v'.", r)
|
||||
}
|
||||
return lexNumber
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Copyright 2013 Apcera Inc. All rights reserved.
|
||||
|
||||
// Conf is a configuration file format used by gnatsd. It is
|
||||
// Package conf supports a configuration file format used by gnatsd. It is
|
||||
// a flexible format that combines the best of traditional
|
||||
// configuration formats and newer styles such as JSON and YAML.
|
||||
package conf
|
||||
@@ -20,9 +20,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Parser will return a map of keys to interface{}, although concrete types
|
||||
// underly them. The values supported are string, bool, int64, float64, DateTime.
|
||||
// Arrays and nested Maps are also supported.
|
||||
type parser struct {
|
||||
mapping map[string]interface{}
|
||||
lx *lexer
|
||||
@@ -37,6 +34,9 @@ type parser struct {
|
||||
keys []string
|
||||
}
|
||||
|
||||
// Parse will return a map of keys to interface{}, although concrete types
|
||||
// underly them. The values supported are string, bool, int64, float64, DateTime.
|
||||
// Arrays and nested Maps are also supported.
|
||||
func Parse(data string) (map[string]interface{}, error) {
|
||||
p, err := parse(data)
|
||||
if err != nil {
|
||||
@@ -121,9 +121,8 @@ func (p *parser) processItem(it item) error {
|
||||
if e, ok := err.(*strconv.NumError); ok &&
|
||||
e.Err == strconv.ErrRange {
|
||||
return fmt.Errorf("Integer '%s' is out of the range.", it.val)
|
||||
} else {
|
||||
return fmt.Errorf("Expected integer, but got '%s'.", it.val)
|
||||
}
|
||||
return fmt.Errorf("Expected integer, but got '%s'.", it.val)
|
||||
}
|
||||
p.setValue(num)
|
||||
case itemFloat:
|
||||
@@ -132,9 +131,8 @@ func (p *parser) processItem(it item) error {
|
||||
if e, ok := err.(*strconv.NumError); ok &&
|
||||
e.Err == strconv.ErrRange {
|
||||
return fmt.Errorf("Float '%s' is out of the range.", it.val)
|
||||
} else {
|
||||
return fmt.Errorf("Expected float, but got '%s'.", it.val)
|
||||
}
|
||||
return fmt.Errorf("Expected float, but got '%s'.", it.val)
|
||||
}
|
||||
p.setValue(num)
|
||||
case itemBool:
|
||||
@@ -154,7 +152,7 @@ func (p *parser) processItem(it item) error {
|
||||
}
|
||||
p.setValue(dt)
|
||||
case itemArrayStart:
|
||||
array := make([]interface{}, 0)
|
||||
var array = make([]interface{}, 0)
|
||||
p.pushContext(array)
|
||||
case itemArrayEnd:
|
||||
array := p.ctx
|
||||
|
||||
@@ -81,7 +81,7 @@ func main() {
|
||||
}
|
||||
|
||||
if showTLSHelp {
|
||||
server.PrintTlsHelpAndDie()
|
||||
server.PrintTLSHelpAndDie()
|
||||
}
|
||||
|
||||
// One flag can set multiple options.
|
||||
|
||||
10
hash/hash.go
10
hash/hash.go
@@ -1,13 +1,13 @@
|
||||
// Copyright 2012 Apcera Inc. All rights reserved.
|
||||
|
||||
// Collection of high performance 32-bit hash functions.
|
||||
// Package hash is a collection of high performance 32-bit hash functions.
|
||||
package hash
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// Generates a Bernstein Hash.
|
||||
// Bernstein Hash.
|
||||
func Bernstein(data []byte) uint32 {
|
||||
hash := uint32(5381)
|
||||
for _, b := range data {
|
||||
@@ -23,7 +23,7 @@ const (
|
||||
_YP32 = 709607
|
||||
)
|
||||
|
||||
// Generates an FNV1A Hash [http://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function]
|
||||
// FNV1A Hash [http://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function]
|
||||
func FNV1A(data []byte) uint32 {
|
||||
var hash uint32 = _OFF32
|
||||
for _, c := range data {
|
||||
@@ -152,10 +152,10 @@ const (
|
||||
_F2 = uint32(0xc2b2ae35)
|
||||
)
|
||||
|
||||
// A default seed for Murmur3
|
||||
// M3Seed is a default seed for Murmur3
|
||||
const M3Seed = uint32(0x9747b28c)
|
||||
|
||||
// Generates a Murmur3 Hash [http://code.google.com/p/smhasher/wiki/MurmurHash3]
|
||||
// Murmur3 Hash [http://code.google.com/p/smhasher/wiki/MurmurHash3]
|
||||
// Does not generate intermediate objects.
|
||||
func Murmur3(data []byte, seed uint32) uint32 {
|
||||
h1 := seed
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
// HashMap defines a high performance hashmap based on
|
||||
// Package hashmap defines a high performance hashmap based on
|
||||
// fast hashing and fast key comparison. Simple chaining
|
||||
// is used, relying on the hashing algorithms for good
|
||||
// distribution
|
||||
@@ -93,7 +93,7 @@ func (h *HashMap) Set(key []byte, data interface{}) {
|
||||
ne := &Entry{hk: hk, key: key, data: data}
|
||||
ne.next = h.bkts[hk&h.msk]
|
||||
h.bkts[hk&h.msk] = ne
|
||||
h.used += 1
|
||||
h.used++
|
||||
// Check for resizing
|
||||
if h.rsz && (h.used > uint32(len(h.bkts))) {
|
||||
h.grow()
|
||||
@@ -152,7 +152,7 @@ func (h *HashMap) Remove(key []byte) {
|
||||
if len(key) == len((*e).key) && bytes.Equal(key, (*e).key) {
|
||||
// Success
|
||||
*e = (*e).next
|
||||
h.used -= 1
|
||||
h.used--
|
||||
// Check for resizing
|
||||
lbkts := uint32(len(h.bkts))
|
||||
if h.rsz && lbkts > _BSZ && (h.used < lbkts/4) {
|
||||
@@ -235,11 +235,11 @@ func (h *HashMap) Stats() *Stats {
|
||||
lc, totalc, slots := 0, 0, 0
|
||||
for _, e := range h.bkts {
|
||||
if e != nil {
|
||||
slots += 1
|
||||
slots++
|
||||
}
|
||||
i := 0
|
||||
for ; e != nil; e = e.next {
|
||||
i += 1
|
||||
i++
|
||||
if i > lc {
|
||||
lc = i
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ func (h *HashMap) RemoveRandom() {
|
||||
e := &h.bkts[i]
|
||||
if *e != nil {
|
||||
*e = (*e).next
|
||||
h.used -= 1
|
||||
h.used--
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -35,7 +35,7 @@ func (h *HashMap) RemoveRandom() {
|
||||
e := &h.bkts[i]
|
||||
if *e != nil {
|
||||
*e = (*e).next
|
||||
h.used -= 1
|
||||
h.used--
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
//Package logger provides logging facilities for the NATS server
|
||||
package logger
|
||||
|
||||
import (
|
||||
@@ -7,6 +9,7 @@ import (
|
||||
"os"
|
||||
)
|
||||
|
||||
// Logger is the server logger
|
||||
type Logger struct {
|
||||
logger *log.Logger
|
||||
debug bool
|
||||
@@ -18,6 +21,7 @@ type Logger struct {
|
||||
traceLabel string
|
||||
}
|
||||
|
||||
// NewStdLogger creates a logger with output directed to Stderr
|
||||
func NewStdLogger(time, debug, trace, colors, pid bool) *Logger {
|
||||
flags := 0
|
||||
if time {
|
||||
@@ -44,6 +48,7 @@ func NewStdLogger(time, debug, trace, colors, pid bool) *Logger {
|
||||
return l
|
||||
}
|
||||
|
||||
// NewFileLogger creates a logger with output directed to a file
|
||||
func NewFileLogger(filename string, time, debug, trace, pid bool) *Logger {
|
||||
fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
|
||||
f, err := os.OpenFile(filename, fileflags, 0660)
|
||||
@@ -93,24 +98,29 @@ func setColoredLabelFormats(l *Logger) {
|
||||
l.traceLabel = fmt.Sprintf(colorFormat, 33, "TRC")
|
||||
}
|
||||
|
||||
// Noticef logs a notice statement
|
||||
func (l *Logger) Noticef(format string, v ...interface{}) {
|
||||
l.logger.Printf(l.infoLabel+format, v...)
|
||||
}
|
||||
|
||||
// Errorf logs an error statement
|
||||
func (l *Logger) Errorf(format string, v ...interface{}) {
|
||||
l.logger.Printf(l.errorLabel+format, v...)
|
||||
}
|
||||
|
||||
// Fatalf logs a fatal error
|
||||
func (l *Logger) Fatalf(format string, v ...interface{}) {
|
||||
l.logger.Fatalf(l.fatalLabel+format, v...)
|
||||
}
|
||||
|
||||
// Debugf logs a debug statement
|
||||
func (l *Logger) Debugf(format string, v ...interface{}) {
|
||||
if l.debug == true {
|
||||
l.logger.Printf(l.debugLabel+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Tracef logs a trace statement
|
||||
func (l *Logger) Tracef(format string, v ...interface{}) {
|
||||
if l.trace == true {
|
||||
l.logger.Printf(l.traceLabel+format, v...)
|
||||
|
||||
@@ -11,12 +11,14 @@ import (
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// SysLogger provides a system logger facility
|
||||
type SysLogger struct {
|
||||
writer *syslog.Writer
|
||||
debug bool
|
||||
trace bool
|
||||
}
|
||||
|
||||
// NewSysLogger creates a new system logger
|
||||
func NewSysLogger(debug, trace bool) *SysLogger {
|
||||
w, err := syslog.New(syslog.LOG_DAEMON|syslog.LOG_NOTICE, "gnatsd")
|
||||
if err != nil {
|
||||
@@ -30,6 +32,7 @@ func NewSysLogger(debug, trace bool) *SysLogger {
|
||||
}
|
||||
}
|
||||
|
||||
// NewRemoteSysLogger creates a new remote system logger
|
||||
func NewRemoteSysLogger(fqn string, debug, trace bool) *SysLogger {
|
||||
network, addr := getNetworkAndAddr(fqn)
|
||||
w, err := syslog.Dial(network, addr, syslog.LOG_DEBUG, "gnatsd")
|
||||
@@ -62,24 +65,29 @@ func getNetworkAndAddr(fqn string) (network, addr string) {
|
||||
return
|
||||
}
|
||||
|
||||
// Noticef logs a notice statement
|
||||
func (l *SysLogger) Noticef(format string, v ...interface{}) {
|
||||
l.writer.Notice(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
// Fatalf logs a fatal error
|
||||
func (l *SysLogger) Fatalf(format string, v ...interface{}) {
|
||||
l.writer.Crit(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
// Errorf logs an error statement
|
||||
func (l *SysLogger) Errorf(format string, v ...interface{}) {
|
||||
l.writer.Err(fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
// Debugf logs a debug statement
|
||||
func (l *SysLogger) Debugf(format string, v ...interface{}) {
|
||||
if l.debug {
|
||||
l.writer.Debug(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
// Tracef logs a trace statement
|
||||
func (l *SysLogger) Tracef(format string, v ...interface{}) {
|
||||
if l.trace {
|
||||
l.writer.Notice(fmt.Sprintf(format, v...))
|
||||
|
||||
@@ -150,7 +150,7 @@ func expectSyslogOutput(t *testing.T, line string, expected string) {
|
||||
|
||||
func runSyslog(c net.PacketConn, done chan<- string) {
|
||||
var buf [4096]byte
|
||||
var rcvd string = ""
|
||||
var rcvd string
|
||||
for {
|
||||
n, _, err := c.ReadFrom(buf[:])
|
||||
if err != nil || n == 0 {
|
||||
|
||||
@@ -2,10 +2,14 @@
|
||||
|
||||
package server
|
||||
|
||||
// Auth is an interface for implementing authentication
|
||||
type Auth interface {
|
||||
// Check if a client is authorized to connect
|
||||
Check(c ClientAuth) bool
|
||||
}
|
||||
|
||||
// ClientAuth is an interface for client authentication
|
||||
type ClientAuth interface {
|
||||
// Get options associated with a client
|
||||
GetOpts() *clientOpts
|
||||
}
|
||||
|
||||
@@ -356,7 +356,7 @@ func (c *client) processPing() {
|
||||
func (c *client) processPong() {
|
||||
c.traceInOp("PONG", nil)
|
||||
c.mu.Lock()
|
||||
c.pout -= 1
|
||||
c.pout--
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -861,7 +861,7 @@ func (c *client) processPingTimer() {
|
||||
c.Debugf("%s Ping Timer", c.typeString())
|
||||
|
||||
// Check for violation
|
||||
c.pout += 1
|
||||
c.pout++
|
||||
if c.pout > c.srv.opts.MaxPingsOut {
|
||||
c.Debugf("Stale Client Connection - Closing")
|
||||
if c.bw != nil {
|
||||
|
||||
@@ -304,7 +304,7 @@ func TestClientPubWithQueueSub(t *testing.T) {
|
||||
}()
|
||||
|
||||
var n1, n2, received int
|
||||
for ; ; received += 1 {
|
||||
for ; ; received++ {
|
||||
l, err := cr.ReadString('\n')
|
||||
if err != nil {
|
||||
break
|
||||
@@ -353,7 +353,7 @@ func TestClientUnSub(t *testing.T) {
|
||||
}()
|
||||
|
||||
var received int
|
||||
for ; ; received += 1 {
|
||||
for ; ; received++ {
|
||||
l, err := cr.ReadString('\n')
|
||||
if err != nil {
|
||||
break
|
||||
@@ -396,7 +396,7 @@ func TestClientUnSubMax(t *testing.T) {
|
||||
}()
|
||||
|
||||
var received int
|
||||
for ; ; received += 1 {
|
||||
for ; ; received++ {
|
||||
l, err := cr.ReadString('\n')
|
||||
if err != nil {
|
||||
break
|
||||
|
||||
@@ -47,7 +47,7 @@ const (
|
||||
// DEFAULT_PING_MAX_OUT is maximum allowed pings outstanding before disconnect.
|
||||
DEFAULT_PING_MAX_OUT = 2
|
||||
|
||||
// CRLF string
|
||||
// CR_LF string
|
||||
CR_LF = "\r\n"
|
||||
|
||||
// LEN_CR_LF hold onto the computed size.
|
||||
@@ -83,7 +83,7 @@ const (
|
||||
// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
|
||||
MAX_PUB_ARGS = 3
|
||||
|
||||
// Default Buffer size for reads and writes per connection. Will be replaced by dynamic
|
||||
// system in the long run.
|
||||
// DEFAULT_BUF_SIZE is the default buffer size for reads and writes per connection.
|
||||
// It will be replaced by a dynamic system in the long run.
|
||||
DEFAULT_BUF_SIZE = 32768
|
||||
)
|
||||
|
||||
@@ -16,14 +16,26 @@ var log = struct {
|
||||
logger Logger
|
||||
}{}
|
||||
|
||||
// Logger interface of the NATS Server
|
||||
type Logger interface {
|
||||
|
||||
// Log a notice statement
|
||||
Noticef(format string, v ...interface{})
|
||||
|
||||
// Log a fatal error
|
||||
Fatalf(format string, v ...interface{})
|
||||
|
||||
// Log an error
|
||||
Errorf(format string, v ...interface{})
|
||||
|
||||
// Log a debug statement
|
||||
Debugf(format string, v ...interface{})
|
||||
|
||||
// Log a trace statement
|
||||
Tracef(format string, v ...interface{})
|
||||
}
|
||||
|
||||
// SetLogger sets the logger of the server
|
||||
func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) {
|
||||
if debugFlag {
|
||||
atomic.StoreInt32(&debug, 1)
|
||||
@@ -38,24 +50,28 @@ func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) {
|
||||
log.Unlock()
|
||||
}
|
||||
|
||||
// Noticef logs a notice statement
|
||||
func Noticef(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Noticef(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Errorf logs an error
|
||||
func Errorf(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Errorf(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Fatalf logs a fatal error
|
||||
func Fatalf(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Fatalf(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Debugf logs a debug statement
|
||||
func Debugf(format string, v ...interface{}) {
|
||||
if atomic.LoadInt32(&debug) == 0 {
|
||||
return
|
||||
@@ -66,6 +82,7 @@ func Debugf(format string, v ...interface{}) {
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Tracef logs a trace statement
|
||||
func Tracef(format string, v ...interface{}) {
|
||||
if atomic.LoadInt32(&trace) == 0 {
|
||||
return
|
||||
|
||||
@@ -56,6 +56,7 @@ type ConnInfo struct {
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
}
|
||||
|
||||
// DefaultConnListSize is the default size of the connection list.
|
||||
const DefaultConnListSize = 1024
|
||||
|
||||
// HandleConnz process HTTP requests for connection information.
|
||||
@@ -317,7 +318,7 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
|
||||
ResponseHandler(w, r, b)
|
||||
}
|
||||
|
||||
// HandleStats process HTTP requests for subjects stats.
|
||||
// HandleSubsz processes HTTP requests for subjects stats.
|
||||
func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
|
||||
s.mu.Lock()
|
||||
s.httpReqStats[SubszPath]++
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
package server
|
||||
|
||||
// Helper types to sort by ConnInfo values
|
||||
// SortOpt is a helper type to sort by ConnInfo values
|
||||
type SortOpt string
|
||||
|
||||
const (
|
||||
@@ -17,6 +17,7 @@ const (
|
||||
byIdle = "idle"
|
||||
)
|
||||
|
||||
// IsValid determines if a sort option is valid
|
||||
func (s SortOpt) IsValid() bool {
|
||||
switch s {
|
||||
case "", byCid, bySubs, byPending, byOutMsgs, byInMsgs, byOutBytes, byInBytes, byLast, byIdle:
|
||||
@@ -26,11 +27,13 @@ func (s SortOpt) IsValid() bool {
|
||||
}
|
||||
}
|
||||
|
||||
// Pair type is internally used.
|
||||
type Pair struct {
|
||||
Key *client
|
||||
Val int64
|
||||
}
|
||||
|
||||
// Pairs type is internally used.
|
||||
type Pairs []Pair
|
||||
|
||||
func (d Pairs) Len() int {
|
||||
|
||||
@@ -36,7 +36,7 @@ func runMonitorServer() *Server {
|
||||
return RunServer(&opts)
|
||||
}
|
||||
|
||||
func runMonitorServerNoHttpPort() *Server {
|
||||
func runMonitorServerNoHTTPPort() *Server {
|
||||
resetPreviousHTTPConnections()
|
||||
opts := DefaultMonitorOptions
|
||||
opts.HTTPPort = 0
|
||||
@@ -81,7 +81,7 @@ func TestMyUptime(t *testing.T) {
|
||||
|
||||
// Make sure that we do not run the http server for monitoring unless asked.
|
||||
func TestNoMonitorPort(t *testing.T) {
|
||||
s := runMonitorServerNoHttpPort()
|
||||
s := runMonitorServerNoHTTPPort()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
@@ -949,8 +949,8 @@ func TestConnzWithRoutes(t *testing.T) {
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
}
|
||||
routeUrl, _ := url.Parse(fmt.Sprintf("nats-route://localhost:%d", CLUSTER_PORT))
|
||||
opts.Routes = []*url.URL{routeUrl}
|
||||
routeURL, _ := url.Parse(fmt.Sprintf("nats-route://localhost:%d", CLUSTER_PORT))
|
||||
opts.Routes = []*url.URL{routeURL}
|
||||
|
||||
sc := RunServer(&opts)
|
||||
defer sc.Shutdown()
|
||||
@@ -1168,10 +1168,10 @@ func createClientConnSubscribeAndPublish(t *testing.T) *nats.Conn {
|
||||
}
|
||||
|
||||
func createClientConnWithName(t *testing.T, name string) *nats.Conn {
|
||||
natsUri := fmt.Sprintf("nats://localhost:%d", CLIENT_PORT)
|
||||
natsURI := fmt.Sprintf("nats://localhost:%d", CLIENT_PORT)
|
||||
|
||||
client := nats.DefaultOptions
|
||||
client.Servers = []string{natsUri}
|
||||
client.Servers = []string{natsURI}
|
||||
client.Name = name
|
||||
nc, err := client.Connect()
|
||||
if err != nil {
|
||||
|
||||
@@ -69,8 +69,8 @@ type authorization struct {
|
||||
timeout float64
|
||||
}
|
||||
|
||||
// This struct holds the parsed tls config information.
|
||||
// It's public so we can use it for flag parsing
|
||||
// TLSConfigOpts holds the parsed tls config information,
|
||||
// used with flag parsing
|
||||
type TLSConfigOpts struct {
|
||||
CertFile string
|
||||
KeyFile string
|
||||
@@ -227,8 +227,8 @@ func parseAuthorization(am map[string]interface{}) authorization {
|
||||
return auth
|
||||
}
|
||||
|
||||
// For Usage...
|
||||
func PrintTlsHelpAndDie() {
|
||||
// PrintTLSHelpAndDie prints TLS usage and exits.
|
||||
func PrintTLSHelpAndDie() {
|
||||
|
||||
var tlsUsage = `
|
||||
TLS configuration is specified in the tls section of a configuration file:
|
||||
@@ -301,7 +301,7 @@ func parseTLS(tlsm map[string]interface{}) (*TLSConfigOpts, error) {
|
||||
case "cipher_suites":
|
||||
ra := mv.([]interface{})
|
||||
if len(ra) == 0 {
|
||||
return nil, fmt.Errorf("error parsing tls config, 'cipher_suites' cannot be empty.")
|
||||
return nil, fmt.Errorf("error parsing tls config, 'cipher_suites' cannot be empty")
|
||||
}
|
||||
tc.Ciphers = make([]uint16, 0, len(ra))
|
||||
for _, r := range ra {
|
||||
@@ -333,6 +333,7 @@ func parseTLS(tlsm map[string]interface{}) (*TLSConfigOpts, error) {
|
||||
return &tc, nil
|
||||
}
|
||||
|
||||
// GenTLSConfig loads TLS related configuration parameters.
|
||||
func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error) {
|
||||
|
||||
// Now load in cert and private key
|
||||
@@ -429,6 +430,7 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
|
||||
return &opts
|
||||
}
|
||||
|
||||
// RoutesFromStr parses route URLs from a string
|
||||
func RoutesFromStr(routesStr string) []*url.URL {
|
||||
routes := strings.Split(routesStr, ",")
|
||||
if len(routes) == 0 {
|
||||
@@ -453,6 +455,7 @@ func mergeRoutes(opts, flagOpts *Options) {
|
||||
opts.RoutesStr = flagOpts.RoutesStr
|
||||
}
|
||||
|
||||
// RemoveSelfReference removes this server from an array of routes
|
||||
func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) {
|
||||
var cleanRoutes []*url.URL
|
||||
cport := strconv.Itoa(clusterPort)
|
||||
@@ -464,7 +467,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cport == port && isIpInList(selfIPs, getUrlIp(host)) {
|
||||
if cport == port && isIPInList(selfIPs, getURLIP(host)) {
|
||||
Noticef("Self referencing IP found: ", r)
|
||||
continue
|
||||
}
|
||||
@@ -474,7 +477,7 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error)
|
||||
return cleanRoutes, nil
|
||||
}
|
||||
|
||||
func isIpInList(list1 []net.IP, list2 []net.IP) bool {
|
||||
func isIPInList(list1 []net.IP, list2 []net.IP) bool {
|
||||
for _, ip1 := range list1 {
|
||||
for _, ip2 := range list2 {
|
||||
if ip1.Equal(ip2) {
|
||||
@@ -485,7 +488,7 @@ func isIpInList(list1 []net.IP, list2 []net.IP) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func getUrlIp(ipStr string) []net.IP {
|
||||
func getURLIP(ipStr string) []net.IP {
|
||||
ipList := []net.IP{}
|
||||
|
||||
ip := net.ParseIP(ipStr)
|
||||
|
||||
@@ -24,6 +24,7 @@ type parseState struct {
|
||||
scratch [MAX_CONTROL_LINE_SIZE]byte
|
||||
}
|
||||
|
||||
// Parser constants
|
||||
const (
|
||||
OP_START = iota
|
||||
OP_PLUS
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Designate the router type
|
||||
// RouteType designates the router type
|
||||
type RouteType int
|
||||
|
||||
// Type of Route
|
||||
@@ -47,6 +47,7 @@ type connectInfo struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// Route protocol constants
|
||||
const (
|
||||
ConProto = "CONNECT %s" + _CRLF_
|
||||
InfoProto = "INFO %s" + _CRLF_
|
||||
@@ -378,7 +379,7 @@ const (
|
||||
|
||||
// FIXME(dlc) - Make these reserved and reject if they come in as a sid
|
||||
// from a client connection.
|
||||
|
||||
// Route constants
|
||||
const (
|
||||
RSID = "RSID"
|
||||
QRSID = "QRSID"
|
||||
@@ -588,18 +589,18 @@ func (s *Server) StartRouting() {
|
||||
s.solicitRoutes()
|
||||
}
|
||||
|
||||
func (s *Server) reConnectToRoute(rUrl *url.URL, rtype RouteType) {
|
||||
func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) {
|
||||
tryForEver := rtype == Explicit
|
||||
if tryForEver {
|
||||
time.Sleep(DEFAULT_ROUTE_RECONNECT)
|
||||
}
|
||||
s.connectToRoute(rUrl, tryForEver)
|
||||
s.connectToRoute(rURL, tryForEver)
|
||||
}
|
||||
|
||||
func (s *Server) connectToRoute(rUrl *url.URL, tryForEver bool) {
|
||||
for s.isRunning() && rUrl != nil {
|
||||
Debugf("Trying to connect to route on %s", rUrl.Host)
|
||||
conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL)
|
||||
func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
|
||||
for s.isRunning() && rURL != nil {
|
||||
Debugf("Trying to connect to route on %s", rURL.Host)
|
||||
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL)
|
||||
if err != nil {
|
||||
Debugf("Error trying to connect to route: %v", err)
|
||||
select {
|
||||
@@ -614,7 +615,7 @@ func (s *Server) connectToRoute(rUrl *url.URL, tryForEver bool) {
|
||||
}
|
||||
// We have a route connection here.
|
||||
// Go ahead and create it and exit this func.
|
||||
s.createRoute(conn, rUrl)
|
||||
s.createRoute(conn, rURL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,9 +161,9 @@ func checkClusterFormed(t *testing.T, servers ...*Server) {
|
||||
// Helper function to generate next opts to make sure no port conflicts etc.
|
||||
func nextServerOpts(opts *Options) *Options {
|
||||
nopts := *opts
|
||||
nopts.Port += 1
|
||||
nopts.ClusterPort += 1
|
||||
nopts.HTTPPort += 1
|
||||
nopts.Port++
|
||||
nopts.ClusterPort++
|
||||
nopts.HTTPPort++
|
||||
return &nopts
|
||||
}
|
||||
|
||||
|
||||
@@ -128,7 +128,7 @@ func New(opts *Options) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// Sets the authentication method for clients.
|
||||
// SetClientAuthMethod sets the authentication method for clients.
|
||||
func (s *Server) SetClientAuthMethod(authMethod Auth) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -139,7 +139,7 @@ func (s *Server) SetClientAuthMethod(authMethod Auth) {
|
||||
s.generateServerInfoJSON()
|
||||
}
|
||||
|
||||
// Sets the authentication method for routes.
|
||||
// SetRouteAuthMethod sets the authentication method for routes.
|
||||
func (s *Server) SetRouteAuthMethod(authMethod Auth) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -388,7 +388,7 @@ func (s *Server) StartHTTPMonitoring() {
|
||||
s.startMonitoring(false)
|
||||
}
|
||||
|
||||
// StartHTTPMonitoring will enable the HTTPS monitoring port.
|
||||
// StartHTTPSMonitoring will enable the HTTPS monitoring port.
|
||||
func (s *Server) StartHTTPSMonitoring() {
|
||||
s.startMonitoring(true)
|
||||
}
|
||||
@@ -757,7 +757,7 @@ func (s *Server) GetRouteListenEndpoint() string {
|
||||
return net.JoinHostPort(host, strconv.Itoa(s.opts.ClusterPort))
|
||||
}
|
||||
|
||||
// Server's ID
|
||||
// Id returns the server's ID
|
||||
func (s *Server) Id() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -247,7 +247,7 @@ func TestSplitConnectArg(t *testing.T) {
|
||||
connectAll := []byte("CONNECT {\"verbose\":false,\"ssl_required\":false," +
|
||||
"\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n")
|
||||
|
||||
argJson := connectAll[8:]
|
||||
argJSON := connectAll[8:]
|
||||
|
||||
c1 := connectAll[:5]
|
||||
c2 := connectAll[5:22]
|
||||
@@ -267,8 +267,8 @@ func TestSplitConnectArg(t *testing.T) {
|
||||
if c.argBuf == nil {
|
||||
t.Fatalf("Expected argBug to not be nil.\n")
|
||||
}
|
||||
if !bytes.Equal(c.argBuf, argJson[:14]) {
|
||||
t.Fatalf("argBuf not correct, received %q, wanted %q\n", argJson[:14], c.argBuf)
|
||||
if !bytes.Equal(c.argBuf, argJSON[:14]) {
|
||||
t.Fatalf("argBuf not correct, received %q, wanted %q\n", argJSON[:14], c.argBuf)
|
||||
}
|
||||
|
||||
if err := c.parse(c3); err != nil {
|
||||
@@ -277,9 +277,9 @@ func TestSplitConnectArg(t *testing.T) {
|
||||
if c.argBuf == nil {
|
||||
t.Fatalf("Expected argBug to not be nil.\n")
|
||||
}
|
||||
if !bytes.Equal(c.argBuf, argJson[:len(argJson)-2]) {
|
||||
if !bytes.Equal(c.argBuf, argJSON[:len(argJSON)-2]) {
|
||||
t.Fatalf("argBuf not correct, received %q, wanted %q\n",
|
||||
argJson[:len(argJson)-2], c.argBuf)
|
||||
argJSON[:len(argJSON)-2], c.argBuf)
|
||||
}
|
||||
|
||||
if err := c.parse(c4); err != nil {
|
||||
|
||||
@@ -15,8 +15,8 @@ func genID() string {
|
||||
|
||||
// Ascii numbers 0-9
|
||||
const (
|
||||
ascii_0 = 48
|
||||
ascii_9 = 57
|
||||
asciiZero = 48
|
||||
asciiNine = 57
|
||||
)
|
||||
|
||||
// parseSize expects decimal positive numbers. We
|
||||
@@ -26,10 +26,10 @@ func parseSize(d []byte) (n int) {
|
||||
return -1
|
||||
}
|
||||
for _, dec := range d {
|
||||
if dec < ascii_0 || dec > ascii_9 {
|
||||
if dec < asciiZero || dec > asciiNine {
|
||||
return -1
|
||||
}
|
||||
n = n*10 + (int(dec) - ascii_0)
|
||||
n = n*10 + (int(dec) - asciiZero)
|
||||
}
|
||||
return n
|
||||
}
|
||||
@@ -41,10 +41,10 @@ func parseInt64(d []byte) (n int64) {
|
||||
return -1
|
||||
}
|
||||
for _, dec := range d {
|
||||
if dec < ascii_0 || dec > ascii_9 {
|
||||
if dec < asciiZero || dec > asciiNine {
|
||||
return -1
|
||||
}
|
||||
n = n*10 + (int64(dec) - ascii_0)
|
||||
n = n*10 + (int64(dec) - asciiZero)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
// Copyright 2012-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
// Sublist is a subject distribution data structure that can match subjects to
|
||||
// interested subscribers. Subscribers can have wildcard subjects to match
|
||||
// multiple published subjects.
|
||||
// Package sublist handles subject distribution and provides a facility
|
||||
// match subjects to interested subscribers. Subscribers can have wildcard
|
||||
// subjects to match multiple published subjects.
|
||||
package sublist
|
||||
|
||||
import (
|
||||
@@ -85,6 +85,7 @@ const (
|
||||
_SEP = byte('.')
|
||||
)
|
||||
|
||||
// Sublist related errors
|
||||
var (
|
||||
ErrInvalidSubject = errors.New("Invalid Subject")
|
||||
ErrNotFound = errors.New("No Matches Found")
|
||||
@@ -102,6 +103,7 @@ func split(subject []byte, tokens [][]byte) [][]byte {
|
||||
return append(tokens, subject[start:])
|
||||
}
|
||||
|
||||
// Insert adds a subject into the sublist
|
||||
func (s *Sublist) Insert(subject []byte, sub interface{}) error {
|
||||
tsa := [16][]byte{}
|
||||
toks := split(subject, tsa[:0])
|
||||
@@ -375,10 +377,10 @@ func (n *node) isEmpty() bool {
|
||||
func (l *level) numNodes() uint32 {
|
||||
num := l.nodes.Count()
|
||||
if l.pwc != nil {
|
||||
num += 1
|
||||
num++
|
||||
}
|
||||
if l.fwc != nil {
|
||||
num += 1
|
||||
num++
|
||||
}
|
||||
return num
|
||||
}
|
||||
@@ -415,10 +417,10 @@ func matchLiteral(literal, subject []byte) bool {
|
||||
ll := len(literal)
|
||||
for {
|
||||
if li >= ll || literal[li] == _SEP {
|
||||
li -= 1
|
||||
li--
|
||||
break
|
||||
}
|
||||
li += 1
|
||||
li++
|
||||
}
|
||||
case _FWC:
|
||||
return true
|
||||
@@ -427,7 +429,7 @@ func matchLiteral(literal, subject []byte) bool {
|
||||
return false
|
||||
}
|
||||
}
|
||||
li += 1
|
||||
li++
|
||||
}
|
||||
// Make sure we have processed all of the literal's chars..
|
||||
if li < ll {
|
||||
@@ -436,6 +438,7 @@ func matchLiteral(literal, subject []byte) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// IsValidLiteralSubject returns true if a subject is valid, false otherwise
|
||||
func IsValidLiteralSubject(subject []byte) bool {
|
||||
tsa := [16][]byte{}
|
||||
toks := split(subject, tsa[:0])
|
||||
@@ -523,7 +526,7 @@ func visitLevel(l *level, depth int) int {
|
||||
return depth
|
||||
}
|
||||
|
||||
depth += 1
|
||||
depth++
|
||||
maxDepth := depth
|
||||
|
||||
all := l.nodes.All()
|
||||
|
||||
@@ -137,17 +137,17 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
expectMsgsB := expectMsgsCommand(t, expectB)
|
||||
|
||||
// Capture sids for checking later.
|
||||
qg1Sids_a := []string{"1", "2", "3"}
|
||||
qg1SidsA := []string{"1", "2", "3"}
|
||||
|
||||
// Three queue subscribers
|
||||
for _, sid := range qg1Sids_a {
|
||||
for _, sid := range qg1SidsA {
|
||||
sendA(fmt.Sprintf("SUB foo qg1 %s\r\n", sid))
|
||||
}
|
||||
sendA("PING\r\n")
|
||||
expectA(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil {
|
||||
if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
expectA(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids), srvB); err != nil {
|
||||
if err := checkExpectedSubs(len(qg1SidsA)+len(pSids), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
@@ -185,7 +185,7 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
|
||||
// Should receive 5.
|
||||
matches = expectMsgsA(5)
|
||||
checkForQueueSid(t, matches, qg1Sids_a)
|
||||
checkForQueueSid(t, matches, qg1SidsA)
|
||||
checkForPubSids(t, matches, pSids)
|
||||
|
||||
// Send to A
|
||||
@@ -193,19 +193,19 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
|
||||
// Should receive 5.
|
||||
matches = expectMsgsA(5)
|
||||
checkForQueueSid(t, matches, qg1Sids_a)
|
||||
checkForQueueSid(t, matches, qg1SidsA)
|
||||
checkForPubSids(t, matches, pSids)
|
||||
|
||||
// Now add queue subscribers to B
|
||||
qg2Sids_b := []string{"1", "2", "3"}
|
||||
for _, sid := range qg2Sids_b {
|
||||
qg2SidsB := []string{"1", "2", "3"}
|
||||
for _, sid := range qg2SidsB {
|
||||
sendB(fmt.Sprintf("SUB foo qg2 %s\r\n", sid))
|
||||
}
|
||||
sendB("PING\r\n")
|
||||
expectB(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvA before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids)+len(qg2Sids_b), srvA); err != nil {
|
||||
if err := checkExpectedSubs(len(qg1SidsA)+len(pSids)+len(qg2SidsB), srvA); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
@@ -214,22 +214,22 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
|
||||
// Should receive 1 from B.
|
||||
matches = expectMsgsB(1)
|
||||
checkForQueueSid(t, matches, qg2Sids_b)
|
||||
checkForQueueSid(t, matches, qg2SidsB)
|
||||
|
||||
// Should receive 5 still from A.
|
||||
matches = expectMsgsA(5)
|
||||
checkForQueueSid(t, matches, qg1Sids_a)
|
||||
checkForQueueSid(t, matches, qg1SidsA)
|
||||
checkForPubSids(t, matches, pSids)
|
||||
|
||||
// Now drop queue subscribers from A
|
||||
for _, sid := range qg1Sids_a {
|
||||
for _, sid := range qg1SidsA {
|
||||
sendA(fmt.Sprintf("UNSUB %s\r\n", sid))
|
||||
}
|
||||
sendA("PING\r\n")
|
||||
expectA(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(pSids)+len(qg2Sids_b), srvB); err != nil {
|
||||
if err := checkExpectedSubs(len(pSids)+len(qg2SidsB), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
@@ -238,7 +238,7 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
|
||||
// Should receive 1 from B.
|
||||
matches = expectMsgsB(1)
|
||||
checkForQueueSid(t, matches, qg2Sids_b)
|
||||
checkForQueueSid(t, matches, qg2SidsB)
|
||||
|
||||
sendB("PING\r\n")
|
||||
expectB(pongRe)
|
||||
@@ -278,17 +278,17 @@ func TestClusterDoubleMsgs(t *testing.T) {
|
||||
expectMsgsA2 := expectMsgsCommand(t, expectA2)
|
||||
|
||||
// Capture sids for checking later.
|
||||
qg1Sids_a := []string{"1", "2", "3"}
|
||||
qg1SidsA := []string{"1", "2", "3"}
|
||||
|
||||
// Three queue subscribers
|
||||
for _, sid := range qg1Sids_a {
|
||||
for _, sid := range qg1SidsA {
|
||||
sendA1(fmt.Sprintf("SUB foo qg1 %s\r\n", sid))
|
||||
}
|
||||
sendA1("PING\r\n")
|
||||
expectA1(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil {
|
||||
if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
@@ -299,7 +299,7 @@ func TestClusterDoubleMsgs(t *testing.T) {
|
||||
// Make sure we get only 1.
|
||||
matches := expectMsgsA1(1)
|
||||
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
||||
checkForQueueSid(t, matches, qg1Sids_a)
|
||||
checkForQueueSid(t, matches, qg1SidsA)
|
||||
|
||||
// Add a FWC subscriber on A2
|
||||
sendA2("SUB > 1\r\n")
|
||||
@@ -309,7 +309,7 @@ func TestClusterDoubleMsgs(t *testing.T) {
|
||||
pSids := []string{"1", "2"}
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a)+2, srvB); err != nil {
|
||||
if err := checkExpectedSubs(len(qg1SidsA)+2, srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
@@ -319,7 +319,7 @@ func TestClusterDoubleMsgs(t *testing.T) {
|
||||
|
||||
matches = expectMsgsA1(1)
|
||||
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
||||
checkForQueueSid(t, matches, qg1Sids_a)
|
||||
checkForQueueSid(t, matches, qg1SidsA)
|
||||
|
||||
matches = expectMsgsA2(2)
|
||||
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
||||
|
||||
@@ -29,7 +29,7 @@ func runMonitorServer() *server.Server {
|
||||
return RunServer(&opts)
|
||||
}
|
||||
|
||||
func runMonitorServerNoHttpPort() *server.Server {
|
||||
func runMonitorServerNoHTTPPort() *server.Server {
|
||||
resetPreviousHTTPConnections()
|
||||
opts := DefaultTestOptions
|
||||
opts.Port = CLIENT_PORT
|
||||
@@ -44,7 +44,7 @@ func resetPreviousHTTPConnections() {
|
||||
|
||||
// Make sure that we do not run the http server for monitoring unless asked.
|
||||
func TestNoMonitorPort(t *testing.T) {
|
||||
s := runMonitorServerNoHttpPort()
|
||||
s := runMonitorServerNoHTTPPort()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
|
||||
@@ -103,7 +103,7 @@ func TestQueueSub(t *testing.T) {
|
||||
matches := expectMsgs(sent)
|
||||
sids := make(map[string]int)
|
||||
for _, m := range matches {
|
||||
sids[string(m[SID_INDEX])]++
|
||||
sids[string(m[sidIndex])]++
|
||||
}
|
||||
if len(sids) != 2 {
|
||||
t.Fatalf("Expected only 2 sids, got %d\n", len(sids))
|
||||
@@ -140,7 +140,7 @@ func TestMultipleQueueSub(t *testing.T) {
|
||||
matches := expectMsgs(sent * 2)
|
||||
sids := make(map[string]int)
|
||||
for _, m := range matches {
|
||||
sids[string(m[SID_INDEX])]++
|
||||
sids[string(m[sidIndex])]++
|
||||
}
|
||||
if len(sids) != 4 {
|
||||
t.Fatalf("Expected 4 sids, got %d\n", len(sids))
|
||||
|
||||
@@ -4,7 +4,6 @@ package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
@@ -174,7 +173,7 @@ func TestSeedSolicitWorks(t *testing.T) {
|
||||
|
||||
// Grab Routez from monitor ports, make sure we are fully connected
|
||||
url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort)
|
||||
rz := readHttpRoutez(t, url)
|
||||
rz := readHTTPRoutez(t, url)
|
||||
ris := expectRids(t, rz, []string{s2.Id(), s3.Id()})
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
@@ -184,7 +183,7 @@ func TestSeedSolicitWorks(t *testing.T) {
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
rz = readHTTPRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s3.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
@@ -194,7 +193,7 @@ func TestSeedSolicitWorks(t *testing.T) {
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
rz = readHTTPRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s2.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
@@ -214,7 +213,7 @@ func checkConnected(t *testing.T, servers []serverInfo, current int, oneSeed boo
|
||||
|
||||
// Grab Routez from monitor ports, make sure we are fully connected
|
||||
url := fmt.Sprintf("http://%s:%d/", s.opts.Host, s.opts.HTTPPort)
|
||||
rz := readHttpRoutez(t, url)
|
||||
rz := readHTTPRoutez(t, url)
|
||||
total := len(servers)
|
||||
var ids []string
|
||||
for i := 0; i < total; i++ {
|
||||
@@ -234,11 +233,11 @@ func checkConnected(t *testing.T, servers []serverInfo, current int, oneSeed boo
|
||||
s := servers[i]
|
||||
if current == 0 || ((oneSeed && i > 0) || (!oneSeed && (i != current-1))) {
|
||||
if ris[s.server.Id()].IsConfigured != false {
|
||||
return errors.New(fmt.Sprintf("Expected server %s:%d not to be configured", s.opts.Host, s.opts.Port))
|
||||
return fmt.Errorf("Expected server %s:%d not to be configured", s.opts.Host, s.opts.Port)
|
||||
}
|
||||
} else if oneSeed || (i == current-1) {
|
||||
if ris[s.server.Id()].IsConfigured != true {
|
||||
return errors.New(fmt.Sprintf("Expected server %s:%d to be configured", s.opts.Host, s.opts.Port))
|
||||
return fmt.Errorf("Expected server %s:%d to be configured", s.opts.Host, s.opts.Port)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -350,7 +349,7 @@ func TestChainedSolicitWorks(t *testing.T) {
|
||||
|
||||
// Grab Routez from monitor ports, make sure we are fully connected
|
||||
url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort)
|
||||
rz := readHttpRoutez(t, url)
|
||||
rz := readHTTPRoutez(t, url)
|
||||
ris := expectRids(t, rz, []string{s2.Id(), s3.Id()})
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
@@ -360,7 +359,7 @@ func TestChainedSolicitWorks(t *testing.T) {
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
rz = readHTTPRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s3.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
@@ -370,7 +369,7 @@ func TestChainedSolicitWorks(t *testing.T) {
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
rz = readHTTPRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s2.Id()})
|
||||
if ris[s2.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected s2 server to be configured\n")
|
||||
@@ -488,7 +487,7 @@ func TestAuthSeedSolicitWorks(t *testing.T) {
|
||||
|
||||
// Grab Routez from monitor ports, make sure we are fully connected
|
||||
url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort)
|
||||
rz := readHttpRoutez(t, url)
|
||||
rz := readHTTPRoutez(t, url)
|
||||
ris := expectRids(t, rz, []string{s2.Id(), s3.Id()})
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
@@ -498,7 +497,7 @@ func TestAuthSeedSolicitWorks(t *testing.T) {
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
rz = readHTTPRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s3.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
@@ -508,7 +507,7 @@ func TestAuthSeedSolicitWorks(t *testing.T) {
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
rz = readHTTPRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s2.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
@@ -534,7 +533,7 @@ func expectRidsNoFatal(t *testing.T, direct bool, rz *server.Routez, rids []stri
|
||||
}
|
||||
if len(rids) != rz.NumRoutes {
|
||||
_, fn, line, _ := runtime.Caller(caller)
|
||||
return nil, errors.New(fmt.Sprintf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes))
|
||||
return nil, fmt.Errorf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes)
|
||||
}
|
||||
set := make(map[string]bool)
|
||||
for _, v := range rids {
|
||||
@@ -545,7 +544,7 @@ func expectRidsNoFatal(t *testing.T, direct bool, rz *server.Routez, rids []stri
|
||||
for _, r := range rz.Routes {
|
||||
if set[r.RemoteId] != true {
|
||||
_, fn, line, _ := runtime.Caller(caller)
|
||||
return nil, errors.New(fmt.Sprintf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids))
|
||||
return nil, fmt.Errorf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids)
|
||||
}
|
||||
ri[r.RemoteId] = r
|
||||
}
|
||||
@@ -553,7 +552,7 @@ func expectRidsNoFatal(t *testing.T, direct bool, rz *server.Routez, rids []stri
|
||||
}
|
||||
|
||||
// Helper to easily grab routez info.
|
||||
func readHttpRoutez(t *testing.T, url string) *server.Routez {
|
||||
func readHTTPRoutez(t *testing.T, url string) *server.Routez {
|
||||
resp, err := http.Get(url + "routez")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
|
||||
@@ -106,8 +106,8 @@ func TestSendRouteInfoOnConnect(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Could not marshal test route info: %v", err)
|
||||
}
|
||||
infoJson := fmt.Sprintf("INFO %s\r\n", b)
|
||||
routeSend(infoJson)
|
||||
infoJSON := fmt.Sprintf("INFO %s\r\n", b)
|
||||
routeSend(infoJSON)
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
}
|
||||
@@ -200,9 +200,9 @@ func TestSendRouteSolicit(t *testing.T) {
|
||||
if len(opts.Routes) <= 0 {
|
||||
t.Fatalf("Need an outbound solicted route for this test")
|
||||
}
|
||||
rUrl := opts.Routes[0]
|
||||
rURL := opts.Routes[0]
|
||||
|
||||
conn := acceptRouteConn(t, rUrl.Host, server.DEFAULT_ROUTE_CONNECT)
|
||||
conn := acceptRouteConn(t, rURL.Host, server.DEFAULT_ROUTE_CONNECT)
|
||||
defer conn.Close()
|
||||
|
||||
// We should receive a connect message right away due to auth.
|
||||
@@ -391,8 +391,8 @@ func TestRouteQueueSemantics(t *testing.T) {
|
||||
matches = expectMsgs(2)
|
||||
|
||||
// Expect first to be the normal subscriber, next will be the queue one.
|
||||
if string(matches[0][SID_INDEX]) != "RSID:2:4" &&
|
||||
string(matches[1][SID_INDEX]) != "RSID:2:4" {
|
||||
if string(matches[0][sidIndex]) != "RSID:2:4" &&
|
||||
string(matches[1][sidIndex]) != "RSID:2:4" {
|
||||
t.Fatalf("Did not received routed sid\n")
|
||||
}
|
||||
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
||||
@@ -400,10 +400,10 @@ func TestRouteQueueSemantics(t *testing.T) {
|
||||
|
||||
// Check the rsid to verify it is one of the queue group subscribers.
|
||||
var rsid string
|
||||
if matches[0][SID_INDEX][0] == 'Q' {
|
||||
rsid = string(matches[0][SID_INDEX])
|
||||
if matches[0][sidIndex][0] == 'Q' {
|
||||
rsid = string(matches[0][sidIndex])
|
||||
} else {
|
||||
rsid = string(matches[1][SID_INDEX])
|
||||
rsid = string(matches[1][sidIndex])
|
||||
}
|
||||
if rsid != qrsid1 && rsid != qrsid2 {
|
||||
t.Fatalf("Expected a queue group rsid, got %s\n", rsid)
|
||||
@@ -443,15 +443,15 @@ func TestSolicitRouteReconnect(t *testing.T) {
|
||||
s, opts := runRouteServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
rUrl := opts.Routes[0]
|
||||
rURL := opts.Routes[0]
|
||||
|
||||
route := acceptRouteConn(t, rUrl.Host, 2*server.DEFAULT_ROUTE_CONNECT)
|
||||
route := acceptRouteConn(t, rURL.Host, 2*server.DEFAULT_ROUTE_CONNECT)
|
||||
|
||||
// Go ahead and close the Route.
|
||||
route.Close()
|
||||
|
||||
// We expect to get called back..
|
||||
route = acceptRouteConn(t, rUrl.Host, 2*server.DEFAULT_ROUTE_CONNECT)
|
||||
route = acceptRouteConn(t, rURL.Host, 2*server.DEFAULT_ROUTE_CONNECT)
|
||||
route.Close()
|
||||
}
|
||||
|
||||
@@ -546,10 +546,10 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Could not marshal test route info: %v", err)
|
||||
}
|
||||
infoJson := fmt.Sprintf("INFO %s\r\n", b)
|
||||
infoJSON := fmt.Sprintf("INFO %s\r\n", b)
|
||||
|
||||
// Trigger the send of local subs.
|
||||
routeSend(infoJson)
|
||||
routeSend(infoJSON)
|
||||
|
||||
routeExpect(subRe)
|
||||
|
||||
@@ -563,7 +563,7 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
|
||||
|
||||
routeExpect(infoRe)
|
||||
|
||||
routeSend(infoJson)
|
||||
routeSend(infoJSON)
|
||||
routeExpect(subRe)
|
||||
}
|
||||
|
||||
|
||||
56
test/test.go
56
test/test.go
@@ -32,6 +32,7 @@ type tLogger interface {
|
||||
Errorf(format string, args ...interface{})
|
||||
}
|
||||
|
||||
// DefaultTestOptions are default options for the unit tests.
|
||||
var DefaultTestOptions = server.Options{
|
||||
Host: "localhost",
|
||||
Port: 4222,
|
||||
@@ -39,15 +40,17 @@ var DefaultTestOptions = server.Options{
|
||||
NoSigs: true,
|
||||
}
|
||||
|
||||
// RunDefaultServer starts a new Go routine based server using the default options
|
||||
func RunDefaultServer() *server.Server {
|
||||
return RunServer(&DefaultTestOptions)
|
||||
}
|
||||
|
||||
// New Go Routine based server
|
||||
// RunServer starts a new Go routine based server
|
||||
func RunServer(opts *server.Options) *server.Server {
|
||||
return RunServerWithAuth(opts, nil)
|
||||
}
|
||||
|
||||
// LoadConfig loads a configuration from a filename
|
||||
func LoadConfig(configFile string) (opts *server.Options) {
|
||||
opts, err := server.ProcessConfigFile(configFile)
|
||||
if err != nil {
|
||||
@@ -57,6 +60,7 @@ func LoadConfig(configFile string) (opts *server.Options) {
|
||||
return
|
||||
}
|
||||
|
||||
// RunServerWithConfig starts a new Go routine based server with a configuration file.
|
||||
func RunServerWithConfig(configFile string) (srv *server.Server, opts *server.Options) {
|
||||
opts = LoadConfig(configFile)
|
||||
|
||||
@@ -72,7 +76,7 @@ func RunServerWithConfig(configFile string) (srv *server.Server, opts *server.Op
|
||||
return
|
||||
}
|
||||
|
||||
// New Go Routine based server with auth
|
||||
// RunServerWithAuth starts a new Go routine based server with auth
|
||||
func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server {
|
||||
if opts == nil {
|
||||
opts = &DefaultTestOptions
|
||||
@@ -251,10 +255,10 @@ func doDefaultConnect(t tLogger, c net.Conn) {
|
||||
doConnect(t, c, false, false, false)
|
||||
}
|
||||
|
||||
const CONNECT_F = "CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\",\"name\":\"%s\"}\r\n"
|
||||
const connectProto = "CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\",\"name\":\"%s\"}\r\n"
|
||||
|
||||
func doRouteAuthConnect(t tLogger, c net.Conn, user, pass, id string) {
|
||||
cs := fmt.Sprintf(CONNECT_F, user, pass, id)
|
||||
cs := fmt.Sprintf(connectProto, user, pass, id)
|
||||
sendProto(t, c, cs)
|
||||
}
|
||||
|
||||
@@ -320,11 +324,11 @@ var (
|
||||
)
|
||||
|
||||
const (
|
||||
SUB_INDEX = 1
|
||||
SID_INDEX = 2
|
||||
REPLY_INDEX = 4
|
||||
LEN_INDEX = 5
|
||||
MSG_INDEX = 6
|
||||
subIndex = 1
|
||||
sidIndex = 2
|
||||
replyIndex = 4
|
||||
lenIndex = 5
|
||||
msgIndex = 6
|
||||
)
|
||||
|
||||
// Reuse expect buffer
|
||||
@@ -361,20 +365,20 @@ func expectNothing(t tLogger, c net.Conn) {
|
||||
|
||||
// This will check that we got what we expected.
|
||||
func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) {
|
||||
if string(m[SUB_INDEX]) != subject {
|
||||
stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[SUB_INDEX])
|
||||
if string(m[subIndex]) != subject {
|
||||
stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[subIndex])
|
||||
}
|
||||
if sid != "" && string(m[SID_INDEX]) != sid {
|
||||
stackFatalf(t, "Did not get correct sid: expected '%s' got '%s'\n", sid, m[SID_INDEX])
|
||||
if sid != "" && string(m[sidIndex]) != sid {
|
||||
stackFatalf(t, "Did not get correct sid: expected '%s' got '%s'\n", sid, m[sidIndex])
|
||||
}
|
||||
if string(m[REPLY_INDEX]) != reply {
|
||||
stackFatalf(t, "Did not get correct reply: expected '%s' got '%s'\n", reply, m[REPLY_INDEX])
|
||||
if string(m[replyIndex]) != reply {
|
||||
stackFatalf(t, "Did not get correct reply: expected '%s' got '%s'\n", reply, m[replyIndex])
|
||||
}
|
||||
if string(m[LEN_INDEX]) != len {
|
||||
stackFatalf(t, "Did not get correct msg length: expected '%s' got '%s'\n", len, m[LEN_INDEX])
|
||||
if string(m[lenIndex]) != len {
|
||||
stackFatalf(t, "Did not get correct msg length: expected '%s' got '%s'\n", len, m[lenIndex])
|
||||
}
|
||||
if string(m[MSG_INDEX]) != msg {
|
||||
stackFatalf(t, "Did not get correct msg: expected '%s' got '%s'\n", msg, m[MSG_INDEX])
|
||||
if string(m[msgIndex]) != msg {
|
||||
stackFatalf(t, "Did not get correct msg: expected '%s' got '%s'\n", msg, m[msgIndex])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,9 +402,9 @@ func checkForQueueSid(t tLogger, matches [][][]byte, sids []string) {
|
||||
seen[sid] = 0
|
||||
}
|
||||
for _, m := range matches {
|
||||
sid := string(m[SID_INDEX])
|
||||
sid := string(m[sidIndex])
|
||||
if _, ok := seen[sid]; ok {
|
||||
seen[sid] += 1
|
||||
seen[sid]++
|
||||
}
|
||||
}
|
||||
// Make sure we only see one and exactly one.
|
||||
@@ -421,9 +425,9 @@ func checkForPubSids(t tLogger, matches [][][]byte, sids []string) {
|
||||
seen[sid] = 0
|
||||
}
|
||||
for _, m := range matches {
|
||||
sid := string(m[SID_INDEX])
|
||||
sid := string(m[sidIndex])
|
||||
if _, ok := seen[sid]; ok {
|
||||
seen[sid] += 1
|
||||
seen[sid]++
|
||||
}
|
||||
}
|
||||
// Make sure we only see one and exactly one for each sid.
|
||||
@@ -438,8 +442,8 @@ func checkForPubSids(t tLogger, matches [][][]byte, sids []string) {
|
||||
// Helper function to generate next opts to make sure no port conflicts etc.
|
||||
func nextServerOpts(opts *server.Options) *server.Options {
|
||||
nopts := *opts
|
||||
nopts.Port += 1
|
||||
nopts.ClusterPort += 1
|
||||
nopts.HTTPPort += 1
|
||||
nopts.Port++
|
||||
nopts.ClusterPort++
|
||||
nopts.HTTPPort++
|
||||
return &nopts
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user