mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Merge pull request #515 from nats-io/users_reload
Implement users/authorization support for config reload
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
@@ -72,6 +73,9 @@ func (s *Server) configureAuthorization() {
|
||||
s.info.AuthRequired = true
|
||||
} else if opts.Username != "" || opts.Authorization != "" {
|
||||
s.info.AuthRequired = true
|
||||
} else {
|
||||
s.users = nil
|
||||
s.info.AuthRequired = false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,6 +136,35 @@ func (s *Server) isRouterAuthorized(c *client) bool {
|
||||
return comparePasswords(opts.Cluster.Password, c.opts.Password)
|
||||
}
|
||||
|
||||
// removeUnauthorizedSubs removes any subscriptions the client has that are no
|
||||
// longer authorized, e.g. due to a config reload.
|
||||
func (s *Server) removeUnauthorizedSubs(c *client) {
|
||||
c.mu.Lock()
|
||||
if c.perms == nil {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
subs := make(map[string]*subscription, len(c.subs))
|
||||
for sid, sub := range c.subs {
|
||||
subs[sid] = sub
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
for sid, sub := range subs {
|
||||
if !c.canSubscribe(sub.subject) {
|
||||
_ = s.sl.Remove(sub)
|
||||
c.mu.Lock()
|
||||
delete(c.subs, sid)
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
|
||||
sub.subject, sub.sid))
|
||||
s.Noticef("Removed sub %q for user %q - not authorized",
|
||||
string(sub.subject), c.opts.Username)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Support for bcrypt stored passwords and tokens.
|
||||
const bcryptPrefix = "$2a$"
|
||||
|
||||
|
||||
@@ -224,6 +224,10 @@ func (c *client) initClient() {
|
||||
// into the client.
|
||||
func (c *client) RegisterUser(user *User) {
|
||||
if user.Permissions == nil {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
c.mu.Lock()
|
||||
c.perms = nil
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -775,14 +779,12 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
}
|
||||
|
||||
// Check permissions if applicable.
|
||||
if c.perms != nil {
|
||||
r := c.perms.sub.Match(string(sub.subject))
|
||||
if len(r.psubs) == 0 {
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject)
|
||||
return nil
|
||||
}
|
||||
if !c.canSubscribe(sub.subject) {
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - User %q, Subject %q, SID %s",
|
||||
c.opts.Username, sub.subject, sub.sid)
|
||||
return nil
|
||||
}
|
||||
|
||||
// We can have two SUB protocols coming from a route due to some
|
||||
@@ -813,6 +815,15 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// canSubscribe determines if the client is authorized to subscribe to the
|
||||
// given subject. Assumes caller is holding lock.
|
||||
func (c *client) canSubscribe(sub []byte) bool {
|
||||
if c.perms == nil {
|
||||
return true
|
||||
}
|
||||
return len(c.perms.sub.Match(string(sub)).psubs) > 0
|
||||
}
|
||||
|
||||
func (c *client) unsubscribe(sub *subscription) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@@ -1011,8 +1022,6 @@ func (c *client) processMsg(msg []byte) {
|
||||
c.traceMsg(msg)
|
||||
}
|
||||
|
||||
// defintely
|
||||
|
||||
// Disallow publish to _SYS.>, these are reserved for internals.
|
||||
if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 &&
|
||||
c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' &&
|
||||
|
||||
@@ -604,7 +604,7 @@ func TestAuthorizationTimeout(t *testing.T) {
|
||||
if _, err := client.ReadString('\n'); err != nil {
|
||||
t.Fatalf("Error receiving info from server: %v\n", err)
|
||||
}
|
||||
time.Sleep(2 * secondsToDuration(serverOptions.AuthTimeout))
|
||||
time.Sleep(3 * secondsToDuration(serverOptions.AuthTimeout))
|
||||
l, err := client.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving info from server: %v\n", err)
|
||||
@@ -686,6 +686,7 @@ func TestTLSCloseClientConnection(t *testing.T) {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
opts.TLSTimeout = 100
|
||||
opts.NoLog = true
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
listen: localhost:4443
|
||||
@@ -1,6 +1,6 @@
|
||||
# Copyright 2016 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: 127.0.0.1:4222
|
||||
listen: localhost:4443
|
||||
|
||||
authorization {
|
||||
users = [
|
||||
|
||||
38
server/configs/reload/authorization_1.conf
Normal file
38
server/configs/reload/authorization_1.conf
Normal file
@@ -0,0 +1,38 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
# Our role based permissions.
|
||||
|
||||
# Superuser can do anything.
|
||||
super_user = {
|
||||
publish = ">"
|
||||
subscribe = ">"
|
||||
}
|
||||
# Can do requests on foo or bar, and subscribe to anything
|
||||
# that is a response to an _INBOX.
|
||||
#
|
||||
# Notice that authorization filters can be singletons or arrays.
|
||||
req_pub_user = {
|
||||
publish = ["req.foo", "req.bar"]
|
||||
subscribe = "_INBOX.>"
|
||||
}
|
||||
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
default_permissions: $default_user
|
||||
|
||||
# Users listed with persmissions.
|
||||
users = [
|
||||
{user: alice, password: foo, permissions: $super_user}
|
||||
{user: bob, password: bar, permissions: $req_pub_user}
|
||||
{user: susan, password: baz}
|
||||
]
|
||||
}
|
||||
38
server/configs/reload/authorization_2.conf
Normal file
38
server/configs/reload/authorization_2.conf
Normal file
@@ -0,0 +1,38 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
# Our role based permissions.
|
||||
|
||||
# Superuser can do anything.
|
||||
super_user = {
|
||||
publish = ">"
|
||||
subscribe = ">"
|
||||
}
|
||||
# Can do requests on _INBOX.foo.bar, and subscribe to anything
|
||||
# that is a response to an _INBOX.foo.
|
||||
#
|
||||
# Notice that authorization filters can be singletons or arrays.
|
||||
req_pub_user = {
|
||||
publish = ["_INBOX.foo.bar"]
|
||||
subscribe = "_INBOX.foo.>"
|
||||
}
|
||||
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
default_permissions: $default_user
|
||||
|
||||
# Users listed with persmissions.
|
||||
users = [
|
||||
{user: alice, password: foo, permissions: $super_user}
|
||||
{user: bob, password: bar, permissions: $req_pub_user}
|
||||
{user: susan, password: baz}
|
||||
]
|
||||
}
|
||||
@@ -1 +1,4 @@
|
||||
listen: localhost:-1
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
@@ -1,2 +1,4 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# Invalid config file
|
||||
trace:
|
||||
|
||||
12
server/configs/reload/multiple_users_1.conf
Normal file
12
server/configs/reload/multiple_users_1.conf
Normal file
@@ -0,0 +1,12 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
users = [
|
||||
{user: alice, password: foo}
|
||||
{user: bob, password: bar}
|
||||
]
|
||||
timeout: 0.5
|
||||
}
|
||||
12
server/configs/reload/multiple_users_2.conf
Normal file
12
server/configs/reload/multiple_users_2.conf
Normal file
@@ -0,0 +1,12 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
users = [
|
||||
{user: alice, password: baz}
|
||||
{user: bob, password: bar}
|
||||
]
|
||||
timeout: 0.5
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# logging options
|
||||
debug: true # enable on reload
|
||||
trace: true # enable on reload
|
||||
@@ -11,3 +13,10 @@ tls {
|
||||
ca_file: "../test/configs/certs/ca.pem"
|
||||
verify: true
|
||||
}
|
||||
|
||||
# Enable authorization on reload
|
||||
authorization {
|
||||
user: tyler
|
||||
password: T0pS3cr3t
|
||||
timeout: 2
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# logging options
|
||||
debug: false
|
||||
trace: true
|
||||
|
||||
9
server/configs/reload/single_user_authentication_1.conf
Normal file
9
server/configs/reload/single_user_authentication_1.conf
Normal file
@@ -0,0 +1,9 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
user: tyler
|
||||
password: T0pS3cr3t
|
||||
}
|
||||
9
server/configs/reload/single_user_authentication_2.conf
Normal file
9
server/configs/reload/single_user_authentication_2.conf
Normal file
@@ -0,0 +1,9 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
user: derek
|
||||
password: passw0rd
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# logging options
|
||||
debug: false
|
||||
trace: false
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# Simple TLS config file
|
||||
|
||||
listen: localhost:-1
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
tls {
|
||||
cert_file: "./configs/certs/server.pem"
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# Simple TLS config file
|
||||
|
||||
listen: localhost:-1
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
tls {
|
||||
cert_file: "./configs/certs/cert.new.pem"
|
||||
|
||||
8
server/configs/reload/token_authentication_1.conf
Normal file
8
server/configs/reload/token_authentication_1.conf
Normal file
@@ -0,0 +1,8 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
token: T0pS3cr3t
|
||||
}
|
||||
8
server/configs/reload/token_authentication_2.conf
Normal file
8
server/configs/reload/token_authentication_2.conf
Normal file
@@ -0,0 +1,8 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:-1
|
||||
log_file: "/tmp/gnatsd.log"
|
||||
|
||||
authorization {
|
||||
token: passw0rd
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# Simple config file
|
||||
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
|
||||
# Simple TLS config file
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
tls {
|
||||
cert_file: "./configs/certs/server.pem"
|
||||
key_file: "./configs/certs/key.pem"
|
||||
timeout: 2
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
|
||||
# Simple TLS config file
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
tls {
|
||||
cert_file: "./configs/certs/cert.new.pem"
|
||||
key_file: "./configs/certs/key.new.pem"
|
||||
ca_file: "./configs/certs/cert.new.pem"
|
||||
verify: true
|
||||
timeout: 2
|
||||
}
|
||||
201
server/reload.go
201
server/reload.go
@@ -18,37 +18,70 @@ var FlagSnapshot *Options
|
||||
type option interface {
|
||||
// Apply the server option.
|
||||
Apply(server *Server)
|
||||
|
||||
// IsLoggingChange indicates if this option requires reloading the logger.
|
||||
IsLoggingChange() bool
|
||||
|
||||
// IsAuthChange indicates if this option requires reloading authorization.
|
||||
IsAuthChange() bool
|
||||
}
|
||||
|
||||
// loggingOption is a base struct that provides default option behaviors for
|
||||
// logging-related options.
|
||||
type loggingOption struct{}
|
||||
|
||||
func (l loggingOption) IsLoggingChange() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (l loggingOption) IsAuthChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// traceOption implements the option interface for the `trace` setting.
|
||||
type traceOption struct {
|
||||
loggingOption
|
||||
newValue bool
|
||||
}
|
||||
|
||||
// Apply the tracing change by reconfiguring the server's logger.
|
||||
// Apply is a no-op because authorization will be reloaded after options are
|
||||
// applied
|
||||
func (t *traceOption) Apply(server *Server) {
|
||||
server.ConfigureLogger()
|
||||
server.Noticef("Reloaded: trace = %v", t.newValue)
|
||||
}
|
||||
|
||||
// debugOption implements the option interface for the `debug` setting.
|
||||
type debugOption struct {
|
||||
loggingOption
|
||||
newValue bool
|
||||
}
|
||||
|
||||
// Apply the debug change by reconfiguring the server's logger.
|
||||
// Apply is a no-op because authorization will be reloaded after options are
|
||||
// applied
|
||||
func (d *debugOption) Apply(server *Server) {
|
||||
server.ConfigureLogger()
|
||||
server.Noticef("Reloaded: debug = %v", d.newValue)
|
||||
}
|
||||
|
||||
// noopOption is a base struct that provides default no-op behaviors.
|
||||
type noopOption struct{}
|
||||
|
||||
func (n noopOption) IsLoggingChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (n noopOption) IsAuthChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// tlsOption implements the option interface for the `tls` setting.
|
||||
type tlsOption struct {
|
||||
noopOption
|
||||
newValue *tls.Config
|
||||
}
|
||||
|
||||
// Apply the tls change.
|
||||
func (t *tlsOption) Apply(server *Server) {
|
||||
server.mu.Lock()
|
||||
tlsRequired := t.newValue != nil
|
||||
server.info.TLSRequired = tlsRequired
|
||||
message := "disabled"
|
||||
@@ -57,24 +90,109 @@ func (t *tlsOption) Apply(server *Server) {
|
||||
message = "enabled"
|
||||
}
|
||||
server.generateServerInfoJSON()
|
||||
server.mu.Unlock()
|
||||
server.Noticef("Reloaded: tls = %s", message)
|
||||
}
|
||||
|
||||
// tlsTimeoutOption implements the option interface for the tls `timeout`
|
||||
// setting.
|
||||
type tlsTimeoutOption struct {
|
||||
noopOption
|
||||
newValue float64
|
||||
}
|
||||
|
||||
// Apply is a no-op because the timeout will be reloaded after options are
|
||||
// applied.
|
||||
func (t *tlsTimeoutOption) Apply(server *Server) {
|
||||
server.Noticef("Reloaded: tls timeout = %v", t.newValue)
|
||||
}
|
||||
|
||||
// authOption is a base struct that provides default option behaviors.
|
||||
type authOption struct{}
|
||||
|
||||
func (o authOption) IsLoggingChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (o authOption) IsAuthChange() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// usernameOption implements the option interface for the `username` setting.
|
||||
type usernameOption struct {
|
||||
authOption
|
||||
}
|
||||
|
||||
// Apply is a no-op because authorization will be reloaded after options are
|
||||
// applied.
|
||||
func (u *usernameOption) Apply(server *Server) {
|
||||
server.Noticef("Reloaded: authorization username")
|
||||
}
|
||||
|
||||
// passwordOption implements the option interface for the `password` setting.
|
||||
type passwordOption struct {
|
||||
authOption
|
||||
}
|
||||
|
||||
// Apply is a no-op because authorization will be reloaded after options are
|
||||
// applied.
|
||||
func (p *passwordOption) Apply(server *Server) {
|
||||
server.Noticef("Reloaded: authorization password")
|
||||
}
|
||||
|
||||
// authorizationOption implements the option interface for the `token`
|
||||
// authorization setting.
|
||||
type authorizationOption struct {
|
||||
authOption
|
||||
}
|
||||
|
||||
// Apply is a no-op because authorization will be reloaded after options are
|
||||
// applied.
|
||||
func (a *authorizationOption) Apply(server *Server) {
|
||||
server.Noticef("Reloaded: authorization token")
|
||||
}
|
||||
|
||||
// authTimeoutOption implements the option interface for the authorization
|
||||
// `timeout` setting.
|
||||
type authTimeoutOption struct {
|
||||
noopOption // Not authOption because this is a no-op; will be reloaded with options.
|
||||
newValue float64
|
||||
}
|
||||
|
||||
// Apply is a no-op because the timeout will be reloaded after options are
|
||||
// applied.
|
||||
func (a *authTimeoutOption) Apply(server *Server) {
|
||||
server.Noticef("Reloaded: authorization timeout = %v", a.newValue)
|
||||
}
|
||||
|
||||
// usersOption implements the option interface for the authorization `users`
|
||||
// setting.
|
||||
type usersOption struct {
|
||||
authOption
|
||||
newValue []*User
|
||||
}
|
||||
|
||||
func (u *usersOption) Apply(server *Server) {
|
||||
server.Noticef("Reloaded: authorization users")
|
||||
}
|
||||
|
||||
// Reload reads the current configuration file and applies any supported
|
||||
// changes. This returns an error if the server was not started with a config
|
||||
// file or an option which doesn't support hot-swapping was changed.
|
||||
func (s *Server) Reload() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.configFile == "" {
|
||||
s.mu.Unlock()
|
||||
return errors.New("Can only reload config when a file is provided using -c or --config")
|
||||
}
|
||||
newOpts, err := ProcessConfigFile(s.configFile)
|
||||
if err != nil {
|
||||
s.mu.Unlock()
|
||||
// TODO: Dump previous good config to a .bak file?
|
||||
return fmt.Errorf("Config reload failed: %s", err)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// Apply flags over config file settings.
|
||||
newOpts = MergeOptions(newOpts, FlagSnapshot)
|
||||
processOptions(newOpts)
|
||||
@@ -115,31 +233,34 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
||||
}
|
||||
switch strings.ToLower(field.Name) {
|
||||
case "trace":
|
||||
diffOpts = append(diffOpts, &traceOption{newValue.(bool)})
|
||||
diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)})
|
||||
case "debug":
|
||||
diffOpts = append(diffOpts, &debugOption{newValue.(bool)})
|
||||
diffOpts = append(diffOpts, &debugOption{newValue: newValue.(bool)})
|
||||
case "tlsconfig":
|
||||
diffOpts = append(diffOpts, &tlsOption{newValue.(*tls.Config)})
|
||||
diffOpts = append(diffOpts, &tlsOption{newValue: newValue.(*tls.Config)})
|
||||
case "tlstimeout":
|
||||
// TLSTimeout change is picked up when Options is swapped.
|
||||
continue
|
||||
diffOpts = append(diffOpts, &tlsTimeoutOption{newValue: newValue.(float64)})
|
||||
case "username":
|
||||
diffOpts = append(diffOpts, &usernameOption{})
|
||||
case "password":
|
||||
diffOpts = append(diffOpts, &passwordOption{})
|
||||
case "authorization":
|
||||
diffOpts = append(diffOpts, &authorizationOption{})
|
||||
case "authtimeout":
|
||||
diffOpts = append(diffOpts, &authTimeoutOption{newValue: newValue.(float64)})
|
||||
case "users":
|
||||
diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)})
|
||||
case "port":
|
||||
// check to see if newValue == 0 and continue if so.
|
||||
if newValue == 0 {
|
||||
// ignore RANDOM_PORT
|
||||
continue
|
||||
} else {
|
||||
return nil, fmt.Errorf("Config reload not supported for %s", field.Name)
|
||||
}
|
||||
case "http_port", "https_port":
|
||||
// check to see if newValue == -1 (RANDOM_PORT for http/https monitoring port)
|
||||
if newValue == -1 {
|
||||
continue
|
||||
}
|
||||
fallthrough
|
||||
default:
|
||||
// Bail out if attempting to reload any unsupported options.
|
||||
return nil, fmt.Errorf("Config reload not supported for %s: old=%v, new=%v", field.Name, oldValue, newValue)
|
||||
return nil, fmt.Errorf("Config reload not supported for %s: old=%v, new=%v",
|
||||
field.Name, oldValue, newValue)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,9 +268,51 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
||||
}
|
||||
|
||||
func (s *Server) applyOptions(opts []option) {
|
||||
var (
|
||||
reloadLogging = false
|
||||
reloadAuth = false
|
||||
)
|
||||
for _, opt := range opts {
|
||||
opt.Apply(s)
|
||||
if opt.IsLoggingChange() {
|
||||
reloadLogging = true
|
||||
}
|
||||
if opt.IsAuthChange() {
|
||||
reloadAuth = true
|
||||
}
|
||||
}
|
||||
|
||||
if reloadLogging {
|
||||
s.ConfigureLogger()
|
||||
}
|
||||
if reloadAuth {
|
||||
s.reloadAuthorization()
|
||||
}
|
||||
|
||||
s.Noticef("Reloaded server configuration")
|
||||
}
|
||||
|
||||
// reloadAuthorization reconfigures the server authorization settings,
|
||||
// disconnects any clients who are no longer authorized, and removes any
|
||||
// unauthorized subscriptions.
|
||||
func (s *Server) reloadAuthorization() {
|
||||
s.mu.Lock()
|
||||
s.configureAuthorization()
|
||||
s.generateServerInfoJSON()
|
||||
clients := make(map[uint64]*client, len(s.clients))
|
||||
for i, client := range s.clients {
|
||||
clients[i] = client
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, client := range clients {
|
||||
// Disconnect any unauthorized clients.
|
||||
if !s.isClientAuthorized(client) {
|
||||
client.authViolation()
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove any unauthorized subscriptions.
|
||||
s.removeUnauthorizedSubs(client)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -216,6 +217,18 @@ func TestConfigReload(t *testing.T) {
|
||||
if !server.info.TLSVerify {
|
||||
t.Fatal("Expected TLSVerify to be true")
|
||||
}
|
||||
if updated.Username != "tyler" {
|
||||
t.Fatalf("Username is incorrect.\nexpected: tyler\ngot: %s", updated.Username)
|
||||
}
|
||||
if updated.Password != "T0pS3cr3t" {
|
||||
t.Fatalf("Password is incorrect.\nexpected: T0pS3cr3t\ngot: %s", updated.Password)
|
||||
}
|
||||
if updated.AuthTimeout != 2 {
|
||||
t.Fatalf("AuthTimeout is incorrect.\nexpected: 2\ngot: %f", updated.AuthTimeout)
|
||||
}
|
||||
if !server.info.AuthRequired {
|
||||
t.Fatal("Expected AuthRequired to be true")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload supports TLS config changes. Test this by starting a server
|
||||
@@ -402,3 +415,827 @@ func TestConfigReloadDisableTLS(t *testing.T) {
|
||||
}
|
||||
nc.Close()
|
||||
}
|
||||
|
||||
// Ensure Reload supports single user authentication config changes. Test this
|
||||
// by starting a server with authentication enabled, connect to it to verify,
|
||||
// reload config using a different username/password, ensure reconnect fails,
|
||||
// then ensure reconnect succeeds when using the correct credentials.
|
||||
func TestConfigReloadRotateUserAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/single_user_authentication_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.UserInfo("tyler", "T0pS3cr3t"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
disconnected := make(chan struct{})
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
nc.SetDisconnectHandler(func(*nats.Conn) {
|
||||
disconnected <- struct{}{}
|
||||
})
|
||||
|
||||
// Change user credentials.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/single_user_authentication_2.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting fails.
|
||||
if _, err := nats.Connect(addr, nats.UserInfo("tyler", "T0pS3cr3t")); err == nil {
|
||||
t.Fatal("Expected connect to fail")
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds when using new credentials.
|
||||
conn, err := nats.Connect(addr, nats.UserInfo("derek", "passw0rd"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Ensure the previous connection received an authorization error.
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if err != nats.ErrAuthorization {
|
||||
t.Fatalf("Expected ErrAuthorization, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected authorization error")
|
||||
}
|
||||
|
||||
// Ensure the previous connection was disconnected.
|
||||
select {
|
||||
case <-disconnected:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected connection to be disconnected")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload supports enabling single user authentication. Test this by
|
||||
// starting a server with authentication disabled, connect to it to verify,
|
||||
// reload config using with a username/password, ensure reconnect fails, then
|
||||
// ensure reconnect succeeds when using the correct credentials.
|
||||
func TestConfigReloadEnableUserAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
disconnected := make(chan struct{})
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
nc.SetDisconnectHandler(func(*nats.Conn) {
|
||||
disconnected <- struct{}{}
|
||||
})
|
||||
|
||||
// Enable authentication.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/single_user_authentication_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting fails.
|
||||
if _, err := nats.Connect(addr); err == nil {
|
||||
t.Fatal("Expected connect to fail")
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds when using new credentials.
|
||||
conn, err := nats.Connect(addr, nats.UserInfo("tyler", "T0pS3cr3t"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Ensure the previous connection received an authorization error.
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if err != nats.ErrAuthorization {
|
||||
t.Fatalf("Expected ErrAuthorization, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected authorization error")
|
||||
}
|
||||
|
||||
// Ensure the previous connection was disconnected.
|
||||
select {
|
||||
case <-disconnected:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected connection to be disconnected")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload supports disabling single user authentication. Test this by
|
||||
// starting a server with authentication enabled, connect to it to verify,
|
||||
// reload config using with authentication disabled, then ensure connecting
|
||||
// with no credentials succeeds.
|
||||
func TestConfigReloadDisableUserAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/single_user_authentication_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.UserInfo("tyler", "T0pS3cr3t"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
t.Fatalf("Client received an unexpected error: %v", err)
|
||||
})
|
||||
|
||||
// Disable authentication.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds with no credentials.
|
||||
conn, err := nats.Connect(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Ensure Reload supports token authentication config changes. Test this by
|
||||
// starting a server with token authentication enabled, connect to it to
|
||||
// verify, reload config using a different token, ensure reconnect fails, then
|
||||
// ensure reconnect succeeds when using the correct token.
|
||||
func TestConfigReloadRotateTokenAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/token_authentication_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.Token("T0pS3cr3t"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
disconnected := make(chan struct{})
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
nc.SetDisconnectHandler(func(*nats.Conn) {
|
||||
disconnected <- struct{}{}
|
||||
})
|
||||
|
||||
// Change authentication token.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/token_authentication_2.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting fails.
|
||||
if _, err := nats.Connect(addr, nats.Token("T0pS3cr3t")); err == nil {
|
||||
t.Fatal("Expected connect to fail")
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds when using new credentials.
|
||||
conn, err := nats.Connect(addr, nats.Token("passw0rd"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Ensure the previous connection received an authorization error.
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if err != nats.ErrAuthorization {
|
||||
t.Fatalf("Expected ErrAuthorization, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected authorization error")
|
||||
}
|
||||
|
||||
// Ensure the previous connection was disconnected.
|
||||
select {
|
||||
case <-disconnected:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected connection to be disconnected")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload supports enabling token authentication. Test this by starting
|
||||
// a server with authentication disabled, connect to it to verify, reload
|
||||
// config using with a token, ensure reconnect fails, then ensure reconnect
|
||||
// succeeds when using the correct token.
|
||||
func TestConfigReloadEnableTokenAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
disconnected := make(chan struct{})
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
nc.SetDisconnectHandler(func(*nats.Conn) {
|
||||
disconnected <- struct{}{}
|
||||
})
|
||||
|
||||
// Enable authentication.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/token_authentication_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting fails.
|
||||
if _, err := nats.Connect(addr); err == nil {
|
||||
t.Fatal("Expected connect to fail")
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds when using new credentials.
|
||||
conn, err := nats.Connect(addr, nats.Token("T0pS3cr3t"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Ensure the previous connection received an authorization error.
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if err != nats.ErrAuthorization {
|
||||
t.Fatalf("Expected ErrAuthorization, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected authorization error")
|
||||
}
|
||||
|
||||
// Ensure the previous connection was disconnected.
|
||||
select {
|
||||
case <-disconnected:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected connection to be disconnected")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload supports disabling single token authentication. Test this by
|
||||
// starting a server with authentication enabled, connect to it to verify,
|
||||
// reload config using with authentication disabled, then ensure connecting
|
||||
// with no token succeeds.
|
||||
func TestConfigReloadDisableTokenAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/token_authentication_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.Token("T0pS3cr3t"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
t.Fatalf("Client received an unexpected error: %v", err)
|
||||
})
|
||||
|
||||
// Disable authentication.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds with no credentials.
|
||||
conn, err := nats.Connect(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Ensure Reload supports users authentication config changes. Test this by
|
||||
// starting a server with users authentication enabled, connect to it to
|
||||
// verify, reload config using a different user, ensure reconnect fails, then
|
||||
// ensure reconnect succeeds when using the correct credentials.
|
||||
func TestConfigReloadRotateUsersAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/multiple_users_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.UserInfo("alice", "foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
disconnected := make(chan struct{})
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
nc.SetDisconnectHandler(func(*nats.Conn) {
|
||||
disconnected <- struct{}{}
|
||||
})
|
||||
|
||||
// These credentials won't change.
|
||||
nc, err = nats.Connect(addr, nats.UserInfo("bob", "bar"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
sub, err := nc.SubscribeSync("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
// Change users credentials.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/multiple_users_2.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting fails.
|
||||
if _, err := nats.Connect(addr, nats.UserInfo("alice", "foo")); err == nil {
|
||||
t.Fatal("Expected connect to fail")
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds when using new credentials.
|
||||
conn, err := nats.Connect(addr, nats.UserInfo("alice", "baz"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Ensure the previous connection received an authorization error.
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if err != nats.ErrAuthorization {
|
||||
t.Fatalf("Expected ErrAuthorization, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected authorization error")
|
||||
}
|
||||
|
||||
// Ensure the previous connection was disconnected.
|
||||
select {
|
||||
case <-disconnected:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected connection to be disconnected")
|
||||
}
|
||||
|
||||
// Ensure the connection using unchanged credentials can still
|
||||
// publish/receive.
|
||||
if err := nc.Publish("foo", []byte("hello")); err != nil {
|
||||
t.Fatalf("Error publishing: %v", err)
|
||||
}
|
||||
msg, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "hello" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("hello"), msg.Data)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload supports enabling users authentication. Test this by starting
|
||||
// a server with authentication disabled, connect to it to verify, reload
|
||||
// config using with users, ensure reconnect fails, then ensure reconnect
|
||||
// succeeds when using the correct credentials.
|
||||
func TestConfigReloadEnableUsersAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
disconnected := make(chan struct{})
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
nc.SetDisconnectHandler(func(*nats.Conn) {
|
||||
disconnected <- struct{}{}
|
||||
})
|
||||
|
||||
// Enable authentication.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/multiple_users_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting fails.
|
||||
if _, err := nats.Connect(addr); err == nil {
|
||||
t.Fatal("Expected connect to fail")
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds when using new credentials.
|
||||
conn, err := nats.Connect(addr, nats.UserInfo("alice", "foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Ensure the previous connection received an authorization error.
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if err != nats.ErrAuthorization {
|
||||
t.Fatalf("Expected ErrAuthorization, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected authorization error")
|
||||
}
|
||||
|
||||
// Ensure the previous connection was disconnected.
|
||||
select {
|
||||
case <-disconnected:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected connection to be disconnected")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload supports disabling users authentication. Test this by starting
|
||||
// a server with authentication enabled, connect to it to verify,
|
||||
// reload config using with authentication disabled, then ensure connecting
|
||||
// with no credentials succeeds.
|
||||
func TestConfigReloadDisableUsersAuthentication(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/multiple_users_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.UserInfo("alice", "foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
t.Fatalf("Client received an unexpected error: %v", err)
|
||||
})
|
||||
|
||||
// Disable authentication.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure connecting succeeds with no credentials.
|
||||
conn, err := nats.Connect(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Ensure Reload supports changing permissions. Test this by starting a server
|
||||
// with a user configured with certain permissions, test publish and subscribe,
|
||||
// reload config with new permissions, ensure the previous subscription was
|
||||
// closed and publishes fail, then ensure the new permissions succeed.
|
||||
func TestConfigReloadChangePermissions(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/reload/authorization_1.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.UserInfo("bob", "bar"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
// Ensure we can publish and receive messages as a sanity check.
|
||||
sub, err := nc.SubscribeSync("_INBOX.>")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
conn, err := nats.Connect(addr, nats.UserInfo("alice", "foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
sub2, err := conn.SubscribeSync("req.foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
if err := conn.Publish("_INBOX.foo", []byte("hello")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "hello" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("hello"), msg.Data)
|
||||
}
|
||||
|
||||
if err := nc.Publish("req.foo", []byte("world")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
msg, err = sub2.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "world" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("world"), msg.Data)
|
||||
}
|
||||
|
||||
// Change permissions.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/authorization_2.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we receive an error for the subscription that is no longer
|
||||
// authorized.
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if !strings.Contains(err.Error(), "permissions violation for subscription to \"_inbox.>\"") {
|
||||
t.Fatalf("Expected permissions violation error, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected permissions violation error")
|
||||
}
|
||||
|
||||
// Ensure we receive an error when publishing to req.foo and we no longer
|
||||
// receive messages on _INBOX.>.
|
||||
if err := nc.Publish("req.foo", []byte("hola")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
if err := conn.Publish("_INBOX.foo", []byte("mundo")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if !strings.Contains(err.Error(), "permissions violation for publish to \"req.foo\"") {
|
||||
t.Fatalf("Expected permissions violation error, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected permissions violation error")
|
||||
}
|
||||
|
||||
queued, _, err := sub2.Pending()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get pending messaged: %v", err)
|
||||
}
|
||||
if queued != 0 {
|
||||
t.Fatalf("Pending is incorrect.\nexpected: 0\ngot: %d", queued)
|
||||
}
|
||||
|
||||
queued, _, err = sub.Pending()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get pending messaged: %v", err)
|
||||
}
|
||||
if queued != 0 {
|
||||
t.Fatalf("Pending is incorrect.\nexpected: 0\ngot: %d", queued)
|
||||
}
|
||||
|
||||
// Ensure we can publish to _INBOX.foo.bar and subscribe to _INBOX.foo.>.
|
||||
sub, err = nc.SubscribeSync("_INBOX.foo.>")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
if err := nc.Publish("_INBOX.foo.bar", []byte("testing")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
msg, err = sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "testing" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("testing"), msg.Data)
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
t.Fatalf("Received unexpected error: %v", err)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
@@ -404,6 +404,7 @@ func TestTLSChainedSolicitWorks(t *testing.T) {
|
||||
|
||||
func TestRouteTLSHandshakeError(t *testing.T) {
|
||||
optsSeed, _ := ProcessConfigFile("./configs/seed_tls.conf")
|
||||
optsSeed.NoLog = true
|
||||
srvSeed := RunServer(optsSeed)
|
||||
defer srvSeed.Shutdown()
|
||||
|
||||
|
||||
@@ -334,7 +334,7 @@ func TestWriteDeadline(t *testing.T) {
|
||||
dur := time.Since(start)
|
||||
// user more than the write deadline to account for calls
|
||||
// overhead, running with -race, etc...
|
||||
if dur > 100*time.Millisecond {
|
||||
if dur > 110*time.Millisecond {
|
||||
t.Fatalf("Flush should have returned sooner, took: %v", dur)
|
||||
}
|
||||
// Flush sender connection to ensure that all data has been sent.
|
||||
|
||||
Reference in New Issue
Block a user