From 1d74a0dea38e7491fc8618c66a0d5f7108a1e902 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 9 Jun 2016 12:16:16 -0700 Subject: [PATCH 1/9] First pass authorization parser --- server/opts.go | 22 ++++++++++++++++++++-- server/opts_test.go | 8 ++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/server/opts.go b/server/opts.go index 30d6aa5d..4952881c 100644 --- a/server/opts.go +++ b/server/opts.go @@ -20,8 +20,26 @@ import ( // For multiple accounts/users. type User struct { - Username string `json:"user"` - Password string `json:"password"` + Username string `json:"user"` + Password string `json:"password"` + Permissions Authorization `json:"permissions"` + MaxConns int `json:"max_connections"` + MaxSubs int `json:"max_subscriptions"` +} + +// Authorization are the allowed subjects on a per +// publish or subscribe basis. +type Authorization struct { + pub *Permission `json:"publish"` + sub *Permission `json:"subscribe"` +} + +// Permission is for describing the subjects and rate limits +// that an account connection can publish or subscribe to and +// what limits if any exist for message and/or byte rates. +type Permission struct { + Subjects []string `json:"subjects"` + // FIXME(dlc) figure out rates. } // Options block for gnatsd server. diff --git a/server/opts_test.go b/server/opts_test.go index e83c279e..43b8326f 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -397,3 +397,11 @@ func TestMultipleUsersConfig(t *testing.T) { } processOptions(opts) } + +func TestAuthorizationConfig(t *testing.T) { + opts, err := ProcessConfigFile("./configs/authorization.conf") + if err != nil { + t.Fatalf("Received an error reading config file: %v\n", err) + } + processOptions(opts) +} From 449aa6e8efd56496169d601f423f7a7e91c940ef Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 16 Jun 2016 13:49:26 -0700 Subject: [PATCH 2/9] Finalized opts parser for subject authorization --- server/configs/authorization.conf | 37 +++++++ server/opts.go | 156 +++++++++++++++++++++++++----- server/opts_test.go | 75 ++++++++++++++ server/sublist.go | 24 ++++- 4 files changed, 267 insertions(+), 25 deletions(-) create mode 100644 server/configs/authorization.conf diff --git a/server/configs/authorization.conf b/server/configs/authorization.conf new file mode 100644 index 00000000..2cf47a03 --- /dev/null +++ b/server/configs/authorization.conf @@ -0,0 +1,37 @@ +# Copyright 2016 Apcera Inc. All rights reserved. + +listen: 127.0.0.1:4222 + +authorization { + # Our role based permissions. + + # Superuser can do anything. + super_user = { + publish = "*" + subscribe = ">" + } + # Can do requests on foo or bar, and subscribe to anything + # that is a response to an _INBOX. + # + # Notice that authorization filters can be singletons or arrays. + req_pub_user = { + publish = ["req.foo", "req.bar"] + subscribe = "_INBOX.>" + } + + # Setup a default user that can subscribe to anything, but has + # no publish capabilities. + default_user = { + subscribe = "PUBLIC.>" + } + + # Default permissions if none presented. e.g. susan below. + default_permissions: $default_user + + # Users listed with persmissions. + users = [ + {user: alice, password: foo, permissions: $super_user} + {user: bob, password: bar, permissions: $req_pub_user} + {user: susan, password: baz} + ] +} diff --git a/server/opts.go b/server/opts.go index 4952881c..bef89cb8 100644 --- a/server/opts.go +++ b/server/opts.go @@ -5,7 +5,6 @@ package server import ( "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "io/ioutil" "net" @@ -20,26 +19,16 @@ import ( // For multiple accounts/users. type User struct { - Username string `json:"user"` - Password string `json:"password"` - Permissions Authorization `json:"permissions"` - MaxConns int `json:"max_connections"` - MaxSubs int `json:"max_subscriptions"` + Username string `json:"user"` + Password string `json:"password"` + Permissions *Permissions `json:"permissions"` } // Authorization are the allowed subjects on a per // publish or subscribe basis. -type Authorization struct { - pub *Permission `json:"publish"` - sub *Permission `json:"subscribe"` -} - -// Permission is for describing the subjects and rate limits -// that an account connection can publish or subscribe to and -// what limits if any exist for message and/or byte rates. -type Permission struct { - Subjects []string `json:"subjects"` - // FIXME(dlc) figure out rates. +type Permissions struct { + Publish []string `json:"publish"` + Subscribe []string `json:"subscribe"` } // Options block for gnatsd server. @@ -52,7 +41,7 @@ type Options struct { NoSigs bool `json:"-"` Logtime bool `json:"-"` MaxConn int `json:"max_connections"` - Users []User `json:"-"` + Users []*User `json:"-"` Username string `json:"-"` Password string `json:"-"` Authorization string `json:"-"` @@ -89,13 +78,15 @@ type Options struct { TLSConfig *tls.Config `json:"-"` } +// Configuration file quthorization section. type authorization struct { // Singles user string pass string // Multiple Users - users []User - timeout float64 + users []*User + timeout float64 + defaultPermissions *Permissions } // TLSConfigOpts holds the parsed tls config information, @@ -343,17 +334,134 @@ func parseAuthorization(am map[string]interface{}) (*authorization, error) { } auth.timeout = at case "users": - b, _ := json.Marshal(mv) - users := []User{} - if err := json.Unmarshal(b, &users); err != nil { - return nil, fmt.Errorf("Could not parse user array properly, %v", err) + users, err := parseUsers(mv) + if err != nil { + return nil, err } auth.users = users + case "default_permission", "default_permissions": + pm, ok := mv.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("Expected default permissions to be a map/struct, got %+v", mv) + } + permissions, err := parseUserPermissions(pm) + if err != nil { + return nil, err + } + auth.defaultPermissions = permissions } + + // Now check for permission defaults with multiple users, etc. + if auth.users != nil && auth.defaultPermissions != nil { + for _, user := range auth.users { + if user.Permissions == nil { + user.Permissions = auth.defaultPermissions + } + } + } + } return auth, nil } +// Helper function to parse multiple users array with optional permissions. +func parseUsers(mv interface{}) ([]*User, error) { + // Make sure we have an array + uv, ok := mv.([]interface{}) + if !ok { + return nil, fmt.Errorf("Expected users field to be an array, got %v", mv) + } + users := []*User{} + for _, u := range uv { + // Check its a map/struct + um, ok := u.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("Expected user entry to be a map/struct, got %v", u) + } + user := &User{} + for k, v := range um { + switch strings.ToLower(k) { + case "user", "username": + user.Username = v.(string) + case "pass", "password": + user.Password = v.(string) + case "permission", "permissions", "authroization": + pm, ok := v.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("Expected user permissions to be a map/struct, got %+v", v) + } + permissions, err := parseUserPermissions(pm) + if err != nil { + return nil, err + } + user.Permissions = permissions + } + } + // Check to make sure we have at least username and password + if user.Username == "" || user.Password == "" { + return nil, fmt.Errorf("User entry requires a user and a password") + } + users = append(users, user) + } + return users, nil +} + +// Helper function to parse user/account permissions +func parseUserPermissions(pm map[string]interface{}) (*Permissions, error) { + p := &Permissions{} + for k, v := range pm { + switch strings.ToLower(k) { + case "pub", "publish": + subjects, err := parseSubjects(v) + if err != nil { + return nil, err + } + p.Publish = subjects + case "sub", "subscribe": + subjects, err := parseSubjects(v) + if err != nil { + return nil, err + } + p.Subscribe = subjects + default: + return nil, fmt.Errorf("Unknown field %s parsing permissions", k) + } + } + return p, nil +} + +// Helper function to parse subject singeltons and/or arrays +func parseSubjects(v interface{}) ([]string, error) { + var subjects []string + switch v.(type) { + case string: + subjects = append(subjects, v.(string)) + case []string: + subjects = v.([]string) + case []interface{}: + for _, i := range v.([]interface{}) { + subject, ok := i.(string) + if !ok { + return nil, fmt.Errorf("Subject in permissions array can not be cast to string") + } + subjects = append(subjects, subject) + } + default: + return nil, fmt.Errorf("Expected subject permissions to be a subject, or array of subjects, got %T", v) + } + return checkSubjectArray(subjects) +} + +// Helper function to validate subjects, etc for account permissioning. +func checkSubjectArray(sa []string) ([]string, error) { + for _, s := range sa { + if !IsValidSubject(s) { + return nil, fmt.Errorf("Subject %q is not a valid subject", s) + } + } + return sa, nil +} + // PrintTLSHelpAndDie prints TLS usage and exits. func PrintTLSHelpAndDie() { fmt.Printf("%s\n", tlsUsage) diff --git a/server/opts_test.go b/server/opts_test.go index 43b8326f..f6b5bc46 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -398,10 +398,85 @@ func TestMultipleUsersConfig(t *testing.T) { processOptions(opts) } +// Test highly depends on contents of the config file listed below. Any changes to that file +// may very weel break this test. func TestAuthorizationConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/authorization.conf") if err != nil { t.Fatalf("Received an error reading config file: %v\n", err) } processOptions(opts) + lu := len(opts.Users) + if lu != 3 { + t.Fatalf("Expected 3 users, got %d\n", lu) + } + // Build a map + mu := make(map[string]*User) + for _, u := range opts.Users { + mu[u.Username] = u + } + alice, ok := mu["alice"] + if !ok { + t.Fatalf("Expected to see user Alice\n") + } + // Check for permissions details + if alice.Permissions == nil { + t.Fatalf("Expected Alice's permissions to be non-nil\n") + } + if alice.Permissions.Publish == nil { + t.Fatalf("Expected Alice's publish permissions to be non-nil\n") + } + if len(alice.Permissions.Publish) != 1 { + t.Fatalf("Expected Alice's publish permissions to have 1 element, got %d\n", + len(alice.Permissions.Publish)) + } + pubPerm := alice.Permissions.Publish[0] + if pubPerm != "*" { + t.Fatalf("Expected Alice's publish permissions to be '*', got %q\n", pubPerm) + } + if alice.Permissions.Subscribe == nil { + t.Fatalf("Expected Alice's subscribe permissions to be non-nil\n") + } + if len(alice.Permissions.Subscribe) != 1 { + t.Fatalf("Expected Alice's subscribe permissions to have 1 element, got %d\n", + len(alice.Permissions.Subscribe)) + } + subPerm := alice.Permissions.Subscribe[0] + if subPerm != ">" { + t.Fatalf("Expected Alice's subscribe permissions to be '>', got %q\n", subPerm) + } + + bob, ok := mu["bob"] + if !ok { + t.Fatalf("Expected to see user Bob\n") + } + if bob.Permissions == nil { + t.Fatalf("Expected Bob's permissions to be non-nil\n") + } + + susan, ok := mu["susan"] + if !ok { + t.Fatalf("Expected to see user Susan\n") + } + if susan.Permissions == nil { + t.Fatalf("Expected Susan's permissions to be non-nil\n") + } + // Check susan closely since she inherited the default permissions. + if susan.Permissions == nil { + t.Fatalf("Expected Susan's permissions to be non-nil\n") + } + if susan.Permissions.Publish != nil { + t.Fatalf("Expected Susan's publish permissions to be nil\n") + } + if susan.Permissions.Subscribe == nil { + t.Fatalf("Expected Susan's subscribe permissions to be non-nil\n") + } + if len(susan.Permissions.Subscribe) != 1 { + t.Fatalf("Expected Susan's subscribe permissions to have 1 element, got %d\n", + len(susan.Permissions.Subscribe)) + } + subPerm = susan.Permissions.Subscribe[0] + if subPerm != "PUBLIC.>" { + t.Fatalf("Expected Susan's subscribe permissions to be 'PUBLIC.>', got %q\n", subPerm) + } } diff --git a/server/sublist.go b/server/sublist.go index fd4cc12f..1fdd24b1 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -565,7 +565,29 @@ func visitLevel(l *level, depth int) int { return maxDepth } -// IsValidLiteralSubject returns true if a subject is valid, false otherwise +// IsValidSubject returns true if a subject is valid, false otherwise +func IsValidSubject(subject string) bool { + if subject == "" { + return false + } + sfwc := false + tokens := strings.Split(string(subject), tsep) + for _, t := range tokens { + if len(t) == 0 || sfwc { + return false + } + if len(t) > 1 { + continue + } + switch t[0] { + case fwc: + sfwc = true + } + } + return true +} + +// IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise func IsValidLiteralSubject(subject string) bool { tokens := strings.Split(string(subject), tsep) for _, t := range tokens { From 7730fac9e24d0016d4a1b9eb001461acd62ddb6f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 16 Jun 2016 17:07:25 -0700 Subject: [PATCH 3/9] Authorization enforcement and performance tweaks --- auth/multiuser.go | 15 +++-- server/auth.go | 2 + server/client.go | 109 ++++++++++++++++++++++++++++++++ server/errors.go | 13 ++-- server/opts_test.go | 4 ++ server/server.go | 3 +- test/auth_test.go | 2 +- test/bench_test.go | 50 +++++++++++---- test/client_auth_test.go | 50 +++++++++++++++ test/configs/authorization.conf | 44 +++++++++++++ test/user_authorization_test.go | 86 +++++++++++++++++++++++++ 11 files changed, 353 insertions(+), 25 deletions(-) create mode 100644 test/client_auth_test.go create mode 100644 test/configs/authorization.conf create mode 100644 test/user_authorization_test.go diff --git a/auth/multiuser.go b/auth/multiuser.go index 0adb4142..1c419eda 100644 --- a/auth/multiuser.go +++ b/auth/multiuser.go @@ -10,14 +10,14 @@ import ( // Plain authentication is a basic username and password type MultiUser struct { - users map[string]string + users map[string]*server.User } // Create a new multi-user -func NewMultiUser(users []server.User) *MultiUser { - m := &MultiUser{users: make(map[string]string)} +func NewMultiUser(users []*server.User) *MultiUser { + m := &MultiUser{users: make(map[string]*server.User)} for _, u := range users { - m.users[u.Username] = u.Password + m.users[u.Username] = u } return m } @@ -25,10 +25,12 @@ func NewMultiUser(users []server.User) *MultiUser { // Check authenticates the client using a username and password against a list of multiple users. func (m *MultiUser) Check(c server.ClientAuth) bool { opts := c.GetOpts() - pass, ok := m.users[opts.Username] + user, ok := m.users[opts.Username] if !ok { return false } + pass := user.Password + // Check to see if the password is a bcrypt hash if isBcrypt(pass) { if err := bcrypt.CompareHashAndPassword([]byte(pass), []byte(opts.Password)); err != nil { @@ -37,5 +39,8 @@ func (m *MultiUser) Check(c server.ClientAuth) bool { } else if pass != opts.Password { return false } + + c.RegisterUser(user) + return true } diff --git a/server/auth.go b/server/auth.go index 92a2b8f9..34f0f01e 100644 --- a/server/auth.go +++ b/server/auth.go @@ -12,4 +12,6 @@ type Auth interface { type ClientAuth interface { // Get options associated with a client GetOpts() *clientOpts + // Optionally map a user after auth. + RegisterUser(*User) } diff --git a/server/client.go b/server/client.go index 70d932c9..42702345 100644 --- a/server/client.go +++ b/server/client.go @@ -53,6 +53,7 @@ type client struct { bw *bufio.Writer srv *Server subs map[string]*subscription + perms *permissions cache readCache pcd map[*client]struct{} atmr *time.Timer @@ -68,6 +69,18 @@ type client struct { trace bool } +type permissions struct { + sub *Sublist + pub *Sublist + pcache map[string]bool +} + +const ( + maxResultCacheSize = 512 + maxPermCacheSize = 32 + pruneSize = 16 +) + // Used in readloop to cache hot subject lookups and group statistics. type readCache struct { genid uint64 @@ -146,6 +159,36 @@ func (c *client) initClient() { } } +// RegisterUser allows auth to call back into a new client +// with the authenticated user. This is used to map any permissions +// into the client. +func (c *client) RegisterUser(user *User) { + if user.Permissions == nil { + return + } + + // Process Permissions and map into client connection structures. + c.mu.Lock() + defer c.mu.Unlock() + + c.perms = &permissions{} + c.perms.sub = NewSublist() + c.perms.pub = NewSublist() + c.perms.pcache = make(map[string]bool) + + // Loop over publish permissions + for _, pubSubject := range user.Permissions.Publish { + sub := &subscription{subject: []byte(pubSubject)} + c.perms.pub.Insert(sub) + } + + // Loop over subscribe permissions + for _, subSubject := range user.Permissions.Subscribe { + sub := &subscription{subject: []byte(subSubject)} + c.perms.sub.Insert(sub) + } +} + func (c *client) readLoop() { // Grab the connection off the client, it will be cleared on a close. // We check for that after the loop, but want to avoid a nil dereference @@ -591,6 +634,17 @@ func (c *client) processSub(argo []byte) (err error) { return nil } + // Check permissions if applicable. + if c.perms != nil && c.perms.sub != nil { + r := c.perms.sub.Match(string(sub.subject)) + if len(r.psubs) == 0 { + c.mu.Unlock() + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) + c.Debugf("Permissions Violation for Subcription to %q", sub.subject) + return nil + } + } + // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. sid := string(sub.sid) @@ -814,9 +868,53 @@ func (c *client) processMsg(msg []byte) { if c.trace { c.traceMsg(msg) } + + // Disallow publish to _SYS.>, these are reserved for internals. + if c.pa.subject[0] == '_' && c.pa.subject[1] == 'S' && + c.pa.subject[2] == 'Y' && c.pa.subject[3] == 'S' { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) + c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + return + } + + // Check if published subject is allowed if we have permissions in place. + if c.perms != nil && c.perms.pub != nil { + allowed, ok := c.perms.pcache[string(c.pa.subject)] + if ok && !allowed { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) + c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + return + } + if !ok { + r := c.perms.pub.Match(string(c.pa.subject)) + if len(r.psubs) == 0 { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) + c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + c.perms.pcache[string(c.pa.subject)] = false + return + } else { + c.perms.pcache[string(c.pa.subject)] = true + } + // Prune if needed. + if len(c.perms.pcache) > maxPermCacheSize { + // Prune the permissions cache. Keeps us from unbounded growth. + r := 0 + for subject, _ := range c.perms.pcache { + delete(c.cache.results, subject) + r++ + if r > pruneSize { + break + } + } + } + } + } + if c.opts.Verbose { c.sendOK() } + + // Mostly under testing scenarios. if srv == nil { return } @@ -838,6 +936,17 @@ func (c *client) processMsg(msg []byte) { subject := string(c.pa.subject) r = srv.sl.Match(subject) c.cache.results[subject] = r + if len(c.cache.results) > maxResultCacheSize { + // Prune the results cache. Keeps us from unbounded growth. + r := 0 + for subject, _ := range c.cache.results { + delete(c.cache.results, subject) + r++ + if r > pruneSize { + break + } + } + } } // Check for no interest, short circuit if so. diff --git a/server/errors.go b/server/errors.go index 1387ebb1..c972ee4b 100644 --- a/server/errors.go +++ b/server/errors.go @@ -1,19 +1,22 @@ -// Copyright 2012 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package server import "errors" var ( - // ErrConnectionClosed represents error condition on a closed connection. + // ErrConnectionClosed represents an error condition on a closed connection. ErrConnectionClosed = errors.New("Connection Closed") - // ErrAuthorization represents error condition on failed authorization. + // ErrAuthorization represents an error condition on failed authorization. ErrAuthorization = errors.New("Authorization Error") - // ErrAuthTimeout represents error condition on failed authorization due to timeout. + // ErrAuthTimeout represents an error condition on failed authorization due to timeout. ErrAuthTimeout = errors.New("Authorization Timeout") - // ErrMaxPayload represents error condition when the payload is too big. + // ErrMaxPayload represents an error condition when the payload is too big. ErrMaxPayload = errors.New("Maximum Payload Exceeded") + + // ErrReservedPublish represents an error condition when the payload is too big. + ErrReservedPublishSubject = errors.New("Reserved Internal Subject") ) diff --git a/server/opts_test.go b/server/opts_test.go index f6b5bc46..96d74623 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -415,6 +415,8 @@ func TestAuthorizationConfig(t *testing.T) { for _, u := range opts.Users { mu[u.Username] = u } + + // Alice alice, ok := mu["alice"] if !ok { t.Fatalf("Expected to see user Alice\n") @@ -446,6 +448,7 @@ func TestAuthorizationConfig(t *testing.T) { t.Fatalf("Expected Alice's subscribe permissions to be '>', got %q\n", subPerm) } + // Bob bob, ok := mu["bob"] if !ok { t.Fatalf("Expected to see user Bob\n") @@ -454,6 +457,7 @@ func TestAuthorizationConfig(t *testing.T) { t.Fatalf("Expected Bob's permissions to be non-nil\n") } + // Susan susan, ok := mu["susan"] if !ok { t.Fatalf("Expected to see user Susan\n") diff --git a/server/server.go b/server/server.go index 666b005e..683d2da1 100644 --- a/server/server.go +++ b/server/server.go @@ -516,8 +516,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Check for Auth if authRequired { - ttl := secondsToDuration(s.opts.AuthTimeout) - c.setAuthTimer(ttl) + c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } // Send our information. diff --git a/test/auth_test.go b/test/auth_test.go index c360a9e5..874e2258 100644 --- a/test/auth_test.go +++ b/test/auth_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package test diff --git a/test/bench_test.go b/test/bench_test.go index 6f3558b9..16faaaeb 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -75,41 +75,41 @@ func sizedString(sz int) string { // Publish subject for pub benchmarks. var psub = "a" -func Benchmark____PubNo_Payload(b *testing.B) { +func Benchmark_____PubNo_Payload(b *testing.B) { benchPub(b, psub, "") } -func Benchmark____Pub8b_Payload(b *testing.B) { +func Benchmark_____Pub8b_Payload(b *testing.B) { b.StopTimer() s := sizedString(8) benchPub(b, psub, s) } -func Benchmark___Pub32b_Payload(b *testing.B) { +func Benchmark____Pub32b_Payload(b *testing.B) { b.StopTimer() s := sizedString(32) benchPub(b, psub, s) } -func Benchmark__Pub256B_Payload(b *testing.B) { +func Benchmark___Pub256B_Payload(b *testing.B) { b.StopTimer() s := sizedString(256) benchPub(b, psub, s) } -func Benchmark____Pub1K_Payload(b *testing.B) { +func Benchmark_____Pub1K_Payload(b *testing.B) { b.StopTimer() s := sizedString(1024) benchPub(b, psub, s) } -func Benchmark____Pub4K_Payload(b *testing.B) { +func Benchmark_____Pub4K_Payload(b *testing.B) { b.StopTimer() s := sizedString(4 * 1024) benchPub(b, psub, s) } -func Benchmark____Pub8K_Payload(b *testing.B) { +func Benchmark_____Pub8K_Payload(b *testing.B) { b.StopTimer() s := sizedString(8 * 1024) benchPub(b, psub, s) @@ -137,7 +137,33 @@ func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) { ch <- true } -func Benchmark___________PubSub(b *testing.B) { +// Benchmark the authorization code path. +func Benchmark_AuthPubNo_Payload(b *testing.B) { + b.StopTimer() + + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + c := createClientConn(b, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(b, c) + + cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench", DefaultPass) + sendProto(b, c, cs) + + bw := bufio.NewWriterSize(c, defaultSendBufSize) + sendOp := []byte("PUB a 0\r\n\r\n") + b.SetBytes(int64(len(sendOp))) + b.StartTimer() + for i := 0; i < b.N; i++ { + bw.Write(sendOp) + } + bw.Flush() + flushConnection(b, c) + b.StopTimer() +} + +func Benchmark____________PubSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -169,7 +195,7 @@ func Benchmark___________PubSub(b *testing.B) { s.Shutdown() } -func Benchmark___PubSubTwoConns(b *testing.B) { +func Benchmark____PubSubTwoConns(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -205,7 +231,7 @@ func Benchmark___PubSubTwoConns(b *testing.B) { s.Shutdown() } -func Benchmark___PubTwoQueueSub(b *testing.B) { +func Benchmark____PubTwoQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -238,7 +264,7 @@ func Benchmark___PubTwoQueueSub(b *testing.B) { s.Shutdown() } -func Benchmark__PubFourQueueSub(b *testing.B) { +func Benchmark___PubFourQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) @@ -273,7 +299,7 @@ func Benchmark__PubFourQueueSub(b *testing.B) { s.Shutdown() } -func Benchmark_PubEightQueueSub(b *testing.B) { +func Benchmark__PubEightQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() c := createClientConn(b, "localhost", PERF_PORT) diff --git a/test/client_auth_test.go b/test/client_auth_test.go new file mode 100644 index 00000000..e784f429 --- /dev/null +++ b/test/client_auth_test.go @@ -0,0 +1,50 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +package test + +import ( + "fmt" + "testing" + + "github.com/nats-io/nats" +) + +func TestMultipleUserAuth(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/multi_user.conf") + defer srv.Shutdown() + + if opts.Users == nil { + t.Fatal("Expected a user array that is not nil") + } + if len(opts.Users) != 2 { + t.Fatal("Expected a user array that had 2 users") + } + + // Test first user + url := fmt.Sprintf("nats://%s:%s@%s:%d/", + opts.Users[0].Username, + opts.Users[0].Password, + opts.Host, opts.Port) + + nc, err := nats.Connect(url) + if err != nil { + t.Fatalf("Expected a succesful connect, got %v\n", err) + } + defer nc.Close() + + if !nc.AuthRequired() { + t.Fatal("Expected auth to be required for the server") + } + + // Test second user + url = fmt.Sprintf("nats://%s:%s@%s:%d/", + opts.Users[1].Username, + opts.Users[1].Password, + opts.Host, opts.Port) + + nc, err = nats.Connect(url) + if err != nil { + t.Fatalf("Expected a succesful connect, got %v\n", err) + } + defer nc.Close() +} diff --git a/test/configs/authorization.conf b/test/configs/authorization.conf new file mode 100644 index 00000000..3bbe15e0 --- /dev/null +++ b/test/configs/authorization.conf @@ -0,0 +1,44 @@ +# Copyright 2016 Apcera Inc. All rights reserved. + +listen: 127.0.0.1:2442 + +authorization { + # Our role based permissions. + + # Admin can do anything. + ADMIN = { + publish = ">" + subscribe = ">" + } + + # Can do requests on req.foo or req.bar, and subscribe to anything + # that is a response, e.g. _INBOX.* + # + # Notice that authorization filters can be singletons or arrays. + REQUESTOR = { + publish = ["req.foo", "req.bar"] + subscribe = "_INBOX.*" + } + + # Default permissions if none presented. e.g. Joe below. + DEFAULT_PERMISSIONS = { + publish = "SANDBOX.*" + subscribe = ["PUBLIC.>", "_INBOX.>"] + } + + # This is to benchmark pub performance. + BENCH = { + publish = "a" + } + + # Just foo for testing + PASS: $2a$10$UHR6GhotWhpLsKtVP0/i6.Nh9.fuY73cWjLoJjb2sKT8KISBcUW5q + + # Users listed with persmissions. + users = [ + {user: alice, password: $PASS, permissions: $ADMIN} + {user: bob, password: $PASS, permissions: $REQUESTOR} + {user: bench, password: $PASS, permissions: $BENCH} + {user: joe, password: $PASS} + ] +} diff --git a/test/user_authorization_test.go b/test/user_authorization_test.go new file mode 100644 index 00000000..59aa6f8c --- /dev/null +++ b/test/user_authorization_test.go @@ -0,0 +1,86 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +package test + +import ( + "regexp" + "testing" +) + +const DefaultPass = "foo" + +var permErrRe = regexp.MustCompile(`\A\-ERR\s+'Permissions Violation([^\r\n]+)\r\n`) + +func TestUserAuthorizationProto(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + // Alice can do anything, check a few for OK result. + c := createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "alice", DefaultPass) + expectResult(t, c, okRe) + sendProto(t, c, "PUB foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "SUB foo 1\r\n") + expectResult(t, c, okRe) + + // Check that we now reserve _SYS.> though for internal, so no clients. + sendProto(t, c, "PUB _SYS.HB 2\r\nok\r\n") + expectResult(t, c, permErrRe) + c.Close() + + // Bob is a requestor only, e.g. req.foo, req.bar for publish, subscribe only to INBOXes. + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "bob", DefaultPass) + expectResult(t, c, okRe) + + // These should error. + sendProto(t, c, "SUB foo 1\r\n") + expectResult(t, c, permErrRe) + sendProto(t, c, "PUB foo 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + // These should work ok. + sendProto(t, c, "SUB _INBOX.abcd 1\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB req.foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB req.bar 2\r\nok\r\n") + expectResult(t, c, okRe) + c.Close() + + // Joe is a default user + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "joe", DefaultPass) + expectResult(t, c, okRe) + + // These should error. + sendProto(t, c, "SUB foo.bar.* 1\r\n") + expectResult(t, c, permErrRe) + sendProto(t, c, "PUB foo.bar.baz 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + // These should work ok. + sendProto(t, c, "SUB _INBOX.abcd 1\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "SUB PUBLIC.abcd 1\r\n") + expectResult(t, c, okRe) + + sendProto(t, c, "PUB SANDBOX.foo 2\r\nok\r\n") + expectResult(t, c, okRe) + sendProto(t, c, "PUB SANDBOX.bar 2\r\nok\r\n") + expectResult(t, c, okRe) + + // Since only PWC, this should fail (too many tokens). + sendProto(t, c, "PUB SANDBOX.foo.bar 2\r\nok\r\n") + expectResult(t, c, permErrRe) + + c.Close() + +} From a55b7c2744db7aae858ff20b4073308444a4e6a1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 16 Jun 2016 20:42:54 -0700 Subject: [PATCH 4/9] log errors cleanup --- auth/multiuser.go | 3 +-- server/client.go | 24 ++++++++++++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/auth/multiuser.go b/auth/multiuser.go index 1c419eda..e3104f56 100644 --- a/auth/multiuser.go +++ b/auth/multiuser.go @@ -3,9 +3,8 @@ package auth import ( - "golang.org/x/crypto/bcrypt" - "github.com/nats-io/gnatsd/server" + "golang.org/x/crypto/bcrypt" ) // Plain authentication is a basic username and password diff --git a/server/client.go b/server/client.go index 42702345..d5ebbd76 100644 --- a/server/client.go +++ b/server/client.go @@ -404,7 +404,13 @@ func (c *client) authTimeout() { } func (c *client) authViolation() { - c.Errorf(ErrAuthorization.Error()) + if c.srv != nil && c.srv.opts.Users != nil { + c.Errorf("%s - User %q", + ErrAuthorization.Error(), + c.opts.Username) + } else { + c.Errorf(ErrAuthorization.Error()) + } c.sendErr("Authorization Violation") c.closeConnection() } @@ -640,7 +646,7 @@ func (c *client) processSub(argo []byte) (err error) { if len(r.psubs) == 0 { c.mu.Unlock() c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) - c.Debugf("Permissions Violation for Subcription to %q", sub.subject) + c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject) return nil } } @@ -872,8 +878,7 @@ func (c *client) processMsg(msg []byte) { // Disallow publish to _SYS.>, these are reserved for internals. if c.pa.subject[0] == '_' && c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' && c.pa.subject[3] == 'S' { - c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) - c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + c.pubPermissionViolation(c.pa.subject) return } @@ -881,15 +886,13 @@ func (c *client) processMsg(msg []byte) { if c.perms != nil && c.perms.pub != nil { allowed, ok := c.perms.pcache[string(c.pa.subject)] if ok && !allowed { - c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) - c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + c.pubPermissionViolation(c.pa.subject) return } if !ok { r := c.perms.pub.Match(string(c.pa.subject)) if len(r.psubs) == 0 { - c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", c.pa.subject)) - c.Debugf("Permissions Violation for Publish to %q", c.pa.subject) + c.pubPermissionViolation(c.pa.subject) c.perms.pcache[string(c.pa.subject)] = false return } else { @@ -1041,6 +1044,11 @@ func (c *client) processMsg(msg []byte) { } } +func (c *client) pubPermissionViolation(subject []byte) { + c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subject)) + c.Errorf("Publish Violation - User %q, Subject %q", c.opts.Username, subject) +} + func (c *client) processPingTimer() { c.mu.Lock() defer c.mu.Unlock() From 248bd8d3662a35a9d6434a27e29b844289529b2a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 16 Jun 2016 21:46:54 -0700 Subject: [PATCH 5/9] bug fix for _SYS. --- server/client.go | 5 +++-- test/user_authorization_test.go | 5 +++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/server/client.go b/server/client.go index d5ebbd76..d8c8385a 100644 --- a/server/client.go +++ b/server/client.go @@ -876,8 +876,9 @@ func (c *client) processMsg(msg []byte) { } // Disallow publish to _SYS.>, these are reserved for internals. - if c.pa.subject[0] == '_' && c.pa.subject[1] == 'S' && - c.pa.subject[2] == 'Y' && c.pa.subject[3] == 'S' { + if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 && + c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' && + c.pa.subject[3] == 'S' && c.pa.subject[4] == '.' { c.pubPermissionViolation(c.pa.subject) return } diff --git a/test/user_authorization_test.go b/test/user_authorization_test.go index 59aa6f8c..793ccb70 100644 --- a/test/user_authorization_test.go +++ b/test/user_authorization_test.go @@ -29,6 +29,11 @@ func TestUserAuthorizationProto(t *testing.T) { // Check that we now reserve _SYS.> though for internal, so no clients. sendProto(t, c, "PUB _SYS.HB 2\r\nok\r\n") expectResult(t, c, permErrRe) + + // Check that _ is ok + sendProto(t, c, "PUB _ 2\r\nok\r\n") + expectResult(t, c, okRe) + c.Close() // Bob is a requestor only, e.g. req.foo, req.bar for publish, subscribe only to INBOXes. From 49e5dfd7971954259a720c1505bfca1bbef7ea1e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 16 Jun 2016 23:54:18 -0700 Subject: [PATCH 6/9] Updated completed items --- TODO.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/TODO.md b/TODO.md index ab146711..39f7089c 100644 --- a/TODO.md +++ b/TODO.md @@ -1,23 +1,25 @@ # General +- [ ] Protocol updates, MAP, MPUB, etc - [ ] Multiple listen endpoints - [ ] Websocket / HTTP2 strategy -- [ ] Listen configure key vs addr and port -- [ ] Multiple Authorization / Access - [ ] T series reservations - [ ] _SYS. server events? - [ ] No downtime restart - [ ] Signal based reload of configuration - [ ] brew, apt-get, rpm, chocately (windows) -- [ ] Buffer pools/sync pools? - [ ] IOVec pools and writev for high fanout? -- [ ] Add ENV and variable support to dconf? ucl? - [ ] Modify cluster support for single message across routes between pub/sub and d-queue - [ ] Memory limits/warnings? - [ ] Limit number of subscriptions a client can have, total memory usage etc. - [ ] Multi-tenant accounts with isolation of subject space - [ ] Pedantic state +- [X] _SYS.> reserved for server events? +- [X] Listen configure key vs addr and port +- [X] Add ENV and variable support to dconf? ucl? +- [X] Buffer pools/sync pools? +- [X] Multiple Authorization / Access - [X] Write dynamic socket buffer sizes - [X] Read dynamic socket buffer sizes - [X] Info updates contain other implicit route servers From 67ed61c8375143c3f92200d68b410c3be7db4b13 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 17 Jun 2016 09:59:39 -0700 Subject: [PATCH 7/9] fixups for PR comments --- TODO.md | 1 + server/client.go | 5 +++-- server/errors.go | 2 +- server/opts.go | 4 ++-- server/opts_test.go | 2 +- test/client_auth_test.go | 2 +- test/configs/authorization.conf | 2 +- 7 files changed, 10 insertions(+), 8 deletions(-) diff --git a/TODO.md b/TODO.md index 39f7089c..5c507fcf 100644 --- a/TODO.md +++ b/TODO.md @@ -1,6 +1,7 @@ # General +- [ ] Blacklist or ERR escalation to close connection for auth/permissions - [ ] Protocol updates, MAP, MPUB, etc - [ ] Multiple listen endpoints - [ ] Websocket / HTTP2 strategy diff --git a/server/client.go b/server/client.go index d8c8385a..7ef290a2 100644 --- a/server/client.go +++ b/server/client.go @@ -171,6 +171,7 @@ func (c *client) RegisterUser(user *User) { c.mu.Lock() defer c.mu.Unlock() + // Pre-allocate all to simplify checks later. c.perms = &permissions{} c.perms.sub = NewSublist() c.perms.pub = NewSublist() @@ -641,7 +642,7 @@ func (c *client) processSub(argo []byte) (err error) { } // Check permissions if applicable. - if c.perms != nil && c.perms.sub != nil { + if c.perms != nil { r := c.perms.sub.Match(string(sub.subject)) if len(r.psubs) == 0 { c.mu.Unlock() @@ -884,7 +885,7 @@ func (c *client) processMsg(msg []byte) { } // Check if published subject is allowed if we have permissions in place. - if c.perms != nil && c.perms.pub != nil { + if c.perms != nil { allowed, ok := c.perms.pcache[string(c.pa.subject)] if ok && !allowed { c.pubPermissionViolation(c.pa.subject) diff --git a/server/errors.go b/server/errors.go index c972ee4b..3354d3f3 100644 --- a/server/errors.go +++ b/server/errors.go @@ -17,6 +17,6 @@ var ( // ErrMaxPayload represents an error condition when the payload is too big. ErrMaxPayload = errors.New("Maximum Payload Exceeded") - // ErrReservedPublish represents an error condition when the payload is too big. + // ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.> ErrReservedPublishSubject = errors.New("Reserved Internal Subject") ) diff --git a/server/opts.go b/server/opts.go index bef89cb8..56ff4327 100644 --- a/server/opts.go +++ b/server/opts.go @@ -78,7 +78,7 @@ type Options struct { TLSConfig *tls.Config `json:"-"` } -// Configuration file quthorization section. +// Configuration file authorization section. type authorization struct { // Singles user string @@ -442,7 +442,7 @@ func parseSubjects(v interface{}) ([]string, error) { for _, i := range v.([]interface{}) { subject, ok := i.(string) if !ok { - return nil, fmt.Errorf("Subject in permissions array can not be cast to string") + return nil, fmt.Errorf("Subject in permissions array cannot be cast to string") } subjects = append(subjects, subject) } diff --git a/server/opts_test.go b/server/opts_test.go index 96d74623..18adf2cf 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -399,7 +399,7 @@ func TestMultipleUsersConfig(t *testing.T) { } // Test highly depends on contents of the config file listed below. Any changes to that file -// may very weel break this test. +// may very well break this test. func TestAuthorizationConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/authorization.conf") if err != nil { diff --git a/test/client_auth_test.go b/test/client_auth_test.go index e784f429..bec4a875 100644 --- a/test/client_auth_test.go +++ b/test/client_auth_test.go @@ -44,7 +44,7 @@ func TestMultipleUserAuth(t *testing.T) { nc, err = nats.Connect(url) if err != nil { - t.Fatalf("Expected a succesful connect, got %v\n", err) + t.Fatalf("Expected a successful connect, got %v\n", err) } defer nc.Close() } diff --git a/test/configs/authorization.conf b/test/configs/authorization.conf index 3bbe15e0..7b87be44 100644 --- a/test/configs/authorization.conf +++ b/test/configs/authorization.conf @@ -34,7 +34,7 @@ authorization { # Just foo for testing PASS: $2a$10$UHR6GhotWhpLsKtVP0/i6.Nh9.fuY73cWjLoJjb2sKT8KISBcUW5q - # Users listed with persmissions. + # Users listed with permissions. users = [ {user: alice, password: $PASS, permissions: $ADMIN} {user: bob, password: $PASS, permissions: $REQUESTOR} From fa95bd936b7b8ce7d00ef2df7ad43f89d65da546 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 17 Jun 2016 10:09:14 -0700 Subject: [PATCH 8/9] spelling --- server/client.go | 2 ++ server/pse/pse_windows.go | 2 +- test/client_auth_test.go | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/client.go b/server/client.go index 7ef290a2..70f8641c 100644 --- a/server/client.go +++ b/server/client.go @@ -876,6 +876,8 @@ func (c *client) processMsg(msg []byte) { c.traceMsg(msg) } + // defintely + // Disallow publish to _SYS.>, these are reserved for internals. if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 && c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' && diff --git a/server/pse/pse_windows.go b/server/pse/pse_windows.go index 40cf1293..0f727426 100644 --- a/server/pse/pse_windows.go +++ b/server/pse/pse_windows.go @@ -119,7 +119,7 @@ func getCounterArrayData(counter PDH_HCOUNTER) ([]float64, error) { var bufCount uint32 // Retrieving array data requires two calls, the first which - // requires an adressable empty buffer, and sets size fields. + // requires an addressable empty buffer, and sets size fields. // The second call returns the data. initialBuf := make([]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, 1) ret := pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &initialBuf[0]) diff --git a/test/client_auth_test.go b/test/client_auth_test.go index bec4a875..7c8890c7 100644 --- a/test/client_auth_test.go +++ b/test/client_auth_test.go @@ -28,7 +28,7 @@ func TestMultipleUserAuth(t *testing.T) { nc, err := nats.Connect(url) if err != nil { - t.Fatalf("Expected a succesful connect, got %v\n", err) + t.Fatalf("Expected a successful connect, got %v\n", err) } defer nc.Close() From 8d07e06b91836f760dd32c8825dfa4613bf808e0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 20 Jun 2016 09:10:54 -0700 Subject: [PATCH 9/9] Always check for pruning --- server/client.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/client.go b/server/client.go index 70f8641c..e16b8667 100644 --- a/server/client.go +++ b/server/client.go @@ -895,10 +895,10 @@ func (c *client) processMsg(msg []byte) { } if !ok { r := c.perms.pub.Match(string(c.pa.subject)) - if len(r.psubs) == 0 { + notAllowed := len(r.psubs) == 0 + if notAllowed { c.pubPermissionViolation(c.pa.subject) c.perms.pcache[string(c.pa.subject)] = false - return } else { c.perms.pcache[string(c.pa.subject)] = true } @@ -914,6 +914,10 @@ func (c *client) processMsg(msg []byte) { } } } + // Return here to allow the pruning code to run if needed. + if notAllowed { + return + } } }