mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Address code review feedback
This commit is contained in:
@@ -664,7 +664,7 @@ Some unit tests make use of temporary symlinks for testing purposes. On Windows,
|
||||
FAIL
|
||||
```
|
||||
|
||||
Similarly, this can fail when creating a symlink on a network drive, which is typically now allowed by default:
|
||||
Similarly, this can fail when creating a symlink on a network drive, which is typically not allowed by default:
|
||||
|
||||
```
|
||||
--- FAIL: TestConfigReload (0.00s)
|
||||
|
||||
2
main.go
2
main.go
@@ -149,7 +149,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Snapshot flag options.
|
||||
*server.FlagSnapshot = *opts
|
||||
server.FlagSnapshot = opts.Clone()
|
||||
|
||||
// Parse config if given
|
||||
if configFile != "" {
|
||||
|
||||
@@ -15,6 +15,18 @@ type User struct {
|
||||
Permissions *Permissions `json:"permissions"`
|
||||
}
|
||||
|
||||
// clone performs a deep copy of the User struct, returning a new clone with
|
||||
// all values copied.
|
||||
func (u *User) clone() *User {
|
||||
if u == nil {
|
||||
return nil
|
||||
}
|
||||
clone := &User{}
|
||||
*clone = *u
|
||||
clone.Permissions = u.Permissions.clone()
|
||||
return clone
|
||||
}
|
||||
|
||||
// Permissions are the allowed subjects on a per
|
||||
// publish or subscribe basis.
|
||||
type Permissions struct {
|
||||
@@ -22,21 +34,40 @@ type Permissions struct {
|
||||
Subscribe []string `json:"subscribe"`
|
||||
}
|
||||
|
||||
// clone performs a deep copy of the Permissions struct, returning a new clone
|
||||
// with all values copied.
|
||||
func (p *Permissions) clone() *Permissions {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
clone := &Permissions{
|
||||
Publish: make([]string, len(p.Publish)),
|
||||
Subscribe: make([]string, len(p.Subscribe)),
|
||||
}
|
||||
copy(clone.Publish, p.Publish)
|
||||
copy(clone.Subscribe, p.Subscribe)
|
||||
return clone
|
||||
}
|
||||
|
||||
// configureAuthorization will do any setup needed for authorization.
|
||||
// Lock is assumed held.
|
||||
func (s *Server) configureAuthorization() {
|
||||
if s.opts == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Check for multiple users first
|
||||
// This just checks and sets up the user map if we have multiple users.
|
||||
if s.getOpts().Users != nil {
|
||||
if opts.Users != nil {
|
||||
s.users = make(map[string]*User)
|
||||
for _, u := range s.getOpts().Users {
|
||||
for _, u := range opts.Users {
|
||||
s.users[u.Username] = u
|
||||
}
|
||||
s.info.AuthRequired = true
|
||||
} else if s.getOpts().Username != "" || s.getOpts().Authorization != "" {
|
||||
} else if opts.Username != "" || opts.Authorization != "" {
|
||||
s.info.AuthRequired = true
|
||||
}
|
||||
}
|
||||
@@ -57,6 +88,9 @@ func (s *Server) checkAuthorization(c *client) bool {
|
||||
// isClientAuthorized will check the client against the proper authorization method and data.
|
||||
// This could be token or username/password based.
|
||||
func (s *Server) isClientAuthorized(c *client) bool {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Check multiple users first, then token, then single user/pass.
|
||||
if s.users != nil {
|
||||
user, ok := s.users[c.opts.Username]
|
||||
@@ -71,14 +105,14 @@ func (s *Server) isClientAuthorized(c *client) bool {
|
||||
}
|
||||
return ok
|
||||
|
||||
} else if s.getOpts().Authorization != "" {
|
||||
return comparePasswords(s.getOpts().Authorization, c.opts.Authorization)
|
||||
} else if opts.Authorization != "" {
|
||||
return comparePasswords(opts.Authorization, c.opts.Authorization)
|
||||
|
||||
} else if s.getOpts().Username != "" {
|
||||
if s.getOpts().Username != c.opts.Username {
|
||||
} else if opts.Username != "" {
|
||||
if opts.Username != c.opts.Username {
|
||||
return false
|
||||
}
|
||||
return comparePasswords(s.getOpts().Password, c.opts.Password)
|
||||
return comparePasswords(opts.Password, c.opts.Password)
|
||||
}
|
||||
|
||||
return true
|
||||
@@ -86,10 +120,13 @@ func (s *Server) isClientAuthorized(c *client) bool {
|
||||
|
||||
// checkRouterAuth checks optional router authorization which can be nil or username/password.
|
||||
func (s *Server) isRouterAuthorized(c *client) bool {
|
||||
if s.getOpts().Cluster.Username != c.opts.Username {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
if opts.Cluster.Username != c.opts.Username {
|
||||
return false
|
||||
}
|
||||
return comparePasswords(s.getOpts().Cluster.Password, c.opts.Password)
|
||||
return comparePasswords(opts.Cluster.Password, c.opts.Password)
|
||||
}
|
||||
|
||||
// Support for bcrypt stored passwords and tokens.
|
||||
|
||||
58
server/auth_test.go
Normal file
58
server/auth_test.go
Normal file
@@ -0,0 +1,58 @@
|
||||
// Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUserCloneNilPermissions(t *testing.T) {
|
||||
user := &User{
|
||||
Username: "foo",
|
||||
Password: "bar",
|
||||
}
|
||||
|
||||
clone := user.clone()
|
||||
|
||||
if !reflect.DeepEqual(user, clone) {
|
||||
t.Fatalf("Cloned Users are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
user, clone)
|
||||
}
|
||||
|
||||
clone.Password = "baz"
|
||||
if reflect.DeepEqual(user, clone) {
|
||||
t.Fatal("Expected Users to be different")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUserClone(t *testing.T) {
|
||||
user := &User{
|
||||
Username: "foo",
|
||||
Password: "bar",
|
||||
Permissions: &Permissions{
|
||||
Publish: []string{"foo"},
|
||||
Subscribe: []string{"bar"},
|
||||
},
|
||||
}
|
||||
|
||||
clone := user.clone()
|
||||
|
||||
if !reflect.DeepEqual(user, clone) {
|
||||
t.Fatalf("Cloned Users are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
user, clone)
|
||||
}
|
||||
|
||||
clone.Permissions.Subscribe = []string{"baz"}
|
||||
if reflect.DeepEqual(user, clone) {
|
||||
t.Fatal("Expected Users to be different")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUserCloneNil(t *testing.T) {
|
||||
user := (*User)(nil)
|
||||
clone := user.clone()
|
||||
if clone != nil {
|
||||
t.Fatalf("Expected nil, got: %+v", clone)
|
||||
}
|
||||
}
|
||||
@@ -266,6 +266,9 @@ func (c *client) readLoop() {
|
||||
// Start read buffer.
|
||||
b := make([]byte, startBufSize)
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
for {
|
||||
n, err := nc.Read(b)
|
||||
if err != nil {
|
||||
@@ -306,7 +309,7 @@ func (c *client) readLoop() {
|
||||
wfc := cp.wfc
|
||||
cp.wfc = 0
|
||||
|
||||
cp.nc.SetWriteDeadline(time.Now().Add(s.getOpts().WriteDeadline))
|
||||
cp.nc.SetWriteDeadline(time.Now().Add(opts.WriteDeadline))
|
||||
err := cp.bw.Flush()
|
||||
cp.nc.SetWriteDeadline(time.Time{})
|
||||
if err != nil {
|
||||
|
||||
@@ -31,7 +31,9 @@ type Logger interface {
|
||||
// ConfigureLogger configures and sets the logger for the server.
|
||||
func (s *Server) ConfigureLogger() {
|
||||
var (
|
||||
log Logger
|
||||
log Logger
|
||||
|
||||
// Snapshot server options.
|
||||
opts = s.getOpts()
|
||||
)
|
||||
|
||||
@@ -86,12 +88,16 @@ func (s *Server) ReOpenLogFile() {
|
||||
s.Noticef("File log re-open ignored, no logger")
|
||||
return
|
||||
}
|
||||
if s.getOpts().LogFile == "" {
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
if opts.LogFile == "" {
|
||||
s.Noticef("File log re-open ignored, not a file logger")
|
||||
} else {
|
||||
fileLog := logger.NewFileLogger(s.getOpts().LogFile,
|
||||
s.getOpts().Logtime, s.getOpts().Debug, s.getOpts().Trace, true)
|
||||
s.SetLogger(fileLog, s.getOpts().Debug, s.getOpts().Trace)
|
||||
fileLog := logger.NewFileLogger(opts.LogFile,
|
||||
opts.Logtime, s.getOpts().Debug, opts.Trace, true)
|
||||
s.SetLogger(fileLog, s.getOpts().Debug, opts.Trace)
|
||||
s.Noticef("File log re-opened")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -457,7 +457,10 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// HandleVarz will process HTTP requests for server information.
|
||||
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
|
||||
v := &Varz{Info: &s.info, Options: s.getOpts(), MaxPayload: s.getOpts().MaxPayload, Start: s.start}
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
v := &Varz{Info: &s.info, Options: opts, MaxPayload: opts.MaxPayload, Start: s.start}
|
||||
v.Now = time.Now()
|
||||
v.Uptime = myUptime(time.Since(s.start))
|
||||
v.Port = v.Info.Port
|
||||
|
||||
@@ -72,6 +72,27 @@ type Options struct {
|
||||
WriteDeadline time.Duration `json:"-"`
|
||||
}
|
||||
|
||||
// Clone performs a deep copy of the Options struct, returning a new clone
|
||||
// with all values copied.
|
||||
func (o *Options) Clone() *Options {
|
||||
if o == nil {
|
||||
return nil
|
||||
}
|
||||
clone := &Options{}
|
||||
*clone = *o
|
||||
clone.Users = make([]*User, len(o.Users))
|
||||
for i, user := range o.Users {
|
||||
clone.Users[i] = user.clone()
|
||||
}
|
||||
clone.Routes = make([]*url.URL, len(o.Routes))
|
||||
for i, route := range o.Routes {
|
||||
routeCopy := &url.URL{}
|
||||
*routeCopy = *route
|
||||
clone.Routes[i] = routeCopy
|
||||
}
|
||||
return clone
|
||||
}
|
||||
|
||||
// Configuration file authorization section.
|
||||
type authorization struct {
|
||||
// Singles
|
||||
|
||||
@@ -694,3 +694,55 @@ func TestParseWriteDeadline(t *testing.T) {
|
||||
t.Fatalf("Expected write_deadline to be 2s, got %v", opts.WriteDeadline)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOptionsClone(t *testing.T) {
|
||||
opts := &Options{
|
||||
ConfigFile: "./configs/test.conf",
|
||||
Host: "localhost",
|
||||
Port: 2222,
|
||||
Username: "derek",
|
||||
Password: "spooky",
|
||||
AuthTimeout: 1.0,
|
||||
Debug: true,
|
||||
Trace: true,
|
||||
Logtime: false,
|
||||
HTTPPort: DEFAULT_HTTP_PORT,
|
||||
LogFile: "/tmp/gnatsd.log",
|
||||
PidFile: "/tmp/gnatsd.pid",
|
||||
ProfPort: 6789,
|
||||
Syslog: true,
|
||||
RemoteSyslog: "udp://foo.com:33",
|
||||
MaxControlLine: 2048,
|
||||
MaxPayload: 65536,
|
||||
MaxConn: 100,
|
||||
PingInterval: 60 * time.Second,
|
||||
MaxPingsOut: 3,
|
||||
Cluster: ClusterOpts{
|
||||
NoAdvertise: true,
|
||||
ConnectRetries: 2,
|
||||
},
|
||||
WriteDeadline: 3 * time.Second,
|
||||
Routes: []*url.URL{&url.URL{}},
|
||||
Users: []*User{&User{Username: "foo", Password: "bar"}},
|
||||
}
|
||||
|
||||
clone := opts.Clone()
|
||||
|
||||
if !reflect.DeepEqual(opts, clone) {
|
||||
t.Fatalf("Cloned Options are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
clone, opts)
|
||||
}
|
||||
|
||||
clone.Users[0].Password = "baz"
|
||||
if reflect.DeepEqual(opts, clone) {
|
||||
t.Fatal("Expected Options to be different")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOptionsCloneNil(t *testing.T) {
|
||||
opts := (*Options)(nil)
|
||||
clone := opts.Clone()
|
||||
if clone != nil {
|
||||
t.Fatalf("Expected nil, got: %+v", clone)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
// Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
@@ -9,7 +11,7 @@ import (
|
||||
|
||||
// FlagSnapshot captures the server options as specified by CLI flags at
|
||||
// startup. This should not be modified once the server has started.
|
||||
var FlagSnapshot = &Options{}
|
||||
var FlagSnapshot *Options
|
||||
|
||||
// option is a hot-swappable configuration setting.
|
||||
type option interface {
|
||||
@@ -32,6 +34,9 @@ func (t *traceOption) Apply(server *Server) {
|
||||
// changes. This returns an error if the server was not started with a config
|
||||
// file or an option which doesn't support hot-swapping was changed.
|
||||
func (s *Server) Reload() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.configFile == "" {
|
||||
return errors.New("Can only reload config when a file is provided using -c or --config")
|
||||
}
|
||||
@@ -63,7 +68,7 @@ func (s *Server) reloadOptions(newOpts *Options) error {
|
||||
// error.
|
||||
func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
||||
var (
|
||||
oldConfig = reflect.ValueOf(s.opts).Elem()
|
||||
oldConfig = reflect.ValueOf(s.getOpts()).Elem()
|
||||
newConfig = reflect.ValueOf(newOpts).Elem()
|
||||
diffOpts = []option{}
|
||||
)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
// Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
@@ -44,7 +46,7 @@ func TestConfigReloadUnsupported(t *testing.T) {
|
||||
processOptions(golden)
|
||||
|
||||
if err := os.Symlink("./configs/reload/test.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
t.Fatalf("Error creating symlink: %v (ensure you have privileges)", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
opts, err := ProcessConfigFile(config)
|
||||
@@ -63,7 +65,7 @@ func TestConfigReloadUnsupported(t *testing.T) {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/reload_unsupported.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
t.Fatalf("Error creating symlink: %v (ensure you have privileges)", err)
|
||||
}
|
||||
|
||||
// This should fail because `debug` cannot be changed.
|
||||
|
||||
@@ -239,8 +239,12 @@ func (s *Server) processImplicitRoute(info *Info) {
|
||||
s.Debugf("Error parsing URL from INFO: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
if info.AuthRequired {
|
||||
r.User = url.UserPassword(s.getOpts().Cluster.Username, s.getOpts().Cluster.Password)
|
||||
r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password)
|
||||
}
|
||||
s.startGoRoutine(func() { s.connectToRoute(r, false) })
|
||||
}
|
||||
@@ -306,9 +310,12 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
}
|
||||
|
||||
func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
didSolicit := rURL != nil
|
||||
r := &route{didSolicit: didSolicit}
|
||||
for _, route := range s.getOpts().Routes {
|
||||
for _, route := range opts.Routes {
|
||||
if rURL != nil && (strings.ToLower(rURL.Host) == strings.ToLower(route.Host)) {
|
||||
r.routeType = Explicit
|
||||
}
|
||||
@@ -340,7 +347,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
// Check for TLS
|
||||
if tlsRequired {
|
||||
// Copy off the config to add in ServerName if we
|
||||
tlsConfig := util.CloneTLSConfig(s.getOpts().Cluster.TLSConfig)
|
||||
tlsConfig := util.CloneTLSConfig(opts.Cluster.TLSConfig)
|
||||
|
||||
// If we solicited, we will act like the client, otherwise the server.
|
||||
if didSolicit {
|
||||
@@ -357,7 +364,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
conn := c.nc.(*tls.Conn)
|
||||
|
||||
// Setup the timeout
|
||||
ttl := secondsToDuration(s.getOpts().Cluster.TLSTimeout)
|
||||
ttl := secondsToDuration(opts.Cluster.TLSTimeout)
|
||||
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
|
||||
conn.SetReadDeadline(time.Now().Add(ttl))
|
||||
|
||||
@@ -419,7 +426,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
|
||||
// Check for Auth required state for incoming connections.
|
||||
if authRequired && !didSolicit {
|
||||
ttl := secondsToDuration(s.getOpts().Cluster.AuthTimeout)
|
||||
ttl := secondsToDuration(opts.Cluster.AuthTimeout)
|
||||
c.setAuthTimer(ttl)
|
||||
}
|
||||
|
||||
@@ -592,13 +599,16 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
|
||||
}
|
||||
|
||||
func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
hp := net.JoinHostPort(s.getOpts().Cluster.Host, strconv.Itoa(s.getOpts().Cluster.Port))
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
hp := net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(opts.Cluster.Port))
|
||||
s.Noticef("Listening for route connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
// We need to close this channel to avoid a deadlock
|
||||
close(ch)
|
||||
s.Fatalf("Error listening on router port: %d - %v", s.getOpts().Cluster.Port, e)
|
||||
s.Fatalf("Error listening on router port: %d - %v", opts.Cluster.Port, e)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -652,13 +662,16 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) {
|
||||
// clients know about us.
|
||||
clientConnectURLs := s.getClientConnectURLs()
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Check for TLSConfig
|
||||
tlsReq := s.getOpts().Cluster.TLSConfig != nil
|
||||
tlsReq := opts.Cluster.TLSConfig != nil
|
||||
info := Info{
|
||||
ID: s.info.ID,
|
||||
Version: s.info.Version,
|
||||
Host: s.getOpts().Cluster.Host,
|
||||
Port: s.getOpts().Cluster.Port,
|
||||
Host: opts.Cluster.Host,
|
||||
Port: opts.Cluster.Port,
|
||||
AuthRequired: false,
|
||||
TLSRequired: tlsReq,
|
||||
SSLRequired: tlsReq,
|
||||
@@ -667,7 +680,7 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) {
|
||||
ClientConnectURLs: clientConnectURLs,
|
||||
}
|
||||
// Check for Auth items
|
||||
if s.getOpts().Cluster.Username != "" {
|
||||
if opts.Cluster.Username != "" {
|
||||
info.AuthRequired = true
|
||||
}
|
||||
s.routeInfo = info
|
||||
@@ -692,6 +705,9 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) {
|
||||
}
|
||||
|
||||
func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
defer s.grWG.Done()
|
||||
attempts := 0
|
||||
for s.isRunning() && rURL != nil {
|
||||
@@ -700,11 +716,11 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
|
||||
if err != nil {
|
||||
s.Debugf("Error trying to connect to route: %v", err)
|
||||
if !tryForEver {
|
||||
if s.getOpts().Cluster.ConnectRetries <= 0 {
|
||||
if opts.Cluster.ConnectRetries <= 0 {
|
||||
return
|
||||
}
|
||||
attempts++
|
||||
if attempts > s.getOpts().Cluster.ConnectRetries {
|
||||
if attempts > opts.Cluster.ConnectRetries {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -237,8 +237,11 @@ func (s *Server) Start() {
|
||||
s.grRunning = true
|
||||
s.grMu.Unlock()
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Log the pid to a file
|
||||
if s.getOpts().PidFile != _EMPTY_ {
|
||||
if opts.PidFile != _EMPTY_ {
|
||||
s.logPid()
|
||||
}
|
||||
|
||||
@@ -253,14 +256,14 @@ func (s *Server) Start() {
|
||||
clientListenReady := make(chan struct{})
|
||||
|
||||
// Start up routing as well if needed.
|
||||
if s.getOpts().Cluster.Port != 0 {
|
||||
if opts.Cluster.Port != 0 {
|
||||
s.startGoRoutine(func() {
|
||||
s.StartRouting(clientListenReady)
|
||||
})
|
||||
}
|
||||
|
||||
// Pprof http endpoint for the profiler.
|
||||
if s.getOpts().ProfPort != 0 {
|
||||
if opts.ProfPort != 0 {
|
||||
s.StartProfiler()
|
||||
}
|
||||
|
||||
@@ -356,7 +359,10 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
}
|
||||
}()
|
||||
|
||||
hp := net.JoinHostPort(s.getOpts().Host, strconv.Itoa(s.getOpts().Port))
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
|
||||
s.Noticef("Listening for client connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
@@ -365,7 +371,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
}
|
||||
|
||||
// Alert of TLS enabled.
|
||||
if s.getOpts().TLSConfig != nil {
|
||||
if opts.TLSConfig != nil {
|
||||
s.Noticef("TLS required for client connections")
|
||||
}
|
||||
|
||||
@@ -378,7 +384,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
|
||||
// If server was started with RANDOM_PORT (-1), opts.Port would be equal
|
||||
// to 0 at the beginning this function. So we need to get the actual port
|
||||
if s.getOpts().Port == 0 {
|
||||
if opts.Port == 0 {
|
||||
// Write resolved port back to options.
|
||||
_, port, err := net.SplitHostPort(l.Addr().String())
|
||||
if err != nil {
|
||||
@@ -392,7 +398,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.getOpts().Port = portNum
|
||||
opts.Port = portNum
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
@@ -430,8 +436,11 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
|
||||
// StartProfiler is called to enable dynamic profiling.
|
||||
func (s *Server) StartProfiler() {
|
||||
s.Noticef("Starting profiling on http port %d", s.getOpts().ProfPort)
|
||||
hp := net.JoinHostPort(s.getOpts().Host, strconv.Itoa(s.getOpts().ProfPort))
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
s.Noticef("Starting profiling on http port %d", opts.ProfPort)
|
||||
hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.ProfPort))
|
||||
go func() {
|
||||
err := http.ListenAndServe(hp, nil)
|
||||
if err != nil {
|
||||
@@ -454,15 +463,18 @@ func (s *Server) StartHTTPSMonitoring() {
|
||||
|
||||
// StartMonitoring starts the HTTP or HTTPs server if needed.
|
||||
func (s *Server) StartMonitoring() error {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Specifying both HTTP and HTTPS ports is a misconfiguration
|
||||
if s.getOpts().HTTPPort != 0 && s.getOpts().HTTPSPort != 0 {
|
||||
return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", s.getOpts().HTTPPort, s.getOpts().HTTPSPort)
|
||||
if opts.HTTPPort != 0 && opts.HTTPSPort != 0 {
|
||||
return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", opts.HTTPPort, opts.HTTPSPort)
|
||||
}
|
||||
var err error
|
||||
if s.getOpts().HTTPPort != 0 {
|
||||
if opts.HTTPPort != 0 {
|
||||
err = s.startMonitoring(false)
|
||||
} else if s.getOpts().HTTPSPort != 0 {
|
||||
if s.getOpts().TLSConfig == nil {
|
||||
} else if opts.HTTPSPort != 0 {
|
||||
if opts.TLSConfig == nil {
|
||||
return fmt.Errorf("TLS cert and key required for HTTPS")
|
||||
}
|
||||
err = s.startMonitoring(true)
|
||||
@@ -482,6 +494,8 @@ const (
|
||||
|
||||
// Start the monitoring server
|
||||
func (s *Server) startMonitoring(secure bool) error {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Used to track HTTP requests
|
||||
s.httpReqStats = map[string]uint64{
|
||||
@@ -499,14 +513,14 @@ func (s *Server) startMonitoring(secure bool) error {
|
||||
)
|
||||
|
||||
if secure {
|
||||
hp = net.JoinHostPort(s.getOpts().HTTPHost, strconv.Itoa(s.getOpts().HTTPSPort))
|
||||
hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(opts.HTTPSPort))
|
||||
s.Noticef("Starting https monitor on %s", hp)
|
||||
config := util.CloneTLSConfig(s.getOpts().TLSConfig)
|
||||
config := util.CloneTLSConfig(opts.TLSConfig)
|
||||
config.ClientAuth = tls.NoClientCert
|
||||
httpListener, err = tls.Listen("tcp", hp, config)
|
||||
|
||||
} else {
|
||||
hp = net.JoinHostPort(s.getOpts().HTTPHost, strconv.Itoa(s.getOpts().HTTPPort))
|
||||
hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(opts.HTTPPort))
|
||||
s.Noticef("Starting http monitor on %s", hp)
|
||||
httpListener, err = net.Listen("tcp", hp)
|
||||
}
|
||||
@@ -600,9 +614,13 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
s.mu.Unlock()
|
||||
return c
|
||||
}
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// If there is a max connections specified, check that adding
|
||||
// this new client would not push us over the max
|
||||
if s.getOpts().MaxConn > 0 && len(s.clients) >= s.getOpts().MaxConn {
|
||||
if opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn {
|
||||
s.mu.Unlock()
|
||||
c.maxConnExceeded()
|
||||
return nil
|
||||
@@ -616,11 +634,11 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
// Check for TLS
|
||||
if tlsRequired {
|
||||
c.Debugf("Starting TLS client connection handshake")
|
||||
c.nc = tls.Server(c.nc, s.getOpts().TLSConfig)
|
||||
c.nc = tls.Server(c.nc, opts.TLSConfig)
|
||||
conn := c.nc.(*tls.Conn)
|
||||
|
||||
// Setup the timeout
|
||||
ttl := secondsToDuration(s.getOpts().TLSTimeout)
|
||||
ttl := secondsToDuration(opts.TLSTimeout)
|
||||
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
|
||||
conn.SetReadDeadline(time.Now().Add(ttl))
|
||||
|
||||
@@ -649,7 +667,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
// the race where the timer fires during the handshake and causes the
|
||||
// server to write bad data to the socket. See issue #432.
|
||||
if authRequired {
|
||||
c.setAuthTimer(secondsToDuration(s.getOpts().AuthTimeout))
|
||||
c.setAuthTimer(secondsToDuration(opts.AuthTimeout))
|
||||
}
|
||||
|
||||
if tlsRequired {
|
||||
@@ -856,10 +874,13 @@ func (s *Server) Addr() net.Addr {
|
||||
// and, if routing is enabled, route connections. If after the duration
|
||||
// `dur` the server is still not ready, returns `false`.
|
||||
func (s *Server) ReadyForConnections(dur time.Duration) bool {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
end := time.Now().Add(dur)
|
||||
for time.Now().Before(end) {
|
||||
s.mu.Lock()
|
||||
ok := s.listener != nil && (s.getOpts().Cluster.Port == 0 || s.routeListener != nil)
|
||||
ok := s.listener != nil && (opts.Cluster.Port == 0 || s.routeListener != nil)
|
||||
s.mu.Unlock()
|
||||
if ok {
|
||||
return true
|
||||
@@ -889,13 +910,16 @@ func (s *Server) startGoRoutine(f func()) {
|
||||
// port based on the server options' Host and Port. If the Host corresponds to
|
||||
// "any" interfaces, this call returns the list of resolved IP addresses.
|
||||
func (s *Server) getClientConnectURLs() []string {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
sPort := strconv.Itoa(s.getOpts().Port)
|
||||
sPort := strconv.Itoa(opts.Port)
|
||||
urls := make([]string, 0, 1)
|
||||
|
||||
ipAddr, err := net.ResolveIPAddr("ip", s.getOpts().Host)
|
||||
ipAddr, err := net.ResolveIPAddr("ip", opts.Host)
|
||||
// If the host is "any" (0.0.0.0 or ::), get specific IPs from available
|
||||
// interfaces.
|
||||
if err == nil && ipAddr.IP.IsUnspecified() {
|
||||
@@ -926,10 +950,10 @@ func (s *Server) getClientConnectURLs() []string {
|
||||
// and not add any address in the array in the loop above, and we
|
||||
// ended-up returning 0.0.0.0, which is problematic for Windows clients.
|
||||
// Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
|
||||
if s.getOpts().Host == "0.0.0.0" || s.getOpts().Host == "::" {
|
||||
s.Errorf("Address %q can not be resolved properly", s.getOpts().Host)
|
||||
if opts.Host == "0.0.0.0" || opts.Host == "::" {
|
||||
s.Errorf("Address %q can not be resolved properly", opts.Host)
|
||||
} else {
|
||||
urls = append(urls, net.JoinHostPort(s.getOpts().Host, sPort))
|
||||
urls = append(urls, net.JoinHostPort(opts.Host, sPort))
|
||||
}
|
||||
}
|
||||
return urls
|
||||
|
||||
Reference in New Issue
Block a user