Basic account support

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-09-16 06:07:35 -07:00
parent d5ceade750
commit 1cbfbfa071
12 changed files with 609 additions and 87 deletions

286
server/accounts_test.go Normal file
View File

@@ -0,0 +1,286 @@
// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"os"
"strings"
"testing"
)
func simpleAccountServer(t *testing.T) *Server {
opts := defaultServerOptions
s := New(&opts)
// Now create two accounts.
_, err := s.RegisterAccount("foo")
if err != nil {
t.Fatalf("Error creating account 'foo': %v", err)
}
_, err = s.RegisterAccount("bar")
if err != nil {
t.Fatalf("Error creating account 'bar': %v", err)
}
return s
}
func TestRegisterDuplicateAccounts(t *testing.T) {
s := simpleAccountServer(t)
if _, err := s.RegisterAccount("foo"); err == nil {
t.Fatal("Expected an error registering 'foo' twice")
}
}
func TestAccountIsolation(t *testing.T) {
s := simpleAccountServer(t)
fooAcc := s.LookupAccount("foo")
barAcc := s.LookupAccount("bar")
if fooAcc == nil || barAcc == nil {
t.Fatalf("Error retrieving accounts for 'foo' and 'bar'")
}
cfoo, crFoo, _ := newClientForServer(s)
if err := cfoo.RegisterWithAccount(fooAcc); err != nil {
t.Fatalf("Error register client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
if err := cbar.RegisterWithAccount(barAcc); err != nil {
t.Fatalf("Error register client with 'bar' account: %v", err)
}
// Make sure they are different accounts/sl.
if cfoo.acc == cbar.acc {
t.Fatalf("Error, accounts the same for both clients")
}
// Now do quick test that makes sure messages do not cross over.
// setup bar as a foo subscriber.
go cbar.parse([]byte("SUB foo 1\r\nPING\r\nPING\r\n"))
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
}
go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"))
l, err = crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'foo' from server: %v", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
checkPayload(crFoo, []byte("hello\r\n"), t)
// Now make sure nothing shows up on bar.
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
}
}
func TestAccountFromOptions(t *testing.T) {
opts := defaultServerOptions
opts.Accounts = []*Account{
&Account{Name: "foo"},
&Account{Name: "bar"},
}
s := New(&opts)
if la := len(s.accounts); la != 2 {
t.Fatalf("Expected to have a server with two accounts, got %v", la)
}
// Check that sl is filled in.
fooAcc := s.LookupAccount("foo")
barAcc := s.LookupAccount("bar")
if fooAcc == nil || barAcc == nil {
t.Fatalf("Error retrieving accounts for 'foo' and 'bar'")
}
if fooAcc.sl == nil || barAcc.sl == nil {
t.Fatal("Expected Sublists to be filled in on Opts.Accounts")
}
}
func TestNewAccountsFromClients(t *testing.T) {
opts := defaultServerOptions
s := New(&opts)
c, cr, _ := newClientForServer(s)
connectOp := []byte("CONNECT {\"account\":\"foo\"}\r\n")
go c.parse(connectOp)
l, _ := cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
opts.AllowNewAccounts = true
s = New(&opts)
c, _, _ = newClientForServer(s)
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received an error trying to create an account: %v", err)
}
}
// Clients can ask that the account be forced to be new. If it exists this is an error.
func TestNewAccountRequireNew(t *testing.T) {
// This has foo and bar accounts already.
s := simpleAccountServer(t)
c, cr, _ := newClientForServer(s)
connectOp := []byte("CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n")
go c.parse(connectOp)
l, _ := cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
// Now allow new accounts on the fly, make sure second time does not work.
opts := defaultServerOptions
opts.AllowNewAccounts = true
s = New(&opts)
c, _, _ = newClientForServer(s)
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received an error trying to create an account: %v", err)
}
c, cr, _ = newClientForServer(s)
go c.parse(connectOp)
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
}
func accountNameExists(name string, accounts []*Account) bool {
for _, acc := range accounts {
if strings.Compare(acc.Name, name) == 0 {
return true
}
}
return false
}
func TestAccountSimpleConfig(t *testing.T) {
confFileName := createConfFile(t, []byte(`accounts = [foo, bar]`))
defer os.Remove(confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Received an error processing config file: %v", err)
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la)
}
if !accountNameExists("foo", opts.Accounts) {
t.Fatal("Expected a 'foo' account")
}
if !accountNameExists("bar", opts.Accounts) {
t.Fatal("Expected a 'bar' account")
}
// Make sure double entries is an error.
confFileName = createConfFile(t, []byte(`accounts = [foo, foo]`))
defer os.Remove(confFileName)
_, err = ProcessConfigFile(confFileName)
if err == nil {
t.Fatalf("Expected an error with double account entries")
}
}
func TestAccountParseConfig(t *testing.T) {
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: alice, password: foo}
{user: bob, password: bar}
]
}
nats.io {
users = [
{user: derek, password: foo}
{user: ivan, password: bar}
]
}
}
`))
defer os.Remove(confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Received an error processing config file: %v", err)
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la)
}
if lu := len(opts.Users); lu != 4 {
t.Fatalf("Expected 4 total Users, got %d\n", lu)
}
var natsAcc *Account
for _, acc := range opts.Accounts {
if acc.Name == "nats.io" {
natsAcc = acc
break
}
}
if natsAcc == nil {
t.Fatalf("Error retrieving account for 'nats.io'")
}
for _, u := range opts.Users {
if u.Username == "derek" {
if u.Account != natsAcc {
t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account)
break
}
}
}
}
func TestAccountParseConfigDuplicateUsers(t *testing.T) {
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: alice, password: foo}
{user: bob, password: bar}
]
}
nats.io {
users = [
{user: alice, password: bar}
]
}
}
`))
defer os.Remove(confFileName)
_, err := ProcessConfigFile(confFileName)
if err == nil {
t.Fatalf("Expected an error with double user entries")
}
}

View File

@@ -16,7 +16,6 @@ package server
import (
"crypto/tls"
"encoding/base64"
"fmt"
"strings"
"github.com/nats-io/nkeys"
@@ -39,17 +38,25 @@ type ClientAuthentication interface {
RegisterUser(*User)
}
// Accounts
type Account struct {
Name string
sl *Sublist
}
// Nkey is for multiple nkey based users
type NkeyUser struct {
Nkey string `json:"user"`
Permissions *Permissions `json:"permissions"`
Permissions *Permissions `json:"permissions,omitempty"`
Account *Account `json:"account,omitempty"`
}
// User is for multiple accounts/users.
type User struct {
Username string `json:"user"`
Password string `json:"password"`
Permissions *Permissions `json:"permissions"`
Permissions *Permissions `json:"permissions,omitempty"`
Account *Account `json:"account,omitempty"`
}
// clone performs a deep copy of the User struct, returning a new clone with
@@ -309,35 +316,6 @@ func (s *Server) isRouterAuthorized(c *client) bool {
return true
}
// removeUnauthorizedSubs removes any subscriptions the client has that are no
// longer authorized, e.g. due to a config reload.
func (s *Server) removeUnauthorizedSubs(c *client) {
c.mu.Lock()
if c.perms == nil {
c.mu.Unlock()
return
}
subs := make(map[string]*subscription, len(c.subs))
for sid, sub := range c.subs {
subs[sid] = sub
}
c.mu.Unlock()
for sid, sub := range subs {
if !c.canSubscribe(sub.subject) {
_ = s.sl.Remove(sub)
c.mu.Lock()
delete(c.subs, sid)
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
sub.subject, sub.sid))
s.Noticef("Removed sub %q for user %q - not authorized",
string(sub.subject), c.opts.Username)
}
}
}
// Support for bcrypt stored passwords and tokens.
const bcryptPrefix = "$2a$"

View File

@@ -144,6 +144,8 @@ type client struct {
ncs string
out outbound
srv *Server
acc *Account
sl *Sublist
subs map[string]*subscription
perms *permissions
in readCache
@@ -258,6 +260,8 @@ type clientOpts struct {
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Account string `json:"account,omitempty"`
AccountNew bool `json:"new_account,omitempty"`
// Routes only
Import *SubjectPermission `json:"import,omitempty"`
@@ -313,22 +317,33 @@ func (c *client) initClient() {
}
}
// RegisterWithAccount will register the given user with a specific
// account. This will change the subject namespace.
func (c *client) RegisterWithAccount(acc *Account) error {
if acc == nil || acc.sl == nil {
return ErrBadAccount
}
c.mu.Lock()
c.acc = acc
c.sl = acc.sl
c.mu.Unlock()
return nil
}
// RegisterUser allows auth to call back into a new client
// with the authenticated user. This is used to map any permissions
// into the client.
func (c *client) RegisterUser(user *User) {
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
c.mu.Lock()
c.perms = nil
c.mu.Unlock()
return
}
// Process Permissions and map into client connection structures.
c.mu.Lock()
defer c.mu.Unlock()
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
c.perms = nil
return
}
c.setPermissions(user.Permissions)
}
@@ -770,6 +785,8 @@ func (c *client) processConnect(arg []byte) error {
proto := c.opts.Protocol
verbose := c.opts.Verbose
lang := c.opts.Lang
account := c.opts.Account
accountNew := c.opts.AccountNew
c.mu.Unlock()
if srv != nil {
@@ -788,6 +805,38 @@ func (c *client) processConnect(arg []byte) error {
c.authViolation()
return ErrAuthorization
}
// Check for Account designation
if account != "" {
var acc *Account
var wasNew bool
if !srv.newAccountsAllowed() {
acc = srv.LookupAccount(account)
if acc == nil {
c.Errorf(ErrMissingAccount.Error())
c.sendErr("Account Not Found")
return ErrMissingAccount
} else if accountNew {
c.Errorf(ErrAccountExists.Error())
c.sendErr(ErrAccountExists.Error())
return ErrAccountExists
}
} else {
// We can create this one on the fly.
acc, wasNew = srv.LookupOrRegisterAccount(account)
if accountNew && !wasNew {
c.Errorf(ErrAccountExists.Error())
c.sendErr(ErrAccountExists.Error())
return ErrAccountExists
}
}
// If we are here we can register ourselves with the new account.
if err := c.RegisterWithAccount(acc); err != nil {
c.Errorf("Problem registering with account [%s]", account)
c.sendErr("Account Failed Registration")
return ErrBadAccount
}
}
}
// Check client protocol request if it exists.
@@ -828,7 +877,18 @@ func (c *client) authTimeout() {
}
func (c *client) authViolation() {
if c.srv != nil && c.srv.getOpts().Users != nil {
var hasNkeys, hasUsers bool
if s := c.srv; s != nil {
s.mu.Lock()
hasNkeys = s.nkeys != nil
hasUsers = s.users != nil
s.mu.Unlock()
}
if hasNkeys {
c.Errorf("%s - Nkey %q",
ErrAuthorization.Error(),
c.opts.Nkey)
} else if hasUsers {
c.Errorf("%s - User %q",
ErrAuthorization.Error(),
c.opts.Username)
@@ -1239,8 +1299,8 @@ func (c *client) processSub(argo []byte) (err error) {
sid := string(sub.sid)
if c.subs[sid] == nil {
c.subs[sid] = sub
if c.srv != nil {
err = c.srv.sl.Insert(sub)
if c.sl != nil {
err = c.sl.Insert(sub)
if err != nil {
delete(c.subs, sid)
} else {
@@ -1297,8 +1357,8 @@ func (c *client) unsubscribe(sub *subscription) {
c.traceOp("<-> %s", "DELSUB", sub.sid)
delete(c.subs, string(sub.sid))
if c.srv != nil {
c.srv.sl.Remove(sub)
if c.sl != nil {
c.sl.Remove(sub)
}
// If we are a queue subscriber on a client connection and we have routes,
@@ -1562,7 +1622,7 @@ func (c *client) processMsg(msg []byte) {
var r *SublistResult
var ok bool
genid := atomic.LoadUint64(&srv.sl.genid)
genid := atomic.LoadUint64(&c.sl.genid)
if genid == c.in.genid && c.in.results != nil {
r, ok = c.in.results[string(c.pa.subject)]
@@ -1574,7 +1634,7 @@ func (c *client) processMsg(msg []byte) {
if !ok {
subject := string(c.pa.subject)
r = srv.sl.Match(subject)
r = c.sl.Match(subject)
c.in.results[subject] = r
// Prune the results cache. Keeps us from unbounded growth.
if len(c.in.results) > maxResultCacheSize {
@@ -1782,6 +1842,35 @@ func (c *client) typeString() string {
return "Unknown Type"
}
// removeUnauthorizedSubs removes any subscriptions the client has that are no
// longer authorized, e.g. due to a config reload.
func (c *client) removeUnauthorizedSubs() {
c.mu.Lock()
if c.perms == nil {
c.mu.Unlock()
return
}
srv := c.srv
subs := make(map[string]*subscription, len(c.subs))
for sid, sub := range c.subs {
subs[sid] = sub
}
c.mu.Unlock()
for sid, sub := range subs {
if c.sl != nil && !c.canSubscribe(sub.subject) {
_ = c.sl.Remove(sub)
c.mu.Lock()
delete(c.subs, sid)
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
sub.subject, sub.sid))
srv.Noticef("Removed sub %q for user %q - not authorized",
string(sub.subject), c.opts.Username)
}
}
}
func (c *client) closeConnection(reason ClosedState) {
c.mu.Lock()
if c.nc == nil {
@@ -1820,10 +1909,13 @@ func (c *client) closeConnection(reason ClosedState) {
c.mu.Unlock()
// Remove clients subscriptions.
c.sl.RemoveBatch(subs)
if srv != nil {
// This is a route that disconnected...
if len(connectURLs) > 0 {
// Unless disabled, possibly update the server's INFO protcol
// Unless disabled, possibly update the server's INFO protocol
// and send to clients that know how to handle async INFOs.
if !srv.getOpts().Cluster.NoAdvertise {
srv.removeClientConnectURLsAndSendINFOToClients(connectURLs)
@@ -1833,9 +1925,8 @@ func (c *client) closeConnection(reason ClosedState) {
// Unregister
srv.removeClient(c)
// Remove clients subscriptions.
srv.sl.RemoveBatch(subs)
if c.typ == CLIENT {
// Remove remote subscriptions.
if c.typ != ROUTER {
// Forward UNSUBs protocols to all routes
srv.broadcastUnSubscribeBatch(subs)
}

View File

@@ -52,6 +52,17 @@ func createClientAsync(ch chan *client, s *Server, cli net.Conn) {
}()
}
func newClientForServer(s *Server) (*client, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, maxBufSize)
ch := make(chan *client)
createClientAsync(ch, s, srv)
l, _ := cr.ReadString('\n')
// Grab client
c := <-ch
return c, cr, l
}
var defaultServerOptions = Options{
Trace: false,
Debug: false,
@@ -643,17 +654,17 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) {
}()
<-ch
if s.sl.Count() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count())
if c.sl.Count() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", c.sl.Count())
}
c.closeConnection(ClientClosed)
if s.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count())
if c.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.gsl.Count())
}
}
func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
s, c, _ := setupClient()
_, c, _ := setupClient()
c.closeConnection(ClientClosed)
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
@@ -664,8 +675,8 @@ func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
}()
<-ch
if s.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count())
if c.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", c.sl.Count())
}
}

View File

@@ -48,4 +48,14 @@ var (
// ErrClientConnectedToRoutePort represents an error condition when a client
// attempted to connect to the route listen port.
ErrClientConnectedToRoutePort = errors.New("Attempted To Connect To Route Port")
// ErrAccountExists is returned when an account is attempted to be registered
// but already exists.
ErrAccountExists = errors.New("Account Exists")
// ErrBadAccount represents a malformed or incorrect account.
ErrBadAccount = errors.New("Bad Account")
// ErrMissingAccount is returned when an account does not exist.
ErrMissingAccount = errors.New("Account Missing")
)

View File

@@ -709,14 +709,15 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
}
sz := &Subsz{s.sl.Stats(), 0, offset, limit, nil}
// FIXME(dlc) - Make account aware.
sz := &Subsz{s.gsl.Stats(), 0, offset, limit, nil}
if subdetail {
// Now add in subscription's details
var raw [4096]*subscription
subs := raw[:0]
s.sl.localSubs(&subs)
s.gsl.localSubs(&subs)
details := make([]SubDetail, len(subs))
i := 0
// TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
@@ -938,7 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.MaxPending = opts.MaxPending
v.WriteDeadline = opts.WriteDeadline
v.Subscriptions = s.sl.Count()
v.Subscriptions = s.gsl.Count()
v.ConfigLoadTime = s.configTime
// Need a copy here since s.httpReqStats can change while doing
// the marshaling down below.

View File

@@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
mrand "math/rand"
"net"
"os"
"strings"
"testing"
@@ -56,17 +55,6 @@ func mixedSetup() (*Server, *client, *bufio.Reader, string) {
return rawSetup(opts)
}
func newClientForServer(s *Server) (*client, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, maxBufSize)
ch := make(chan *client)
createClientAsync(ch, s, srv)
l, _ := cr.ReadString('\n')
// Grab client
c := <-ch
return c, cr, l
}
func TestServerInfoNonce(t *testing.T) {
_, l := setUpClientWithResponse()
if !strings.HasPrefix(l, "INFO ") {

View File

@@ -62,6 +62,8 @@ type Options struct {
MaxSubs int `json:"max_subscriptions,omitempty"`
Nkeys []*NkeyUser `json:"-"`
Users []*User `json:"-"`
Accounts []*Account `json:"-"`
AllowNewAccounts bool `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
@@ -227,7 +229,7 @@ func (e *configWarningErr) Error() string {
}
// ProcessConfigFile processes a configuration file.
// FIXME(dlc): Hacky
// FIXME(dlc): A bit hacky
func ProcessConfigFile(configFile string) (*Options, error) {
opts := &Options{}
if err := opts.ProcessConfigFile(configFile); err != nil {
@@ -306,6 +308,15 @@ func (o *Options) ProcessConfigFile(configFile string) error {
o.Trace = v.(bool)
case "logtime":
o.Logtime = v.(bool)
case "accounts":
if pedantic {
err = parseAccounts(tk, o)
} else {
err = parseAccounts(v, o)
}
if err != nil {
return err
}
case "authorization":
var auth *authorization
if pedantic {
@@ -595,6 +606,82 @@ func setClusterPermissions(opts *ClusterOpts, perms *Permissions) {
}
}
// parseAccounts will parse the different accounts syntax.
func parseAccounts(v interface{}, opts *Options) error {
var pedantic = opts.CheckConfig
var tk token
_, v = unwrapValue(v)
uorn := make(map[string]struct{})
switch v.(type) {
case []interface{}, []string:
m := make(map[string]struct{}, len(v.([]interface{})))
for _, name := range v.([]interface{}) {
ns := name.(string)
if _, ok := m[ns]; ok {
return fmt.Errorf("Duplicate Account Entry: %s", ns)
}
opts.Accounts = append(opts.Accounts, &Account{Name: name.(string)})
m[ns] = struct{}{}
}
case map[string]interface{}:
m := make(map[string]struct{}, len(v.(map[string]interface{})))
for name, mv := range v.(map[string]interface{}) {
_, mv = unwrapValue(mv)
if _, ok := m[name]; ok {
return fmt.Errorf("Duplicate Account Entry: %s", name)
}
uv, ok := mv.(map[string]interface{})
if !ok {
return fmt.Errorf("Expected map entry for users")
}
acc := &Account{Name: name}
opts.Accounts = append(opts.Accounts, acc)
m[name] = struct{}{}
for k, v := range uv {
tk, mv = unwrapValue(v)
switch strings.ToLower(k) {
case "users":
var (
users []*User
err error
nkeys []*NkeyUser
)
if pedantic {
nkeys, users, err = parseUsers(tk, opts)
} else {
nkeys, users, err = parseUsers(mv, opts)
}
if err != nil {
return err
}
for _, u := range users {
if _, ok := uorn[u.Username]; ok {
return fmt.Errorf("Duplicate user %q detected", u.Username)
}
uorn[u.Username] = struct{}{}
u.Account = acc
}
opts.Users = append(opts.Users, users...)
for _, u := range nkeys {
if _, ok := uorn[u.Nkey]; ok {
return fmt.Errorf("Duplicate nkey %q detected", u.Nkey)
}
uorn[u.Nkey] = struct{}{}
u.Account = acc
}
opts.Nkeys = append(opts.Nkeys, nkeys...)
}
}
}
default:
return fmt.Errorf("Expected an array or map of account entries, got %T", v)
}
return nil
}
// Helper function to parse Authorization configs.
func parseAuthorization(v interface{}, opts *Options) (*authorization, error) {
var (

View File

@@ -681,7 +681,7 @@ func (s *Server) reloadAuthorization() {
}
// Remove any unauthorized subscriptions.
s.removeUnauthorizedSubs(client)
client.removeUnauthorizedSubs()
}
for _, route := range routes {

View File

@@ -578,7 +578,8 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
var raw [4096]*subscription
subs := raw[:0]
s.sl.localSubs(&subs)
// FIXME(dlc) this needs to be scoped per account when cluster proto changes.
s.gsl.localSubs(&subs)
route.mu.Lock()
closed := route.sendRouteSubProtos(subs, func(sub *subscription) bool {
@@ -691,7 +692,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
}
}
c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
c := &client{srv: s, sl: s.gsl, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
// Grab server variables
s.mu.Lock()

View File

@@ -69,13 +69,14 @@ type Server struct {
mu sync.Mutex
prand *rand.Rand
info Info
sl *Sublist
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
listener net.Listener
gsl *Sublist
accounts map[string]*Account
clients map[uint64]*client
routes map[uint64]*client
remotes map[string]*client
@@ -173,7 +174,7 @@ func New(opts *Options) *Server {
configFile: opts.ConfigFile,
info: info,
prand: rand.New(rand.NewSource(time.Now().UnixNano())),
sl: NewSublist(),
gsl: NewSublist(),
opts: opts,
done: make(chan bool, 1),
start: now,
@@ -192,6 +193,9 @@ func New(opts *Options) *Server {
// Used internally for quick look-ups.
s.clientConnectURLsMap = make(map[string]struct{})
// For tracking accounts
s.accounts = make(map[string]*Account)
// For tracking clients
s.clients = make(map[uint64]*client)
@@ -210,6 +214,9 @@ func New(opts *Options) *Server {
// to shutdown.
s.quitCh = make(chan struct{})
// Used to setup Accounts.
s.configureAccounts()
// Used to setup Authorization.
s.configureAuthorization()
@@ -232,6 +239,16 @@ func (s *Server) setOpts(opts *Options) {
s.optsMu.Unlock()
}
func (s *Server) configureAccounts() {
// Check opts and walk through them. Making sure to create SLs.
for _, acc := range s.opts.Accounts {
if acc.sl == nil {
acc.sl = NewSublist()
}
s.accounts[acc.Name] = acc
}
}
func (s *Server) generateRouteInfoJSON() {
b, _ := json.Marshal(s.routeInfo)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
@@ -281,6 +298,52 @@ func (s *Server) logPid() error {
return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660)
}
// newAccountsAllowed returns whether or not new accounts can be created on the fly.
func (s *Server) newAccountsAllowed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.opts.AllowNewAccounts
}
func (s *Server) LookupOrRegisterAccount(name string) (*Account, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if acc, ok := s.accounts[name]; ok {
return acc, false
}
acc := &Account{
Name: name,
sl: NewSublist(),
}
s.accounts[name] = acc
return acc, true
}
// RegisterAccount will register an account. The account must be new
// or this call will fail.
func (s *Server) RegisterAccount(name string) (*Account, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.accounts[name]; ok {
return nil, ErrAccountExists
}
acc := &Account{
Name: name,
sl: NewSublist(),
}
s.accounts[name] = acc
return acc, nil
}
// LookupAccount is a public function to return the account structure
// associated with name.
func (s *Server) LookupAccount(name string) *Account {
s.mu.Lock()
defer s.mu.Unlock()
return s.accounts[name]
}
// Start up the server, this will block.
// Start via a Go routine if needed.
func (s *Server) Start() {
@@ -778,7 +841,7 @@ func (s *Server) createClient(conn net.Conn) *client {
max_subs := opts.MaxSubs
now := time.Now()
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now}
c := &client{srv: s, sl: s.gsl, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now}
// Grab JSON info string
s.mu.Lock()
@@ -1088,7 +1151,13 @@ func (s *Server) getClient(cid uint64) *client {
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()
subs := s.sl.Count()
var subs uint32
for _, acc := range s.accounts {
if acc.sl != nil {
subs += acc.sl.Count()
}
}
subs += s.gsl.Count()
s.mu.Unlock()
return subs
}

View File

@@ -24,8 +24,8 @@ func TestSplitBufferSubOp(t *testing.T) {
defer cli.Close()
defer trash.Close()
s := &Server{sl: NewSublist()}
c := &client{srv: s, subs: make(map[string]*subscription), nc: cli}
s := &Server{gsl: NewSublist()}
c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription), nc: cli}
subop := []byte("SUB foo 1\r\n")
subop1 := subop[:6]
@@ -43,7 +43,7 @@ func TestSplitBufferSubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match("foo")
r := s.gsl.Match("foo")
if r == nil || len(r.psubs) != 1 {
t.Fatalf("Did not match subscription properly: %+v\n", r)
}
@@ -60,7 +60,7 @@ func TestSplitBufferSubOp(t *testing.T) {
}
func TestSplitBufferUnsubOp(t *testing.T) {
s := &Server{sl: NewSublist()}
s := &Server{gsl: NewSublist()}
c := &client{srv: s, subs: make(map[string]*subscription)}
subop := []byte("SUB foo 1024\r\n")
@@ -87,7 +87,7 @@ func TestSplitBufferUnsubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match("foo")
r := s.gsl.Match("foo")
if r != nil && len(r.psubs) != 0 {
t.Fatalf("Should be no subscriptions in results: %+v\n", r)
}
@@ -300,7 +300,7 @@ func TestSplitConnectArg(t *testing.T) {
func TestSplitDanglingArgBuf(t *testing.T) {
s := New(&defaultServerOptions)
c := &client{srv: s, subs: make(map[string]*subscription)}
c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription)}
// We test to make sure we do not dangle any argBufs after processing
// since that could lead to performance issues.