mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
11
TODO.md
11
TODO.md
@@ -1,23 +1,26 @@
|
||||
|
||||
# General
|
||||
|
||||
- [ ] Blacklist or ERR escalation to close connection for auth/permissions
|
||||
- [ ] Protocol updates, MAP, MPUB, etc
|
||||
- [ ] Multiple listen endpoints
|
||||
- [ ] Websocket / HTTP2 strategy
|
||||
- [ ] Listen configure key vs addr and port
|
||||
- [ ] Multiple Authorization / Access
|
||||
- [ ] T series reservations
|
||||
- [ ] _SYS. server events?
|
||||
- [ ] No downtime restart
|
||||
- [ ] Signal based reload of configuration
|
||||
- [ ] brew, apt-get, rpm, chocately (windows)
|
||||
- [ ] Buffer pools/sync pools?
|
||||
- [ ] IOVec pools and writev for high fanout?
|
||||
- [ ] Add ENV and variable support to dconf? ucl?
|
||||
- [ ] Modify cluster support for single message across routes between pub/sub and d-queue
|
||||
- [ ] Memory limits/warnings?
|
||||
- [ ] Limit number of subscriptions a client can have, total memory usage etc.
|
||||
- [ ] Multi-tenant accounts with isolation of subject space
|
||||
- [ ] Pedantic state
|
||||
- [X] _SYS.> reserved for server events?
|
||||
- [X] Listen configure key vs addr and port
|
||||
- [X] Add ENV and variable support to dconf? ucl?
|
||||
- [X] Buffer pools/sync pools?
|
||||
- [X] Multiple Authorization / Access
|
||||
- [X] Write dynamic socket buffer sizes
|
||||
- [X] Read dynamic socket buffer sizes
|
||||
- [X] Info updates contain other implicit route servers
|
||||
|
||||
@@ -3,21 +3,20 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
||||
"github.com/nats-io/gnatsd/server"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
)
|
||||
|
||||
// Plain authentication is a basic username and password
|
||||
type MultiUser struct {
|
||||
users map[string]string
|
||||
users map[string]*server.User
|
||||
}
|
||||
|
||||
// Create a new multi-user
|
||||
func NewMultiUser(users []server.User) *MultiUser {
|
||||
m := &MultiUser{users: make(map[string]string)}
|
||||
func NewMultiUser(users []*server.User) *MultiUser {
|
||||
m := &MultiUser{users: make(map[string]*server.User)}
|
||||
for _, u := range users {
|
||||
m.users[u.Username] = u.Password
|
||||
m.users[u.Username] = u
|
||||
}
|
||||
return m
|
||||
}
|
||||
@@ -25,10 +24,12 @@ func NewMultiUser(users []server.User) *MultiUser {
|
||||
// Check authenticates the client using a username and password against a list of multiple users.
|
||||
func (m *MultiUser) Check(c server.ClientAuth) bool {
|
||||
opts := c.GetOpts()
|
||||
pass, ok := m.users[opts.Username]
|
||||
user, ok := m.users[opts.Username]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
pass := user.Password
|
||||
|
||||
// Check to see if the password is a bcrypt hash
|
||||
if isBcrypt(pass) {
|
||||
if err := bcrypt.CompareHashAndPassword([]byte(pass), []byte(opts.Password)); err != nil {
|
||||
@@ -37,5 +38,8 @@ func (m *MultiUser) Check(c server.ClientAuth) bool {
|
||||
} else if pass != opts.Password {
|
||||
return false
|
||||
}
|
||||
|
||||
c.RegisterUser(user)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -12,4 +12,6 @@ type Auth interface {
|
||||
type ClientAuth interface {
|
||||
// Get options associated with a client
|
||||
GetOpts() *clientOpts
|
||||
// Optionally map a user after auth.
|
||||
RegisterUser(*User)
|
||||
}
|
||||
|
||||
127
server/client.go
127
server/client.go
@@ -53,6 +53,7 @@ type client struct {
|
||||
bw *bufio.Writer
|
||||
srv *Server
|
||||
subs map[string]*subscription
|
||||
perms *permissions
|
||||
cache readCache
|
||||
pcd map[*client]struct{}
|
||||
atmr *time.Timer
|
||||
@@ -68,6 +69,18 @@ type client struct {
|
||||
trace bool
|
||||
}
|
||||
|
||||
type permissions struct {
|
||||
sub *Sublist
|
||||
pub *Sublist
|
||||
pcache map[string]bool
|
||||
}
|
||||
|
||||
const (
|
||||
maxResultCacheSize = 512
|
||||
maxPermCacheSize = 32
|
||||
pruneSize = 16
|
||||
)
|
||||
|
||||
// Used in readloop to cache hot subject lookups and group statistics.
|
||||
type readCache struct {
|
||||
genid uint64
|
||||
@@ -146,6 +159,37 @@ func (c *client) initClient() {
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterUser allows auth to call back into a new client
|
||||
// with the authenticated user. This is used to map any permissions
|
||||
// into the client.
|
||||
func (c *client) RegisterUser(user *User) {
|
||||
if user.Permissions == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Process Permissions and map into client connection structures.
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Pre-allocate all to simplify checks later.
|
||||
c.perms = &permissions{}
|
||||
c.perms.sub = NewSublist()
|
||||
c.perms.pub = NewSublist()
|
||||
c.perms.pcache = make(map[string]bool)
|
||||
|
||||
// Loop over publish permissions
|
||||
for _, pubSubject := range user.Permissions.Publish {
|
||||
sub := &subscription{subject: []byte(pubSubject)}
|
||||
c.perms.pub.Insert(sub)
|
||||
}
|
||||
|
||||
// Loop over subscribe permissions
|
||||
for _, subSubject := range user.Permissions.Subscribe {
|
||||
sub := &subscription{subject: []byte(subSubject)}
|
||||
c.perms.sub.Insert(sub)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) readLoop() {
|
||||
// Grab the connection off the client, it will be cleared on a close.
|
||||
// We check for that after the loop, but want to avoid a nil dereference
|
||||
@@ -361,7 +405,13 @@ func (c *client) authTimeout() {
|
||||
}
|
||||
|
||||
func (c *client) authViolation() {
|
||||
c.Errorf(ErrAuthorization.Error())
|
||||
if c.srv != nil && c.srv.opts.Users != nil {
|
||||
c.Errorf("%s - User %q",
|
||||
ErrAuthorization.Error(),
|
||||
c.opts.Username)
|
||||
} else {
|
||||
c.Errorf(ErrAuthorization.Error())
|
||||
}
|
||||
c.sendErr("Authorization Violation")
|
||||
c.closeConnection()
|
||||
}
|
||||
@@ -591,6 +641,17 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check permissions if applicable.
|
||||
if c.perms != nil {
|
||||
r := c.perms.sub.Match(string(sub.subject))
|
||||
if len(r.psubs) == 0 {
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - User %q, Subject %q", c.opts.Username, sub.subject)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// We can have two SUB protocols coming from a route due to some
|
||||
// race conditions. We should make sure that we process only one.
|
||||
sid := string(sub.sid)
|
||||
@@ -814,9 +875,57 @@ func (c *client) processMsg(msg []byte) {
|
||||
if c.trace {
|
||||
c.traceMsg(msg)
|
||||
}
|
||||
|
||||
// defintely
|
||||
|
||||
// Disallow publish to _SYS.>, these are reserved for internals.
|
||||
if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 &&
|
||||
c.pa.subject[1] == 'S' && c.pa.subject[2] == 'Y' &&
|
||||
c.pa.subject[3] == 'S' && c.pa.subject[4] == '.' {
|
||||
c.pubPermissionViolation(c.pa.subject)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if published subject is allowed if we have permissions in place.
|
||||
if c.perms != nil {
|
||||
allowed, ok := c.perms.pcache[string(c.pa.subject)]
|
||||
if ok && !allowed {
|
||||
c.pubPermissionViolation(c.pa.subject)
|
||||
return
|
||||
}
|
||||
if !ok {
|
||||
r := c.perms.pub.Match(string(c.pa.subject))
|
||||
notAllowed := len(r.psubs) == 0
|
||||
if notAllowed {
|
||||
c.pubPermissionViolation(c.pa.subject)
|
||||
c.perms.pcache[string(c.pa.subject)] = false
|
||||
} else {
|
||||
c.perms.pcache[string(c.pa.subject)] = true
|
||||
}
|
||||
// Prune if needed.
|
||||
if len(c.perms.pcache) > maxPermCacheSize {
|
||||
// Prune the permissions cache. Keeps us from unbounded growth.
|
||||
r := 0
|
||||
for subject, _ := range c.perms.pcache {
|
||||
delete(c.cache.results, subject)
|
||||
r++
|
||||
if r > pruneSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// Return here to allow the pruning code to run if needed.
|
||||
if notAllowed {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if c.opts.Verbose {
|
||||
c.sendOK()
|
||||
}
|
||||
|
||||
// Mostly under testing scenarios.
|
||||
if srv == nil {
|
||||
return
|
||||
}
|
||||
@@ -838,6 +947,17 @@ func (c *client) processMsg(msg []byte) {
|
||||
subject := string(c.pa.subject)
|
||||
r = srv.sl.Match(subject)
|
||||
c.cache.results[subject] = r
|
||||
if len(c.cache.results) > maxResultCacheSize {
|
||||
// Prune the results cache. Keeps us from unbounded growth.
|
||||
r := 0
|
||||
for subject, _ := range c.cache.results {
|
||||
delete(c.cache.results, subject)
|
||||
r++
|
||||
if r > pruneSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for no interest, short circuit if so.
|
||||
@@ -932,6 +1052,11 @@ func (c *client) processMsg(msg []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) pubPermissionViolation(subject []byte) {
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subject))
|
||||
c.Errorf("Publish Violation - User %q, Subject %q", c.opts.Username, subject)
|
||||
}
|
||||
|
||||
func (c *client) processPingTimer() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
37
server/configs/authorization.conf
Normal file
37
server/configs/authorization.conf
Normal file
@@ -0,0 +1,37 @@
|
||||
# Copyright 2016 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: 127.0.0.1:4222
|
||||
|
||||
authorization {
|
||||
# Our role based permissions.
|
||||
|
||||
# Superuser can do anything.
|
||||
super_user = {
|
||||
publish = "*"
|
||||
subscribe = ">"
|
||||
}
|
||||
# Can do requests on foo or bar, and subscribe to anything
|
||||
# that is a response to an _INBOX.
|
||||
#
|
||||
# Notice that authorization filters can be singletons or arrays.
|
||||
req_pub_user = {
|
||||
publish = ["req.foo", "req.bar"]
|
||||
subscribe = "_INBOX.>"
|
||||
}
|
||||
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
default_permissions: $default_user
|
||||
|
||||
# Users listed with persmissions.
|
||||
users = [
|
||||
{user: alice, password: foo, permissions: $super_user}
|
||||
{user: bob, password: bar, permissions: $req_pub_user}
|
||||
{user: susan, password: baz}
|
||||
]
|
||||
}
|
||||
@@ -1,19 +1,22 @@
|
||||
// Copyright 2012 Apcera Inc. All rights reserved.
|
||||
// Copyright 2012-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
// ErrConnectionClosed represents error condition on a closed connection.
|
||||
// ErrConnectionClosed represents an error condition on a closed connection.
|
||||
ErrConnectionClosed = errors.New("Connection Closed")
|
||||
|
||||
// ErrAuthorization represents error condition on failed authorization.
|
||||
// ErrAuthorization represents an error condition on failed authorization.
|
||||
ErrAuthorization = errors.New("Authorization Error")
|
||||
|
||||
// ErrAuthTimeout represents error condition on failed authorization due to timeout.
|
||||
// ErrAuthTimeout represents an error condition on failed authorization due to timeout.
|
||||
ErrAuthTimeout = errors.New("Authorization Timeout")
|
||||
|
||||
// ErrMaxPayload represents error condition when the payload is too big.
|
||||
// ErrMaxPayload represents an error condition when the payload is too big.
|
||||
ErrMaxPayload = errors.New("Maximum Payload Exceeded")
|
||||
|
||||
// ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.>
|
||||
ErrReservedPublishSubject = errors.New("Reserved Internal Subject")
|
||||
)
|
||||
|
||||
146
server/opts.go
146
server/opts.go
@@ -5,7 +5,6 @@ package server
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
@@ -20,8 +19,16 @@ import (
|
||||
|
||||
// For multiple accounts/users.
|
||||
type User struct {
|
||||
Username string `json:"user"`
|
||||
Password string `json:"password"`
|
||||
Username string `json:"user"`
|
||||
Password string `json:"password"`
|
||||
Permissions *Permissions `json:"permissions"`
|
||||
}
|
||||
|
||||
// Authorization are the allowed subjects on a per
|
||||
// publish or subscribe basis.
|
||||
type Permissions struct {
|
||||
Publish []string `json:"publish"`
|
||||
Subscribe []string `json:"subscribe"`
|
||||
}
|
||||
|
||||
// Options block for gnatsd server.
|
||||
@@ -34,7 +41,7 @@ type Options struct {
|
||||
NoSigs bool `json:"-"`
|
||||
Logtime bool `json:"-"`
|
||||
MaxConn int `json:"max_connections"`
|
||||
Users []User `json:"-"`
|
||||
Users []*User `json:"-"`
|
||||
Username string `json:"-"`
|
||||
Password string `json:"-"`
|
||||
Authorization string `json:"-"`
|
||||
@@ -71,13 +78,15 @@ type Options struct {
|
||||
TLSConfig *tls.Config `json:"-"`
|
||||
}
|
||||
|
||||
// Configuration file authorization section.
|
||||
type authorization struct {
|
||||
// Singles
|
||||
user string
|
||||
pass string
|
||||
// Multiple Users
|
||||
users []User
|
||||
timeout float64
|
||||
users []*User
|
||||
timeout float64
|
||||
defaultPermissions *Permissions
|
||||
}
|
||||
|
||||
// TLSConfigOpts holds the parsed tls config information,
|
||||
@@ -325,17 +334,134 @@ func parseAuthorization(am map[string]interface{}) (*authorization, error) {
|
||||
}
|
||||
auth.timeout = at
|
||||
case "users":
|
||||
b, _ := json.Marshal(mv)
|
||||
users := []User{}
|
||||
if err := json.Unmarshal(b, &users); err != nil {
|
||||
return nil, fmt.Errorf("Could not parse user array properly, %v", err)
|
||||
users, err := parseUsers(mv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
auth.users = users
|
||||
case "default_permission", "default_permissions":
|
||||
pm, ok := mv.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Expected default permissions to be a map/struct, got %+v", mv)
|
||||
}
|
||||
permissions, err := parseUserPermissions(pm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
auth.defaultPermissions = permissions
|
||||
}
|
||||
|
||||
// Now check for permission defaults with multiple users, etc.
|
||||
if auth.users != nil && auth.defaultPermissions != nil {
|
||||
for _, user := range auth.users {
|
||||
if user.Permissions == nil {
|
||||
user.Permissions = auth.defaultPermissions
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return auth, nil
|
||||
}
|
||||
|
||||
// Helper function to parse multiple users array with optional permissions.
|
||||
func parseUsers(mv interface{}) ([]*User, error) {
|
||||
// Make sure we have an array
|
||||
uv, ok := mv.([]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Expected users field to be an array, got %v", mv)
|
||||
}
|
||||
users := []*User{}
|
||||
for _, u := range uv {
|
||||
// Check its a map/struct
|
||||
um, ok := u.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Expected user entry to be a map/struct, got %v", u)
|
||||
}
|
||||
user := &User{}
|
||||
for k, v := range um {
|
||||
switch strings.ToLower(k) {
|
||||
case "user", "username":
|
||||
user.Username = v.(string)
|
||||
case "pass", "password":
|
||||
user.Password = v.(string)
|
||||
case "permission", "permissions", "authroization":
|
||||
pm, ok := v.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Expected user permissions to be a map/struct, got %+v", v)
|
||||
}
|
||||
permissions, err := parseUserPermissions(pm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
user.Permissions = permissions
|
||||
}
|
||||
}
|
||||
// Check to make sure we have at least username and password
|
||||
if user.Username == "" || user.Password == "" {
|
||||
return nil, fmt.Errorf("User entry requires a user and a password")
|
||||
}
|
||||
users = append(users, user)
|
||||
}
|
||||
return users, nil
|
||||
}
|
||||
|
||||
// Helper function to parse user/account permissions
|
||||
func parseUserPermissions(pm map[string]interface{}) (*Permissions, error) {
|
||||
p := &Permissions{}
|
||||
for k, v := range pm {
|
||||
switch strings.ToLower(k) {
|
||||
case "pub", "publish":
|
||||
subjects, err := parseSubjects(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Publish = subjects
|
||||
case "sub", "subscribe":
|
||||
subjects, err := parseSubjects(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Subscribe = subjects
|
||||
default:
|
||||
return nil, fmt.Errorf("Unknown field %s parsing permissions", k)
|
||||
}
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Helper function to parse subject singeltons and/or arrays
|
||||
func parseSubjects(v interface{}) ([]string, error) {
|
||||
var subjects []string
|
||||
switch v.(type) {
|
||||
case string:
|
||||
subjects = append(subjects, v.(string))
|
||||
case []string:
|
||||
subjects = v.([]string)
|
||||
case []interface{}:
|
||||
for _, i := range v.([]interface{}) {
|
||||
subject, ok := i.(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Subject in permissions array cannot be cast to string")
|
||||
}
|
||||
subjects = append(subjects, subject)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("Expected subject permissions to be a subject, or array of subjects, got %T", v)
|
||||
}
|
||||
return checkSubjectArray(subjects)
|
||||
}
|
||||
|
||||
// Helper function to validate subjects, etc for account permissioning.
|
||||
func checkSubjectArray(sa []string) ([]string, error) {
|
||||
for _, s := range sa {
|
||||
if !IsValidSubject(s) {
|
||||
return nil, fmt.Errorf("Subject %q is not a valid subject", s)
|
||||
}
|
||||
}
|
||||
return sa, nil
|
||||
}
|
||||
|
||||
// PrintTLSHelpAndDie prints TLS usage and exits.
|
||||
func PrintTLSHelpAndDie() {
|
||||
fmt.Printf("%s\n", tlsUsage)
|
||||
|
||||
@@ -397,3 +397,90 @@ func TestMultipleUsersConfig(t *testing.T) {
|
||||
}
|
||||
processOptions(opts)
|
||||
}
|
||||
|
||||
// Test highly depends on contents of the config file listed below. Any changes to that file
|
||||
// may very well break this test.
|
||||
func TestAuthorizationConfig(t *testing.T) {
|
||||
opts, err := ProcessConfigFile("./configs/authorization.conf")
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
processOptions(opts)
|
||||
lu := len(opts.Users)
|
||||
if lu != 3 {
|
||||
t.Fatalf("Expected 3 users, got %d\n", lu)
|
||||
}
|
||||
// Build a map
|
||||
mu := make(map[string]*User)
|
||||
for _, u := range opts.Users {
|
||||
mu[u.Username] = u
|
||||
}
|
||||
|
||||
// Alice
|
||||
alice, ok := mu["alice"]
|
||||
if !ok {
|
||||
t.Fatalf("Expected to see user Alice\n")
|
||||
}
|
||||
// Check for permissions details
|
||||
if alice.Permissions == nil {
|
||||
t.Fatalf("Expected Alice's permissions to be non-nil\n")
|
||||
}
|
||||
if alice.Permissions.Publish == nil {
|
||||
t.Fatalf("Expected Alice's publish permissions to be non-nil\n")
|
||||
}
|
||||
if len(alice.Permissions.Publish) != 1 {
|
||||
t.Fatalf("Expected Alice's publish permissions to have 1 element, got %d\n",
|
||||
len(alice.Permissions.Publish))
|
||||
}
|
||||
pubPerm := alice.Permissions.Publish[0]
|
||||
if pubPerm != "*" {
|
||||
t.Fatalf("Expected Alice's publish permissions to be '*', got %q\n", pubPerm)
|
||||
}
|
||||
if alice.Permissions.Subscribe == nil {
|
||||
t.Fatalf("Expected Alice's subscribe permissions to be non-nil\n")
|
||||
}
|
||||
if len(alice.Permissions.Subscribe) != 1 {
|
||||
t.Fatalf("Expected Alice's subscribe permissions to have 1 element, got %d\n",
|
||||
len(alice.Permissions.Subscribe))
|
||||
}
|
||||
subPerm := alice.Permissions.Subscribe[0]
|
||||
if subPerm != ">" {
|
||||
t.Fatalf("Expected Alice's subscribe permissions to be '>', got %q\n", subPerm)
|
||||
}
|
||||
|
||||
// Bob
|
||||
bob, ok := mu["bob"]
|
||||
if !ok {
|
||||
t.Fatalf("Expected to see user Bob\n")
|
||||
}
|
||||
if bob.Permissions == nil {
|
||||
t.Fatalf("Expected Bob's permissions to be non-nil\n")
|
||||
}
|
||||
|
||||
// Susan
|
||||
susan, ok := mu["susan"]
|
||||
if !ok {
|
||||
t.Fatalf("Expected to see user Susan\n")
|
||||
}
|
||||
if susan.Permissions == nil {
|
||||
t.Fatalf("Expected Susan's permissions to be non-nil\n")
|
||||
}
|
||||
// Check susan closely since she inherited the default permissions.
|
||||
if susan.Permissions == nil {
|
||||
t.Fatalf("Expected Susan's permissions to be non-nil\n")
|
||||
}
|
||||
if susan.Permissions.Publish != nil {
|
||||
t.Fatalf("Expected Susan's publish permissions to be nil\n")
|
||||
}
|
||||
if susan.Permissions.Subscribe == nil {
|
||||
t.Fatalf("Expected Susan's subscribe permissions to be non-nil\n")
|
||||
}
|
||||
if len(susan.Permissions.Subscribe) != 1 {
|
||||
t.Fatalf("Expected Susan's subscribe permissions to have 1 element, got %d\n",
|
||||
len(susan.Permissions.Subscribe))
|
||||
}
|
||||
subPerm = susan.Permissions.Subscribe[0]
|
||||
if subPerm != "PUBLIC.>" {
|
||||
t.Fatalf("Expected Susan's subscribe permissions to be 'PUBLIC.>', got %q\n", subPerm)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,7 +119,7 @@ func getCounterArrayData(counter PDH_HCOUNTER) ([]float64, error) {
|
||||
var bufCount uint32
|
||||
|
||||
// Retrieving array data requires two calls, the first which
|
||||
// requires an adressable empty buffer, and sets size fields.
|
||||
// requires an addressable empty buffer, and sets size fields.
|
||||
// The second call returns the data.
|
||||
initialBuf := make([]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, 1)
|
||||
ret := pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &initialBuf[0])
|
||||
|
||||
@@ -516,8 +516,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
|
||||
// Check for Auth
|
||||
if authRequired {
|
||||
ttl := secondsToDuration(s.opts.AuthTimeout)
|
||||
c.setAuthTimer(ttl)
|
||||
c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout))
|
||||
}
|
||||
|
||||
// Send our information.
|
||||
|
||||
@@ -565,7 +565,29 @@ func visitLevel(l *level, depth int) int {
|
||||
return maxDepth
|
||||
}
|
||||
|
||||
// IsValidLiteralSubject returns true if a subject is valid, false otherwise
|
||||
// IsValidSubject returns true if a subject is valid, false otherwise
|
||||
func IsValidSubject(subject string) bool {
|
||||
if subject == "" {
|
||||
return false
|
||||
}
|
||||
sfwc := false
|
||||
tokens := strings.Split(string(subject), tsep)
|
||||
for _, t := range tokens {
|
||||
if len(t) == 0 || sfwc {
|
||||
return false
|
||||
}
|
||||
if len(t) > 1 {
|
||||
continue
|
||||
}
|
||||
switch t[0] {
|
||||
case fwc:
|
||||
sfwc = true
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise
|
||||
func IsValidLiteralSubject(subject string) bool {
|
||||
tokens := strings.Split(string(subject), tsep)
|
||||
for _, t := range tokens {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2013 Apcera Inc. All rights reserved.
|
||||
// Copyright 2012-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
|
||||
@@ -75,41 +75,41 @@ func sizedString(sz int) string {
|
||||
// Publish subject for pub benchmarks.
|
||||
var psub = "a"
|
||||
|
||||
func Benchmark____PubNo_Payload(b *testing.B) {
|
||||
func Benchmark_____PubNo_Payload(b *testing.B) {
|
||||
benchPub(b, psub, "")
|
||||
}
|
||||
|
||||
func Benchmark____Pub8b_Payload(b *testing.B) {
|
||||
func Benchmark_____Pub8b_Payload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := sizedString(8)
|
||||
benchPub(b, psub, s)
|
||||
}
|
||||
|
||||
func Benchmark___Pub32b_Payload(b *testing.B) {
|
||||
func Benchmark____Pub32b_Payload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := sizedString(32)
|
||||
benchPub(b, psub, s)
|
||||
}
|
||||
|
||||
func Benchmark__Pub256B_Payload(b *testing.B) {
|
||||
func Benchmark___Pub256B_Payload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := sizedString(256)
|
||||
benchPub(b, psub, s)
|
||||
}
|
||||
|
||||
func Benchmark____Pub1K_Payload(b *testing.B) {
|
||||
func Benchmark_____Pub1K_Payload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := sizedString(1024)
|
||||
benchPub(b, psub, s)
|
||||
}
|
||||
|
||||
func Benchmark____Pub4K_Payload(b *testing.B) {
|
||||
func Benchmark_____Pub4K_Payload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := sizedString(4 * 1024)
|
||||
benchPub(b, psub, s)
|
||||
}
|
||||
|
||||
func Benchmark____Pub8K_Payload(b *testing.B) {
|
||||
func Benchmark_____Pub8K_Payload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := sizedString(8 * 1024)
|
||||
benchPub(b, psub, s)
|
||||
@@ -137,7 +137,33 @@ func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) {
|
||||
ch <- true
|
||||
}
|
||||
|
||||
func Benchmark___________PubSub(b *testing.B) {
|
||||
// Benchmark the authorization code path.
|
||||
func Benchmark_AuthPubNo_Payload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
|
||||
srv, opts := RunServerWithConfig("./configs/authorization.conf")
|
||||
defer srv.Shutdown()
|
||||
|
||||
c := createClientConn(b, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
expectAuthRequired(b, c)
|
||||
|
||||
cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench", DefaultPass)
|
||||
sendProto(b, c, cs)
|
||||
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
sendOp := []byte("PUB a 0\r\n\r\n")
|
||||
b.SetBytes(int64(len(sendOp)))
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bw.Write(sendOp)
|
||||
}
|
||||
bw.Flush()
|
||||
flushConnection(b, c)
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
func Benchmark____________PubSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
@@ -169,7 +195,7 @@ func Benchmark___________PubSub(b *testing.B) {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark___PubSubTwoConns(b *testing.B) {
|
||||
func Benchmark____PubSubTwoConns(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
@@ -205,7 +231,7 @@ func Benchmark___PubSubTwoConns(b *testing.B) {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark___PubTwoQueueSub(b *testing.B) {
|
||||
func Benchmark____PubTwoQueueSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
@@ -238,7 +264,7 @@ func Benchmark___PubTwoQueueSub(b *testing.B) {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark__PubFourQueueSub(b *testing.B) {
|
||||
func Benchmark___PubFourQueueSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
@@ -273,7 +299,7 @@ func Benchmark__PubFourQueueSub(b *testing.B) {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark_PubEightQueueSub(b *testing.B) {
|
||||
func Benchmark__PubEightQueueSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
|
||||
50
test/client_auth_test.go
Normal file
50
test/client_auth_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
// Copyright 2016 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/nats-io/nats"
|
||||
)
|
||||
|
||||
func TestMultipleUserAuth(t *testing.T) {
|
||||
srv, opts := RunServerWithConfig("./configs/multi_user.conf")
|
||||
defer srv.Shutdown()
|
||||
|
||||
if opts.Users == nil {
|
||||
t.Fatal("Expected a user array that is not nil")
|
||||
}
|
||||
if len(opts.Users) != 2 {
|
||||
t.Fatal("Expected a user array that had 2 users")
|
||||
}
|
||||
|
||||
// Test first user
|
||||
url := fmt.Sprintf("nats://%s:%s@%s:%d/",
|
||||
opts.Users[0].Username,
|
||||
opts.Users[0].Password,
|
||||
opts.Host, opts.Port)
|
||||
|
||||
nc, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a successful connect, got %v\n", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
if !nc.AuthRequired() {
|
||||
t.Fatal("Expected auth to be required for the server")
|
||||
}
|
||||
|
||||
// Test second user
|
||||
url = fmt.Sprintf("nats://%s:%s@%s:%d/",
|
||||
opts.Users[1].Username,
|
||||
opts.Users[1].Password,
|
||||
opts.Host, opts.Port)
|
||||
|
||||
nc, err = nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a successful connect, got %v\n", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
}
|
||||
44
test/configs/authorization.conf
Normal file
44
test/configs/authorization.conf
Normal file
@@ -0,0 +1,44 @@
|
||||
# Copyright 2016 Apcera Inc. All rights reserved.
|
||||
|
||||
listen: 127.0.0.1:2442
|
||||
|
||||
authorization {
|
||||
# Our role based permissions.
|
||||
|
||||
# Admin can do anything.
|
||||
ADMIN = {
|
||||
publish = ">"
|
||||
subscribe = ">"
|
||||
}
|
||||
|
||||
# Can do requests on req.foo or req.bar, and subscribe to anything
|
||||
# that is a response, e.g. _INBOX.*
|
||||
#
|
||||
# Notice that authorization filters can be singletons or arrays.
|
||||
REQUESTOR = {
|
||||
publish = ["req.foo", "req.bar"]
|
||||
subscribe = "_INBOX.*"
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. Joe below.
|
||||
DEFAULT_PERMISSIONS = {
|
||||
publish = "SANDBOX.*"
|
||||
subscribe = ["PUBLIC.>", "_INBOX.>"]
|
||||
}
|
||||
|
||||
# This is to benchmark pub performance.
|
||||
BENCH = {
|
||||
publish = "a"
|
||||
}
|
||||
|
||||
# Just foo for testing
|
||||
PASS: $2a$10$UHR6GhotWhpLsKtVP0/i6.Nh9.fuY73cWjLoJjb2sKT8KISBcUW5q
|
||||
|
||||
# Users listed with permissions.
|
||||
users = [
|
||||
{user: alice, password: $PASS, permissions: $ADMIN}
|
||||
{user: bob, password: $PASS, permissions: $REQUESTOR}
|
||||
{user: bench, password: $PASS, permissions: $BENCH}
|
||||
{user: joe, password: $PASS}
|
||||
]
|
||||
}
|
||||
91
test/user_authorization_test.go
Normal file
91
test/user_authorization_test.go
Normal file
@@ -0,0 +1,91 @@
|
||||
// Copyright 2016 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const DefaultPass = "foo"
|
||||
|
||||
var permErrRe = regexp.MustCompile(`\A\-ERR\s+'Permissions Violation([^\r\n]+)\r\n`)
|
||||
|
||||
func TestUserAuthorizationProto(t *testing.T) {
|
||||
srv, opts := RunServerWithConfig("./configs/authorization.conf")
|
||||
defer srv.Shutdown()
|
||||
|
||||
// Alice can do anything, check a few for OK result.
|
||||
c := createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
expectAuthRequired(t, c)
|
||||
doAuthConnect(t, c, "", "alice", DefaultPass)
|
||||
expectResult(t, c, okRe)
|
||||
sendProto(t, c, "PUB foo 2\r\nok\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
sendProto(t, c, "SUB foo 1\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
// Check that we now reserve _SYS.> though for internal, so no clients.
|
||||
sendProto(t, c, "PUB _SYS.HB 2\r\nok\r\n")
|
||||
expectResult(t, c, permErrRe)
|
||||
|
||||
// Check that _ is ok
|
||||
sendProto(t, c, "PUB _ 2\r\nok\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
c.Close()
|
||||
|
||||
// Bob is a requestor only, e.g. req.foo, req.bar for publish, subscribe only to INBOXes.
|
||||
c = createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
expectAuthRequired(t, c)
|
||||
doAuthConnect(t, c, "", "bob", DefaultPass)
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
// These should error.
|
||||
sendProto(t, c, "SUB foo 1\r\n")
|
||||
expectResult(t, c, permErrRe)
|
||||
sendProto(t, c, "PUB foo 2\r\nok\r\n")
|
||||
expectResult(t, c, permErrRe)
|
||||
|
||||
// These should work ok.
|
||||
sendProto(t, c, "SUB _INBOX.abcd 1\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
sendProto(t, c, "PUB req.foo 2\r\nok\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
sendProto(t, c, "PUB req.bar 2\r\nok\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
c.Close()
|
||||
|
||||
// Joe is a default user
|
||||
c = createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
expectAuthRequired(t, c)
|
||||
doAuthConnect(t, c, "", "joe", DefaultPass)
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
// These should error.
|
||||
sendProto(t, c, "SUB foo.bar.* 1\r\n")
|
||||
expectResult(t, c, permErrRe)
|
||||
sendProto(t, c, "PUB foo.bar.baz 2\r\nok\r\n")
|
||||
expectResult(t, c, permErrRe)
|
||||
|
||||
// These should work ok.
|
||||
sendProto(t, c, "SUB _INBOX.abcd 1\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
sendProto(t, c, "SUB PUBLIC.abcd 1\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "PUB SANDBOX.foo 2\r\nok\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
sendProto(t, c, "PUB SANDBOX.bar 2\r\nok\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
// Since only PWC, this should fail (too many tokens).
|
||||
sendProto(t, c, "PUB SANDBOX.foo.bar 2\r\nok\r\n")
|
||||
expectResult(t, c, permErrRe)
|
||||
|
||||
c.Close()
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user