mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge branch 'master' of github.com:nats-io/gnatsd into config_reload
This commit is contained in:
@@ -192,8 +192,8 @@ func (c *client) initClient() {
|
||||
c.cid = atomic.AddUint64(&s.gcid, 1)
|
||||
c.bw = bufio.NewWriterSize(c.nc, startBufSize)
|
||||
c.subs = make(map[string]*subscription)
|
||||
c.debug = (atomic.LoadInt32(&debug) != 0)
|
||||
c.trace = (atomic.LoadInt32(&trace) != 0)
|
||||
c.debug = (atomic.LoadInt32(&c.srv.logging.debug) != 0)
|
||||
c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0)
|
||||
|
||||
// This is a scratch buffer used for processMsg()
|
||||
// The msg header starts with "MSG ",
|
||||
@@ -1350,13 +1350,13 @@ func (c *client) closeConnection() {
|
||||
}
|
||||
|
||||
if rid != "" && srv.remotes[rid] != nil {
|
||||
Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
|
||||
c.srv.Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
|
||||
return
|
||||
} else if rid == srv.info.ID {
|
||||
Debugf("Detected route to self, ignoring \"%s\"", rurl)
|
||||
c.srv.Debugf("Detected route to self, ignoring \"%s\"", rurl)
|
||||
return
|
||||
} else if rtype != Implicit || retryImplicit {
|
||||
Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
|
||||
c.srv.Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
|
||||
// Keep track of this go-routine so we can wait for it on
|
||||
// server shutdown.
|
||||
srv.startGoRoutine(func() { srv.reConnectToRoute(rurl, rtype) })
|
||||
@@ -1368,20 +1368,20 @@ func (c *client) closeConnection() {
|
||||
|
||||
func (c *client) Errorf(format string, v ...interface{}) {
|
||||
format = fmt.Sprintf("%s - %s", c, format)
|
||||
Errorf(format, v...)
|
||||
c.srv.Errorf(format, v...)
|
||||
}
|
||||
|
||||
func (c *client) Debugf(format string, v ...interface{}) {
|
||||
format = fmt.Sprintf("%s - %s", c, format)
|
||||
Debugf(format, v...)
|
||||
c.srv.Debugf(format, v...)
|
||||
}
|
||||
|
||||
func (c *client) Noticef(format string, v ...interface{}) {
|
||||
format = fmt.Sprintf("%s - %s", c, format)
|
||||
Noticef(format, v...)
|
||||
c.srv.Noticef(format, v...)
|
||||
}
|
||||
|
||||
func (c *client) Tracef(format string, v ...interface{}) {
|
||||
format = fmt.Sprintf("%s - %s", c, format)
|
||||
Tracef(format, v...)
|
||||
c.srv.Tracef(format, v...)
|
||||
}
|
||||
|
||||
@@ -591,7 +591,7 @@ func TestClientMapRemoval(t *testing.T) {
|
||||
func TestAuthorizationTimeout(t *testing.T) {
|
||||
serverOptions := defaultServerOptions
|
||||
serverOptions.Authorization = "my_token"
|
||||
serverOptions.AuthTimeout = 1
|
||||
serverOptions.AuthTimeout = 0.4
|
||||
s := RunServer(&serverOptions)
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -604,6 +604,7 @@ func TestAuthorizationTimeout(t *testing.T) {
|
||||
if _, err := client.ReadString('\n'); err != nil {
|
||||
t.Fatalf("Error receiving info from server: %v\n", err)
|
||||
}
|
||||
time.Sleep(2 * secondsToDuration(serverOptions.AuthTimeout))
|
||||
l, err := client.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving info from server: %v\n", err)
|
||||
|
||||
@@ -4,21 +4,11 @@ package server
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/nats-io/gnatsd/logger"
|
||||
)
|
||||
|
||||
// Package globals for performance checks
|
||||
var trace int32
|
||||
var debug int32
|
||||
|
||||
var log = struct {
|
||||
*sync.Mutex
|
||||
logger Logger
|
||||
}{Mutex: new(sync.Mutex)}
|
||||
|
||||
// Logger interface of the NATS Server
|
||||
type Logger interface {
|
||||
|
||||
@@ -68,19 +58,19 @@ func (s *Server) ConfigureLogger() {
|
||||
// SetLogger sets the logger of the server
|
||||
func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) {
|
||||
if debugFlag {
|
||||
atomic.StoreInt32(&debug, 1)
|
||||
atomic.StoreInt32(&s.logging.debug, 1)
|
||||
} else {
|
||||
atomic.StoreInt32(&debug, 0)
|
||||
atomic.StoreInt32(&s.logging.debug, 0)
|
||||
}
|
||||
if traceFlag {
|
||||
atomic.StoreInt32(&trace, 1)
|
||||
atomic.StoreInt32(&s.logging.trace, 1)
|
||||
} else {
|
||||
atomic.StoreInt32(&trace, 0)
|
||||
atomic.StoreInt32(&s.logging.trace, 0)
|
||||
}
|
||||
|
||||
log.Lock()
|
||||
log.logger = logger
|
||||
log.Unlock()
|
||||
s.logging.Lock()
|
||||
s.logging.logger = logger
|
||||
s.logging.Unlock()
|
||||
}
|
||||
|
||||
// If the logger is a file based logger, close and re-open the file.
|
||||
@@ -88,73 +78,73 @@ func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) {
|
||||
// the process to trigger this function.
|
||||
func (s *Server) ReOpenLogFile() {
|
||||
// Check to make sure this is a file logger.
|
||||
log.Lock()
|
||||
ll := log.logger
|
||||
log.Unlock()
|
||||
s.logging.RLock()
|
||||
ll := s.logging.logger
|
||||
s.logging.RUnlock()
|
||||
|
||||
if ll == nil {
|
||||
Noticef("File log re-open ignored, no logger")
|
||||
s.Noticef("File log re-open ignored, no logger")
|
||||
return
|
||||
}
|
||||
if s.getOpts().LogFile == "" {
|
||||
Noticef("File log re-open ignored, not a file logger")
|
||||
s.Noticef("File log re-open ignored, not a file logger")
|
||||
} else {
|
||||
fileLog := logger.NewFileLogger(s.getOpts().LogFile,
|
||||
s.getOpts().Logtime, s.getOpts().Debug, s.getOpts().Trace, true)
|
||||
s.SetLogger(fileLog, s.getOpts().Debug, s.getOpts().Trace)
|
||||
Noticef("File log re-opened")
|
||||
s.Noticef("File log re-opened")
|
||||
}
|
||||
}
|
||||
|
||||
// Noticef logs a notice statement
|
||||
func Noticef(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
func (s *Server) Noticef(format string, v ...interface{}) {
|
||||
s.executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Noticef(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Errorf logs an error
|
||||
func Errorf(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
func (s *Server) Errorf(format string, v ...interface{}) {
|
||||
s.executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Errorf(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Fatalf logs a fatal error
|
||||
func Fatalf(format string, v ...interface{}) {
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
func (s *Server) Fatalf(format string, v ...interface{}) {
|
||||
s.executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Fatalf(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Debugf logs a debug statement
|
||||
func Debugf(format string, v ...interface{}) {
|
||||
if atomic.LoadInt32(&debug) == 0 {
|
||||
func (s *Server) Debugf(format string, v ...interface{}) {
|
||||
if atomic.LoadInt32(&s.logging.debug) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
s.executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Debugf(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
// Tracef logs a trace statement
|
||||
func Tracef(format string, v ...interface{}) {
|
||||
if atomic.LoadInt32(&trace) == 0 {
|
||||
func (s *Server) Tracef(format string, v ...interface{}) {
|
||||
if atomic.LoadInt32(&s.logging.trace) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
s.executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Tracef(format, v...)
|
||||
}, format, v...)
|
||||
}
|
||||
|
||||
func executeLogCall(f func(logger Logger, format string, v ...interface{}), format string, args ...interface{}) {
|
||||
log.Lock()
|
||||
defer log.Unlock()
|
||||
if log.logger == nil {
|
||||
func (s *Server) executeLogCall(f func(logger Logger, format string, v ...interface{}), format string, args ...interface{}) {
|
||||
s.logging.RLock()
|
||||
defer s.logging.RUnlock()
|
||||
if s.logging.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
f(log.logger, format, args...)
|
||||
f(s.logging.logger, format, args...)
|
||||
}
|
||||
|
||||
@@ -20,46 +20,46 @@ func TestSetLogger(t *testing.T) {
|
||||
server.SetLogger(dl, true, true)
|
||||
|
||||
// We assert that the logger has change to the DummyLogger
|
||||
_ = log.logger.(*DummyLogger)
|
||||
_ = server.logging.logger.(*DummyLogger)
|
||||
|
||||
if debug != 1 {
|
||||
t.Fatalf("Expected debug 1, received value %d\n", debug)
|
||||
if server.logging.debug != 1 {
|
||||
t.Fatalf("Expected debug 1, received value %d\n", server.logging.debug)
|
||||
}
|
||||
|
||||
if trace != 1 {
|
||||
t.Fatalf("Expected trace 1, received value %d\n", trace)
|
||||
if server.logging.trace != 1 {
|
||||
t.Fatalf("Expected trace 1, received value %d\n", server.logging.trace)
|
||||
}
|
||||
|
||||
// Check traces
|
||||
expectedStr := "This is a Notice"
|
||||
Noticef(expectedStr)
|
||||
server.Noticef(expectedStr)
|
||||
dl.checkContent(t, expectedStr)
|
||||
expectedStr = "This is an Error"
|
||||
Errorf(expectedStr)
|
||||
server.Errorf(expectedStr)
|
||||
dl.checkContent(t, expectedStr)
|
||||
expectedStr = "This is a Fatal"
|
||||
Fatalf(expectedStr)
|
||||
server.Fatalf(expectedStr)
|
||||
dl.checkContent(t, expectedStr)
|
||||
expectedStr = "This is a Debug"
|
||||
Debugf(expectedStr)
|
||||
server.Debugf(expectedStr)
|
||||
dl.checkContent(t, expectedStr)
|
||||
expectedStr = "This is a Trace"
|
||||
Tracef(expectedStr)
|
||||
server.Tracef(expectedStr)
|
||||
dl.checkContent(t, expectedStr)
|
||||
|
||||
// Make sure that we can reset to fal
|
||||
server.SetLogger(dl, false, false)
|
||||
if debug != 0 {
|
||||
t.Fatalf("Expected debug 0, got %v", debug)
|
||||
if server.logging.debug != 0 {
|
||||
t.Fatalf("Expected debug 0, got %v", server.logging.debug)
|
||||
}
|
||||
if trace != 0 {
|
||||
t.Fatalf("Expected trace 0, got %v", trace)
|
||||
if server.logging.trace != 0 {
|
||||
t.Fatalf("Expected trace 0, got %v", server.logging.trace)
|
||||
}
|
||||
// Now, Debug and Trace should not produce anything
|
||||
dl.msg = ""
|
||||
Debugf("This Debug should not be traced")
|
||||
server.Debugf("This Debug should not be traced")
|
||||
dl.checkContent(t, "")
|
||||
Tracef("This Trace should not be traced")
|
||||
server.Tracef("This Trace should not be traced")
|
||||
dl.checkContent(t, "")
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ func TestReOpenLogFile(t *testing.T) {
|
||||
s.SetLogger(fileLog, false, false)
|
||||
// Add some log
|
||||
expectedStr := "This is a Notice"
|
||||
Noticef(expectedStr)
|
||||
s.Noticef(expectedStr)
|
||||
// Check content of log
|
||||
buf, err := ioutil.ReadFile(s.opts.LogFile)
|
||||
if err != nil {
|
||||
@@ -139,7 +139,7 @@ func TestReOpenLogFile(t *testing.T) {
|
||||
t.Fatalf("File should indicate that file log was re-opened, got: %v", string(buf))
|
||||
}
|
||||
// Make sure we can append to the log
|
||||
Noticef("New message")
|
||||
s.Noticef("New message")
|
||||
buf, err = ioutil.ReadFile(s.opts.LogFile)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading file: %v", err)
|
||||
|
||||
@@ -242,7 +242,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
b, err := json.MarshalIndent(c, "", " ")
|
||||
if err != nil {
|
||||
Errorf("Error marshaling response to /connz request: %v", err)
|
||||
s.Errorf("Error marshaling response to /connz request: %v", err)
|
||||
}
|
||||
|
||||
// Handle response
|
||||
@@ -333,7 +333,7 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
b, err := json.MarshalIndent(rs, "", " ")
|
||||
if err != nil {
|
||||
Errorf("Error marshaling response to /routez request: %v", err)
|
||||
s.Errorf("Error marshaling response to /routez request: %v", err)
|
||||
}
|
||||
|
||||
// Handle response
|
||||
@@ -349,7 +349,7 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
|
||||
st := &Subsz{s.sl.Stats()}
|
||||
b, err := json.MarshalIndent(st, "", " ")
|
||||
if err != nil {
|
||||
Errorf("Error marshaling response to /subscriptionsz request: %v", err)
|
||||
s.Errorf("Error marshaling response to /subscriptionsz request: %v", err)
|
||||
}
|
||||
|
||||
// Handle response
|
||||
@@ -486,7 +486,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
b, err := json.MarshalIndent(v, "", " ")
|
||||
if err != nil {
|
||||
Errorf("Error marshaling response to /varz request: %v", err)
|
||||
s.Errorf("Error marshaling response to /varz request: %v", err)
|
||||
}
|
||||
|
||||
// Handle response
|
||||
|
||||
@@ -737,15 +737,21 @@ func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error)
|
||||
var cleanRoutes []*url.URL
|
||||
cport := strconv.Itoa(clusterPort)
|
||||
|
||||
selfIPs := getInterfaceIPs()
|
||||
selfIPs, err := getInterfaceIPs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, r := range routes {
|
||||
host, port, err := net.SplitHostPort(r.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cport == port && isIPInList(selfIPs, getURLIP(host)) {
|
||||
Noticef("Self referencing IP found: ", r)
|
||||
ipList, err := getURLIP(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cport == port && isIPInList(selfIPs, ipList) {
|
||||
continue
|
||||
}
|
||||
cleanRoutes = append(cleanRoutes, r)
|
||||
@@ -765,19 +771,18 @@ func isIPInList(list1 []net.IP, list2 []net.IP) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func getURLIP(ipStr string) []net.IP {
|
||||
func getURLIP(ipStr string) ([]net.IP, error) {
|
||||
ipList := []net.IP{}
|
||||
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip != nil {
|
||||
ipList = append(ipList, ip)
|
||||
return ipList
|
||||
return ipList, nil
|
||||
}
|
||||
|
||||
hostAddr, err := net.LookupHost(ipStr)
|
||||
if err != nil {
|
||||
Errorf("Error looking up host with route hostname: %v", err)
|
||||
return ipList
|
||||
return nil, fmt.Errorf("Error looking up host with route hostname: %v", err)
|
||||
}
|
||||
for _, addr := range hostAddr {
|
||||
ip = net.ParseIP(addr)
|
||||
@@ -785,16 +790,15 @@ func getURLIP(ipStr string) []net.IP {
|
||||
ipList = append(ipList, ip)
|
||||
}
|
||||
}
|
||||
return ipList
|
||||
return ipList, nil
|
||||
}
|
||||
|
||||
func getInterfaceIPs() []net.IP {
|
||||
func getInterfaceIPs() ([]net.IP, error) {
|
||||
var localIPs []net.IP
|
||||
|
||||
interfaceAddr, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
Errorf("Error getting self referencing address: %v", err)
|
||||
return localIPs
|
||||
return nil, fmt.Errorf("Error getting self referencing address: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < len(interfaceAddr); i++ {
|
||||
@@ -802,10 +806,10 @@ func getInterfaceIPs() []net.IP {
|
||||
if net.ParseIP(interfaceIP.String()) != nil {
|
||||
localIPs = append(localIPs, interfaceIP)
|
||||
} else {
|
||||
Errorf("Error parsing self referencing address: %v", err)
|
||||
return nil, fmt.Errorf("Error parsing self referencing address: %v", err)
|
||||
}
|
||||
}
|
||||
return localIPs
|
||||
return localIPs, nil
|
||||
}
|
||||
|
||||
func processOptions(opts *Options) {
|
||||
|
||||
@@ -8,11 +8,11 @@ import (
|
||||
)
|
||||
|
||||
func dummyClient() *client {
|
||||
return &client{}
|
||||
return &client{srv: New(&defaultServerOptions)}
|
||||
}
|
||||
|
||||
func dummyRouteClient() *client {
|
||||
return &client{typ: ROUTER}
|
||||
return &client{srv: New(&defaultServerOptions), typ: ROUTER}
|
||||
}
|
||||
|
||||
func TestParsePing(t *testing.T) {
|
||||
|
||||
@@ -24,8 +24,8 @@ type traceOption struct {
|
||||
|
||||
// Apply the tracing change by reconfiguring the server's logger.
|
||||
func (t *traceOption) Apply(server *Server) {
|
||||
Noticef("Reloaded: trace = %v", t.newValue)
|
||||
server.ConfigureLogger()
|
||||
server.Noticef("Reloaded: trace = %v", t.newValue)
|
||||
}
|
||||
|
||||
// Reload reads the current configuration file and applies any supported
|
||||
@@ -95,5 +95,5 @@ func (s *Server) applyOptions(opts []option) {
|
||||
opt.Apply(s)
|
||||
}
|
||||
|
||||
Noticef("Reloaded server configuration")
|
||||
s.Noticef("Reloaded server configuration")
|
||||
}
|
||||
|
||||
@@ -140,14 +140,6 @@ func TestConfigReloadInvalidConfig(t *testing.T) {
|
||||
|
||||
// Ensure Reload returns nil and the config is changed on success.
|
||||
func TestConfigReload(t *testing.T) {
|
||||
// The server package uses a global logger that gets configured by calls to
|
||||
// server.ConfigureLogger(). We need to restore it to prevent side effects.
|
||||
// TODO: Consider getting rid of global logger.
|
||||
logBefore := log
|
||||
defer func() {
|
||||
log = logBefore
|
||||
}()
|
||||
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
|
||||
@@ -236,7 +236,7 @@ func (s *Server) processImplicitRoute(info *Info) {
|
||||
// Initiate the connection, using info.IP instead of info.URL here...
|
||||
r, err := url.Parse(info.IP)
|
||||
if err != nil {
|
||||
Debugf("Error parsing URL from INFO: %v\n", err)
|
||||
s.Debugf("Error parsing URL from INFO: %v\n", err)
|
||||
return
|
||||
}
|
||||
if info.AuthRequired {
|
||||
@@ -547,7 +547,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
|
||||
func (s *Server) broadcastInterestToRoutes(proto string) {
|
||||
var arg []byte
|
||||
if atomic.LoadInt32(&trace) == 1 {
|
||||
if atomic.LoadInt32(&s.logging.trace) == 1 {
|
||||
arg = []byte(proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
protoAsBytes := []byte(proto)
|
||||
@@ -593,12 +593,12 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
|
||||
|
||||
func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
hp := net.JoinHostPort(s.getOpts().Cluster.Host, strconv.Itoa(s.getOpts().Cluster.Port))
|
||||
Noticef("Listening for route connections on %s", hp)
|
||||
s.Noticef("Listening for route connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
// We need to close this channel to avoid a deadlock
|
||||
close(ch)
|
||||
Fatalf("Error listening on router port: %d - %v", s.getOpts().Cluster.Port, e)
|
||||
s.Fatalf("Error listening on router port: %d - %v", s.getOpts().Cluster.Port, e)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -616,7 +616,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
Debugf("Temporary Route Accept Errorf(%v), sleeping %dms",
|
||||
s.Debugf("Temporary Route Accept Errorf(%v), sleeping %dms",
|
||||
ne, tmpDelay/time.Millisecond)
|
||||
time.Sleep(tmpDelay)
|
||||
tmpDelay *= 2
|
||||
@@ -624,7 +624,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
tmpDelay = ACCEPT_MAX_SLEEP
|
||||
}
|
||||
} else if s.isRunning() {
|
||||
Noticef("Accept error: %v", err)
|
||||
s.Noticef("Accept error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -634,7 +634,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
s.grWG.Done()
|
||||
})
|
||||
}
|
||||
Debugf("Router accept loop exiting..")
|
||||
s.Debugf("Router accept loop exiting..")
|
||||
s.done <- true
|
||||
}
|
||||
|
||||
@@ -695,10 +695,10 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
|
||||
defer s.grWG.Done()
|
||||
attempts := 0
|
||||
for s.isRunning() && rURL != nil {
|
||||
Debugf("Trying to connect to route on %s", rURL.Host)
|
||||
s.Debugf("Trying to connect to route on %s", rURL.Host)
|
||||
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL)
|
||||
if err != nil {
|
||||
Debugf("Error trying to connect to route: %v", err)
|
||||
s.Debugf("Error trying to connect to route: %v", err)
|
||||
if !tryForEver {
|
||||
if s.getOpts().Cluster.ConnectRetries <= 0 {
|
||||
return
|
||||
|
||||
@@ -78,6 +78,12 @@ type Server struct {
|
||||
grRunning bool
|
||||
grWG sync.WaitGroup // to wait on various go routines
|
||||
cproto int64 // number of clients supporting async INFO
|
||||
logging struct {
|
||||
sync.RWMutex
|
||||
logger Logger
|
||||
trace int32
|
||||
debug int32
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure all are 64bits for atomic use
|
||||
@@ -164,7 +170,7 @@ func (s *Server) generateServerInfoJSON() {
|
||||
// Generate the info json
|
||||
b, err := json.Marshal(s.info)
|
||||
if err != nil {
|
||||
Fatalf("Error marshaling INFO JSON: %+v\n", err)
|
||||
s.Fatalf("Error marshaling INFO JSON: %+v\n", err)
|
||||
return
|
||||
}
|
||||
s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF))
|
||||
@@ -219,8 +225,8 @@ func (s *Server) logPid() {
|
||||
// Start up the server, this will block.
|
||||
// Start via a Go routine if needed.
|
||||
func (s *Server) Start() {
|
||||
Noticef("Starting nats-server version %s", VERSION)
|
||||
Debugf("Go build version %s", s.info.GoVersion)
|
||||
s.Noticef("Starting nats-server version %s", VERSION)
|
||||
s.Debugf("Go build version %s", s.info.GoVersion)
|
||||
|
||||
// Avoid RACE between Start() and Shutdown()
|
||||
s.mu.Lock()
|
||||
@@ -238,7 +244,7 @@ func (s *Server) Start() {
|
||||
|
||||
// Start monitoring if needed
|
||||
if err := s.StartMonitoring(); err != nil {
|
||||
Fatalf("Can't start monitoring: %v", err)
|
||||
s.Fatalf("Can't start monitoring: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -351,20 +357,20 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
}()
|
||||
|
||||
hp := net.JoinHostPort(s.getOpts().Host, strconv.Itoa(s.getOpts().Port))
|
||||
Noticef("Listening for client connections on %s", hp)
|
||||
s.Noticef("Listening for client connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
Fatalf("Error listening on port: %s, %q", hp, e)
|
||||
s.Fatalf("Error listening on port: %s, %q", hp, e)
|
||||
return
|
||||
}
|
||||
|
||||
// Alert of TLS enabled.
|
||||
if s.getOpts().TLSConfig != nil {
|
||||
Noticef("TLS required for client connections")
|
||||
s.Noticef("TLS required for client connections")
|
||||
}
|
||||
|
||||
Debugf("Server id is %s", s.info.ID)
|
||||
Noticef("Server is ready")
|
||||
s.Debugf("Server id is %s", s.info.ID)
|
||||
s.Noticef("Server is ready")
|
||||
|
||||
// Setup state that can enable shutdown
|
||||
s.mu.Lock()
|
||||
@@ -376,13 +382,13 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
// Write resolved port back to options.
|
||||
_, port, err := net.SplitHostPort(l.Addr().String())
|
||||
if err != nil {
|
||||
Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
s.Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
portNum, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
s.Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -400,7 +406,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
Debugf("Temporary Client Accept Error(%v), sleeping %dms",
|
||||
s.Debugf("Temporary Client Accept Error(%v), sleeping %dms",
|
||||
ne, tmpDelay/time.Millisecond)
|
||||
time.Sleep(tmpDelay)
|
||||
tmpDelay *= 2
|
||||
@@ -408,7 +414,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
tmpDelay = ACCEPT_MAX_SLEEP
|
||||
}
|
||||
} else if s.isRunning() {
|
||||
Noticef("Accept error: %v", err)
|
||||
s.Noticef("Accept error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -418,18 +424,18 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
s.grWG.Done()
|
||||
})
|
||||
}
|
||||
Noticef("Server Exiting..")
|
||||
s.Noticef("Server Exiting..")
|
||||
s.done <- true
|
||||
}
|
||||
|
||||
// StartProfiler is called to enable dynamic profiling.
|
||||
func (s *Server) StartProfiler() {
|
||||
Noticef("Starting profiling on http port %d", s.getOpts().ProfPort)
|
||||
s.Noticef("Starting profiling on http port %d", s.getOpts().ProfPort)
|
||||
hp := net.JoinHostPort(s.getOpts().Host, strconv.Itoa(s.getOpts().ProfPort))
|
||||
go func() {
|
||||
err := http.ListenAndServe(hp, nil)
|
||||
if err != nil {
|
||||
Fatalf("error starting monitor server: %s", err)
|
||||
s.Fatalf("error starting monitor server: %s", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -494,14 +500,14 @@ func (s *Server) startMonitoring(secure bool) error {
|
||||
|
||||
if secure {
|
||||
hp = net.JoinHostPort(s.getOpts().HTTPHost, strconv.Itoa(s.getOpts().HTTPSPort))
|
||||
Noticef("Starting https monitor on %s", hp)
|
||||
s.Noticef("Starting https monitor on %s", hp)
|
||||
config := util.CloneTLSConfig(s.getOpts().TLSConfig)
|
||||
config.ClientAuth = tls.NoClientCert
|
||||
httpListener, err = tls.Listen("tcp", hp, config)
|
||||
|
||||
} else {
|
||||
hp = net.JoinHostPort(s.getOpts().HTTPHost, strconv.Itoa(s.getOpts().HTTPPort))
|
||||
Noticef("Starting http monitor on %s", hp)
|
||||
s.Noticef("Starting http monitor on %s", hp)
|
||||
httpListener, err = net.Listen("tcp", hp)
|
||||
}
|
||||
|
||||
@@ -921,7 +927,7 @@ func (s *Server) getClientConnectURLs() []string {
|
||||
// ended-up returning 0.0.0.0, which is problematic for Windows clients.
|
||||
// Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
|
||||
if s.getOpts().Host == "0.0.0.0" || s.getOpts().Host == "::" {
|
||||
Errorf("Address %q can not be resolved properly", s.getOpts().Host)
|
||||
s.Errorf("Address %q can not be resolved properly", s.getOpts().Host)
|
||||
} else {
|
||||
urls = append(urls, net.JoinHostPort(s.getOpts().Host, sPort))
|
||||
}
|
||||
|
||||
@@ -20,10 +20,10 @@ func (s *Server) handleSignals() {
|
||||
|
||||
go func() {
|
||||
for sig := range c {
|
||||
Debugf("Trapped %q signal", sig)
|
||||
s.Debugf("Trapped %q signal", sig)
|
||||
switch sig {
|
||||
case syscall.SIGINT:
|
||||
Noticef("Server Exiting..")
|
||||
s.Noticef("Server Exiting..")
|
||||
os.Exit(0)
|
||||
case syscall.SIGUSR1:
|
||||
// File log re-open for rotating file logs.
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestSignalToReOpenLogFile(t *testing.T) {
|
||||
|
||||
// Add a trace
|
||||
expectedStr := "This is a Notice"
|
||||
Noticef(expectedStr)
|
||||
s.Noticef(expectedStr)
|
||||
buf, err := ioutil.ReadFile(logFile)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading file: %v", err)
|
||||
|
||||
@@ -288,7 +288,8 @@ func TestSplitConnectArg(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitDanglingArgBuf(t *testing.T) {
|
||||
c := &client{subs: make(map[string]*subscription)}
|
||||
s := New(&defaultServerOptions)
|
||||
c := &client{srv: s, subs: make(map[string]*subscription)}
|
||||
|
||||
// We test to make sure we do not dangle any argBufs after processing
|
||||
// since that could lead to performance issues.
|
||||
|
||||
Reference in New Issue
Block a user