diff --git a/server/accounts_test.go b/server/accounts_test.go new file mode 100644 index 00000000..3dd6595a --- /dev/null +++ b/server/accounts_test.go @@ -0,0 +1,286 @@ +// Copyright 2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "os" + "strings" + "testing" +) + +func simpleAccountServer(t *testing.T) *Server { + opts := defaultServerOptions + s := New(&opts) + + // Now create two accounts. + _, err := s.RegisterAccount("foo") + if err != nil { + t.Fatalf("Error creating account 'foo': %v", err) + } + _, err = s.RegisterAccount("bar") + if err != nil { + t.Fatalf("Error creating account 'bar': %v", err) + } + return s +} + +func TestRegisterDuplicateAccounts(t *testing.T) { + s := simpleAccountServer(t) + if _, err := s.RegisterAccount("foo"); err == nil { + t.Fatal("Expected an error registering 'foo' twice") + } +} + +func TestAccountIsolation(t *testing.T) { + s := simpleAccountServer(t) + fooAcc := s.LookupAccount("foo") + barAcc := s.LookupAccount("bar") + if fooAcc == nil || barAcc == nil { + t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") + } + cfoo, crFoo, _ := newClientForServer(s) + if err := cfoo.RegisterWithAccount(fooAcc); err != nil { + t.Fatalf("Error register client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + if err := cbar.RegisterWithAccount(barAcc); err != nil { + t.Fatalf("Error register client with 'bar' account: %v", err) + } + + // Make sure they are different accounts/sl. + if cfoo.acc == cbar.acc { + t.Fatalf("Error, accounts the same for both clients") + } + + // Now do quick test that makes sure messages do not cross over. + // setup bar as a foo subscriber. + go cbar.parse([]byte("SUB foo 1\r\nPING\r\nPING\r\n")) + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q\n", l) + } + + go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")) + l, err = crFoo.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'foo' from server: %v", err) + } + + matches := msgPat.FindAllStringSubmatch(l, -1)[0] + if matches[SUB_INDEX] != "foo" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + checkPayload(crFoo, []byte("hello\r\n"), t) + + // Now make sure nothing shows up on bar. + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q\n", l) + } +} + +func TestAccountFromOptions(t *testing.T) { + opts := defaultServerOptions + opts.Accounts = []*Account{ + &Account{Name: "foo"}, + &Account{Name: "bar"}, + } + s := New(&opts) + + if la := len(s.accounts); la != 2 { + t.Fatalf("Expected to have a server with two accounts, got %v", la) + } + // Check that sl is filled in. + fooAcc := s.LookupAccount("foo") + barAcc := s.LookupAccount("bar") + if fooAcc == nil || barAcc == nil { + t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") + } + if fooAcc.sl == nil || barAcc.sl == nil { + t.Fatal("Expected Sublists to be filled in on Opts.Accounts") + } +} + +func TestNewAccountsFromClients(t *testing.T) { + opts := defaultServerOptions + s := New(&opts) + + c, cr, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"account\":\"foo\"}\r\n") + go c.parse(connectOp) + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } + + opts.AllowNewAccounts = true + s = New(&opts) + + c, _, _ = newClientForServer(s) + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received an error trying to create an account: %v", err) + } +} + +// Clients can ask that the account be forced to be new. If it exists this is an error. +func TestNewAccountRequireNew(t *testing.T) { + // This has foo and bar accounts already. + s := simpleAccountServer(t) + + c, cr, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n") + go c.parse(connectOp) + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } + + // Now allow new accounts on the fly, make sure second time does not work. + opts := defaultServerOptions + opts.AllowNewAccounts = true + s = New(&opts) + + c, _, _ = newClientForServer(s) + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received an error trying to create an account: %v", err) + } + + c, cr, _ = newClientForServer(s) + go c.parse(connectOp) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } +} + +func accountNameExists(name string, accounts []*Account) bool { + for _, acc := range accounts { + if strings.Compare(acc.Name, name) == 0 { + return true + } + } + return false +} + +func TestAccountSimpleConfig(t *testing.T) { + confFileName := createConfFile(t, []byte(`accounts = [foo, bar]`)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Received an error processing config file: %v", err) + } + if la := len(opts.Accounts); la != 2 { + t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + } + if !accountNameExists("foo", opts.Accounts) { + t.Fatal("Expected a 'foo' account") + } + if !accountNameExists("bar", opts.Accounts) { + t.Fatal("Expected a 'bar' account") + } + + // Make sure double entries is an error. + confFileName = createConfFile(t, []byte(`accounts = [foo, foo]`)) + defer os.Remove(confFileName) + _, err = ProcessConfigFile(confFileName) + if err == nil { + t.Fatalf("Expected an error with double account entries") + } +} + +func TestAccountParseConfig(t *testing.T) { + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: alice, password: foo} + {user: bob, password: bar} + ] + } + nats.io { + users = [ + {user: derek, password: foo} + {user: ivan, password: bar} + ] + } + } + `)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Received an error processing config file: %v", err) + } + + if la := len(opts.Accounts); la != 2 { + t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + } + + if lu := len(opts.Users); lu != 4 { + t.Fatalf("Expected 4 total Users, got %d\n", lu) + } + + var natsAcc *Account + for _, acc := range opts.Accounts { + if acc.Name == "nats.io" { + natsAcc = acc + break + } + } + if natsAcc == nil { + t.Fatalf("Error retrieving account for 'nats.io'") + } + + for _, u := range opts.Users { + if u.Username == "derek" { + if u.Account != natsAcc { + t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account) + break + } + } + } +} + +func TestAccountParseConfigDuplicateUsers(t *testing.T) { + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: alice, password: foo} + {user: bob, password: bar} + ] + } + nats.io { + users = [ + {user: alice, password: bar} + ] + } + } + `)) + defer os.Remove(confFileName) + _, err := ProcessConfigFile(confFileName) + if err == nil { + t.Fatalf("Expected an error with double user entries") + } +} diff --git a/server/auth.go b/server/auth.go index 252ae6a8..146312eb 100644 --- a/server/auth.go +++ b/server/auth.go @@ -16,7 +16,6 @@ package server import ( "crypto/tls" "encoding/base64" - "fmt" "strings" "github.com/nats-io/nkeys" @@ -39,17 +38,25 @@ type ClientAuthentication interface { RegisterUser(*User) } +// Accounts +type Account struct { + Name string + sl *Sublist +} + // Nkey is for multiple nkey based users type NkeyUser struct { Nkey string `json:"user"` - Permissions *Permissions `json:"permissions"` + Permissions *Permissions `json:"permissions,omitempty"` + Account *Account `json:"account,omitempty"` } // User is for multiple accounts/users. type User struct { Username string `json:"user"` Password string `json:"password"` - Permissions *Permissions `json:"permissions"` + Permissions *Permissions `json:"permissions,omitempty"` + Account *Account `json:"account,omitempty"` } // clone performs a deep copy of the User struct, returning a new clone with @@ -309,35 +316,6 @@ func (s *Server) isRouterAuthorized(c *client) bool { return true } -// removeUnauthorizedSubs removes any subscriptions the client has that are no -// longer authorized, e.g. due to a config reload. -func (s *Server) removeUnauthorizedSubs(c *client) { - c.mu.Lock() - if c.perms == nil { - c.mu.Unlock() - return - } - - subs := make(map[string]*subscription, len(c.subs)) - for sid, sub := range c.subs { - subs[sid] = sub - } - c.mu.Unlock() - - for sid, sub := range subs { - if !c.canSubscribe(sub.subject) { - _ = s.sl.Remove(sub) - c.mu.Lock() - delete(c.subs, sid) - c.mu.Unlock() - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", - sub.subject, sub.sid)) - s.Noticef("Removed sub %q for user %q - not authorized", - string(sub.subject), c.opts.Username) - } - } -} - // Support for bcrypt stored passwords and tokens. const bcryptPrefix = "$2a$" diff --git a/server/client.go b/server/client.go index 73d4f453..b822bb58 100644 --- a/server/client.go +++ b/server/client.go @@ -144,6 +144,8 @@ type client struct { ncs string out outbound srv *Server + acc *Account + sl *Sublist subs map[string]*subscription perms *permissions in readCache @@ -258,6 +260,8 @@ type clientOpts struct { Lang string `json:"lang"` Version string `json:"version"` Protocol int `json:"protocol"` + Account string `json:"account,omitempty"` + AccountNew bool `json:"new_account,omitempty"` // Routes only Import *SubjectPermission `json:"import,omitempty"` @@ -313,22 +317,33 @@ func (c *client) initClient() { } } +// RegisterWithAccount will register the given user with a specific +// account. This will change the subject namespace. +func (c *client) RegisterWithAccount(acc *Account) error { + if acc == nil || acc.sl == nil { + return ErrBadAccount + } + c.mu.Lock() + c.acc = acc + c.sl = acc.sl + c.mu.Unlock() + return nil +} + // RegisterUser allows auth to call back into a new client // with the authenticated user. This is used to map any permissions // into the client. func (c *client) RegisterUser(user *User) { - if user.Permissions == nil { - // Reset perms to nil in case client previously had them. - c.mu.Lock() - c.perms = nil - c.mu.Unlock() - return - } - // Process Permissions and map into client connection structures. c.mu.Lock() defer c.mu.Unlock() + if user.Permissions == nil { + // Reset perms to nil in case client previously had them. + c.perms = nil + return + } + c.setPermissions(user.Permissions) } @@ -770,6 +785,8 @@ func (c *client) processConnect(arg []byte) error { proto := c.opts.Protocol verbose := c.opts.Verbose lang := c.opts.Lang + account := c.opts.Account + accountNew := c.opts.AccountNew c.mu.Unlock() if srv != nil { @@ -788,6 +805,38 @@ func (c *client) processConnect(arg []byte) error { c.authViolation() return ErrAuthorization } + + // Check for Account designation + if account != "" { + var acc *Account + var wasNew bool + if !srv.newAccountsAllowed() { + acc = srv.LookupAccount(account) + if acc == nil { + c.Errorf(ErrMissingAccount.Error()) + c.sendErr("Account Not Found") + return ErrMissingAccount + } else if accountNew { + c.Errorf(ErrAccountExists.Error()) + c.sendErr(ErrAccountExists.Error()) + return ErrAccountExists + } + } else { + // We can create this one on the fly. + acc, wasNew = srv.LookupOrRegisterAccount(account) + if accountNew && !wasNew { + c.Errorf(ErrAccountExists.Error()) + c.sendErr(ErrAccountExists.Error()) + return ErrAccountExists + } + } + // If we are here we can register ourselves with the new account. + if err := c.RegisterWithAccount(acc); err != nil { + c.Errorf("Problem registering with account [%s]", account) + c.sendErr("Account Failed Registration") + return ErrBadAccount + } + } } // Check client protocol request if it exists. @@ -828,7 +877,18 @@ func (c *client) authTimeout() { } func (c *client) authViolation() { - if c.srv != nil && c.srv.getOpts().Users != nil { + var hasNkeys, hasUsers bool + if s := c.srv; s != nil { + s.mu.Lock() + hasNkeys = s.nkeys != nil + hasUsers = s.users != nil + s.mu.Unlock() + } + if hasNkeys { + c.Errorf("%s - Nkey %q", + ErrAuthorization.Error(), + c.opts.Nkey) + } else if hasUsers { c.Errorf("%s - User %q", ErrAuthorization.Error(), c.opts.Username) @@ -1239,8 +1299,8 @@ func (c *client) processSub(argo []byte) (err error) { sid := string(sub.sid) if c.subs[sid] == nil { c.subs[sid] = sub - if c.srv != nil { - err = c.srv.sl.Insert(sub) + if c.sl != nil { + err = c.sl.Insert(sub) if err != nil { delete(c.subs, sid) } else { @@ -1297,8 +1357,8 @@ func (c *client) unsubscribe(sub *subscription) { c.traceOp("<-> %s", "DELSUB", sub.sid) delete(c.subs, string(sub.sid)) - if c.srv != nil { - c.srv.sl.Remove(sub) + if c.sl != nil { + c.sl.Remove(sub) } // If we are a queue subscriber on a client connection and we have routes, @@ -1562,7 +1622,7 @@ func (c *client) processMsg(msg []byte) { var r *SublistResult var ok bool - genid := atomic.LoadUint64(&srv.sl.genid) + genid := atomic.LoadUint64(&c.sl.genid) if genid == c.in.genid && c.in.results != nil { r, ok = c.in.results[string(c.pa.subject)] @@ -1574,7 +1634,7 @@ func (c *client) processMsg(msg []byte) { if !ok { subject := string(c.pa.subject) - r = srv.sl.Match(subject) + r = c.sl.Match(subject) c.in.results[subject] = r // Prune the results cache. Keeps us from unbounded growth. if len(c.in.results) > maxResultCacheSize { @@ -1782,6 +1842,35 @@ func (c *client) typeString() string { return "Unknown Type" } +// removeUnauthorizedSubs removes any subscriptions the client has that are no +// longer authorized, e.g. due to a config reload. +func (c *client) removeUnauthorizedSubs() { + c.mu.Lock() + if c.perms == nil { + c.mu.Unlock() + return + } + srv := c.srv + subs := make(map[string]*subscription, len(c.subs)) + for sid, sub := range c.subs { + subs[sid] = sub + } + c.mu.Unlock() + + for sid, sub := range subs { + if c.sl != nil && !c.canSubscribe(sub.subject) { + _ = c.sl.Remove(sub) + c.mu.Lock() + delete(c.subs, sid) + c.mu.Unlock() + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", + sub.subject, sub.sid)) + srv.Noticef("Removed sub %q for user %q - not authorized", + string(sub.subject), c.opts.Username) + } + } +} + func (c *client) closeConnection(reason ClosedState) { c.mu.Lock() if c.nc == nil { @@ -1820,10 +1909,13 @@ func (c *client) closeConnection(reason ClosedState) { c.mu.Unlock() + // Remove clients subscriptions. + c.sl.RemoveBatch(subs) + if srv != nil { // This is a route that disconnected... if len(connectURLs) > 0 { - // Unless disabled, possibly update the server's INFO protcol + // Unless disabled, possibly update the server's INFO protocol // and send to clients that know how to handle async INFOs. if !srv.getOpts().Cluster.NoAdvertise { srv.removeClientConnectURLsAndSendINFOToClients(connectURLs) @@ -1833,9 +1925,8 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) - // Remove clients subscriptions. - srv.sl.RemoveBatch(subs) - if c.typ == CLIENT { + // Remove remote subscriptions. + if c.typ != ROUTER { // Forward UNSUBs protocols to all routes srv.broadcastUnSubscribeBatch(subs) } diff --git a/server/client_test.go b/server/client_test.go index 29271a34..80574992 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -52,6 +52,17 @@ func createClientAsync(ch chan *client, s *Server, cli net.Conn) { }() } +func newClientForServer(s *Server) (*client, *bufio.Reader, string) { + cli, srv := net.Pipe() + cr := bufio.NewReaderSize(cli, maxBufSize) + ch := make(chan *client) + createClientAsync(ch, s, srv) + l, _ := cr.ReadString('\n') + // Grab client + c := <-ch + return c, cr, l +} + var defaultServerOptions = Options{ Trace: false, Debug: false, @@ -643,17 +654,17 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) { }() <-ch - if s.sl.Count() != 2 { - t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count()) + if c.sl.Count() != 2 { + t.Fatalf("Should have 2 subscriptions, got %d\n", c.sl.Count()) } c.closeConnection(ClientClosed) - if s.sl.Count() != 0 { - t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) + if c.sl.Count() != 0 { + t.Fatalf("Should have no subscriptions after close, got %d\n", s.gsl.Count()) } } func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) { - s, c, _ := setupClient() + _, c, _ := setupClient() c.closeConnection(ClientClosed) subs := []byte("SUB foo 1\r\nSUB bar 2\r\n") @@ -664,8 +675,8 @@ func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) { }() <-ch - if s.sl.Count() != 0 { - t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) + if c.sl.Count() != 0 { + t.Fatalf("Should have no subscriptions after close, got %d\n", c.sl.Count()) } } diff --git a/server/errors.go b/server/errors.go index c722bfc4..2dd640c7 100644 --- a/server/errors.go +++ b/server/errors.go @@ -48,4 +48,14 @@ var ( // ErrClientConnectedToRoutePort represents an error condition when a client // attempted to connect to the route listen port. ErrClientConnectedToRoutePort = errors.New("Attempted To Connect To Route Port") + + // ErrAccountExists is returned when an account is attempted to be registered + // but already exists. + ErrAccountExists = errors.New("Account Exists") + + // ErrBadAccount represents a malformed or incorrect account. + ErrBadAccount = errors.New("Bad Account") + + // ErrMissingAccount is returned when an account does not exist. + ErrMissingAccount = errors.New("Account Missing") ) diff --git a/server/monitor.go b/server/monitor.go index 9379b736..604f12a8 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -709,14 +709,15 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { } } - sz := &Subsz{s.sl.Stats(), 0, offset, limit, nil} + // FIXME(dlc) - Make account aware. + sz := &Subsz{s.gsl.Stats(), 0, offset, limit, nil} if subdetail { // Now add in subscription's details var raw [4096]*subscription subs := raw[:0] - s.sl.localSubs(&subs) + s.gsl.localSubs(&subs) details := make([]SubDetail, len(subs)) i := 0 // TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering. @@ -938,7 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) v.MaxPending = opts.MaxPending v.WriteDeadline = opts.WriteDeadline - v.Subscriptions = s.sl.Count() + v.Subscriptions = s.gsl.Count() v.ConfigLoadTime = s.configTime // Need a copy here since s.httpReqStats can change while doing // the marshaling down below. diff --git a/server/nkey_test.go b/server/nkey_test.go index f0837f2c..d3186e68 100644 --- a/server/nkey_test.go +++ b/server/nkey_test.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" mrand "math/rand" - "net" "os" "strings" "testing" @@ -56,17 +55,6 @@ func mixedSetup() (*Server, *client, *bufio.Reader, string) { return rawSetup(opts) } -func newClientForServer(s *Server) (*client, *bufio.Reader, string) { - cli, srv := net.Pipe() - cr := bufio.NewReaderSize(cli, maxBufSize) - ch := make(chan *client) - createClientAsync(ch, s, srv) - l, _ := cr.ReadString('\n') - // Grab client - c := <-ch - return c, cr, l -} - func TestServerInfoNonce(t *testing.T) { _, l := setUpClientWithResponse() if !strings.HasPrefix(l, "INFO ") { diff --git a/server/opts.go b/server/opts.go index a7efa8b1..0b758c93 100644 --- a/server/opts.go +++ b/server/opts.go @@ -62,6 +62,8 @@ type Options struct { MaxSubs int `json:"max_subscriptions,omitempty"` Nkeys []*NkeyUser `json:"-"` Users []*User `json:"-"` + Accounts []*Account `json:"-"` + AllowNewAccounts bool `json:"-"` Username string `json:"-"` Password string `json:"-"` Authorization string `json:"-"` @@ -227,7 +229,7 @@ func (e *configWarningErr) Error() string { } // ProcessConfigFile processes a configuration file. -// FIXME(dlc): Hacky +// FIXME(dlc): A bit hacky func ProcessConfigFile(configFile string) (*Options, error) { opts := &Options{} if err := opts.ProcessConfigFile(configFile); err != nil { @@ -306,6 +308,15 @@ func (o *Options) ProcessConfigFile(configFile string) error { o.Trace = v.(bool) case "logtime": o.Logtime = v.(bool) + case "accounts": + if pedantic { + err = parseAccounts(tk, o) + } else { + err = parseAccounts(v, o) + } + if err != nil { + return err + } case "authorization": var auth *authorization if pedantic { @@ -595,6 +606,82 @@ func setClusterPermissions(opts *ClusterOpts, perms *Permissions) { } } +// parseAccounts will parse the different accounts syntax. +func parseAccounts(v interface{}, opts *Options) error { + var pedantic = opts.CheckConfig + var tk token + _, v = unwrapValue(v) + uorn := make(map[string]struct{}) + + switch v.(type) { + case []interface{}, []string: + m := make(map[string]struct{}, len(v.([]interface{}))) + for _, name := range v.([]interface{}) { + ns := name.(string) + if _, ok := m[ns]; ok { + return fmt.Errorf("Duplicate Account Entry: %s", ns) + } + opts.Accounts = append(opts.Accounts, &Account{Name: name.(string)}) + m[ns] = struct{}{} + } + case map[string]interface{}: + m := make(map[string]struct{}, len(v.(map[string]interface{}))) + for name, mv := range v.(map[string]interface{}) { + _, mv = unwrapValue(mv) + if _, ok := m[name]; ok { + return fmt.Errorf("Duplicate Account Entry: %s", name) + } + uv, ok := mv.(map[string]interface{}) + if !ok { + return fmt.Errorf("Expected map entry for users") + } + acc := &Account{Name: name} + opts.Accounts = append(opts.Accounts, acc) + m[name] = struct{}{} + + for k, v := range uv { + tk, mv = unwrapValue(v) + switch strings.ToLower(k) { + case "users": + var ( + users []*User + err error + nkeys []*NkeyUser + ) + if pedantic { + nkeys, users, err = parseUsers(tk, opts) + } else { + nkeys, users, err = parseUsers(mv, opts) + } + if err != nil { + return err + } + for _, u := range users { + if _, ok := uorn[u.Username]; ok { + return fmt.Errorf("Duplicate user %q detected", u.Username) + } + uorn[u.Username] = struct{}{} + u.Account = acc + } + opts.Users = append(opts.Users, users...) + + for _, u := range nkeys { + if _, ok := uorn[u.Nkey]; ok { + return fmt.Errorf("Duplicate nkey %q detected", u.Nkey) + } + uorn[u.Nkey] = struct{}{} + u.Account = acc + } + opts.Nkeys = append(opts.Nkeys, nkeys...) + } + } + } + default: + return fmt.Errorf("Expected an array or map of account entries, got %T", v) + } + return nil +} + // Helper function to parse Authorization configs. func parseAuthorization(v interface{}, opts *Options) (*authorization, error) { var ( diff --git a/server/reload.go b/server/reload.go index 410ede1d..3e57c90b 100644 --- a/server/reload.go +++ b/server/reload.go @@ -681,7 +681,7 @@ func (s *Server) reloadAuthorization() { } // Remove any unauthorized subscriptions. - s.removeUnauthorizedSubs(client) + client.removeUnauthorizedSubs() } for _, route := range routes { diff --git a/server/route.go b/server/route.go index 55d5fc0f..d206f233 100644 --- a/server/route.go +++ b/server/route.go @@ -578,7 +578,8 @@ func (s *Server) sendLocalSubsToRoute(route *client) { var raw [4096]*subscription subs := raw[:0] - s.sl.localSubs(&subs) + // FIXME(dlc) this needs to be scoped per account when cluster proto changes. + s.gsl.localSubs(&subs) route.mu.Lock() closed := route.sendRouteSubProtos(subs, func(sub *subscription) bool { @@ -691,7 +692,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } } - c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} + c := &client{srv: s, sl: s.gsl, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} // Grab server variables s.mu.Lock() diff --git a/server/server.go b/server/server.go index 1f890d33..492c8a8d 100644 --- a/server/server.go +++ b/server/server.go @@ -69,13 +69,14 @@ type Server struct { mu sync.Mutex prand *rand.Rand info Info - sl *Sublist configFile string optsMu sync.RWMutex opts *Options running bool shutdown bool listener net.Listener + gsl *Sublist + accounts map[string]*Account clients map[uint64]*client routes map[uint64]*client remotes map[string]*client @@ -173,7 +174,7 @@ func New(opts *Options) *Server { configFile: opts.ConfigFile, info: info, prand: rand.New(rand.NewSource(time.Now().UnixNano())), - sl: NewSublist(), + gsl: NewSublist(), opts: opts, done: make(chan bool, 1), start: now, @@ -192,6 +193,9 @@ func New(opts *Options) *Server { // Used internally for quick look-ups. s.clientConnectURLsMap = make(map[string]struct{}) + // For tracking accounts + s.accounts = make(map[string]*Account) + // For tracking clients s.clients = make(map[uint64]*client) @@ -210,6 +214,9 @@ func New(opts *Options) *Server { // to shutdown. s.quitCh = make(chan struct{}) + // Used to setup Accounts. + s.configureAccounts() + // Used to setup Authorization. s.configureAuthorization() @@ -232,6 +239,16 @@ func (s *Server) setOpts(opts *Options) { s.optsMu.Unlock() } +func (s *Server) configureAccounts() { + // Check opts and walk through them. Making sure to create SLs. + for _, acc := range s.opts.Accounts { + if acc.sl == nil { + acc.sl = NewSublist() + } + s.accounts[acc.Name] = acc + } +} + func (s *Server) generateRouteInfoJSON() { b, _ := json.Marshal(s.routeInfo) pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} @@ -281,6 +298,52 @@ func (s *Server) logPid() error { return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660) } +// newAccountsAllowed returns whether or not new accounts can be created on the fly. +func (s *Server) newAccountsAllowed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.opts.AllowNewAccounts +} + +func (s *Server) LookupOrRegisterAccount(name string) (*Account, bool) { + s.mu.Lock() + defer s.mu.Unlock() + if acc, ok := s.accounts[name]; ok { + return acc, false + } + acc := &Account{ + Name: name, + sl: NewSublist(), + } + s.accounts[name] = acc + return acc, true +} + +// RegisterAccount will register an account. The account must be new +// or this call will fail. +func (s *Server) RegisterAccount(name string) (*Account, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.accounts[name]; ok { + return nil, ErrAccountExists + } + acc := &Account{ + Name: name, + sl: NewSublist(), + } + s.accounts[name] = acc + return acc, nil +} + +// LookupAccount is a public function to return the account structure +// associated with name. +func (s *Server) LookupAccount(name string) *Account { + s.mu.Lock() + defer s.mu.Unlock() + return s.accounts[name] +} + // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { @@ -778,7 +841,7 @@ func (s *Server) createClient(conn net.Conn) *client { max_subs := opts.MaxSubs now := time.Now() - c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now} + c := &client{srv: s, sl: s.gsl, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now} // Grab JSON info string s.mu.Lock() @@ -1088,7 +1151,13 @@ func (s *Server) getClient(cid uint64) *client { // NumSubscriptions will report how many subscriptions are active. func (s *Server) NumSubscriptions() uint32 { s.mu.Lock() - subs := s.sl.Count() + var subs uint32 + for _, acc := range s.accounts { + if acc.sl != nil { + subs += acc.sl.Count() + } + } + subs += s.gsl.Count() s.mu.Unlock() return subs } diff --git a/server/split_test.go b/server/split_test.go index 77dd2efb..ff36c80e 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -24,8 +24,8 @@ func TestSplitBufferSubOp(t *testing.T) { defer cli.Close() defer trash.Close() - s := &Server{sl: NewSublist()} - c := &client{srv: s, subs: make(map[string]*subscription), nc: cli} + s := &Server{gsl: NewSublist()} + c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -43,7 +43,7 @@ func TestSplitBufferSubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match("foo") + r := s.gsl.Match("foo") if r == nil || len(r.psubs) != 1 { t.Fatalf("Did not match subscription properly: %+v\n", r) } @@ -60,7 +60,7 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{sl: NewSublist()} + s := &Server{gsl: NewSublist()} c := &client{srv: s, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") @@ -87,7 +87,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match("foo") + r := s.gsl.Match("foo") if r != nil && len(r.psubs) != 0 { t.Fatalf("Should be no subscriptions in results: %+v\n", r) } @@ -300,7 +300,7 @@ func TestSplitConnectArg(t *testing.T) { func TestSplitDanglingArgBuf(t *testing.T) { s := New(&defaultServerOptions) - c := &client{srv: s, subs: make(map[string]*subscription)} + c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues.