mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Handle permissions changes on config reload
This commit is contained in:
@@ -135,6 +135,26 @@ func (s *Server) isRouterAuthorized(c *client) bool {
|
||||
return comparePasswords(opts.Cluster.Password, c.opts.Password)
|
||||
}
|
||||
|
||||
// removeUnauthorizedSubs removes any subscriptions the client has that are no
|
||||
// longer authorized, e.g. due to a config reload.
|
||||
func (s *Server) removeUnauthorizedSubs(c *client) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.perms == nil || c.perms.sub == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for sid, sub := range c.subs {
|
||||
if !c.canSubscribe(sub.subject) {
|
||||
delete(c.subs, sid)
|
||||
s.sl.Remove(sub)
|
||||
s.Noticef("Removed sub %q for user %q - not authorized",
|
||||
string(sub.subject), c.opts.Username)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Support for bcrypt stored passwords and tokens.
|
||||
const bcryptPrefix = "$2a$"
|
||||
|
||||
|
||||
@@ -224,6 +224,10 @@ func (c *client) initClient() {
|
||||
// into the client.
|
||||
func (c *client) RegisterUser(user *User) {
|
||||
if user.Permissions == nil {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
c.mu.Lock()
|
||||
c.perms = nil
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -775,14 +779,11 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
}
|
||||
|
||||
// Check permissions if applicable.
|
||||
if c.perms != nil {
|
||||
r := c.perms.sub.Match(string(sub.subject))
|
||||
if len(r.psubs) == 0 {
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject)
|
||||
return nil
|
||||
}
|
||||
if !c.canSubscribe(sub.subject) {
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject)
|
||||
return nil
|
||||
}
|
||||
|
||||
// We can have two SUB protocols coming from a route due to some
|
||||
@@ -813,6 +814,15 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// canSubscribe determines if the client is authorized to subscribe to the
|
||||
// given subject. Assumes caller is holding lock.
|
||||
func (c *client) canSubscribe(sub []byte) bool {
|
||||
if c.perms == nil {
|
||||
return true
|
||||
}
|
||||
return len(c.perms.sub.Match(string(sub)).psubs) > 0
|
||||
}
|
||||
|
||||
func (c *client) unsubscribe(sub *subscription) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@@ -1011,8 +1021,6 @@ func (c *client) processMsg(msg []byte) {
|
||||
c.traceMsg(msg)
|
||||
}
|
||||
|
||||
// defintely
|
||||
|
||||
// Disallow publish to _SYS.>, these are reserved for internals.
|
||||
if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 &&
|
||||
c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' &&
|
||||
|
||||
@@ -7,7 +7,7 @@ authorization {
|
||||
|
||||
# Superuser can do anything.
|
||||
super_user = {
|
||||
publish = "*"
|
||||
publish = ">"
|
||||
subscribe = ">"
|
||||
}
|
||||
# Can do requests on foo or bar, and subscribe to anything
|
||||
|
||||
@@ -1 +1,3 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
37
server/configs/reload/authorization.conf
Normal file
37
server/configs/reload/authorization.conf
Normal file
@@ -0,0 +1,37 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: 127.0.0.1:4222
|
||||
|
||||
authorization {
|
||||
# Our role based permissions.
|
||||
|
||||
# Superuser can do anything.
|
||||
super_user = {
|
||||
publish = ">"
|
||||
subscribe = ">"
|
||||
}
|
||||
# Can do requests on _INBOX.foo.bar, and subscribe to anything
|
||||
# that is a response to an _INBOX.foo.
|
||||
#
|
||||
# Notice that authorization filters can be singletons or arrays.
|
||||
req_pub_user = {
|
||||
publish = ["_INBOX.foo.bar"]
|
||||
subscribe = "_INBOX.foo.>"
|
||||
}
|
||||
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
default_permissions: $default_user
|
||||
|
||||
# Users listed with persmissions.
|
||||
users = [
|
||||
{user: alice, password: foo, permissions: $super_user}
|
||||
{user: bob, password: bar, permissions: $req_pub_user}
|
||||
{user: susan, password: baz}
|
||||
]
|
||||
}
|
||||
@@ -1,2 +1,4 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# Invalid config file
|
||||
trace:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Copyright 2016 Apcera Inc. All rights reserved.
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# logging options
|
||||
debug: true # enable on reload
|
||||
trace: true # enable on reload
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# logging options
|
||||
debug: false
|
||||
trace: true
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
authorization {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
# logging options
|
||||
debug: false
|
||||
trace: false
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
authorization {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
authorization {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Copyright 2017 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: localhost:4443
|
||||
|
||||
authorization {
|
||||
|
||||
@@ -563,8 +563,8 @@ func TestAuthorizationConfig(t *testing.T) {
|
||||
len(alice.Permissions.Publish))
|
||||
}
|
||||
pubPerm := alice.Permissions.Publish[0]
|
||||
if pubPerm != "*" {
|
||||
t.Fatalf("Expected Alice's publish permissions to be '*', got %q\n", pubPerm)
|
||||
if pubPerm != ">" {
|
||||
t.Fatalf("Expected Alice's publish permissions to be '>', got %q\n", pubPerm)
|
||||
}
|
||||
if alice.Permissions.Subscribe == nil {
|
||||
t.Fatalf("Expected Alice's subscribe permissions to be non-nil\n")
|
||||
|
||||
@@ -284,8 +284,9 @@ func (s *Server) applyOptions(opts []option) {
|
||||
s.Noticef("Reloaded server configuration")
|
||||
}
|
||||
|
||||
// reloadAuthorization reconfigures the server authorization settings and
|
||||
// disconnects any clients who are no longer authorized.
|
||||
// reloadAuthorization reconfigures the server authorization settings,
|
||||
// disconnects any clients who are no longer authorized, and removes any
|
||||
// unauthorized subscriptions.
|
||||
func (s *Server) reloadAuthorization() {
|
||||
s.mu.Lock()
|
||||
s.configureAuthorization()
|
||||
@@ -296,10 +297,14 @@ func (s *Server) reloadAuthorization() {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// Disconnect any unauthorized clients.
|
||||
for _, client := range clients {
|
||||
// Disconnect any unauthorized clients.
|
||||
if !s.isClientAuthorized(client) {
|
||||
client.authViolation()
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove any unauthorized subscriptions.
|
||||
s.removeUnauthorizedSubs(client)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -1077,3 +1078,152 @@ func TestConfigReloadDisableUsersAuthentication(t *testing.T) {
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Ensure Reload supports changing permissions. Test this by starting a server
|
||||
// with a user configured with certain permissions, test publish and subscribe,
|
||||
// reload config with new permissions, ensure the previous subscription was
|
||||
// closed and publishes fail, then ensure the new permissions succeed.
|
||||
func TestConfigReloadChangePermissions(t *testing.T) {
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting working directory: %v", err)
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/authorization.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
|
||||
opts, err := ProcessConfigFile(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
|
||||
server := RunServer(opts)
|
||||
defer server.Shutdown()
|
||||
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(addr, nats.UserInfo("bob", "bar"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
asyncErr := make(chan error)
|
||||
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr <- err
|
||||
})
|
||||
// Ensure we can publish and receive messages as a sanity check.
|
||||
sub, err := nc.SubscribeSync("_INBOX.>")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
conn, err := nats.Connect(addr, nats.UserInfo("alice", "foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
sub2, err := conn.SubscribeSync("req.foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
if err := conn.Publish("_INBOX.foo", []byte("hello")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "hello" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("hello"), msg.Data)
|
||||
}
|
||||
|
||||
if err := nc.Publish("req.foo", []byte("world")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
msg, err = sub2.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "world" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("world"), msg.Data)
|
||||
}
|
||||
|
||||
// Change permissions.
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/reload/authorization.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we receive an error when publishing to req.foo and we no longer
|
||||
// receive messages on _INBOX.>.
|
||||
if err := nc.Publish("req.foo", []byte("hola")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
if err := conn.Publish("_INBOX.foo", []byte("mundo")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
if !strings.Contains(err.Error(), "permissions violation for publish to \"req.foo\"") {
|
||||
t.Fatalf("Expected permissions violation error, got %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Expected permissions violation error")
|
||||
}
|
||||
|
||||
queued, _, err := sub2.Pending()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get pending messaged: %v", err)
|
||||
}
|
||||
if queued != 0 {
|
||||
t.Fatalf("Pending is incorrect.\nexpected: 0\ngot: %d", queued)
|
||||
}
|
||||
|
||||
queued, _, err = sub.Pending()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get pending messaged: %v", err)
|
||||
}
|
||||
if queued != 0 {
|
||||
t.Fatalf("Pending is incorrect.\nexpected: 0\ngot: %d", queued)
|
||||
}
|
||||
|
||||
// Ensure we can publish to _INBOX.foo.bar and subscribe to _INBOX.foo.>.
|
||||
sub, err = nc.SubscribeSync("_INBOX.foo.>")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
if err := nc.Publish("_INBOX.foo.bar", []byte("testing")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
msg, err = sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "testing" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("testing"), msg.Data)
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-asyncErr:
|
||||
t.Fatalf("Received unexpected error: %v", err)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user