diff --git a/TODO.md b/TODO.md index ab146711..5c507fcf 100644 --- a/TODO.md +++ b/TODO.md @@ -1,23 +1,26 @@ # General +- [ ] Blacklist or ERR escalation to close connection for auth/permissions +- [ ] Protocol updates, MAP, MPUB, etc - [ ] Multiple listen endpoints - [ ] Websocket / HTTP2 strategy -- [ ] Listen configure key vs addr and port -- [ ] Multiple Authorization / Access - [ ] T series reservations - [ ] _SYS. server events? - [ ] No downtime restart - [ ] Signal based reload of configuration - [ ] brew, apt-get, rpm, chocately (windows) -- [ ] Buffer pools/sync pools? - [ ] IOVec pools and writev for high fanout? -- [ ] Add ENV and variable support to dconf? ucl? - [ ] Modify cluster support for single message across routes between pub/sub and d-queue - [ ] Memory limits/warnings? - [ ] Limit number of subscriptions a client can have, total memory usage etc. - [ ] Multi-tenant accounts with isolation of subject space - [ ] Pedantic state +- [X] _SYS.> reserved for server events? +- [X] Listen configure key vs addr and port +- [X] Add ENV and variable support to dconf? ucl? +- [X] Buffer pools/sync pools? +- [X] Multiple Authorization / Access - [X] Write dynamic socket buffer sizes - [X] Read dynamic socket buffer sizes - [X] Info updates contain other implicit route servers diff --git a/auth/multiuser.go b/auth/multiuser.go index 0adb4142..e3104f56 100644 --- a/auth/multiuser.go +++ b/auth/multiuser.go @@ -3,21 +3,20 @@ package auth import ( - "golang.org/x/crypto/bcrypt" - "github.com/nats-io/gnatsd/server" + "golang.org/x/crypto/bcrypt" ) // Plain authentication is a basic username and password type MultiUser struct { - users map[string]string + users map[string]*server.User } // Create a new multi-user -func NewMultiUser(users []server.User) *MultiUser { - m := &MultiUser{users: make(map[string]string)} +func NewMultiUser(users []*server.User) *MultiUser { + m := &MultiUser{users: make(map[string]*server.User)} for _, u := range users { - m.users[u.Username] = u.Password + m.users[u.Username] = u } return m } @@ -25,10 +24,12 @@ func NewMultiUser(users []server.User) *MultiUser { // Check authenticates the client using a username and password against a list of multiple users. func (m *MultiUser) Check(c server.ClientAuth) bool { opts := c.GetOpts() - pass, ok := m.users[opts.Username] + user, ok := m.users[opts.Username] if !ok { return false } + pass := user.Password + // Check to see if the password is a bcrypt hash if isBcrypt(pass) { if err := bcrypt.CompareHashAndPassword([]byte(pass), []byte(opts.Password)); err != nil { @@ -37,5 +38,8 @@ func (m *MultiUser) Check(c server.ClientAuth) bool { } else if pass != opts.Password { return false } + + c.RegisterUser(user) + return true } diff --git a/server/auth.go b/server/auth.go index 92a2b8f9..34f0f01e 100644 --- a/server/auth.go +++ b/server/auth.go @@ -12,4 +12,6 @@ type Auth interface { type ClientAuth interface { // Get options associated with a client GetOpts() *clientOpts + // Optionally map a user after auth. + RegisterUser(*User) } diff --git a/server/client.go b/server/client.go index 70d932c9..e16b8667 100644 --- a/server/client.go +++ b/server/client.go @@ -53,6 +53,7 @@ type client struct { bw *bufio.Writer srv *Server subs map[string]*subscription + perms *permissions cache readCache pcd map[*client]struct{} atmr *time.Timer @@ -68,6 +69,18 @@ type client struct { trace bool } +type permissions struct { + sub *Sublist + pub *Sublist + pcache map[string]bool +} + +const ( + maxResultCacheSize = 512 + maxPermCacheSize = 32 + pruneSize = 16 +) + // Used in readloop to cache hot subject lookups and group statistics. type readCache struct { genid uint64 @@ -146,6 +159,37 @@ func (c *client) initClient() { } } +// RegisterUser allows auth to call back into a new client +// with the authenticated user. This is used to map any permissions +// into the client. +func (c *client) RegisterUser(user *User) { + if user.Permissions == nil { + return + } + + // Process Permissions and map into client connection structures. + c.mu.Lock() + defer c.mu.Unlock() + + // Pre-allocate all to simplify checks later. + c.perms = &permissions{} + c.perms.sub = NewSublist() + c.perms.pub = NewSublist() + c.perms.pcache = make(map[string]bool) + + // Loop over publish permissions + for _, pubSubject := range user.Permissions.Publish { + sub := &subscription{subject: []byte(pubSubject)} + c.perms.pub.Insert(sub) + } + + // Loop over subscribe permissions + for _, subSubject := range user.Permissions.Subscribe { + sub := &subscription{subject: []byte(subSubject)} + c.perms.sub.Insert(sub) + } +} + func (c *client) readLoop() { // Grab the connection off the client, it will be cleared on a close. // We check for that after the loop, but want to avoid a nil dereference @@ -361,7 +405,13 @@ func (c *client) authTimeout() { } func (c *client) authViolation() { - c.Errorf(ErrAuthorization.Error()) + if c.srv != nil && c.srv.opts.Users != nil { + c.Errorf("%s - User %q", + ErrAuthorization.Error(), + c.opts.Username) + } else { + c.Errorf(ErrAuthorization.Error()) + } c.sendErr("Authorization Violation") c.closeConnection() } @@ -591,6 +641,17 @@ func (c *client) processSub(argo []byte) (err error) { return nil } + // Check permissions if applicable. + if c.perms != nil { + r := c.perms.sub.Match(string(sub.subject)) + if len(r.psubs) == 0 { + c.mu.Unlock() + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) + c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject) + return nil + } + } + // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. sid := string(sub.sid) @@ -814,9 +875,57 @@ func (c *client) processMsg(msg []byte) { if c.trace { c.traceMsg(msg) } + + // defintely + + // Disallow publish to _SYS.>, these are reserved for internals. + if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 && + c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' && + c.pa.subject[3] == 'S' && c.pa.subject[4] == '.' { + c.pubPermissionViolation(c.pa.subject) + return + } + + // Check if published subject is allowed if we have permissions in place. + if c.perms != nil { + allowed, ok := c.perms.pcache[string(c.pa.subject)] + if ok && !allowed { + c.pubPermissionViolation(c.pa.subject) + return + } + if !ok { + r := c.perms.pub.Match(string(c.pa.subject)) + notAllowed := len(r.psubs) == 0 + if notAllowed { + c.pubPermissionViolation(c.pa.subject) + c.perms.pcache[string(c.pa.subject)] = false + } else { + c.perms.pcache[string(c.pa.subject)] = true + } + // Prune if needed. + if len(c.perms.pcache) > maxPermCacheSize { + // Prune the permissions cache. Keeps us from unbounded growth. + r := 0 + for subject, _ := range c.perms.pcache { + delete(c.cache.results, subject) + r++ + if r > pruneSize { + break + } + } + } + // Return here to allow the pruning code to run if needed. + if notAllowed { + return + } + } + } + if c.opts.Verbose { c.sendOK() } + + // Mostly under testing scenarios. if srv == nil { return } @@ -838,6 +947,17 @@ func (c *client) processMsg(msg []byte) { subject := string(c.pa.subject) r = srv.sl.Match(subject) c.cache.results[subject] = r + if len(c.cache.results) > maxResultCacheSize { + // Prune the results cache. Keeps us from unbounded growth. + r := 0 + for subject, _ := range c.cache.results { + delete(c.cache.results, subject) + r++ + if r > pruneSize { + break + } + } + } } // Check for no interest, short circuit if so. @@ -932,6 +1052,11 @@ func (c *client) processMsg(msg []byte) { } } +func (c *client) pubPermissionViolation(subject []byte) { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subject)) + c.Errorf("Publish Violation - User %q, Subject %q", c.opts.Username, subject) +} + func (c *client) processPingTimer() { c.mu.Lock() defer c.mu.Unlock() diff --git a/server/configs/authorization.conf b/server/configs/authorization.conf new file mode 100644 index 00000000..2cf47a03 --- /dev/null +++ b/server/configs/authorization.conf @@ -0,0 +1,37 @@ +# Copyright 2016 Apcera Inc. All rights reserved. + +listen: 127.0.0.1:4222 + +authorization { + # Our role based permissions. + + # Superuser can do anything. + super_user = { + publish = "*" + subscribe = ">" + } + # Can do requests on foo or bar, and subscribe to anything + # that is a response to an _INBOX. + # + # Notice that authorization filters can be singletons or arrays. + req_pub_user = { + publish = ["req.foo", "req.bar"] + subscribe = "_INBOX.>" + } + + # Setup a default user that can subscribe to anything, but has + # no publish capabilities. + default_user = { + subscribe = "PUBLIC.>" + } + + # Default permissions if none presented. e.g. susan below. + default_permissions: $default_user + + # Users listed with persmissions. + users = [ + {user: alice, password: foo, permissions: $super_user} + {user: bob, password: bar, permissions: $req_pub_user} + {user: susan, password: baz} + ] +} diff --git a/server/errors.go b/server/errors.go index 1387ebb1..3354d3f3 100644 --- a/server/errors.go +++ b/server/errors.go @@ -1,19 +1,22 @@ -// Copyright 2012 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package server import "errors" var ( - // ErrConnectionClosed represents error condition on a closed connection. + // ErrConnectionClosed represents an error condition on a closed connection. ErrConnectionClosed = errors.New("Connection Closed") - // ErrAuthorization represents error condition on failed authorization. + // ErrAuthorization represents an error condition on failed authorization. ErrAuthorization = errors.New("Authorization Error") - // ErrAuthTimeout represents error condition on failed authorization due to timeout. + // ErrAuthTimeout represents an error condition on failed authorization due to timeout. ErrAuthTimeout = errors.New("Authorization Timeout") - // ErrMaxPayload represents error condition when the payload is too big. + // ErrMaxPayload represents an error condition when the payload is too big. ErrMaxPayload = errors.New("Maximum Payload Exceeded") + + // ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.> + ErrReservedPublishSubject = errors.New("Reserved Internal Subject") ) diff --git a/server/opts.go b/server/opts.go index 30d6aa5d..56ff4327 100644 --- a/server/opts.go +++ b/server/opts.go @@ -5,7 +5,6 @@ package server import ( "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "io/ioutil" "net" @@ -20,8 +19,16 @@ import ( // For multiple accounts/users. type User struct { - Username string `json:"user"` - Password string `json:"password"` + Username string `json:"user"` + Password string `json:"password"` + Permissions *Permissions `json:"permissions"` +} + +// Authorization are the allowed subjects on a per +// publish or subscribe basis. +type Permissions struct { + Publish []string `json:"publish"` + Subscribe []string `json:"subscribe"` } // Options block for gnatsd server. @@ -34,7 +41,7 @@ type Options struct { NoSigs bool `json:"-"` Logtime bool `json:"-"` MaxConn int `json:"max_connections"` - Users []User `json:"-"` + Users []*User `json:"-"` Username string `json:"-"` Password string `json:"-"` Authorization string `json:"-"` @@ -71,13 +78,15 @@ type Options struct { TLSConfig *tls.Config `json:"-"` } +// Configuration file authorization section. type authorization struct { // Singles user string pass string // Multiple Users - users []User - timeout float64 + users []*User + timeout float64 + defaultPermissions *Permissions } // TLSConfigOpts holds the parsed tls config information, @@ -325,17 +334,134 @@ func parseAuthorization(am map[string]interface{}) (*authorization, error) { } auth.timeout = at case "users": - b, _ := json.Marshal(mv) - users := []User{} - if err := json.Unmarshal(b, &users); err != nil { - return nil, fmt.Errorf("Could not parse user array properly, %v", err) + users, err := parseUsers(mv) + if err != nil { + return nil, err } auth.users = users + case "default_permission", "default_permissions": + pm, ok := mv.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("Expected default permissions to be a map/struct, got %+v", mv) + } + permissions, err := parseUserPermissions(pm) + if err != nil { + return nil, err + } + auth.defaultPermissions = permissions } + + // Now check for permission defaults with multiple users, etc. + if auth.users != nil && auth.defaultPermissions != nil { + for _, user := range auth.users { + if user.Permissions == nil { + user.Permissions = auth.defaultPermissions + } + } + } + } return auth, nil } +// Helper function to parse multiple users array with optional permissions. +func parseUsers(mv interface{}) ([]*User, error) { + // Make sure we have an array + uv, ok := mv.([]interface{}) + if !ok { + return nil, fmt.Errorf("Expected users field to be an array, got %v", mv) + } + users := []*User{} + for _, u := range uv { + // Check its a map/struct + um, ok := u.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("Expected user entry to be a map/struct, got %v", u) + } + user := &User{} + for k, v := range um { + switch strings.ToLower(k) { + case "user", "username": + user.Username = v.(string) + case "pass", "password": + user.Password = v.(string) + case "permission", "permissions", "authroization": + pm, ok := v.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("Expected user permissions to be a map/struct, got %+v", v) + } + permissions, err := parseUserPermissions(pm) + if err != nil { + return nil, err + } + user.Permissions = permissions + } + } + // Check to make sure we have at least username and password + if user.Username == "" || user.Password == "" { + return nil, fmt.Errorf("User entry requires a user and a password") + } + users = append(users, user) + } + return users, nil +} + +// Helper function to parse user/account permissions +func parseUserPermissions(pm map[string]interface{}) (*Permissions, error) { + p := &Permissions{} + for k, v := range pm { + switch strings.ToLower(k) { + case "pub", "publish": + subjects, err := parseSubjects(v) + if err != nil { + return nil, err + } + p.Publish = subjects + case "sub", "subscribe": + subjects, err := parseSubjects(v) + if err != nil { + return nil, err + } + p.Subscribe = subjects + default: + return nil, fmt.Errorf("Unknown field %s parsing permissions", k) + } + } + return p, nil +} + +// Helper function to parse subject singeltons and/or arrays +func parseSubjects(v interface{}) ([]string, error) { + var subjects []string + switch v.(type) { + case string: + subjects = append(subjects, v.(string)) + case []string: + subjects = v.([]string) + case []interface{}: + for _, i := range v.([]interface{}) { + subject, ok := i.(string) + if !ok { + return nil, fmt.Errorf("Subject in permissions array cannot be cast to string") + } + subjects = append(subjects, subject) + } + default: + return nil, fmt.Errorf("Expected subject permissions to be a subject, or array of subjects, got %T", v) + } + return checkSubjectArray(subjects) +} + +// Helper function to validate subjects, etc for account permissioning. +func checkSubjectArray(sa []string) ([]string, error) { + for _, s := range sa { + if !IsValidSubject(s) { + return nil, fmt.Errorf("Subject %q is not a valid subject", s) + } + } + return sa, nil +} + // PrintTLSHelpAndDie prints TLS usage and exits. func PrintTLSHelpAndDie() { fmt.Printf("%s\n", tlsUsage) diff --git a/server/opts_test.go b/server/opts_test.go index e83c279e..18adf2cf 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -397,3 +397,90 @@ func TestMultipleUsersConfig(t *testing.T) { } processOptions(opts) } + +// Test highly depends on contents of the config file listed below. Any changes to that file +// may very well break this test. +func TestAuthorizationConfig(t *testing.T) { + opts, err := ProcessConfigFile("./configs/authorization.conf") + if err != nil { + t.Fatalf("Received an error reading config file: %v\n", err) + } + processOptions(opts) + lu := len(opts.Users) + if lu != 3 { + t.Fatalf("Expected 3 users, got %d\n", lu) + } + // Build a map + mu := make(map[string]*User) + for _, u := range opts.Users { + mu[u.Username] = u + } + + // Alice + alice, ok := mu["alice"] + if !ok { + t.Fatalf("Expected to see user Alice\n") + } + // Check for permissions details + if alice.Permissions == nil { + t.Fatalf("Expected Alice's permissions to be non-nil\n") + } + if alice.Permissions.Publish == nil { + t.Fatalf("Expected Alice's publish permissions to be non-nil\n") + } + if len(alice.Permissions.Publish) != 1 { + t.Fatalf("Expected Alice's publish permissions to have 1 element, got %d\n", + len(alice.Permissions.Publish)) + } + pubPerm := alice.Permissions.Publish[0] + if pubPerm != "*" { + t.Fatalf("Expected Alice's publish permissions to be '*', got %q\n", pubPerm) + } + if alice.Permissions.Subscribe == nil { + t.Fatalf("Expected Alice's subscribe permissions to be non-nil\n") + } + if len(alice.Permissions.Subscribe) != 1 { + t.Fatalf("Expected Alice's subscribe permissions to have 1 element, got %d\n", + len(alice.Permissions.Subscribe)) + } + subPerm := alice.Permissions.Subscribe[0] + if subPerm != ">" { + t.Fatalf("Expected Alice's subscribe permissions to be '>', got %q\n", subPerm) + } + + // Bob + bob, ok := mu["bob"] + if !ok { + t.Fatalf("Expected to see user Bob\n") + } + if bob.Permissions == nil { + t.Fatalf("Expected Bob's permissions to be non-nil\n") + } + + // Susan + susan, ok := mu["susan"] + if !ok { + t.Fatalf("Expected to see user Susan\n") + } + if susan.Permissions == nil { + t.Fatalf("Expected Susan's permissions to be non-nil\n") + } + // Check susan closely since she inherited the default permissions. + if susan.Permissions == nil { + t.Fatalf("Expected Susan's permissions to be non-nil\n") + } + if susan.Permissions.Publish != nil { + t.Fatalf("Expected Susan's publish permissions to be nil\n") + } + if susan.Permissions.Subscribe == nil { + t.Fatalf("Expected Susan's subscribe permissions to be non-nil\n") + } + if len(susan.Permissions.Subscribe) != 1 { + t.Fatalf("Expected Susan's subscribe permissions to have 1 element, got %d\n", + len(susan.Permissions.Subscribe)) + } + subPerm = susan.Permissions.Subscribe[0] + if subPerm != "PUBLIC.>" { + t.Fatalf("Expected Susan's subscribe permissions to be 'PUBLIC.>', got %q\n", subPerm) + } +} diff --git a/server/pse/pse_windows.go b/server/pse/pse_windows.go index 40cf1293..0f727426 100644 --- a/server/pse/pse_windows.go +++ b/server/pse/pse_windows.go @@ -119,7 +119,7 @@ func getCounterArrayData(counter PDH_HCOUNTER) ([]float64, error) { var bufCount uint32 // Retrieving array data requires two calls, the first which - // requires an adressable empty buffer, and sets size fields. + // requires an addressable empty buffer, and sets size fields. // The second call returns the data. initialBuf := make([]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, 1) ret := pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &initialBuf[0]) diff --git a/server/server.go b/server/server.go index 666b005e..683d2da1 100644 --- a/server/server.go +++ b/server/server.go @@ -516,8 +516,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Check for Auth if authRequired { - ttl := secondsToDuration(s.opts.AuthTimeout) - c.setAuthTimer(ttl) + c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } // Send our information. diff --git a/server/sublist.go b/server/sublist.go index fd4cc12f..1fdd24b1 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -565,7 +565,29 @@ func visitLevel(l *level, depth int) int { return maxDepth } -// IsValidLiteralSubject returns true if a subject is valid, false otherwise +// IsValidSubject returns true if a subject is valid, false otherwise +func IsValidSubject(subject string) bool { + if subject == "" { + return false + } + sfwc := false + tokens := strings.Split(string(subject), tsep) + for _, t := range tokens { + if len(t) == 0 || sfwc { + return false + } + if len(t) > 1 { + continue + } + switch t[0] { + case fwc: + sfwc = true + } + } + return true +} + +// IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise func IsValidLiteralSubject(subject string) bool { tokens := strings.Split(string(subject), tsep) for _, t := range tokens { diff --git a/test/auth_test.go b/test/auth_test.go index c360a9e5..874e2258 100644 --- a/test/auth_test.go +++ b/test/auth_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package test diff --git a/test/bench_test.go b/test/bench_test.go index 6f3558b9..16faaaeb 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -75,41 +75,41 @@ func sizedString(sz int) string { // Publish subject for pub benchmarks. var psub = "a" -func Benchmark____PubNo_Payload(b *testing.B) { +func Benchmark_____PubNo_Payload(b *testing.B) { benchPub(b, psub, "") } -func Benchmark____Pub8b_Payload(b *testing.B) { +func Benchmark_____Pub8b_Payload(b *testing.B) { b.StopTimer() s := sizedString(8) benchPub(b, psub, s) } -func Benchmark___Pub32b_Payload(b *testing.B) { +func Benchmark____Pub32b_Payload(b *testing.B) { b.StopTimer() s := sizedString(32) benchPub(b, psub, s) } -func Benchmark__Pub256B_Payload(b *testing.B) { +func Benchmark___Pub256B_Payload(b *testing.B) { b.StopTimer() s := sizedString(256) benchPub(b, psub, s) } -func Benchmark____Pub1K_Payload(b *testing.B) { +func Benchmark_____Pub1K_Payload(b *testing.B) { b.StopTimer() s := sizedString(1024) benchPub(b, psub, s) } -func Benchmark____Pub4K_Payload(b *testing.B) { +func Benchmark_____Pub4K_Payload(b *testing.B) { b.StopTimer() s := sizedString(4 * 1024) benchPub(b, psub, s) } -func Benchmark____Pub8K_Payload(b *testing.B) { +func Benchmark_____Pub8K_Payload(b *testing.B) { b.StopTimer() s := sizedString(8 * 1024) benchPub(b, psub, s) @@ -137,7 +137,33 @@ func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) { ch <- true } -func Benchmark___________PubSub(b *testing.B) { +// Benchmark the authorization code path. +func Benchmark_AuthPubNo_Payload(b *testing.B) { + b.StopTimer() + + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + c := createClientConn(b, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(b, c) + + cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench", DefaultPass) + sendProto(b, c, cs) + + bw := bufio.NewWriterSize(c, defaultSendBufSize) + sendOp := []byte("PUB a 0\r\n\r\n") + b.SetBytes(int64(len(sendOp))) + b.StartTimer() + for i := 0; i < b.N; i++ { + bw.Write(sendOp) + } + bw.Flush() + flushConnection(b, c) + b.StopTimer() +} + +func Benchmark____________PubSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -169,7 +195,7 @@ func Benchmark___________PubSub(b *testing.B) { s.Shutdown() } -func Benchmark___PubSubTwoConns(b *testing.B) { +func Benchmark____PubSubTwoConns(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -205,7 +231,7 @@ func Benchmark___PubSubTwoConns(b *testing.B) { s.Shutdown() } -func Benchmark___PubTwoQueueSub(b *testing.B) { +func Benchmark____PubTwoQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -238,7 +264,7 @@ func Benchmark___PubTwoQueueSub(b *testing.B) { s.Shutdown() } -func Benchmark__PubFourQueueSub(b *testing.B) { +func Benchmark___PubFourQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -273,7 +299,7 @@ func Benchmark__PubFourQueueSub(b *testing.B) { s.Shutdown() } -func Benchmark_PubEightQueueSub(b *testing.B) { +func Benchmark__PubEightQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) diff --git a/test/client_auth_test.go b/test/client_auth_test.go new file mode 100644 index 00000000..7c8890c7 --- /dev/null +++ b/test/client_auth_test.go @@ -0,0 +1,50 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +package test + +import ( + "fmt" + "testing" + + "github.com/nats-io/nats" +) + +func TestMultipleUserAuth(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/multi_user.conf") + defer srv.Shutdown() + + if opts.Users == nil { + t.Fatal("Expected a user array that is not nil") + } + if len(opts.Users) != 2 { + t.Fatal("Expected a user array that had 2 users") + } + + // Test first user + url := fmt.Sprintf("nats://%s:%s@%s:%d/", + opts.Users[0].Username, + opts.Users[0].Password, + opts.Host, opts.Port) + + nc, err := nats.Connect(url) + if err != nil { + t.Fatalf("Expected a successful connect, got %v\n", err) + } + defer nc.Close() + + if !nc.AuthRequired() { + t.Fatal("Expected auth to be required for the server") + } + + // Test second user + url = fmt.Sprintf("nats://%s:%s@%s:%d/", + opts.Users[1].Username, + opts.Users[1].Password, + opts.Host, opts.Port) + + nc, err = nats.Connect(url) + if err != nil { + t.Fatalf("Expected a successful connect, got %v\n", err) + } + defer nc.Close() +} diff --git a/test/configs/authorization.conf b/test/configs/authorization.conf new file mode 100644 index 00000000..7b87be44 --- /dev/null +++ b/test/configs/authorization.conf @@ -0,0 +1,44 @@ +# Copyright 2016 Apcera Inc. All rights reserved. + +listen: 127.0.0.1:2442 + +authorization { + # Our role based permissions. + + # Admin can do anything. + ADMIN = { + publish = ">" + subscribe = ">" + } + + # Can do requests on req.foo or req.bar, and subscribe to anything + # that is a response, e.g. _INBOX.* + # + # Notice that authorization filters can be singletons or arrays. + REQUESTOR = { + publish = ["req.foo", "req.bar"] + subscribe = "_INBOX.*" + } + + # Default permissions if none presented. e.g. Joe below. + DEFAULT_PERMISSIONS = { + publish = "SANDBOX.*" + subscribe = ["PUBLIC.>", "_INBOX.>"] + } + + # This is to benchmark pub performance. + BENCH = { + publish = "a" + } + + # Just foo for testing + PASS: $2a$10$UHR6GhotWhpLsKtVP0/i6.Nh9.fuY73cWjLoJjb2sKT8KISBcUW5q + + # Users listed with permissions. + users = [ + {user: alice, password: $PASS, permissions: $ADMIN} + {user: bob, password: $PASS, permissions: $REQUESTOR} + {user: bench, password: $PASS, permissions: $BENCH} + {user: joe, password: $PASS} + ] +} diff --git a/test/user_authorization_test.go b/test/user_authorization_test.go new file mode 100644 index 00000000..793ccb70 --- /dev/null +++ b/test/user_authorization_test.go @@ -0,0 +1,91 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +package test + +import ( + "regexp" + "testing" +) + +const DefaultPass = "foo" + +var permErrRe = regexp.MustCompile(`\A\-ERR\s+'Permissions Violation([^\r\n]+)\r\n`) + +func TestUserAuthorizationProto(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + // Alice can do anything, check a few for OK result. + c := createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "alice", DefaultPass) + expectResult(t, c, okRe) + sendProto(t, c, "PUB foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "SUB foo 1\r\n") + expectResult(t, c, okRe) + + // Check that we now reserve _SYS.> though for internal, so no clients. + sendProto(t, c, "PUB _SYS.HB 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + // Check that _ is ok + sendProto(t, c, "PUB _ 2\r\nok\r\n") + expectResult(t, c, okRe) + + c.Close() + + // Bob is a requestor only, e.g. req.foo, req.bar for publish, subscribe only to INBOXes. + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "bob", DefaultPass) + expectResult(t, c, okRe) + + // These should error. + sendProto(t, c, "SUB foo 1\r\n") + expectResult(t, c, permErrRe) + sendProto(t, c, "PUB foo 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + // These should work ok. + sendProto(t, c, "SUB _INBOX.abcd 1\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB req.foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB req.bar 2\r\nok\r\n") + expectResult(t, c, okRe) + c.Close() + + // Joe is a default user + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "joe", DefaultPass) + expectResult(t, c, okRe) + + // These should error. + sendProto(t, c, "SUB foo.bar.* 1\r\n") + expectResult(t, c, permErrRe) + sendProto(t, c, "PUB foo.bar.baz 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + // These should work ok. + sendProto(t, c, "SUB _INBOX.abcd 1\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "SUB PUBLIC.abcd 1\r\n") + expectResult(t, c, okRe) + + sendProto(t, c, "PUB SANDBOX.foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB SANDBOX.bar 2\r\nok\r\n") + expectResult(t, c, okRe) + + // Since only PWC, this should fail (too many tokens). + sendProto(t, c, "PUB SANDBOX.foo.bar 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + c.Close() + +}