diff --git a/server/auth.go b/server/auth.go index d81d328d..c1d9bcbe 100644 --- a/server/auth.go +++ b/server/auth.go @@ -135,6 +135,26 @@ func (s *Server) isRouterAuthorized(c *client) bool { return comparePasswords(opts.Cluster.Password, c.opts.Password) } +// removeUnauthorizedSubs removes any subscriptions the client has that are no +// longer authorized, e.g. due to a config reload. +func (s *Server) removeUnauthorizedSubs(c *client) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.perms == nil || c.perms.sub == nil { + return + } + + for sid, sub := range c.subs { + if !c.canSubscribe(sub.subject) { + delete(c.subs, sid) + s.sl.Remove(sub) + s.Noticef("Removed sub %q for user %q - not authorized", + string(sub.subject), c.opts.Username) + } + } +} + // Support for bcrypt stored passwords and tokens. const bcryptPrefix = "$2a$" diff --git a/server/client.go b/server/client.go index a5921adc..2c3f8c1e 100644 --- a/server/client.go +++ b/server/client.go @@ -224,6 +224,10 @@ func (c *client) initClient() { // into the client. func (c *client) RegisterUser(user *User) { if user.Permissions == nil { + // Reset perms to nil in case client previously had them. + c.mu.Lock() + c.perms = nil + c.mu.Unlock() return } @@ -775,14 +779,11 @@ func (c *client) processSub(argo []byte) (err error) { } // Check permissions if applicable. - if c.perms != nil { - r := c.perms.sub.Match(string(sub.subject)) - if len(r.psubs) == 0 { - c.mu.Unlock() - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) - c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject) - return nil - } + if !c.canSubscribe(sub.subject) { + c.mu.Unlock() + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) + c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject) + return nil } // We can have two SUB protocols coming from a route due to some @@ -813,6 +814,15 @@ func (c *client) processSub(argo []byte) (err error) { return nil } +// canSubscribe determines if the client is authorized to subscribe to the +// given subject. Assumes caller is holding lock. +func (c *client) canSubscribe(sub []byte) bool { + if c.perms == nil { + return true + } + return len(c.perms.sub.Match(string(sub)).psubs) > 0 +} + func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() @@ -1011,8 +1021,6 @@ func (c *client) processMsg(msg []byte) { c.traceMsg(msg) } - // defintely - // Disallow publish to _SYS.>, these are reserved for internals. if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 && c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' && diff --git a/server/configs/authorization.conf b/server/configs/authorization.conf index 2cf47a03..cb7a7829 100644 --- a/server/configs/authorization.conf +++ b/server/configs/authorization.conf @@ -7,7 +7,7 @@ authorization { # Superuser can do anything. super_user = { - publish = "*" + publish = ">" subscribe = ">" } # Can do requests on foo or bar, and subscribe to anything diff --git a/server/configs/basic.conf b/server/configs/basic.conf index f826ee21..a4ede5f1 100644 --- a/server/configs/basic.conf +++ b/server/configs/basic.conf @@ -1 +1,3 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + listen: localhost:4443 diff --git a/server/configs/reload/authorization.conf b/server/configs/reload/authorization.conf new file mode 100644 index 00000000..a81d49cb --- /dev/null +++ b/server/configs/reload/authorization.conf @@ -0,0 +1,37 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + +listen: 127.0.0.1:4222 + +authorization { + # Our role based permissions. + + # Superuser can do anything. + super_user = { + publish = ">" + subscribe = ">" + } + # Can do requests on _INBOX.foo.bar, and subscribe to anything + # that is a response to an _INBOX.foo. + # + # Notice that authorization filters can be singletons or arrays. + req_pub_user = { + publish = ["_INBOX.foo.bar"] + subscribe = "_INBOX.foo.>" + } + + # Setup a default user that can subscribe to anything, but has + # no publish capabilities. + default_user = { + subscribe = "PUBLIC.>" + } + + # Default permissions if none presented. e.g. susan below. + default_permissions: $default_user + + # Users listed with persmissions. + users = [ + {user: alice, password: foo, permissions: $super_user} + {user: bob, password: bar, permissions: $req_pub_user} + {user: susan, password: baz} + ] +} diff --git a/server/configs/reload/invalid.conf b/server/configs/reload/invalid.conf index ad80c5a2..f769d1e3 100644 --- a/server/configs/reload/invalid.conf +++ b/server/configs/reload/invalid.conf @@ -1,2 +1,4 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + # Invalid config file trace: diff --git a/server/configs/reload/multiple_users.conf b/server/configs/reload/multiple_users.conf index b35a3dd9..2b0ae0f0 100644 --- a/server/configs/reload/multiple_users.conf +++ b/server/configs/reload/multiple_users.conf @@ -1,4 +1,4 @@ -# Copyright 2016 Apcera Inc. All rights reserved. +# Copyright 2017 Apcera Inc. All rights reserved. listen: localhost:4443 diff --git a/server/configs/reload/reload.conf b/server/configs/reload/reload.conf index 74ed9db8..8da91d90 100644 --- a/server/configs/reload/reload.conf +++ b/server/configs/reload/reload.conf @@ -1,3 +1,5 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + # logging options debug: true # enable on reload trace: true # enable on reload diff --git a/server/configs/reload/reload_unsupported.conf b/server/configs/reload/reload_unsupported.conf index ef0e70e8..b7a94046 100644 --- a/server/configs/reload/reload_unsupported.conf +++ b/server/configs/reload/reload_unsupported.conf @@ -1,3 +1,5 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + # logging options debug: false trace: true diff --git a/server/configs/reload/single_user_authentication.conf b/server/configs/reload/single_user_authentication.conf index 26efcee4..7a03dde5 100644 --- a/server/configs/reload/single_user_authentication.conf +++ b/server/configs/reload/single_user_authentication.conf @@ -1,3 +1,5 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + listen: localhost:4443 authorization { diff --git a/server/configs/reload/test.conf b/server/configs/reload/test.conf index 178f8b51..8bb78845 100644 --- a/server/configs/reload/test.conf +++ b/server/configs/reload/test.conf @@ -1,3 +1,5 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + # logging options debug: false trace: false diff --git a/server/configs/reload/token_authentication.conf b/server/configs/reload/token_authentication.conf index add902fc..6cbd7604 100644 --- a/server/configs/reload/token_authentication.conf +++ b/server/configs/reload/token_authentication.conf @@ -1,3 +1,5 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + listen: localhost:4443 authorization { diff --git a/server/configs/single_user_authentication.conf b/server/configs/single_user_authentication.conf index 3c36e3bc..a081eee0 100644 --- a/server/configs/single_user_authentication.conf +++ b/server/configs/single_user_authentication.conf @@ -1,3 +1,5 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + listen: localhost:4443 authorization { diff --git a/server/configs/token_authentication.conf b/server/configs/token_authentication.conf index 6af54616..e10e7f49 100644 --- a/server/configs/token_authentication.conf +++ b/server/configs/token_authentication.conf @@ -1,3 +1,5 @@ +# Copyright 2017 Apcera Inc. All rights reserved. + listen: localhost:4443 authorization { diff --git a/server/opts_test.go b/server/opts_test.go index d8a892cb..96b4896d 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -563,8 +563,8 @@ func TestAuthorizationConfig(t *testing.T) { len(alice.Permissions.Publish)) } pubPerm := alice.Permissions.Publish[0] - if pubPerm != "*" { - t.Fatalf("Expected Alice's publish permissions to be '*', got %q\n", pubPerm) + if pubPerm != ">" { + t.Fatalf("Expected Alice's publish permissions to be '>', got %q\n", pubPerm) } if alice.Permissions.Subscribe == nil { t.Fatalf("Expected Alice's subscribe permissions to be non-nil\n") diff --git a/server/reload.go b/server/reload.go index e14f3d1f..0ce81b5a 100644 --- a/server/reload.go +++ b/server/reload.go @@ -284,8 +284,9 @@ func (s *Server) applyOptions(opts []option) { s.Noticef("Reloaded server configuration") } -// reloadAuthorization reconfigures the server authorization settings and -// disconnects any clients who are no longer authorized. +// reloadAuthorization reconfigures the server authorization settings, +// disconnects any clients who are no longer authorized, and removes any +// unauthorized subscriptions. func (s *Server) reloadAuthorization() { s.mu.Lock() s.configureAuthorization() @@ -296,10 +297,14 @@ func (s *Server) reloadAuthorization() { } s.mu.Unlock() - // Disconnect any unauthorized clients. for _, client := range clients { + // Disconnect any unauthorized clients. if !s.isClientAuthorized(client) { client.authViolation() + continue } + + // Remove any unauthorized subscriptions. + s.removeUnauthorizedSubs(client) } } diff --git a/server/reload_test.go b/server/reload_test.go index e0224105..30fe73db 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "reflect" + "strings" "testing" "time" @@ -1077,3 +1078,152 @@ func TestConfigReloadDisableUsersAuthentication(t *testing.T) { } conn.Close() } + +// Ensure Reload supports changing permissions. Test this by starting a server +// with a user configured with certain permissions, test publish and subscribe, +// reload config with new permissions, ensure the previous subscription was +// closed and publishes fail, then ensure the new permissions succeed. +func TestConfigReloadChangePermissions(t *testing.T) { + dir, err := os.Getwd() + if err != nil { + t.Fatalf("Error getting working directory: %v", err) + } + config := filepath.Join(dir, "tmp.conf") + + if err := os.Symlink("./configs/authorization.conf", config); err != nil { + t.Fatalf("Error creating symlink: %v", err) + } + defer os.Remove(config) + + opts, err := ProcessConfigFile(config) + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + + server := RunServer(opts) + defer server.Shutdown() + + addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + nc, err := nats.Connect(addr, nats.UserInfo("bob", "bar")) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc.Close() + asyncErr := make(chan error) + nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + asyncErr <- err + }) + // Ensure we can publish and receive messages as a sanity check. + sub, err := nc.SubscribeSync("_INBOX.>") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + nc.Flush() + + conn, err := nats.Connect(addr, nats.UserInfo("alice", "foo")) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer conn.Close() + + sub2, err := conn.SubscribeSync("req.foo") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + if err := conn.Publish("_INBOX.foo", []byte("hello")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + conn.Flush() + + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if string(msg.Data) != "hello" { + t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("hello"), msg.Data) + } + + if err := nc.Publish("req.foo", []byte("world")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + nc.Flush() + + msg, err = sub2.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if string(msg.Data) != "world" { + t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("world"), msg.Data) + } + + // Change permissions. + if err := os.Remove(config); err != nil { + t.Fatalf("Error deleting symlink: %v", err) + } + if err := os.Symlink("./configs/reload/authorization.conf", config); err != nil { + t.Fatalf("Error creating symlink: %v", err) + } + if err := server.Reload(); err != nil { + t.Fatalf("Error reloading config: %v", err) + } + + // Ensure we receive an error when publishing to req.foo and we no longer + // receive messages on _INBOX.>. + if err := nc.Publish("req.foo", []byte("hola")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + nc.Flush() + if err := conn.Publish("_INBOX.foo", []byte("mundo")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + conn.Flush() + + select { + case err := <-asyncErr: + if !strings.Contains(err.Error(), "permissions violation for publish to \"req.foo\"") { + t.Fatalf("Expected permissions violation error, got %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Expected permissions violation error") + } + + queued, _, err := sub2.Pending() + if err != nil { + t.Fatalf("Failed to get pending messaged: %v", err) + } + if queued != 0 { + t.Fatalf("Pending is incorrect.\nexpected: 0\ngot: %d", queued) + } + + queued, _, err = sub.Pending() + if err != nil { + t.Fatalf("Failed to get pending messaged: %v", err) + } + if queued != 0 { + t.Fatalf("Pending is incorrect.\nexpected: 0\ngot: %d", queued) + } + + // Ensure we can publish to _INBOX.foo.bar and subscribe to _INBOX.foo.>. + sub, err = nc.SubscribeSync("_INBOX.foo.>") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + nc.Flush() + if err := nc.Publish("_INBOX.foo.bar", []byte("testing")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + nc.Flush() + msg, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if string(msg.Data) != "testing" { + t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("testing"), msg.Data) + } + + select { + case err := <-asyncErr: + t.Fatalf("Received unexpected error: %v", err) + default: + } +}