diff --git a/auth/multiuser.go b/auth/multiuser.go index 0adb4142..1c419eda 100644 --- a/auth/multiuser.go +++ b/auth/multiuser.go @@ -10,14 +10,14 @@ import ( // Plain authentication is a basic username and password type MultiUser struct { - users map[string]string + users map[string]*server.User } // Create a new multi-user -func NewMultiUser(users []server.User) *MultiUser { - m := &MultiUser{users: make(map[string]string)} +func NewMultiUser(users []*server.User) *MultiUser { + m := &MultiUser{users: make(map[string]*server.User)} for _, u := range users { - m.users[u.Username] = u.Password + m.users[u.Username] = u } return m } @@ -25,10 +25,12 @@ func NewMultiUser(users []server.User) *MultiUser { // Check authenticates the client using a username and password against a list of multiple users. func (m *MultiUser) Check(c server.ClientAuth) bool { opts := c.GetOpts() - pass, ok := m.users[opts.Username] + user, ok := m.users[opts.Username] if !ok { return false } + pass := user.Password + // Check to see if the password is a bcrypt hash if isBcrypt(pass) { if err := bcrypt.CompareHashAndPassword([]byte(pass), []byte(opts.Password)); err != nil { @@ -37,5 +39,8 @@ func (m *MultiUser) Check(c server.ClientAuth) bool { } else if pass != opts.Password { return false } + + c.RegisterUser(user) + return true } diff --git a/server/auth.go b/server/auth.go index 92a2b8f9..34f0f01e 100644 --- a/server/auth.go +++ b/server/auth.go @@ -12,4 +12,6 @@ type Auth interface { type ClientAuth interface { // Get options associated with a client GetOpts() *clientOpts + // Optionally map a user after auth. + RegisterUser(*User) } diff --git a/server/client.go b/server/client.go index 70d932c9..42702345 100644 --- a/server/client.go +++ b/server/client.go @@ -53,6 +53,7 @@ type client struct { bw *bufio.Writer srv *Server subs map[string]*subscription + perms *permissions cache readCache pcd map[*client]struct{} atmr *time.Timer @@ -68,6 +69,18 @@ type client struct { trace bool } +type permissions struct { + sub *Sublist + pub *Sublist + pcache map[string]bool +} + +const ( + maxResultCacheSize = 512 + maxPermCacheSize = 32 + pruneSize = 16 +) + // Used in readloop to cache hot subject lookups and group statistics. type readCache struct { genid uint64 @@ -146,6 +159,36 @@ func (c *client) initClient() { } } +// RegisterUser allows auth to call back into a new client +// with the authenticated user. This is used to map any permissions +// into the client. +func (c *client) RegisterUser(user *User) { + if user.Permissions == nil { + return + } + + // Process Permissions and map into client connection structures. + c.mu.Lock() + defer c.mu.Unlock() + + c.perms = &permissions{} + c.perms.sub = NewSublist() + c.perms.pub = NewSublist() + c.perms.pcache = make(map[string]bool) + + // Loop over publish permissions + for _, pubSubject := range user.Permissions.Publish { + sub := &subscription{subject: []byte(pubSubject)} + c.perms.pub.Insert(sub) + } + + // Loop over subscribe permissions + for _, subSubject := range user.Permissions.Subscribe { + sub := &subscription{subject: []byte(subSubject)} + c.perms.sub.Insert(sub) + } +} + func (c *client) readLoop() { // Grab the connection off the client, it will be cleared on a close. // We check for that after the loop, but want to avoid a nil dereference @@ -591,6 +634,17 @@ func (c *client) processSub(argo []byte) (err error) { return nil } + // Check permissions if applicable. + if c.perms != nil && c.perms.sub != nil { + r := c.perms.sub.Match(string(sub.subject)) + if len(r.psubs) == 0 { + c.mu.Unlock() + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) + c.Debugf("Permissions Violation for Subcription to %q", sub.subject) + return nil + } + } + // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. sid := string(sub.sid) @@ -814,9 +868,53 @@ func (c *client) processMsg(msg []byte) { if c.trace { c.traceMsg(msg) } + + // Disallow publish to _SYS.>, these are reserved for internals. + if c.pa.subject[0] == '_' && c.pa.subject[1] == 'S' && + c.pa.subject[2] == 'Y' && c.pa.subject[3] == 'S' { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) + c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + return + } + + // Check if published subject is allowed if we have permissions in place. + if c.perms != nil && c.perms.pub != nil { + allowed, ok := c.perms.pcache[string(c.pa.subject)] + if ok && !allowed { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) + c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + return + } + if !ok { + r := c.perms.pub.Match(string(c.pa.subject)) + if len(r.psubs) == 0 { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) + c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + c.perms.pcache[string(c.pa.subject)] = false + return + } else { + c.perms.pcache[string(c.pa.subject)] = true + } + // Prune if needed. + if len(c.perms.pcache) > maxPermCacheSize { + // Prune the permissions cache. Keeps us from unbounded growth. + r := 0 + for subject, _ := range c.perms.pcache { + delete(c.cache.results, subject) + r++ + if r > pruneSize { + break + } + } + } + } + } + if c.opts.Verbose { c.sendOK() } + + // Mostly under testing scenarios. if srv == nil { return } @@ -838,6 +936,17 @@ func (c *client) processMsg(msg []byte) { subject := string(c.pa.subject) r = srv.sl.Match(subject) c.cache.results[subject] = r + if len(c.cache.results) > maxResultCacheSize { + // Prune the results cache. Keeps us from unbounded growth. + r := 0 + for subject, _ := range c.cache.results { + delete(c.cache.results, subject) + r++ + if r > pruneSize { + break + } + } + } } // Check for no interest, short circuit if so. diff --git a/server/errors.go b/server/errors.go index 1387ebb1..c972ee4b 100644 --- a/server/errors.go +++ b/server/errors.go @@ -1,19 +1,22 @@ -// Copyright 2012 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package server import "errors" var ( - // ErrConnectionClosed represents error condition on a closed connection. + // ErrConnectionClosed represents an error condition on a closed connection. ErrConnectionClosed = errors.New("Connection Closed") - // ErrAuthorization represents error condition on failed authorization. + // ErrAuthorization represents an error condition on failed authorization. ErrAuthorization = errors.New("Authorization Error") - // ErrAuthTimeout represents error condition on failed authorization due to timeout. + // ErrAuthTimeout represents an error condition on failed authorization due to timeout. ErrAuthTimeout = errors.New("Authorization Timeout") - // ErrMaxPayload represents error condition when the payload is too big. + // ErrMaxPayload represents an error condition when the payload is too big. ErrMaxPayload = errors.New("Maximum Payload Exceeded") + + // ErrReservedPublish represents an error condition when the payload is too big. + ErrReservedPublishSubject = errors.New("Reserved Internal Subject") ) diff --git a/server/opts_test.go b/server/opts_test.go index f6b5bc46..96d74623 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -415,6 +415,8 @@ func TestAuthorizationConfig(t *testing.T) { for _, u := range opts.Users { mu[u.Username] = u } + + // Alice alice, ok := mu["alice"] if !ok { t.Fatalf("Expected to see user Alice\n") @@ -446,6 +448,7 @@ func TestAuthorizationConfig(t *testing.T) { t.Fatalf("Expected Alice's subscribe permissions to be '>', got %q\n", subPerm) } + // Bob bob, ok := mu["bob"] if !ok { t.Fatalf("Expected to see user Bob\n") @@ -454,6 +457,7 @@ func TestAuthorizationConfig(t *testing.T) { t.Fatalf("Expected Bob's permissions to be non-nil\n") } + // Susan susan, ok := mu["susan"] if !ok { t.Fatalf("Expected to see user Susan\n") diff --git a/server/server.go b/server/server.go index 666b005e..683d2da1 100644 --- a/server/server.go +++ b/server/server.go @@ -516,8 +516,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Check for Auth if authRequired { - ttl := secondsToDuration(s.opts.AuthTimeout) - c.setAuthTimer(ttl) + c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } // Send our information. diff --git a/test/auth_test.go b/test/auth_test.go index c360a9e5..874e2258 100644 --- a/test/auth_test.go +++ b/test/auth_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package test diff --git a/test/bench_test.go b/test/bench_test.go index 6f3558b9..16faaaeb 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -75,41 +75,41 @@ func sizedString(sz int) string { // Publish subject for pub benchmarks. var psub = "a" -func Benchmark____PubNo_Payload(b *testing.B) { +func Benchmark_____PubNo_Payload(b *testing.B) { benchPub(b, psub, "") } -func Benchmark____Pub8b_Payload(b *testing.B) { +func Benchmark_____Pub8b_Payload(b *testing.B) { b.StopTimer() s := sizedString(8) benchPub(b, psub, s) } -func Benchmark___Pub32b_Payload(b *testing.B) { +func Benchmark____Pub32b_Payload(b *testing.B) { b.StopTimer() s := sizedString(32) benchPub(b, psub, s) } -func Benchmark__Pub256B_Payload(b *testing.B) { +func Benchmark___Pub256B_Payload(b *testing.B) { b.StopTimer() s := sizedString(256) benchPub(b, psub, s) } -func Benchmark____Pub1K_Payload(b *testing.B) { +func Benchmark_____Pub1K_Payload(b *testing.B) { b.StopTimer() s := sizedString(1024) benchPub(b, psub, s) } -func Benchmark____Pub4K_Payload(b *testing.B) { +func Benchmark_____Pub4K_Payload(b *testing.B) { b.StopTimer() s := sizedString(4 * 1024) benchPub(b, psub, s) } -func Benchmark____Pub8K_Payload(b *testing.B) { +func Benchmark_____Pub8K_Payload(b *testing.B) { b.StopTimer() s := sizedString(8 * 1024) benchPub(b, psub, s) @@ -137,7 +137,33 @@ func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) { ch <- true } -func Benchmark___________PubSub(b *testing.B) { +// Benchmark the authorization code path. +func Benchmark_AuthPubNo_Payload(b *testing.B) { + b.StopTimer() + + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + c := createClientConn(b, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(b, c) + + cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench", DefaultPass) + sendProto(b, c, cs) + + bw := bufio.NewWriterSize(c, defaultSendBufSize) + sendOp := []byte("PUB a 0\r\n\r\n") + b.SetBytes(int64(len(sendOp))) + b.StartTimer() + for i := 0; i < b.N; i++ { + bw.Write(sendOp) + } + bw.Flush() + flushConnection(b, c) + b.StopTimer() +} + +func Benchmark____________PubSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -169,7 +195,7 @@ func Benchmark___________PubSub(b *testing.B) { s.Shutdown() } -func Benchmark___PubSubTwoConns(b *testing.B) { +func Benchmark____PubSubTwoConns(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -205,7 +231,7 @@ func Benchmark___PubSubTwoConns(b *testing.B) { s.Shutdown() } -func Benchmark___PubTwoQueueSub(b *testing.B) { +func Benchmark____PubTwoQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -238,7 +264,7 @@ func Benchmark___PubTwoQueueSub(b *testing.B) { s.Shutdown() } -func Benchmark__PubFourQueueSub(b *testing.B) { +func Benchmark___PubFourQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -273,7 +299,7 @@ func Benchmark__PubFourQueueSub(b *testing.B) { s.Shutdown() } -func Benchmark_PubEightQueueSub(b *testing.B) { +func Benchmark__PubEightQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) diff --git a/test/client_auth_test.go b/test/client_auth_test.go new file mode 100644 index 00000000..e784f429 --- /dev/null +++ b/test/client_auth_test.go @@ -0,0 +1,50 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +package test + +import ( + "fmt" + "testing" + + "github.com/nats-io/nats" +) + +func TestMultipleUserAuth(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/multi_user.conf") + defer srv.Shutdown() + + if opts.Users == nil { + t.Fatal("Expected a user array that is not nil") + } + if len(opts.Users) != 2 { + t.Fatal("Expected a user array that had 2 users") + } + + // Test first user + url := fmt.Sprintf("nats://%s:%s@%s:%d/", + opts.Users[0].Username, + opts.Users[0].Password, + opts.Host, opts.Port) + + nc, err := nats.Connect(url) + if err != nil { + t.Fatalf("Expected a succesful connect, got %v\n", err) + } + defer nc.Close() + + if !nc.AuthRequired() { + t.Fatal("Expected auth to be required for the server") + } + + // Test second user + url = fmt.Sprintf("nats://%s:%s@%s:%d/", + opts.Users[1].Username, + opts.Users[1].Password, + opts.Host, opts.Port) + + nc, err = nats.Connect(url) + if err != nil { + t.Fatalf("Expected a succesful connect, got %v\n", err) + } + defer nc.Close() +} diff --git a/test/configs/authorization.conf b/test/configs/authorization.conf new file mode 100644 index 00000000..3bbe15e0 --- /dev/null +++ b/test/configs/authorization.conf @@ -0,0 +1,44 @@ +# Copyright 2016 Apcera Inc. All rights reserved. + +listen: 127.0.0.1:2442 + +authorization { + # Our role based permissions. + + # Admin can do anything. + ADMIN = { + publish = ">" + subscribe = ">" + } + + # Can do requests on req.foo or req.bar, and subscribe to anything + # that is a response, e.g. _INBOX.* + # + # Notice that authorization filters can be singletons or arrays. + REQUESTOR = { + publish = ["req.foo", "req.bar"] + subscribe = "_INBOX.*" + } + + # Default permissions if none presented. e.g. Joe below. + DEFAULT_PERMISSIONS = { + publish = "SANDBOX.*" + subscribe = ["PUBLIC.>", "_INBOX.>"] + } + + # This is to benchmark pub performance. + BENCH = { + publish = "a" + } + + # Just foo for testing + PASS: $2a$10$UHR6GhotWhpLsKtVP0/i6.Nh9.fuY73cWjLoJjb2sKT8KISBcUW5q + + # Users listed with persmissions. + users = [ + {user: alice, password: $PASS, permissions: $ADMIN} + {user: bob, password: $PASS, permissions: $REQUESTOR} + {user: bench, password: $PASS, permissions: $BENCH} + {user: joe, password: $PASS} + ] +} diff --git a/test/user_authorization_test.go b/test/user_authorization_test.go new file mode 100644 index 00000000..59aa6f8c --- /dev/null +++ b/test/user_authorization_test.go @@ -0,0 +1,86 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +package test + +import ( + "regexp" + "testing" +) + +const DefaultPass = "foo" + +var permErrRe = regexp.MustCompile(`\A\-ERR\s+'Permissions Violation([^\r\n]+)\r\n`) + +func TestUserAuthorizationProto(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + // Alice can do anything, check a few for OK result. + c := createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "alice", DefaultPass) + expectResult(t, c, okRe) + sendProto(t, c, "PUB foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "SUB foo 1\r\n") + expectResult(t, c, okRe) + + // Check that we now reserve _SYS.> though for internal, so no clients. + sendProto(t, c, "PUB _SYS.HB 2\r\nok\r\n") + expectResult(t, c, permErrRe) + c.Close() + + // Bob is a requestor only, e.g. req.foo, req.bar for publish, subscribe only to INBOXes. + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "bob", DefaultPass) + expectResult(t, c, okRe) + + // These should error. + sendProto(t, c, "SUB foo 1\r\n") + expectResult(t, c, permErrRe) + sendProto(t, c, "PUB foo 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + // These should work ok. + sendProto(t, c, "SUB _INBOX.abcd 1\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB req.foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB req.bar 2\r\nok\r\n") + expectResult(t, c, okRe) + c.Close() + + // Joe is a default user + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "joe", DefaultPass) + expectResult(t, c, okRe) + + // These should error. + sendProto(t, c, "SUB foo.bar.* 1\r\n") + expectResult(t, c, permErrRe) + sendProto(t, c, "PUB foo.bar.baz 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + // These should work ok. + sendProto(t, c, "SUB _INBOX.abcd 1\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "SUB PUBLIC.abcd 1\r\n") + expectResult(t, c, okRe) + + sendProto(t, c, "PUB SANDBOX.foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB SANDBOX.bar 2\r\nok\r\n") + expectResult(t, c, okRe) + + // Since only PWC, this should fail (too many tokens). + sendProto(t, c, "PUB SANDBOX.foo.bar 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + c.Close() + +}