From 1cbfbfa0712697d4100c4e08115fc5cf6db056e0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 16 Sep 2018 06:07:35 -0700 Subject: [PATCH 1/8] Basic account support Signed-off-by: Derek Collison --- server/accounts_test.go | 286 ++++++++++++++++++++++++++++++++++++++++ server/auth.go | 42 ++---- server/client.go | 129 +++++++++++++++--- server/client_test.go | 25 +++- server/errors.go | 10 ++ server/monitor.go | 7 +- server/nkey_test.go | 12 -- server/opts.go | 89 ++++++++++++- server/reload.go | 2 +- server/route.go | 5 +- server/server.go | 77 ++++++++++- server/split_test.go | 12 +- 12 files changed, 609 insertions(+), 87 deletions(-) create mode 100644 server/accounts_test.go diff --git a/server/accounts_test.go b/server/accounts_test.go new file mode 100644 index 00000000..3dd6595a --- /dev/null +++ b/server/accounts_test.go @@ -0,0 +1,286 @@ +// Copyright 2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "os" + "strings" + "testing" +) + +func simpleAccountServer(t *testing.T) *Server { + opts := defaultServerOptions + s := New(&opts) + + // Now create two accounts. + _, err := s.RegisterAccount("foo") + if err != nil { + t.Fatalf("Error creating account 'foo': %v", err) + } + _, err = s.RegisterAccount("bar") + if err != nil { + t.Fatalf("Error creating account 'bar': %v", err) + } + return s +} + +func TestRegisterDuplicateAccounts(t *testing.T) { + s := simpleAccountServer(t) + if _, err := s.RegisterAccount("foo"); err == nil { + t.Fatal("Expected an error registering 'foo' twice") + } +} + +func TestAccountIsolation(t *testing.T) { + s := simpleAccountServer(t) + fooAcc := s.LookupAccount("foo") + barAcc := s.LookupAccount("bar") + if fooAcc == nil || barAcc == nil { + t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") + } + cfoo, crFoo, _ := newClientForServer(s) + if err := cfoo.RegisterWithAccount(fooAcc); err != nil { + t.Fatalf("Error register client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + if err := cbar.RegisterWithAccount(barAcc); err != nil { + t.Fatalf("Error register client with 'bar' account: %v", err) + } + + // Make sure they are different accounts/sl. + if cfoo.acc == cbar.acc { + t.Fatalf("Error, accounts the same for both clients") + } + + // Now do quick test that makes sure messages do not cross over. + // setup bar as a foo subscriber. + go cbar.parse([]byte("SUB foo 1\r\nPING\r\nPING\r\n")) + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q\n", l) + } + + go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")) + l, err = crFoo.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'foo' from server: %v", err) + } + + matches := msgPat.FindAllStringSubmatch(l, -1)[0] + if matches[SUB_INDEX] != "foo" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + checkPayload(crFoo, []byte("hello\r\n"), t) + + // Now make sure nothing shows up on bar. + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q\n", l) + } +} + +func TestAccountFromOptions(t *testing.T) { + opts := defaultServerOptions + opts.Accounts = []*Account{ + &Account{Name: "foo"}, + &Account{Name: "bar"}, + } + s := New(&opts) + + if la := len(s.accounts); la != 2 { + t.Fatalf("Expected to have a server with two accounts, got %v", la) + } + // Check that sl is filled in. + fooAcc := s.LookupAccount("foo") + barAcc := s.LookupAccount("bar") + if fooAcc == nil || barAcc == nil { + t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") + } + if fooAcc.sl == nil || barAcc.sl == nil { + t.Fatal("Expected Sublists to be filled in on Opts.Accounts") + } +} + +func TestNewAccountsFromClients(t *testing.T) { + opts := defaultServerOptions + s := New(&opts) + + c, cr, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"account\":\"foo\"}\r\n") + go c.parse(connectOp) + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } + + opts.AllowNewAccounts = true + s = New(&opts) + + c, _, _ = newClientForServer(s) + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received an error trying to create an account: %v", err) + } +} + +// Clients can ask that the account be forced to be new. If it exists this is an error. +func TestNewAccountRequireNew(t *testing.T) { + // This has foo and bar accounts already. + s := simpleAccountServer(t) + + c, cr, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n") + go c.parse(connectOp) + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } + + // Now allow new accounts on the fly, make sure second time does not work. + opts := defaultServerOptions + opts.AllowNewAccounts = true + s = New(&opts) + + c, _, _ = newClientForServer(s) + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received an error trying to create an account: %v", err) + } + + c, cr, _ = newClientForServer(s) + go c.parse(connectOp) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } +} + +func accountNameExists(name string, accounts []*Account) bool { + for _, acc := range accounts { + if strings.Compare(acc.Name, name) == 0 { + return true + } + } + return false +} + +func TestAccountSimpleConfig(t *testing.T) { + confFileName := createConfFile(t, []byte(`accounts = [foo, bar]`)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Received an error processing config file: %v", err) + } + if la := len(opts.Accounts); la != 2 { + t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + } + if !accountNameExists("foo", opts.Accounts) { + t.Fatal("Expected a 'foo' account") + } + if !accountNameExists("bar", opts.Accounts) { + t.Fatal("Expected a 'bar' account") + } + + // Make sure double entries is an error. + confFileName = createConfFile(t, []byte(`accounts = [foo, foo]`)) + defer os.Remove(confFileName) + _, err = ProcessConfigFile(confFileName) + if err == nil { + t.Fatalf("Expected an error with double account entries") + } +} + +func TestAccountParseConfig(t *testing.T) { + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: alice, password: foo} + {user: bob, password: bar} + ] + } + nats.io { + users = [ + {user: derek, password: foo} + {user: ivan, password: bar} + ] + } + } + `)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Received an error processing config file: %v", err) + } + + if la := len(opts.Accounts); la != 2 { + t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + } + + if lu := len(opts.Users); lu != 4 { + t.Fatalf("Expected 4 total Users, got %d\n", lu) + } + + var natsAcc *Account + for _, acc := range opts.Accounts { + if acc.Name == "nats.io" { + natsAcc = acc + break + } + } + if natsAcc == nil { + t.Fatalf("Error retrieving account for 'nats.io'") + } + + for _, u := range opts.Users { + if u.Username == "derek" { + if u.Account != natsAcc { + t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account) + break + } + } + } +} + +func TestAccountParseConfigDuplicateUsers(t *testing.T) { + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: alice, password: foo} + {user: bob, password: bar} + ] + } + nats.io { + users = [ + {user: alice, password: bar} + ] + } + } + `)) + defer os.Remove(confFileName) + _, err := ProcessConfigFile(confFileName) + if err == nil { + t.Fatalf("Expected an error with double user entries") + } +} diff --git a/server/auth.go b/server/auth.go index 252ae6a8..146312eb 100644 --- a/server/auth.go +++ b/server/auth.go @@ -16,7 +16,6 @@ package server import ( "crypto/tls" "encoding/base64" - "fmt" "strings" "github.com/nats-io/nkeys" @@ -39,17 +38,25 @@ type ClientAuthentication interface { RegisterUser(*User) } +// Accounts +type Account struct { + Name string + sl *Sublist +} + // Nkey is for multiple nkey based users type NkeyUser struct { Nkey string `json:"user"` - Permissions *Permissions `json:"permissions"` + Permissions *Permissions `json:"permissions,omitempty"` + Account *Account `json:"account,omitempty"` } // User is for multiple accounts/users. type User struct { Username string `json:"user"` Password string `json:"password"` - Permissions *Permissions `json:"permissions"` + Permissions *Permissions `json:"permissions,omitempty"` + Account *Account `json:"account,omitempty"` } // clone performs a deep copy of the User struct, returning a new clone with @@ -309,35 +316,6 @@ func (s *Server) isRouterAuthorized(c *client) bool { return true } -// removeUnauthorizedSubs removes any subscriptions the client has that are no -// longer authorized, e.g. due to a config reload. -func (s *Server) removeUnauthorizedSubs(c *client) { - c.mu.Lock() - if c.perms == nil { - c.mu.Unlock() - return - } - - subs := make(map[string]*subscription, len(c.subs)) - for sid, sub := range c.subs { - subs[sid] = sub - } - c.mu.Unlock() - - for sid, sub := range subs { - if !c.canSubscribe(sub.subject) { - _ = s.sl.Remove(sub) - c.mu.Lock() - delete(c.subs, sid) - c.mu.Unlock() - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", - sub.subject, sub.sid)) - s.Noticef("Removed sub %q for user %q - not authorized", - string(sub.subject), c.opts.Username) - } - } -} - // Support for bcrypt stored passwords and tokens. const bcryptPrefix = "$2a$" diff --git a/server/client.go b/server/client.go index 73d4f453..b822bb58 100644 --- a/server/client.go +++ b/server/client.go @@ -144,6 +144,8 @@ type client struct { ncs string out outbound srv *Server + acc *Account + sl *Sublist subs map[string]*subscription perms *permissions in readCache @@ -258,6 +260,8 @@ type clientOpts struct { Lang string `json:"lang"` Version string `json:"version"` Protocol int `json:"protocol"` + Account string `json:"account,omitempty"` + AccountNew bool `json:"new_account,omitempty"` // Routes only Import *SubjectPermission `json:"import,omitempty"` @@ -313,22 +317,33 @@ func (c *client) initClient() { } } +// RegisterWithAccount will register the given user with a specific +// account. This will change the subject namespace. +func (c *client) RegisterWithAccount(acc *Account) error { + if acc == nil || acc.sl == nil { + return ErrBadAccount + } + c.mu.Lock() + c.acc = acc + c.sl = acc.sl + c.mu.Unlock() + return nil +} + // RegisterUser allows auth to call back into a new client // with the authenticated user. This is used to map any permissions // into the client. func (c *client) RegisterUser(user *User) { - if user.Permissions == nil { - // Reset perms to nil in case client previously had them. - c.mu.Lock() - c.perms = nil - c.mu.Unlock() - return - } - // Process Permissions and map into client connection structures. c.mu.Lock() defer c.mu.Unlock() + if user.Permissions == nil { + // Reset perms to nil in case client previously had them. + c.perms = nil + return + } + c.setPermissions(user.Permissions) } @@ -770,6 +785,8 @@ func (c *client) processConnect(arg []byte) error { proto := c.opts.Protocol verbose := c.opts.Verbose lang := c.opts.Lang + account := c.opts.Account + accountNew := c.opts.AccountNew c.mu.Unlock() if srv != nil { @@ -788,6 +805,38 @@ func (c *client) processConnect(arg []byte) error { c.authViolation() return ErrAuthorization } + + // Check for Account designation + if account != "" { + var acc *Account + var wasNew bool + if !srv.newAccountsAllowed() { + acc = srv.LookupAccount(account) + if acc == nil { + c.Errorf(ErrMissingAccount.Error()) + c.sendErr("Account Not Found") + return ErrMissingAccount + } else if accountNew { + c.Errorf(ErrAccountExists.Error()) + c.sendErr(ErrAccountExists.Error()) + return ErrAccountExists + } + } else { + // We can create this one on the fly. + acc, wasNew = srv.LookupOrRegisterAccount(account) + if accountNew && !wasNew { + c.Errorf(ErrAccountExists.Error()) + c.sendErr(ErrAccountExists.Error()) + return ErrAccountExists + } + } + // If we are here we can register ourselves with the new account. + if err := c.RegisterWithAccount(acc); err != nil { + c.Errorf("Problem registering with account [%s]", account) + c.sendErr("Account Failed Registration") + return ErrBadAccount + } + } } // Check client protocol request if it exists. @@ -828,7 +877,18 @@ func (c *client) authTimeout() { } func (c *client) authViolation() { - if c.srv != nil && c.srv.getOpts().Users != nil { + var hasNkeys, hasUsers bool + if s := c.srv; s != nil { + s.mu.Lock() + hasNkeys = s.nkeys != nil + hasUsers = s.users != nil + s.mu.Unlock() + } + if hasNkeys { + c.Errorf("%s - Nkey %q", + ErrAuthorization.Error(), + c.opts.Nkey) + } else if hasUsers { c.Errorf("%s - User %q", ErrAuthorization.Error(), c.opts.Username) @@ -1239,8 +1299,8 @@ func (c *client) processSub(argo []byte) (err error) { sid := string(sub.sid) if c.subs[sid] == nil { c.subs[sid] = sub - if c.srv != nil { - err = c.srv.sl.Insert(sub) + if c.sl != nil { + err = c.sl.Insert(sub) if err != nil { delete(c.subs, sid) } else { @@ -1297,8 +1357,8 @@ func (c *client) unsubscribe(sub *subscription) { c.traceOp("<-> %s", "DELSUB", sub.sid) delete(c.subs, string(sub.sid)) - if c.srv != nil { - c.srv.sl.Remove(sub) + if c.sl != nil { + c.sl.Remove(sub) } // If we are a queue subscriber on a client connection and we have routes, @@ -1562,7 +1622,7 @@ func (c *client) processMsg(msg []byte) { var r *SublistResult var ok bool - genid := atomic.LoadUint64(&srv.sl.genid) + genid := atomic.LoadUint64(&c.sl.genid) if genid == c.in.genid && c.in.results != nil { r, ok = c.in.results[string(c.pa.subject)] @@ -1574,7 +1634,7 @@ func (c *client) processMsg(msg []byte) { if !ok { subject := string(c.pa.subject) - r = srv.sl.Match(subject) + r = c.sl.Match(subject) c.in.results[subject] = r // Prune the results cache. Keeps us from unbounded growth. if len(c.in.results) > maxResultCacheSize { @@ -1782,6 +1842,35 @@ func (c *client) typeString() string { return "Unknown Type" } +// removeUnauthorizedSubs removes any subscriptions the client has that are no +// longer authorized, e.g. due to a config reload. +func (c *client) removeUnauthorizedSubs() { + c.mu.Lock() + if c.perms == nil { + c.mu.Unlock() + return + } + srv := c.srv + subs := make(map[string]*subscription, len(c.subs)) + for sid, sub := range c.subs { + subs[sid] = sub + } + c.mu.Unlock() + + for sid, sub := range subs { + if c.sl != nil && !c.canSubscribe(sub.subject) { + _ = c.sl.Remove(sub) + c.mu.Lock() + delete(c.subs, sid) + c.mu.Unlock() + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", + sub.subject, sub.sid)) + srv.Noticef("Removed sub %q for user %q - not authorized", + string(sub.subject), c.opts.Username) + } + } +} + func (c *client) closeConnection(reason ClosedState) { c.mu.Lock() if c.nc == nil { @@ -1820,10 +1909,13 @@ func (c *client) closeConnection(reason ClosedState) { c.mu.Unlock() + // Remove clients subscriptions. + c.sl.RemoveBatch(subs) + if srv != nil { // This is a route that disconnected... if len(connectURLs) > 0 { - // Unless disabled, possibly update the server's INFO protcol + // Unless disabled, possibly update the server's INFO protocol // and send to clients that know how to handle async INFOs. if !srv.getOpts().Cluster.NoAdvertise { srv.removeClientConnectURLsAndSendINFOToClients(connectURLs) @@ -1833,9 +1925,8 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) - // Remove clients subscriptions. - srv.sl.RemoveBatch(subs) - if c.typ == CLIENT { + // Remove remote subscriptions. + if c.typ != ROUTER { // Forward UNSUBs protocols to all routes srv.broadcastUnSubscribeBatch(subs) } diff --git a/server/client_test.go b/server/client_test.go index 29271a34..80574992 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -52,6 +52,17 @@ func createClientAsync(ch chan *client, s *Server, cli net.Conn) { }() } +func newClientForServer(s *Server) (*client, *bufio.Reader, string) { + cli, srv := net.Pipe() + cr := bufio.NewReaderSize(cli, maxBufSize) + ch := make(chan *client) + createClientAsync(ch, s, srv) + l, _ := cr.ReadString('\n') + // Grab client + c := <-ch + return c, cr, l +} + var defaultServerOptions = Options{ Trace: false, Debug: false, @@ -643,17 +654,17 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) { }() <-ch - if s.sl.Count() != 2 { - t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count()) + if c.sl.Count() != 2 { + t.Fatalf("Should have 2 subscriptions, got %d\n", c.sl.Count()) } c.closeConnection(ClientClosed) - if s.sl.Count() != 0 { - t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) + if c.sl.Count() != 0 { + t.Fatalf("Should have no subscriptions after close, got %d\n", s.gsl.Count()) } } func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) { - s, c, _ := setupClient() + _, c, _ := setupClient() c.closeConnection(ClientClosed) subs := []byte("SUB foo 1\r\nSUB bar 2\r\n") @@ -664,8 +675,8 @@ func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) { }() <-ch - if s.sl.Count() != 0 { - t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) + if c.sl.Count() != 0 { + t.Fatalf("Should have no subscriptions after close, got %d\n", c.sl.Count()) } } diff --git a/server/errors.go b/server/errors.go index c722bfc4..2dd640c7 100644 --- a/server/errors.go +++ b/server/errors.go @@ -48,4 +48,14 @@ var ( // ErrClientConnectedToRoutePort represents an error condition when a client // attempted to connect to the route listen port. ErrClientConnectedToRoutePort = errors.New("Attempted To Connect To Route Port") + + // ErrAccountExists is returned when an account is attempted to be registered + // but already exists. + ErrAccountExists = errors.New("Account Exists") + + // ErrBadAccount represents a malformed or incorrect account. + ErrBadAccount = errors.New("Bad Account") + + // ErrMissingAccount is returned when an account does not exist. + ErrMissingAccount = errors.New("Account Missing") ) diff --git a/server/monitor.go b/server/monitor.go index 9379b736..604f12a8 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -709,14 +709,15 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { } } - sz := &Subsz{s.sl.Stats(), 0, offset, limit, nil} + // FIXME(dlc) - Make account aware. + sz := &Subsz{s.gsl.Stats(), 0, offset, limit, nil} if subdetail { // Now add in subscription's details var raw [4096]*subscription subs := raw[:0] - s.sl.localSubs(&subs) + s.gsl.localSubs(&subs) details := make([]SubDetail, len(subs)) i := 0 // TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering. @@ -938,7 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) v.MaxPending = opts.MaxPending v.WriteDeadline = opts.WriteDeadline - v.Subscriptions = s.sl.Count() + v.Subscriptions = s.gsl.Count() v.ConfigLoadTime = s.configTime // Need a copy here since s.httpReqStats can change while doing // the marshaling down below. diff --git a/server/nkey_test.go b/server/nkey_test.go index f0837f2c..d3186e68 100644 --- a/server/nkey_test.go +++ b/server/nkey_test.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" mrand "math/rand" - "net" "os" "strings" "testing" @@ -56,17 +55,6 @@ func mixedSetup() (*Server, *client, *bufio.Reader, string) { return rawSetup(opts) } -func newClientForServer(s *Server) (*client, *bufio.Reader, string) { - cli, srv := net.Pipe() - cr := bufio.NewReaderSize(cli, maxBufSize) - ch := make(chan *client) - createClientAsync(ch, s, srv) - l, _ := cr.ReadString('\n') - // Grab client - c := <-ch - return c, cr, l -} - func TestServerInfoNonce(t *testing.T) { _, l := setUpClientWithResponse() if !strings.HasPrefix(l, "INFO ") { diff --git a/server/opts.go b/server/opts.go index a7efa8b1..0b758c93 100644 --- a/server/opts.go +++ b/server/opts.go @@ -62,6 +62,8 @@ type Options struct { MaxSubs int `json:"max_subscriptions,omitempty"` Nkeys []*NkeyUser `json:"-"` Users []*User `json:"-"` + Accounts []*Account `json:"-"` + AllowNewAccounts bool `json:"-"` Username string `json:"-"` Password string `json:"-"` Authorization string `json:"-"` @@ -227,7 +229,7 @@ func (e *configWarningErr) Error() string { } // ProcessConfigFile processes a configuration file. -// FIXME(dlc): Hacky +// FIXME(dlc): A bit hacky func ProcessConfigFile(configFile string) (*Options, error) { opts := &Options{} if err := opts.ProcessConfigFile(configFile); err != nil { @@ -306,6 +308,15 @@ func (o *Options) ProcessConfigFile(configFile string) error { o.Trace = v.(bool) case "logtime": o.Logtime = v.(bool) + case "accounts": + if pedantic { + err = parseAccounts(tk, o) + } else { + err = parseAccounts(v, o) + } + if err != nil { + return err + } case "authorization": var auth *authorization if pedantic { @@ -595,6 +606,82 @@ func setClusterPermissions(opts *ClusterOpts, perms *Permissions) { } } +// parseAccounts will parse the different accounts syntax. +func parseAccounts(v interface{}, opts *Options) error { + var pedantic = opts.CheckConfig + var tk token + _, v = unwrapValue(v) + uorn := make(map[string]struct{}) + + switch v.(type) { + case []interface{}, []string: + m := make(map[string]struct{}, len(v.([]interface{}))) + for _, name := range v.([]interface{}) { + ns := name.(string) + if _, ok := m[ns]; ok { + return fmt.Errorf("Duplicate Account Entry: %s", ns) + } + opts.Accounts = append(opts.Accounts, &Account{Name: name.(string)}) + m[ns] = struct{}{} + } + case map[string]interface{}: + m := make(map[string]struct{}, len(v.(map[string]interface{}))) + for name, mv := range v.(map[string]interface{}) { + _, mv = unwrapValue(mv) + if _, ok := m[name]; ok { + return fmt.Errorf("Duplicate Account Entry: %s", name) + } + uv, ok := mv.(map[string]interface{}) + if !ok { + return fmt.Errorf("Expected map entry for users") + } + acc := &Account{Name: name} + opts.Accounts = append(opts.Accounts, acc) + m[name] = struct{}{} + + for k, v := range uv { + tk, mv = unwrapValue(v) + switch strings.ToLower(k) { + case "users": + var ( + users []*User + err error + nkeys []*NkeyUser + ) + if pedantic { + nkeys, users, err = parseUsers(tk, opts) + } else { + nkeys, users, err = parseUsers(mv, opts) + } + if err != nil { + return err + } + for _, u := range users { + if _, ok := uorn[u.Username]; ok { + return fmt.Errorf("Duplicate user %q detected", u.Username) + } + uorn[u.Username] = struct{}{} + u.Account = acc + } + opts.Users = append(opts.Users, users...) + + for _, u := range nkeys { + if _, ok := uorn[u.Nkey]; ok { + return fmt.Errorf("Duplicate nkey %q detected", u.Nkey) + } + uorn[u.Nkey] = struct{}{} + u.Account = acc + } + opts.Nkeys = append(opts.Nkeys, nkeys...) + } + } + } + default: + return fmt.Errorf("Expected an array or map of account entries, got %T", v) + } + return nil +} + // Helper function to parse Authorization configs. func parseAuthorization(v interface{}, opts *Options) (*authorization, error) { var ( diff --git a/server/reload.go b/server/reload.go index 410ede1d..3e57c90b 100644 --- a/server/reload.go +++ b/server/reload.go @@ -681,7 +681,7 @@ func (s *Server) reloadAuthorization() { } // Remove any unauthorized subscriptions. - s.removeUnauthorizedSubs(client) + client.removeUnauthorizedSubs() } for _, route := range routes { diff --git a/server/route.go b/server/route.go index 55d5fc0f..d206f233 100644 --- a/server/route.go +++ b/server/route.go @@ -578,7 +578,8 @@ func (s *Server) sendLocalSubsToRoute(route *client) { var raw [4096]*subscription subs := raw[:0] - s.sl.localSubs(&subs) + // FIXME(dlc) this needs to be scoped per account when cluster proto changes. + s.gsl.localSubs(&subs) route.mu.Lock() closed := route.sendRouteSubProtos(subs, func(sub *subscription) bool { @@ -691,7 +692,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } } - c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} + c := &client{srv: s, sl: s.gsl, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} // Grab server variables s.mu.Lock() diff --git a/server/server.go b/server/server.go index 1f890d33..492c8a8d 100644 --- a/server/server.go +++ b/server/server.go @@ -69,13 +69,14 @@ type Server struct { mu sync.Mutex prand *rand.Rand info Info - sl *Sublist configFile string optsMu sync.RWMutex opts *Options running bool shutdown bool listener net.Listener + gsl *Sublist + accounts map[string]*Account clients map[uint64]*client routes map[uint64]*client remotes map[string]*client @@ -173,7 +174,7 @@ func New(opts *Options) *Server { configFile: opts.ConfigFile, info: info, prand: rand.New(rand.NewSource(time.Now().UnixNano())), - sl: NewSublist(), + gsl: NewSublist(), opts: opts, done: make(chan bool, 1), start: now, @@ -192,6 +193,9 @@ func New(opts *Options) *Server { // Used internally for quick look-ups. s.clientConnectURLsMap = make(map[string]struct{}) + // For tracking accounts + s.accounts = make(map[string]*Account) + // For tracking clients s.clients = make(map[uint64]*client) @@ -210,6 +214,9 @@ func New(opts *Options) *Server { // to shutdown. s.quitCh = make(chan struct{}) + // Used to setup Accounts. + s.configureAccounts() + // Used to setup Authorization. s.configureAuthorization() @@ -232,6 +239,16 @@ func (s *Server) setOpts(opts *Options) { s.optsMu.Unlock() } +func (s *Server) configureAccounts() { + // Check opts and walk through them. Making sure to create SLs. + for _, acc := range s.opts.Accounts { + if acc.sl == nil { + acc.sl = NewSublist() + } + s.accounts[acc.Name] = acc + } +} + func (s *Server) generateRouteInfoJSON() { b, _ := json.Marshal(s.routeInfo) pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} @@ -281,6 +298,52 @@ func (s *Server) logPid() error { return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660) } +// newAccountsAllowed returns whether or not new accounts can be created on the fly. +func (s *Server) newAccountsAllowed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.opts.AllowNewAccounts +} + +func (s *Server) LookupOrRegisterAccount(name string) (*Account, bool) { + s.mu.Lock() + defer s.mu.Unlock() + if acc, ok := s.accounts[name]; ok { + return acc, false + } + acc := &Account{ + Name: name, + sl: NewSublist(), + } + s.accounts[name] = acc + return acc, true +} + +// RegisterAccount will register an account. The account must be new +// or this call will fail. +func (s *Server) RegisterAccount(name string) (*Account, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.accounts[name]; ok { + return nil, ErrAccountExists + } + acc := &Account{ + Name: name, + sl: NewSublist(), + } + s.accounts[name] = acc + return acc, nil +} + +// LookupAccount is a public function to return the account structure +// associated with name. +func (s *Server) LookupAccount(name string) *Account { + s.mu.Lock() + defer s.mu.Unlock() + return s.accounts[name] +} + // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { @@ -778,7 +841,7 @@ func (s *Server) createClient(conn net.Conn) *client { max_subs := opts.MaxSubs now := time.Now() - c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now} + c := &client{srv: s, sl: s.gsl, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now} // Grab JSON info string s.mu.Lock() @@ -1088,7 +1151,13 @@ func (s *Server) getClient(cid uint64) *client { // NumSubscriptions will report how many subscriptions are active. func (s *Server) NumSubscriptions() uint32 { s.mu.Lock() - subs := s.sl.Count() + var subs uint32 + for _, acc := range s.accounts { + if acc.sl != nil { + subs += acc.sl.Count() + } + } + subs += s.gsl.Count() s.mu.Unlock() return subs } diff --git a/server/split_test.go b/server/split_test.go index 77dd2efb..ff36c80e 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -24,8 +24,8 @@ func TestSplitBufferSubOp(t *testing.T) { defer cli.Close() defer trash.Close() - s := &Server{sl: NewSublist()} - c := &client{srv: s, subs: make(map[string]*subscription), nc: cli} + s := &Server{gsl: NewSublist()} + c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -43,7 +43,7 @@ func TestSplitBufferSubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match("foo") + r := s.gsl.Match("foo") if r == nil || len(r.psubs) != 1 { t.Fatalf("Did not match subscription properly: %+v\n", r) } @@ -60,7 +60,7 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{sl: NewSublist()} + s := &Server{gsl: NewSublist()} c := &client{srv: s, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") @@ -87,7 +87,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match("foo") + r := s.gsl.Match("foo") if r != nil && len(r.psubs) != 0 { t.Fatalf("Should be no subscriptions in results: %+v\n", r) } @@ -300,7 +300,7 @@ func TestSplitConnectArg(t *testing.T) { func TestSplitDanglingArgBuf(t *testing.T) { s := New(&defaultServerOptions) - c := &client{srv: s, subs: make(map[string]*subscription)} + c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues. From 620e1d73640e942206f5e62d81a35d6ba02eef7b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 19 Sep 2018 21:38:55 -0700 Subject: [PATCH 2/8] Basic account mapping via import and export Signed-off-by: Derek Collison --- server/accounts_test.go | 310 ++++++++++++++++++++++++++++++++++++++-- server/auth.go | 102 ++++++++++++- server/client.go | 84 ++++++++++- server/client_test.go | 8 +- server/errors.go | 2 + server/sublist.go | 48 ++++++- server/sublist_test.go | 4 +- 7 files changed, 535 insertions(+), 23 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 3dd6595a..a40e2245 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -19,42 +19,40 @@ import ( "testing" ) -func simpleAccountServer(t *testing.T) *Server { +func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) { opts := defaultServerOptions s := New(&opts) // Now create two accounts. - _, err := s.RegisterAccount("foo") + f, err := s.RegisterAccount("foo") if err != nil { t.Fatalf("Error creating account 'foo': %v", err) } - _, err = s.RegisterAccount("bar") + b, err := s.RegisterAccount("bar") if err != nil { t.Fatalf("Error creating account 'bar': %v", err) } - return s + return s, f, b } func TestRegisterDuplicateAccounts(t *testing.T) { - s := simpleAccountServer(t) + s, _, _ := simpleAccountServer(t) if _, err := s.RegisterAccount("foo"); err == nil { t.Fatal("Expected an error registering 'foo' twice") } } func TestAccountIsolation(t *testing.T) { - s := simpleAccountServer(t) - fooAcc := s.LookupAccount("foo") - barAcc := s.LookupAccount("bar") + s, fooAcc, barAcc := simpleAccountServer(t) if fooAcc == nil || barAcc == nil { t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") } cfoo, crFoo, _ := newClientForServer(s) - if err := cfoo.RegisterWithAccount(fooAcc); err != nil { + if err := cfoo.registerWithAccount(fooAcc); err != nil { t.Fatalf("Error register client with 'foo' account: %v", err) } cbar, crBar, _ := newClientForServer(s) - if err := cbar.RegisterWithAccount(barAcc); err != nil { + if err := cbar.registerWithAccount(barAcc); err != nil { t.Fatalf("Error register client with 'bar' account: %v", err) } @@ -146,7 +144,7 @@ func TestNewAccountsFromClients(t *testing.T) { // Clients can ask that the account be forced to be new. If it exists this is an error. func TestNewAccountRequireNew(t *testing.T) { // This has foo and bar accounts already. - s := simpleAccountServer(t) + s, _, _ := simpleAccountServer(t) c, cr, _ := newClientForServer(s) connectOp := []byte("CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n") @@ -284,3 +282,293 @@ func TestAccountParseConfigDuplicateUsers(t *testing.T) { t.Fatalf("Expected an error with double user entries") } } + +func TestImportAuthorized(t *testing.T) { + _, foo, bar := simpleAccountServer(t) + + checkBool(foo.checkImportAuthorized(bar, "foo"), false, t) + checkBool(foo.checkImportAuthorized(bar, "*"), false, t) + checkBool(foo.checkImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkImportAuthorized(bar, "foo.>"), false, t) + + foo.addExport("foo", isPublicExport) + checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkImportAuthorized(bar, "bar"), false, t) + checkBool(foo.checkImportAuthorized(bar, "*"), false, t) + + foo.addExport("*", []*Account{bar}) + checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.bar"), false, t) + checkBool(foo.checkImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkImportAuthorized(bar, "*.*"), false, t) + checkBool(foo.checkImportAuthorized(bar, "*.>"), false, t) + + // Reset and test '>' public export + _, foo, bar = simpleAccountServer(t) + foo.addExport(">", nil) + // Everything should work. + checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.bar"), true, t) + checkBool(foo.checkImportAuthorized(bar, ">"), true, t) + checkBool(foo.checkImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.*"), true, t) + checkBool(foo.checkImportAuthorized(bar, "*.*"), true, t) + checkBool(foo.checkImportAuthorized(bar, "*.>"), true, t) + + // Reset and test pwc and fwc + s, foo, bar := simpleAccountServer(t) + foo.addExport("foo.*.baz.>", []*Account{bar}) + checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.1"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.*"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.*.baz.1.1"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.22.baz.22"), true, t) + checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz"), false, t) + checkBool(foo.checkImportAuthorized(bar, ""), false, t) + checkBool(foo.checkImportAuthorized(bar, "foo.bar.*.*"), false, t) + + // Make sure we match the account as well + + fb, _ := s.RegisterAccount("foobar") + bz, _ := s.RegisterAccount("baz") + + checkBool(foo.checkImportAuthorized(fb, "foo.bar.baz.1"), false, t) + checkBool(foo.checkImportAuthorized(bz, "foo.bar.baz.1"), false, t) +} + +func TestSimpleMapping(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + // Test first that trying to import with no matching export permission returns an error. + if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != ErrAccountImportAuthorization { + t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err) + } + + // Now map the subject space between foo and bar. + // Need to do export first. + if err := cfoo.acc.addExport("foo", nil); err != nil { // Public with no accounts defined. + t.Fatalf("Error adding account export to client foo: %v", err) + } + if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != nil { + t.Fatalf("Error adding account import to client bar: %v", err) + } + + // Normal Subscription on bar client. + go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + checkMsg := func(l, sid string) { + t.Helper() + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "import.foo" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != sid { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + } + + // Now check we got the message from normal subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + checkMsg(l, "1") + checkPayload(crBar, []byte("hello\r\n"), t) + + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'baz': %v", err) + } + checkMsg(l, "2") + checkPayload(crBar, []byte("hello\r\n"), t) +} + +func TestNoPrefixWildcardMapping(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + t.Fatalf("Error adding account export to client foo: %v", err) + } + if err := cbar.acc.addImport(fooAcc, "*", ""); err != nil { + t.Fatalf("Error adding account import to client bar: %v", err) + } + + // Normal Subscription on bar client for literal "foo". + go cbar.parse([]byte("SUB foo 1\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + // Now check we got the message from normal subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "foo" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("hello\r\n"), t) +} + +func TestPrefixWildcardMapping(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + t.Fatalf("Error adding account export to client foo: %v", err) + } + if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil { + t.Fatalf("Error adding account import to client bar: %v", err) + } + + // Normal Subscription on bar client for wildcard. + go cbar.parse([]byte("SUB pub.imports.* 1\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + // Now check we got the messages from wildcard subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "pub.imports.foo" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("hello\r\n"), t) +} + +func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + t.Fatalf("Error adding account export to client foo: %v", err) + } + if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil { + t.Fatalf("Error adding account import to client bar: %v", err) + } + + // Normal Subscription on bar client for wildcard. + go cbar.parse([]byte("SUB pub.imports.foo 1\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + // Now check we got the messages from wildcard subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "pub.imports.foo" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("hello\r\n"), t) +} diff --git a/server/auth.go b/server/auth.go index 146312eb..61279e98 100644 --- a/server/auth.go +++ b/server/auth.go @@ -17,6 +17,7 @@ import ( "crypto/tls" "encoding/base64" "strings" + "sync" "github.com/nats-io/nkeys" "golang.org/x/crypto/bcrypt" @@ -38,10 +39,107 @@ type ClientAuthentication interface { RegisterUser(*User) } +// Import mapping struct +type importMap struct { + acc *Account + from string + prefix string +} + // Accounts type Account struct { - Name string - sl *Sublist + Name string + mu sync.RWMutex + sl *Sublist + imports map[string]*importMap + exports map[string]map[string]*Account +} + +// addImport will add in the import +func (a *Account) addImport(account *Account, from, prefix string) error { + // First check to see if the account has authorized export of the subject. + if !account.checkImportAuthorized(a, from) { + return ErrAccountImportAuthorization + } + + a.mu.Lock() + defer a.mu.Unlock() + if account == nil { + return ErrMissingAccount + } + if a.imports == nil { + a.imports = make(map[string]*importMap) + } + if prefix != "" && prefix[len(prefix)-1] != btsep { + prefix = prefix + string(btsep) + } + // TODO(dlc) - collisions, etc. + a.imports[from] = &importMap{account, from, prefix} + return nil +} + +// Placeholder to denote public export. +var isPublicExport = []*Account(nil) + +// addExport will add an export to the account. If accounts is nil +// it will signify a public export, meaning anyone can impoort. +func (a *Account) addExport(subject string, accounts []*Account) error { + a.mu.Lock() + defer a.mu.Unlock() + if a == nil { + return ErrMissingAccount + } + if a.exports == nil { + a.exports = make(map[string]map[string]*Account) + } + var ma map[string]*Account + for _, aa := range accounts { + if ma == nil { + ma = make(map[string]*Account, len(accounts)) + } + ma[aa.Name] = aa + } + a.exports[subject] = ma + return nil +} + +// Check if another account is authorized to import from us. +func (a *Account) checkImportAuthorized(account *Account, subject string) bool { + // Find the subject in the exports list. + a.mu.RLock() + defer a.mu.RUnlock() + + if a.exports == nil || !IsValidSubject(subject) { + return false + } + + // Check direct match of subject first + am, ok := a.exports[subject] + if ok { + // if am is nil that denotes a public export + if am == nil { + return true + } + // If we have a matching account we are authorized + _, ok := am[account.Name] + return ok + } + // ok if we are here we did not match directly so we need to test each one. + // The import subject arg has to take precedence, meaning the export + // has to be a true subset of the import claim. We already checked for + // exact matches above. + + tokens := strings.Split(subject, tsep) + for subj, am := range a.exports { + if isSubsetMatch(tokens, subj) { + if am == nil { + return true + } + _, ok := am[account.Name] + return ok + } + } + return false } // Nkey is for multiple nkey based users diff --git a/server/client.go b/server/client.go index b822bb58..f2e72d10 100644 --- a/server/client.go +++ b/server/client.go @@ -22,6 +22,7 @@ import ( "math/rand" "net" "regexp" + "strings" "sync" "sync/atomic" "time" @@ -239,6 +240,7 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState { type subscription struct { client *client + im *importMap subject []byte queue []byte sid []byte @@ -319,7 +321,7 @@ func (c *client) initClient() { // RegisterWithAccount will register the given user with a specific // account. This will change the subject namespace. -func (c *client) RegisterWithAccount(acc *Account) error { +func (c *client) registerWithAccount(acc *Account) error { if acc == nil || acc.sl == nil { return ErrBadAccount } @@ -831,7 +833,7 @@ func (c *client) processConnect(arg []byte) error { } } // If we are here we can register ourselves with the new account. - if err := c.RegisterWithAccount(acc); err != nil { + if err := c.registerWithAccount(acc); err != nil { c.Errorf("Problem registering with account [%s]", account) c.sendErr("Account Failed Registration") return ErrBadAccount @@ -962,7 +964,7 @@ func (c *client) queueOutbound(data []byte) bool { c.out.p = nil } // Check for a big message, and if found place directly on nb - // FIXME(dlc) - do we need signaling of ownership here if we want len(data) < + // FIXME(dlc) - do we need signaling of ownership here if we want len(data) < maxBufSize if len(data) > maxBufSize { c.out.nb = append(c.out.nb, data) referenced = true @@ -1287,16 +1289,18 @@ func (c *client) processSub(argo []byte) (err error) { return nil } + // Check if we have a maximum on the number of subscriptions. if c.msubs > 0 && len(c.subs) >= c.msubs { c.mu.Unlock() c.maxSubsExceeded() return nil } - // Check if we have a maximum on the number of subscriptions. // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. sid := string(sub.sid) + var chkImports bool + if c.subs[sid] == nil { c.subs[sid] = sub if c.sl != nil { @@ -1304,17 +1308,26 @@ func (c *client) processSub(argo []byte) (err error) { if err != nil { delete(c.subs, sid) } else { + if c.acc != nil { + chkImports = true + } shouldForward = c.typ != ROUTER } } } c.mu.Unlock() + if err != nil { c.sendErr("Invalid Subject") return nil } else if c.opts.Verbose { c.sendOK() } + if chkImports { + if err := c.checkAccountImports(sub); err != nil { + c.Errorf(err.Error()) + } + } if shouldForward { c.srv.broadcastSubscribe(sub) } @@ -1322,6 +1335,49 @@ func (c *client) processSub(argo []byte) (err error) { return nil } +// Check to see if we need to create a shadow subscription due to imports +// in other accounts. +// Assume lock is held +func (c *client) checkAccountImports(sub *subscription) error { + c.mu.Lock() + acc := c.acc + c.mu.Unlock() + + if acc == nil { + return ErrMissingAccount + } + + subject := string(sub.subject) + tokens := strings.Split(subject, tsep) + + var rims [32]*importMap + var ims = rims[:0] + acc.mu.RLock() + for _, im := range acc.imports { + if isSubsetMatch(tokens, im.prefix+im.from) { + ims = append(ims, im) + } + } + acc.mu.RUnlock() + + // Now walk through collected importMaps + for _, im := range ims { + // We have a match for a local subscription with an import from another account. + // We will create a shadow subscription. + nsub := *sub // copy + nsub.im = im + if im.prefix != "" { + // redo subject here to match subject in the publisher account space. + // Just remove prefix from what they gave us. That maps into other space. + nsub.subject = sub.subject[len(im.prefix):] + } + if err := im.acc.sl.Insert(&nsub); err != nil { + return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name) + } + } + return nil +} + // canSubscribe determines if the client is authorized to subscribe to the // given subject. Assumes caller is holding lock. func (c *client) canSubscribe(subject []byte) bool { @@ -1627,7 +1683,7 @@ func (c *client) processMsg(msg []byte) { if genid == c.in.genid && c.in.results != nil { r, ok = c.in.results[string(c.pa.subject)] } else { - // reset our L1 completely. + // Reset our L1 completely. c.in.results = make(map[string]*SublistResult) c.in.genid = genid } @@ -1694,6 +1750,15 @@ func (c *client) processMsg(msg []byte) { rmap[sub.client.route.remoteID] = routeSeen sub.client.mu.Unlock() } + // Check for mapped subs + if sub.im != nil && sub.im.prefix != "" { + // Redo the subject here on the fly. + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, sub.im.prefix...) + msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, ' ') + si = len(msgh) + } // Normal delivery mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) @@ -1714,6 +1779,15 @@ func (c *client) processMsg(msg []byte) { index := (startIndex + i) % len(qsubs) sub := qsubs[index] if sub != nil { + // Check for mapped subs + if sub.im != nil && sub.im.prefix != "" { + // Redo the subject here on the fly. + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, sub.im.prefix...) + msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, ' ') + si = len(msgh) + } mh := c.msgHeader(msgh[:si], sub) if c.deliverMsg(sub, mh, msg) { break diff --git a/server/client_test.go b/server/client_test.go index 80574992..c3ee2f88 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -395,13 +395,19 @@ func TestClientNoBodyPubSubWithReply(t *testing.T) { } } -func (c *client) parseFlushAndClose(op []byte) { +// This needs to clear any flushOutbound flags since writeLoop not running. +func (c *client) parseAndFlush(op []byte) { c.parse(op) for cp := range c.pcd { cp.mu.Lock() + cp.flags.clear(flushOutbound) cp.flushOutbound() cp.mu.Unlock() } +} + +func (c *client) parseFlushAndClose(op []byte) { + c.parseAndFlush(op) c.nc.Close() } diff --git a/server/errors.go b/server/errors.go index 2dd640c7..b1601f97 100644 --- a/server/errors.go +++ b/server/errors.go @@ -58,4 +58,6 @@ var ( // ErrMissingAccount is returned when an account does not exist. ErrMissingAccount = errors.New("Account Missing") + + ErrAccountImportAuthorization = errors.New("Account Not Authorized: Subject Not Exported") ) diff --git a/server/sublist.go b/server/sublist.go index f1faa753..af553f47 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -712,6 +712,52 @@ func IsValidLiteralSubject(subject string) bool { return true } +// This will test a subject as an array of tokens against a test subject +// and determine if the tokens are matched. Both test subject and tokens +// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"], +// but not of foo.bar, etc. +func isSubsetMatch(tokens []string, test string) bool { + tsa := [32]string{} + tts := tsa[:0] + start := 0 + for i := 0; i < len(test); i++ { + if test[i] == btsep { + tts = append(tts, test[start:i]) + start = i + 1 + } + } + tts = append(tts, test[start:]) + + // Walk the target tokens + for i, t2 := range tts { + if i >= len(tokens) { + return false + } + if t2[0] == fwc && len(t2) == 1 { + return true + } + t1 := tokens[i] + if t1[0] == fwc && len(t1) == 1 { + return false + } + if t1[0] == pwc && len(t1) == 1 { + m := t2[0] == pwc && len(t2) == 1 + if !m { + return false + } + if i >= len(tts) { + return true + } else { + continue + } + } + if t2[0] != pwc && strings.Compare(t1, t2) != 0 { + return false + } + } + return len(tokens) == len(tts) +} + // matchLiteral is used to test literal subjects, those that do not have any // wildcards, with a target subject. This is used in the cache layer. func matchLiteral(literal, subject string) bool { @@ -725,7 +771,7 @@ func matchLiteral(literal, subject string) bool { // This function has been optimized for speed. // For instance, do not set b:=subject[i] here since // we may bump `i` in this loop to avoid `continue` or - // skiping common test in a particular test. + // skipping common test in a particular test. // Run Benchmark_SublistMatchLiteral before making any change. switch subject[i] { case pwc: diff --git a/server/sublist_test.go b/server/sublist_test.go index 48c8540f..5801da3b 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -23,8 +23,6 @@ import ( "testing" "time" - dbg "runtime/debug" - "github.com/nats-io/nuid" ) @@ -421,8 +419,8 @@ func TestSublistBasicQueueResults(t *testing.T) { } func checkBool(b, expected bool, t *testing.T) { + t.Helper() if b != expected { - dbg.PrintStack() t.Fatalf("Expected %v, but got %v\n", expected, b) } } From f6cb706c68e2337fd5878b25947f5d9eb6abb608 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 20 Sep 2018 18:05:59 -0700 Subject: [PATCH 3/8] First pass req/reply across accounts Signed-off-by: Derek Collison --- server/accounts_test.go | 97 +++++++++++++++++++++++++++++++ server/auth.go | 123 ++++++++++++++++++++++++++++++++++++---- server/client.go | 58 ++++++++++++------- server/client_test.go | 3 + server/errors.go | 4 ++ server/parser.go | 2 +- server/route.go | 9 ++- server/server.go | 1 + 8 files changed, 262 insertions(+), 35 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index a40e2245..b4b2bcf9 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -572,3 +572,100 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { } checkPayload(crBar, []byte("hello\r\n"), t) } + +func TestCrossAccountRequestReply(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, crFoo, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + // Add in the service import for the requests. Make it public. + if err := cfoo.acc.addService(nil, "test.request"); err != nil { + t.Fatalf("Error adding account service import to client foo: %v", err) + } + + // Test addRoute to make sure it requires accounts, and literalsubjects for both from and to subjects. + if err := cbar.acc.addRoute(nil, "foo", "test.request"); err != ErrMissingAccount { + t.Fatalf("Expected ErrMissingAccount but received %v.", err) + } + if err := cbar.acc.addRoute(fooAcc, "*", "test.request"); err != ErrInvalidSubject { + t.Fatalf("Expected ErrInvalidSubject but received %v.", err) + } + if err := cbar.acc.addRoute(fooAcc, "foo", "test..request."); err != ErrInvalidSubject { + t.Fatalf("Expected ErrInvalidSubject but received %v.", err) + } + + // Now add in the Route for request to be routed to the foo account. + if err := cbar.acc.addRoute(fooAcc, "foo", "test.request"); err != nil { + t.Fatalf("Error adding account route to client bar: %v", err) + } + + // Now setup the resonder under cfoo + cfoo.parse([]byte("SUB test.request 1\r\n")) + + // Now send the request. Remember we expect the request on our local foo. We added the route + // with that "from" and will map it to "test.request" + go cbar.parseAndFlush([]byte("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n")) + + // Now read the request from crFoo + l, err := crFoo.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "test.request" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + if matches[REPLY_INDEX] != "bar" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + checkPayload(crFoo, []byte("help\r\n"), t) + + go cfoo.parseAndFlush([]byte("PUB bar 2\r\n22\r\n")) + + // Now read the response from crBar + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw = msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches = mraw[0] + if matches[SUB_INDEX] != "bar" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "11" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + if matches[REPLY_INDEX] != "" { + t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("22\r\n"), t) + + // Make sure we have no routes on fooAcc. An implicit one was created + /// for the response but should be removed when the response was processed. + if nr := fooAcc.numRoutes(); nr != 0 { + t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr) + } +} diff --git a/server/auth.go b/server/auth.go index 61279e98..37a80c12 100644 --- a/server/auth.go +++ b/server/auth.go @@ -46,17 +46,98 @@ type importMap struct { prefix string } -// Accounts -type Account struct { - Name string - mu sync.RWMutex - sl *Sublist - imports map[string]*importMap - exports map[string]map[string]*Account +// Route mapping struct +type routeMap struct { + acc *Account + from string + to string + ae bool } -// addImport will add in the import +// Accounts +type Account struct { + Name string + mu sync.RWMutex + sl *Sublist + imports map[string]*importMap + exports map[string]map[string]*Account + services map[string]map[string]*Account + // TODO(dlc) sync.Map may be better. + routes map[string]*routeMap +} + +func (a *Account) addService(accounts []*Account, subject string) error { + a.mu.Lock() + defer a.mu.Unlock() + if a == nil { + return ErrMissingAccount + } + if a.services == nil { + a.services = make(map[string]map[string]*Account) + } + ma := a.services[subject] + if accounts != nil && ma == nil { + ma = make(map[string]*Account) + } + for _, a := range accounts { + ma[a.Name] = a + } + a.services[subject] = ma + return nil +} + +// numRoutes returns the number of routes on this account. +func (a *Account) numRoutes() int { + a.mu.RLock() + defer a.mu.RUnlock() + return len(a.routes) +} + +// This will add a route to an account to send published messages / requests +// to the destination account. From is the local subject to map, To is the +// subject that will appear on the destination account. Destination will need +// to have an import rule to allow access via addService. +func (a *Account) addRoute(destination *Account, from, to string) error { + if destination == nil { + return ErrMissingAccount + } + if !IsValidLiteralSubject(from) || !IsValidLiteralSubject(to) { + return ErrInvalidSubject + } + // First check to see if the account has authorized us to route to the "to" subject. + if !destination.checkRouteAuthorized(a, to) { + return ErrAccountRouteAuthorization + } + + return a.addImplicitRoute(destination, from, to, false) +} + +// removeRoute will remove the route by subject. +func (a *Account) removeRoute(subject string) { + a.mu.Lock() + delete(a.routes, subject) + a.mu.Unlock() +} + +// Add a route to a connect from an implicit route created for a response to a request. +// This does no checks and should be only called by the msg processing code. Use addRoute +// above if responding to user input or config, etc. +func (a *Account) addImplicitRoute(destination *Account, from, to string, autoexpire bool) error { + a.mu.Lock() + if a.routes == nil { + a.routes = make(map[string]*routeMap) + } + a.routes[from] = &routeMap{destination, from, to, autoexpire} + a.mu.Unlock() + return nil +} + +// addImport will add in the import from a specific account. func (a *Account) addImport(account *Account, from, prefix string) error { + if account == nil { + return ErrMissingAccount + } + // First check to see if the account has authorized export of the subject. if !account.checkImportAuthorized(a, from) { return ErrAccountImportAuthorization @@ -64,9 +145,6 @@ func (a *Account) addImport(account *Account, from, prefix string) error { a.mu.Lock() defer a.mu.Unlock() - if account == nil { - return ErrMissingAccount - } if a.imports == nil { a.imports = make(map[string]*importMap) } @@ -103,6 +181,29 @@ func (a *Account) addExport(subject string, accounts []*Account) error { return nil } +// Check if another account is authorized to route requests to us. +func (a *Account) checkRouteAuthorized(account *Account, subject string) bool { + // Find the subject in the services list. + a.mu.RLock() + defer a.mu.RUnlock() + + if a.services == nil || !IsValidLiteralSubject(subject) { + return false + } + // These are always literal subjects so just lookup. + am, ok := a.services[subject] + if !ok { + return false + } + // Check to see if we are public or if we need to search for the account. + if am == nil { + return true + } + // Check that we allow this account. + _, ok = am[account.Name] + return ok +} + // Check if another account is authorized to import from us. func (a *Account) checkImportAuthorized(account *Account, subject string) bool { // Find the subject in the exports list. diff --git a/server/client.go b/server/client.go index f2e72d10..30f86d79 100644 --- a/server/client.go +++ b/server/client.go @@ -238,9 +238,11 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState { return &state } +// This is the main subscription struct that indicates +// interest in published messages. type subscription struct { client *client - im *importMap + im *importMap // This is for importing support. subject []byte queue []byte sid []byte @@ -1634,18 +1636,8 @@ func (c *client) pubAllowed(subject []byte) bool { return allowed } -// prepMsgHeader will prepare the message header prefix -func (c *client) prepMsgHeader() []byte { - // Use the scratch buffer.. - msgh := c.msgb[:msgHeadProtoLen] - - // msg header - msgh = append(msgh, c.pa.subject...) - return append(msgh, ' ') -} - // processMsg is called to process an inbound msg from a client. -func (c *client) processMsg(msg []byte) { +func (c *client) processInboundMsg(msg []byte) { // Snapshot server. srv := c.srv @@ -1689,9 +1681,8 @@ func (c *client) processMsg(msg []byte) { } if !ok { - subject := string(c.pa.subject) - r = c.sl.Match(subject) - c.in.results[subject] = r + r = c.sl.Match(string(c.pa.subject)) + c.in.results[string(c.pa.subject)] = r // Prune the results cache. Keeps us from unbounded growth. if len(c.in.results) > maxResultCacheSize { n := 0 @@ -1704,6 +1695,26 @@ func (c *client) processMsg(msg []byte) { } } + // Check to see if we need to route this message to + // another account via a route entry. + if c.typ == CLIENT && c.acc != nil && c.acc.routes != nil { + c.acc.mu.RLock() + rm := c.acc.routes[string(c.pa.subject)] + c.acc.mu.RUnlock() + // Get the results from the other account for the mapped "to" subject. + if rm != nil && rm.acc != nil && rm.acc.sl != nil { + if rm.ae { + c.acc.removeRoute(rm.from) + } + if c.pa.reply != nil { + rm.acc.addImplicitRoute(c.acc, string(c.pa.reply), string(c.pa.reply), true) + } + // FIXME(dlc) - Do L1 cache trick from above. + rr := rm.acc.sl.Match(rm.to) + c.processMsgResults(rr, msg, []byte(rm.to)) + } + } + // This is the fanout scale. fanout := len(r.psubs) + len(r.qsubs) @@ -1713,12 +1724,18 @@ func (c *client) processMsg(msg []byte) { } if c.typ == ROUTER { - c.processRoutedMsg(r, msg) - return + c.processRoutedMsgResults(r, msg) + } else if c.typ == CLIENT { + c.processMsgResults(r, msg, c.pa.subject) } +} - // Client connection processing here. - msgh := c.prepMsgHeader() +// This processes the sublist results for a given message. +func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { + // msg header + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, subject...) + msgh = append(msgh, ' ') si := len(msgh) // Used to only send messages once across any given route. @@ -1732,7 +1749,7 @@ func (c *client) processMsg(msg []byte) { if sub.client.typ == ROUTER { // Check to see if we have already sent it here. if rmap == nil { - rmap = make(map[string]struct{}, srv.numRoutes()) + rmap = make(map[string]struct{}, c.srv.numRoutes()) } sub.client.mu.Lock() if sub.client.nc == nil || @@ -1769,6 +1786,7 @@ func (c *client) processMsg(msg []byte) { if c.in.prand == nil { c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) } + // Process queue subs for i := 0; i < len(r.qsubs); i++ { qsubs := r.qsubs[i] diff --git a/server/client_test.go b/server/client_test.go index c3ee2f88..afa2efeb 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -57,6 +57,8 @@ func newClientForServer(s *Server) (*client, *bufio.Reader, string) { cr := bufio.NewReaderSize(cli, maxBufSize) ch := make(chan *client) createClientAsync(ch, s, srv) + // So failing tests don't just hang. + cli.SetReadDeadline(time.Now().Add(2 * time.Second)) l, _ := cr.ReadString('\n') // Grab client c := <-ch @@ -283,6 +285,7 @@ const ( ) func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) { + t.Helper() // Read in payload d := make([]byte, len(expected)) n, err := cr.Read(d) diff --git a/server/errors.go b/server/errors.go index b1601f97..1656048d 100644 --- a/server/errors.go +++ b/server/errors.go @@ -59,5 +59,9 @@ var ( // ErrMissingAccount is returned when an account does not exist. ErrMissingAccount = errors.New("Account Missing") + // ErrAccountImportAuthorization is returned when an import is not authorized. ErrAccountImportAuthorization = errors.New("Account Not Authorized: Subject Not Exported") + + // ErrAccountRouteAuthorization is returned when a route is not authorized. + ErrAccountRouteAuthorization = errors.New("Account Not Authorized On Service") ) diff --git a/server/parser.go b/server/parser.go index 088894fb..6eec1acc 100644 --- a/server/parser.go +++ b/server/parser.go @@ -234,7 +234,7 @@ func (c *client) parse(buf []byte) error { if len(c.msgBuf) != c.pa.size+LEN_CR_LF { goto parseErr } - c.processMsg(c.msgBuf) + c.processInboundMsg(c.msgBuf) c.argBuf, c.msgBuf = nil, nil c.drop, c.as, c.state = 0, i+1, OP_START default: diff --git a/server/route.go b/server/route.go index d206f233..37c32bca 100644 --- a/server/route.go +++ b/server/route.go @@ -212,12 +212,15 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group) } -// processRoutedMsg processes messages inbound from a route. -func (c *client) processRoutedMsg(r *SublistResult, msg []byte) { +// processRoutedMsgResults processes messages inbound from a route. +func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) { // Snapshot server. srv := c.srv - msgh := c.prepMsgHeader() + // msg header + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, ' ') si := len(msgh) // If we have a queue subscription, deliver direct diff --git a/server/server.go b/server/server.go index 492c8a8d..0c2f8570 100644 --- a/server/server.go +++ b/server/server.go @@ -835,6 +835,7 @@ func (s *Server) copyInfo() Info { func (s *Server) createClient(conn net.Conn) *client { // Snapshot server options. + // TODO(dlc) - This can get expensive. opts := s.getOpts() max_pay := int64(opts.MaxPayload) From c4bcbf62751cf46174de2299e40c8deffdcccbc5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 21 Sep 2018 16:49:27 -0700 Subject: [PATCH 4/8] Map anonymous reply subjects Signed-off-by: Derek Collison --- server/accounts_test.go | 57 +++++++++++++++++++++++++---------------- server/client.go | 49 +++++++++++++++++++++++++++-------- server/route.go | 8 +++--- 3 files changed, 78 insertions(+), 36 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index b4b2bcf9..8b44549a 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -14,6 +14,7 @@ package server import ( + "fmt" "os" "strings" "testing" @@ -69,7 +70,7 @@ func TestAccountIsolation(t *testing.T) { t.Fatalf("Error for client 'bar' from server: %v", err) } if !strings.HasPrefix(l, "PONG\r\n") { - t.Fatalf("PONG response incorrect: %q\n", l) + t.Fatalf("PONG response incorrect: %q", l) } go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")) @@ -83,7 +84,7 @@ func TestAccountIsolation(t *testing.T) { t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crFoo, []byte("hello\r\n"), t) @@ -93,7 +94,7 @@ func TestAccountIsolation(t *testing.T) { t.Fatalf("Error for client 'bar' from server: %v", err) } if !strings.HasPrefix(l, "PONG\r\n") { - t.Fatalf("PONG response incorrect: %q\n", l) + t.Fatalf("PONG response incorrect: %q", l) } } @@ -190,7 +191,7 @@ func TestAccountSimpleConfig(t *testing.T) { t.Fatalf("Received an error processing config file: %v", err) } if la := len(opts.Accounts); la != 2 { - t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + t.Fatalf("Expected to see 2 accounts in opts, got %d", la) } if !accountNameExists("foo", opts.Accounts) { t.Fatal("Expected a 'foo' account") @@ -232,11 +233,11 @@ func TestAccountParseConfig(t *testing.T) { } if la := len(opts.Accounts); la != 2 { - t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + t.Fatalf("Expected to see 2 accounts in opts, got %d", la) } if lu := len(opts.Users); lu != 4 { - t.Fatalf("Expected 4 total Users, got %d\n", lu) + t.Fatalf("Expected 4 total Users, got %d", lu) } var natsAcc *Account @@ -391,10 +392,10 @@ func TestSimpleMapping(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "import.foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != sid { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } } @@ -459,10 +460,10 @@ func TestNoPrefixWildcardMapping(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("hello\r\n"), t) } @@ -512,10 +513,10 @@ func TestPrefixWildcardMapping(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "pub.imports.foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("hello\r\n"), t) } @@ -565,10 +566,10 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "pub.imports.foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("hello\r\n"), t) } @@ -630,17 +631,19 @@ func TestCrossAccountRequestReply(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "test.request" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } - if matches[REPLY_INDEX] != "bar" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + // Make sure this looks like _INBOX + if !strings.HasPrefix(matches[REPLY_INDEX], "_INBOX.") { + t.Fatalf("Expected an _INBOX.* like reply, got '%s'", matches[REPLY_INDEX]) } checkPayload(crFoo, []byte("help\r\n"), t) - go cfoo.parseAndFlush([]byte("PUB bar 2\r\n22\r\n")) + replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX]) + go cfoo.parseAndFlush([]byte(replyOp)) // Now read the response from crBar l, err = crBar.ReadString('\n') @@ -653,13 +656,13 @@ func TestCrossAccountRequestReply(t *testing.T) { } matches = mraw[0] if matches[SUB_INDEX] != "bar" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "11" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } if matches[REPLY_INDEX] != "" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("22\r\n"), t) @@ -669,3 +672,13 @@ func TestCrossAccountRequestReply(t *testing.T) { t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr) } } + +func BenchmarkNewRouteReply(b *testing.B) { + opts := defaultServerOptions + s := New(&opts) + c, _, _ := newClientForServer(s) + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.newRouteReply() + } +} diff --git a/server/client.go b/server/client.go index 30f86d79..1589ecad 100644 --- a/server/client.go +++ b/server/client.go @@ -1480,11 +1480,11 @@ func (c *client) processUnsub(arg []byte) error { return nil } -func (c *client) msgHeader(mh []byte, sub *subscription) []byte { +func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte { mh = append(mh, sub.sid...) mh = append(mh, ' ') - if c.pa.reply != nil { - mh = append(mh, c.pa.reply...) + if reply != nil { + mh = append(mh, reply...) mh = append(mh, ' ') } mh = append(mh, c.pa.szb...) @@ -1636,6 +1636,32 @@ func (c *client) pubAllowed(subject []byte) bool { return allowed } +// Used to mimic client like replies. +const ( + replyPrefix = "_INBOX." + replyPrefixLen = len(replyPrefix) + digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + base = 62 +) + +// newRouteReply is used when rewriting replies that cross account boundaries. +// These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients. +func (c *client) newRouteReply() []byte { + // Check to see if we have our own rand yet. Global rand + // has contention with lots of clients, etc. + if c.in.prand == nil { + c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } + + var b = [15]byte{'_', 'I', 'N', 'B', 'O', 'X', '.'} + rn := c.in.prand.Int63() + for i, l := replyPrefixLen, rn; i < len(b); i++ { + b[i] = digits[l%base] + l /= base + } + return b[:] +} + // processMsg is called to process an inbound msg from a client. func (c *client) processInboundMsg(msg []byte) { // Snapshot server. @@ -1703,15 +1729,18 @@ func (c *client) processInboundMsg(msg []byte) { c.acc.mu.RUnlock() // Get the results from the other account for the mapped "to" subject. if rm != nil && rm.acc != nil && rm.acc.sl != nil { + var nrr []byte if rm.ae { c.acc.removeRoute(rm.from) } if c.pa.reply != nil { - rm.acc.addImplicitRoute(c.acc, string(c.pa.reply), string(c.pa.reply), true) + // We want to remap this to provide anonymity. + nrr = c.newRouteReply() + rm.acc.addImplicitRoute(c.acc, string(nrr), string(c.pa.reply), true) } // FIXME(dlc) - Do L1 cache trick from above. rr := rm.acc.sl.Match(rm.to) - c.processMsgResults(rr, msg, []byte(rm.to)) + c.processMsgResults(rr, msg, []byte(rm.to), nrr) } } @@ -1726,12 +1755,12 @@ func (c *client) processInboundMsg(msg []byte) { if c.typ == ROUTER { c.processRoutedMsgResults(r, msg) } else if c.typ == CLIENT { - c.processMsgResults(r, msg, c.pa.subject) + c.processMsgResults(r, msg, c.pa.subject, c.pa.reply) } } // This processes the sublist results for a given message. -func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { +func (c *client) processMsgResults(r *SublistResult, msg, subject, reply []byte) { // msg header msgh := c.msgb[:msgHeadProtoLen] msgh = append(msgh, subject...) @@ -1767,7 +1796,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { rmap[sub.client.route.remoteID] = routeSeen sub.client.mu.Unlock() } - // Check for mapped subs + // Check for import mapped subs if sub.im != nil && sub.im.prefix != "" { // Redo the subject here on the fly. msgh := c.msgb[:msgHeadProtoLen] @@ -1777,7 +1806,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { si = len(msgh) } // Normal delivery - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, reply) c.deliverMsg(sub, mh, msg) } @@ -1806,7 +1835,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { msgh = append(msgh, ' ') si = len(msgh) } - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, reply) if c.deliverMsg(sub, mh, msg) { break } diff --git a/server/route.go b/server/route.go index 37c32bca..bbfa4bf2 100644 --- a/server/route.go +++ b/server/route.go @@ -194,7 +194,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { rsub = sub continue } - mh := c.msgHeader(msgh[:], sub) + mh := c.msgHeader(msgh[:], sub, c.pa.reply) if c.deliverMsg(sub, mh, msg) { c.Debugf("Redelivery succeeded for message on group '%q'", group) return @@ -203,7 +203,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { // If we are here we failed to find a local, see if we snapshotted a // remote sub, and if so deliver to that. if rsub != nil { - mh := c.msgHeader(msgh[:], rsub) + mh := c.msgHeader(msgh[:], rsub, c.pa.reply) if c.deliverMsg(rsub, mh, msg) { c.Debugf("Re-routing message on group '%q' to remote server", group) return @@ -236,7 +236,7 @@ func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) { } didDeliver := false if sub != nil { - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, c.pa.reply) didDeliver = c.deliverMsg(sub, mh, msg) } if !didDeliver && c.srv != nil { @@ -261,7 +261,7 @@ func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) { sub.client.mu.Unlock() // Normal delivery - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, c.pa.reply) c.deliverMsg(sub, mh, msg) } } From ae21fa22b7c3fbdaf947474e4bb312d284383ec4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 22 Sep 2018 16:53:59 -0700 Subject: [PATCH 5/8] API changes to match config for account mappings Signed-off-by: Derek Collison --- server/accounts_test.go | 116 ++++++++++++++--------------- server/auth.go | 157 ++++++++++++++++++++++------------------ server/client.go | 22 +++--- server/errors.go | 8 +- 4 files changed, 158 insertions(+), 145 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 8b44549a..c70661e2 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -287,60 +287,60 @@ func TestAccountParseConfigDuplicateUsers(t *testing.T) { func TestImportAuthorized(t *testing.T) { _, foo, bar := simpleAccountServer(t) - checkBool(foo.checkImportAuthorized(bar, "foo"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*"), false, t) - checkBool(foo.checkImportAuthorized(bar, ">"), false, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t) - checkBool(foo.checkImportAuthorized(bar, "foo.>"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.>"), false, t) - foo.addExport("foo", isPublicExport) - checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) - checkBool(foo.checkImportAuthorized(bar, "bar"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*"), false, t) + foo.addStreamExport("foo", isPublicExport) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t) - foo.addExport("*", []*Account{bar}) - checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) - checkBool(foo.checkImportAuthorized(bar, "bar"), true, t) - checkBool(foo.checkImportAuthorized(bar, "baz"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar"), false, t) - checkBool(foo.checkImportAuthorized(bar, ">"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*.*"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*.>"), false, t) + foo.addStreamExport("*", []*Account{bar}) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), false, t) // Reset and test '>' public export _, foo, bar = simpleAccountServer(t) - foo.addExport(">", nil) + foo.addStreamExport(">", nil) // Everything should work. - checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) - checkBool(foo.checkImportAuthorized(bar, "bar"), true, t) - checkBool(foo.checkImportAuthorized(bar, "baz"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar"), true, t) - checkBool(foo.checkImportAuthorized(bar, ">"), true, t) - checkBool(foo.checkImportAuthorized(bar, "*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "*.*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "*.>"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), true, t) // Reset and test pwc and fwc s, foo, bar := simpleAccountServer(t) - foo.addExport("foo.*.baz.>", []*Account{bar}) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.1"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*.baz.1.1"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.22.baz.22"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz"), false, t) - checkBool(foo.checkImportAuthorized(bar, ""), false, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.*.*"), false, t) + foo.addStreamExport("foo.*.baz.>", []*Account{bar}) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.22.baz.22"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ""), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.*.*"), false, t) // Make sure we match the account as well fb, _ := s.RegisterAccount("foobar") bz, _ := s.RegisterAccount("baz") - checkBool(foo.checkImportAuthorized(fb, "foo.bar.baz.1"), false, t) - checkBool(foo.checkImportAuthorized(bz, "foo.bar.baz.1"), false, t) + checkBool(foo.checkStreamImportAuthorized(fb, "foo.bar.baz.1"), false, t) + checkBool(foo.checkStreamImportAuthorized(bz, "foo.bar.baz.1"), false, t) } func TestSimpleMapping(t *testing.T) { @@ -361,16 +361,16 @@ func TestSimpleMapping(t *testing.T) { } // Test first that trying to import with no matching export permission returns an error. - if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != ErrAccountImportAuthorization { + if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization { t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err) } // Now map the subject space between foo and bar. // Need to do export first. - if err := cfoo.acc.addExport("foo", nil); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport("foo", nil); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -432,10 +432,10 @@ func TestNoPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "*", ""); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -485,10 +485,10 @@ func TestPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -538,10 +538,10 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -592,23 +592,23 @@ func TestCrossAccountRequestReply(t *testing.T) { } // Add in the service import for the requests. Make it public. - if err := cfoo.acc.addService(nil, "test.request"); err != nil { + if err := cfoo.acc.addServiceExport(nil, "test.request"); err != nil { t.Fatalf("Error adding account service import to client foo: %v", err) } - // Test addRoute to make sure it requires accounts, and literalsubjects for both from and to subjects. - if err := cbar.acc.addRoute(nil, "foo", "test.request"); err != ErrMissingAccount { + // Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects. + if err := cbar.acc.addServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount { t.Fatalf("Expected ErrMissingAccount but received %v.", err) } - if err := cbar.acc.addRoute(fooAcc, "*", "test.request"); err != ErrInvalidSubject { + if err := cbar.acc.addServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject { t.Fatalf("Expected ErrInvalidSubject but received %v.", err) } - if err := cbar.acc.addRoute(fooAcc, "foo", "test..request."); err != ErrInvalidSubject { + if err := cbar.acc.addServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject { t.Fatalf("Expected ErrInvalidSubject but received %v.", err) } // Now add in the Route for request to be routed to the foo account. - if err := cbar.acc.addRoute(fooAcc, "foo", "test.request"); err != nil { + if err := cbar.acc.addServiceImport(fooAcc, "foo", "test.request"); err != nil { t.Fatalf("Error adding account route to client bar: %v", err) } @@ -666,9 +666,9 @@ func TestCrossAccountRequestReply(t *testing.T) { } checkPayload(crBar, []byte("22\r\n"), t) - // Make sure we have no routes on fooAcc. An implicit one was created - /// for the response but should be removed when the response was processed. - if nr := fooAcc.numRoutes(); nr != 0 { + // Make sure we have no service imports on fooAcc. An implicit one was created + // for the response but should be removed when the response was processed. + if nr := fooAcc.numServiceRoutes(); nr != 0 { t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr) } } @@ -679,6 +679,6 @@ func BenchmarkNewRouteReply(b *testing.B) { c, _, _ := newClientForServer(s) b.ResetTimer() for i := 0; i < b.N; i++ { - c.newRouteReply() + c.newServiceReply() } } diff --git a/server/auth.go b/server/auth.go index 37a80c12..38db688d 100644 --- a/server/auth.go +++ b/server/auth.go @@ -39,65 +39,78 @@ type ClientAuthentication interface { RegisterUser(*User) } -// Import mapping struct -type importMap struct { +// Import stream mapping struct +type streamImport struct { acc *Account from string prefix string } -// Route mapping struct -type routeMap struct { +// Import service mapping struct +type serviceImport struct { acc *Account from string to string ae bool } -// Accounts -type Account struct { - Name string - mu sync.RWMutex - sl *Sublist - imports map[string]*importMap - exports map[string]map[string]*Account - services map[string]map[string]*Account - // TODO(dlc) sync.Map may be better. - routes map[string]*routeMap +type importMap struct { + streams map[string]*streamImport + services map[string]*serviceImport // TODO(dlc) sync.Map may be better. } -func (a *Account) addService(accounts []*Account, subject string) error { +type exportMap struct { + streams map[string]map[string]*Account + services map[string]map[string]*Account +} + +// Accounts +type Account struct { + Name string + mu sync.RWMutex + sl *Sublist + imports importMap + exports exportMap + /* + imports map[string]*importMap + exports map[string]map[string]*Account + services map[string]map[string]*Account + routes map[string]*routeMap // TODO(dlc) sync.Map may be better. + */ +} + +func (a *Account) addServiceExport(accounts []*Account, subject string) error { a.mu.Lock() defer a.mu.Unlock() if a == nil { return ErrMissingAccount } - if a.services == nil { - a.services = make(map[string]map[string]*Account) + if a.exports.services == nil { + a.exports.services = make(map[string]map[string]*Account) } - ma := a.services[subject] + ma := a.exports.services[subject] if accounts != nil && ma == nil { ma = make(map[string]*Account) } for _, a := range accounts { ma[a.Name] = a } - a.services[subject] = ma + a.exports.services[subject] = ma return nil } -// numRoutes returns the number of routes on this account. -func (a *Account) numRoutes() int { +// numServiceRoutes returns the number of service routes on this account. +func (a *Account) numServiceRoutes() int { a.mu.RLock() defer a.mu.RUnlock() - return len(a.routes) + return len(a.imports.services) } // This will add a route to an account to send published messages / requests // to the destination account. From is the local subject to map, To is the // subject that will appear on the destination account. Destination will need // to have an import rule to allow access via addService. -func (a *Account) addRoute(destination *Account, from, to string) error { +func (a *Account) addServiceImport(destination *Account, from, to string) error { if destination == nil { return ErrMissingAccount } @@ -105,54 +118,54 @@ func (a *Account) addRoute(destination *Account, from, to string) error { return ErrInvalidSubject } // First check to see if the account has authorized us to route to the "to" subject. - if !destination.checkRouteAuthorized(a, to) { - return ErrAccountRouteAuthorization + if !destination.checkServiceImportAuthorized(a, to) { + return ErrServiceImportAuthorization } - return a.addImplicitRoute(destination, from, to, false) + return a.addImplicitServiceImport(destination, from, to, false) } -// removeRoute will remove the route by subject. -func (a *Account) removeRoute(subject string) { +// removeServiceImport will remove the route by subject. +func (a *Account) removeServiceImport(subject string) { a.mu.Lock() - delete(a.routes, subject) + delete(a.imports.services, subject) a.mu.Unlock() } -// Add a route to a connect from an implicit route created for a response to a request. +// Add a route to connect from an implicit route created for a response to a request. // This does no checks and should be only called by the msg processing code. Use addRoute // above if responding to user input or config, etc. -func (a *Account) addImplicitRoute(destination *Account, from, to string, autoexpire bool) error { +func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool) error { a.mu.Lock() - if a.routes == nil { - a.routes = make(map[string]*routeMap) + if a.imports.services == nil { + a.imports.services = make(map[string]*serviceImport) } - a.routes[from] = &routeMap{destination, from, to, autoexpire} + a.imports.services[from] = &serviceImport{destination, from, to, autoexpire} a.mu.Unlock() return nil } -// addImport will add in the import from a specific account. -func (a *Account) addImport(account *Account, from, prefix string) error { +// addStreamImport will add in the stream import from a specific account. +func (a *Account) addStreamImport(account *Account, from, prefix string) error { if account == nil { return ErrMissingAccount } // First check to see if the account has authorized export of the subject. - if !account.checkImportAuthorized(a, from) { - return ErrAccountImportAuthorization + if !account.checkStreamImportAuthorized(a, from) { + return ErrStreamImportAuthorization } a.mu.Lock() defer a.mu.Unlock() - if a.imports == nil { - a.imports = make(map[string]*importMap) + if a.imports.streams == nil { + a.imports.streams = make(map[string]*streamImport) } if prefix != "" && prefix[len(prefix)-1] != btsep { prefix = prefix + string(btsep) } // TODO(dlc) - collisions, etc. - a.imports[from] = &importMap{account, from, prefix} + a.imports.streams[from] = &streamImport{account, from, prefix} return nil } @@ -161,14 +174,14 @@ var isPublicExport = []*Account(nil) // addExport will add an export to the account. If accounts is nil // it will signify a public export, meaning anyone can impoort. -func (a *Account) addExport(subject string, accounts []*Account) error { +func (a *Account) addStreamExport(subject string, accounts []*Account) error { a.mu.Lock() defer a.mu.Unlock() if a == nil { return ErrMissingAccount } - if a.exports == nil { - a.exports = make(map[string]map[string]*Account) + if a.exports.streams == nil { + a.exports.streams = make(map[string]map[string]*Account) } var ma map[string]*Account for _, aa := range accounts { @@ -177,45 +190,22 @@ func (a *Account) addExport(subject string, accounts []*Account) error { } ma[aa.Name] = aa } - a.exports[subject] = ma + a.exports.streams[subject] = ma return nil } -// Check if another account is authorized to route requests to us. -func (a *Account) checkRouteAuthorized(account *Account, subject string) bool { - // Find the subject in the services list. - a.mu.RLock() - defer a.mu.RUnlock() - - if a.services == nil || !IsValidLiteralSubject(subject) { - return false - } - // These are always literal subjects so just lookup. - am, ok := a.services[subject] - if !ok { - return false - } - // Check to see if we are public or if we need to search for the account. - if am == nil { - return true - } - // Check that we allow this account. - _, ok = am[account.Name] - return ok -} - // Check if another account is authorized to import from us. -func (a *Account) checkImportAuthorized(account *Account, subject string) bool { +func (a *Account) checkStreamImportAuthorized(account *Account, subject string) bool { // Find the subject in the exports list. a.mu.RLock() defer a.mu.RUnlock() - if a.exports == nil || !IsValidSubject(subject) { + if a.exports.streams == nil || !IsValidSubject(subject) { return false } // Check direct match of subject first - am, ok := a.exports[subject] + am, ok := a.exports.streams[subject] if ok { // if am is nil that denotes a public export if am == nil { @@ -231,7 +221,7 @@ func (a *Account) checkImportAuthorized(account *Account, subject string) bool { // exact matches above. tokens := strings.Split(subject, tsep) - for subj, am := range a.exports { + for subj, am := range a.exports.streams { if isSubsetMatch(tokens, subj) { if am == nil { return true @@ -243,6 +233,29 @@ func (a *Account) checkImportAuthorized(account *Account, subject string) bool { return false } +// Check if another account is authorized to route requests to this service. +func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool { + // Find the subject in the services list. + a.mu.RLock() + defer a.mu.RUnlock() + + if a.exports.services == nil || !IsValidLiteralSubject(subject) { + return false + } + // These are always literal subjects so just lookup. + am, ok := a.exports.services[subject] + if !ok { + return false + } + // Check to see if we are public or if we need to search for the account. + if am == nil { + return true + } + // Check that we allow this account. + _, ok = am[account.Name] + return ok +} + // Nkey is for multiple nkey based users type NkeyUser struct { Nkey string `json:"user"` diff --git a/server/client.go b/server/client.go index 1589ecad..9ef65bf6 100644 --- a/server/client.go +++ b/server/client.go @@ -242,7 +242,7 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState { // interest in published messages. type subscription struct { client *client - im *importMap // This is for importing support. + im *streamImport // This is for importing support. subject []byte queue []byte sid []byte @@ -1352,10 +1352,10 @@ func (c *client) checkAccountImports(sub *subscription) error { subject := string(sub.subject) tokens := strings.Split(subject, tsep) - var rims [32]*importMap + var rims [32]*streamImport var ims = rims[:0] acc.mu.RLock() - for _, im := range acc.imports { + for _, im := range acc.imports.streams { if isSubsetMatch(tokens, im.prefix+im.from) { ims = append(ims, im) } @@ -1644,9 +1644,9 @@ const ( base = 62 ) -// newRouteReply is used when rewriting replies that cross account boundaries. +// newServiceReply is used when rewriting replies that cross account boundaries. // These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients. -func (c *client) newRouteReply() []byte { +func (c *client) newServiceReply() []byte { // Check to see if we have our own rand yet. Global rand // has contention with lots of clients, etc. if c.in.prand == nil { @@ -1722,21 +1722,21 @@ func (c *client) processInboundMsg(msg []byte) { } // Check to see if we need to route this message to - // another account via a route entry. - if c.typ == CLIENT && c.acc != nil && c.acc.routes != nil { + // another account. + if c.typ == CLIENT && c.acc != nil && c.acc.imports.services != nil { c.acc.mu.RLock() - rm := c.acc.routes[string(c.pa.subject)] + rm := c.acc.imports.services[string(c.pa.subject)] c.acc.mu.RUnlock() // Get the results from the other account for the mapped "to" subject. if rm != nil && rm.acc != nil && rm.acc.sl != nil { var nrr []byte if rm.ae { - c.acc.removeRoute(rm.from) + c.acc.removeServiceImport(rm.from) } if c.pa.reply != nil { // We want to remap this to provide anonymity. - nrr = c.newRouteReply() - rm.acc.addImplicitRoute(c.acc, string(nrr), string(c.pa.reply), true) + nrr = c.newServiceReply() + rm.acc.addImplicitServiceImport(c.acc, string(nrr), string(c.pa.reply), true) } // FIXME(dlc) - Do L1 cache trick from above. rr := rm.acc.sl.Match(rm.to) diff --git a/server/errors.go b/server/errors.go index 1656048d..003b9880 100644 --- a/server/errors.go +++ b/server/errors.go @@ -59,9 +59,9 @@ var ( // ErrMissingAccount is returned when an account does not exist. ErrMissingAccount = errors.New("Account Missing") - // ErrAccountImportAuthorization is returned when an import is not authorized. - ErrAccountImportAuthorization = errors.New("Account Not Authorized: Subject Not Exported") + // ErrStreamImportAuthorization is returned when a stream import is not authorized. + ErrStreamImportAuthorization = errors.New("Stream Import Not Authorized") - // ErrAccountRouteAuthorization is returned when a route is not authorized. - ErrAccountRouteAuthorization = errors.New("Account Not Authorized On Service") + // ErrServiceImportAuthorization is returned when a service import is not authorized. + ErrServiceImportAuthorization = errors.New("Service Import Not Authorized") ) From 9f8330bcc98a4a15198d0a7abba835dd62244ef6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 23 Sep 2018 12:11:42 -0700 Subject: [PATCH 6/8] Added import and export parsing for configs Signed-off-by: Derek Collison --- conf/lex.go | 2 + server/accounts_test.go | 148 +++++++++++++- server/auth.go | 31 ++- server/configs/accounts.conf | 47 +++++ server/opts.go | 384 ++++++++++++++++++++++++++++++++--- 5 files changed, 571 insertions(+), 41 deletions(-) create mode 100644 server/configs/accounts.conf diff --git a/conf/lex.go b/conf/lex.go index f9603a99..3eedaa89 100644 --- a/conf/lex.go +++ b/conf/lex.go @@ -594,6 +594,8 @@ func lexMapKeyStart(lx *lexer) stateFn { switch { case isKeySeparator(r): return lx.errorf("Unexpected key separator '%v'.", r) + case r == arrayEnd: + return lx.errorf("Unexpected array end '%v' processing map.", r) case unicode.IsSpace(r): lx.next() return lexSkip(lx, lexMapKeyStart) diff --git a/server/accounts_test.go b/server/accounts_test.go index c70661e2..fca206c8 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -284,6 +284,152 @@ func TestAccountParseConfigDuplicateUsers(t *testing.T) { } } +func TestAccountParseConfigImportsExports(t *testing.T) { + opts, err := ProcessConfigFile("./configs/accounts.conf") + if err != nil { + t.Fatal(err) + } + if la := len(opts.Accounts); la != 3 { + t.Fatalf("Expected to see 3 accounts in opts, got %d", la) + } + if lu := len(opts.Nkeys); lu != 4 { + t.Fatalf("Expected 4 total Nkey users, got %d", lu) + } + if lu := len(opts.Users); lu != 0 { + t.Fatalf("Expected no Users, got %d", lu) + } + var natsAcc, synAcc *Account + for _, acc := range opts.Accounts { + if acc.Name == "nats.io" { + natsAcc = acc + } + if acc.Name == "synadia" { + synAcc = acc + } + } + if natsAcc == nil { + t.Fatalf("Error retrieving account for 'nats.io'") + } + if natsAcc.Nkey != "AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE" { + t.Fatalf("Expected nats account to have an nkey, got %q\n", natsAcc.Nkey) + } + // Check user assigned to the correct account. + for _, nk := range opts.Nkeys { + if nk.Nkey == "UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW" { + if nk.Account != natsAcc { + t.Fatalf("Expected user to be associated with natsAcc, got %q\n", nk.Account.Name) + } + break + } + } + + // Now check for the imports and exports of streams and services. + if lis := len(natsAcc.imports.streams); lis != 2 { + t.Fatalf("Expected 2 imported streams, got %d\n", lis) + } + if lis := len(natsAcc.imports.services); lis != 1 { + t.Fatalf("Expected 1 imported service, got %d\n", lis) + } + if les := len(natsAcc.exports.services); les != 1 { + t.Fatalf("Expected 1 exported service, got %d\n", les) + } + if les := len(natsAcc.exports.streams); les != 0 { + t.Fatalf("Expected no exported streams, got %d\n", les) + } + + if synAcc == nil { + t.Fatalf("Error retrieving account for 'synadia'") + } + + if lis := len(synAcc.imports.streams); lis != 0 { + t.Fatalf("Expected no imported streams, got %d\n", lis) + } + if lis := len(synAcc.imports.services); lis != 1 { + t.Fatalf("Expected 1 imported service, got %d\n", lis) + } + if les := len(synAcc.exports.services); les != 2 { + t.Fatalf("Expected 2 exported service, got %d\n", les) + } + if les := len(synAcc.exports.streams); les != 2 { + t.Fatalf("Expected 2 exported streams, got %d\n", les) + } +} + +func TestImportExportConfigFailures(t *testing.T) { + // Import from unknow account + cf := createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{stream: {account: "synadia", subject:"foo"}}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import from unknown account") + } + // Import a service with no account. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{service: subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import of a service with no account") + } + // Import a service with a wildcard subject. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{service: {account: "nats.io", subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import of a service with wildcard subject") + } + // Export with unknown keyword. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + exports = [{service: "foo.*", wat:true}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with export with unknown keyword") + } + // Import with unknown keyword. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{stream: {account: nats.io, subject: "foo.*"}, wat:true}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import with unknown keyword") + } + // Export with an account. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + exports = [{service: {account: nats.io, subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with export with account") + } +} + func TestImportAuthorized(t *testing.T) { _, foo, bar := simpleAccountServer(t) @@ -592,7 +738,7 @@ func TestCrossAccountRequestReply(t *testing.T) { } // Add in the service import for the requests. Make it public. - if err := cfoo.acc.addServiceExport(nil, "test.request"); err != nil { + if err := cfoo.acc.addServiceExport("test.request", nil); err != nil { t.Fatalf("Error adding account service import to client foo: %v", err) } diff --git a/server/auth.go b/server/auth.go index 38db688d..a08c051a 100644 --- a/server/auth.go +++ b/server/auth.go @@ -39,6 +39,16 @@ type ClientAuthentication interface { RegisterUser(*User) } +// Accounts +type Account struct { + Name string + Nkey string + mu sync.RWMutex + sl *Sublist + imports importMap + exports exportMap +} + // Import stream mapping struct type streamImport struct { acc *Account @@ -64,22 +74,7 @@ type exportMap struct { services map[string]map[string]*Account } -// Accounts -type Account struct { - Name string - mu sync.RWMutex - sl *Sublist - imports importMap - exports exportMap - /* - imports map[string]*importMap - exports map[string]map[string]*Account - services map[string]map[string]*Account - routes map[string]*routeMap // TODO(dlc) sync.Map may be better. - */ -} - -func (a *Account) addServiceExport(accounts []*Account, subject string) error { +func (a *Account) addServiceExport(subject string, accounts []*Account) error { a.mu.Lock() defer a.mu.Unlock() if a == nil { @@ -114,6 +109,10 @@ func (a *Account) addServiceImport(destination *Account, from, to string) error if destination == nil { return ErrMissingAccount } + // Empty means use from. + if to == "" { + to = from + } if !IsValidLiteralSubject(from) || !IsValidLiteralSubject(to) { return ErrInvalidSubject } diff --git a/server/configs/accounts.conf b/server/configs/accounts.conf new file mode 100644 index 00000000..2554cb2b --- /dev/null +++ b/server/configs/accounts.conf @@ -0,0 +1,47 @@ + +accounts: { + synadia: { + nkey: ADMHMDX2LEUJRZQHGVSVRWZEJ2CPNHYO6TB4ZCZ37LXAX5SYNEW252GF + + users = [ + # Bob + {nkey : UC6NLCN7AS34YOJVCYD4PJ3QB7QGLYG5B5IMBT25VW5K4TNUJODM7BOX} + # Alice + {nkey : UBAAQWTW6CG2G6ANGNKB5U2B7HRWHSGMZEZX3AQSAJOQDAUGJD46LD2E} + ] + + exports = [ + {stream: "public.>"} # No accounts means public. + {stream: "synadia.private.>", accounts: [cncf, nats.io]} + {service: "pub.request"} # No accounts means public. + {service: "pub.special.request", accounts: [nats.io]} + ] + + imports = [ + {service: {account: "nats.io", subject: "nats.time"}} + ] + } + + nats.io: { + nkey: AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE + + users = [ + # Ivan + {nkey : UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW} + # Derek + {nkey : UDEREK22W43P2NFQCSKGM6BWD23OVWEDR7JE7LSNCD232MZIC4X2MEKZ} + ] + + imports = [ + {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} + {stream: {account: "synadia", subject:"synadia.private.*"}} + {service: {account: "synadia", subject: "pub.special.request"}, to: "synadia.request"} + ] + + exports = [ + {service: "nats.time"} + ] + } + + cncf: { nkey: ABDAYEV6KZVLW3GSJ3V7IWC542676TFYILXF2C7Z56LCPSMVHJE5BVYO} +} diff --git a/server/opts.go b/server/opts.go index 0b758c93..b73f793d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -309,11 +309,7 @@ func (o *Options) ProcessConfigFile(configFile string) error { case "logtime": o.Logtime = v.(bool) case "accounts": - if pedantic { - err = parseAccounts(tk, o) - } else { - err = parseAccounts(v, o) - } + err = parseAccounts(v, o) if err != nil { return err } @@ -606,14 +602,40 @@ func setClusterPermissions(opts *ClusterOpts, perms *Permissions) { } } +// Temp structures to hold account import and export defintions since they need +// to be processed after being parsed. +type export struct { + acc *Account + sub string + accs []string +} + +type importStream struct { + acc *Account + an string + sub string + pre string +} + +type importService struct { + acc *Account + an string + sub string + to string +} + // parseAccounts will parse the different accounts syntax. func parseAccounts(v interface{}, opts *Options) error { - var pedantic = opts.CheckConfig - var tk token + var ( + pedantic = opts.CheckConfig + importStreams []*importStream + importServices []*importService + exportStreams []*export + exportServices []*export + ) _, v = unwrapValue(v) - uorn := make(map[string]struct{}) - switch v.(type) { + // Simple array of account names. case []interface{}, []string: m := make(map[string]struct{}, len(v.([]interface{}))) for _, name := range v.([]interface{}) { @@ -621,27 +643,47 @@ func parseAccounts(v interface{}, opts *Options) error { if _, ok := m[ns]; ok { return fmt.Errorf("Duplicate Account Entry: %s", ns) } - opts.Accounts = append(opts.Accounts, &Account{Name: name.(string)}) + opts.Accounts = append(opts.Accounts, &Account{Name: ns}) m[ns] = struct{}{} } + // More common map entry case map[string]interface{}: - m := make(map[string]struct{}, len(v.(map[string]interface{}))) - for name, mv := range v.(map[string]interface{}) { - _, mv = unwrapValue(mv) - if _, ok := m[name]; ok { - return fmt.Errorf("Duplicate Account Entry: %s", name) - } - uv, ok := mv.(map[string]interface{}) + // Track users across accounts, must be unique across + // accounts and nkeys vs users. + uorn := make(map[string]struct{}) + for aname, mv := range v.(map[string]interface{}) { + _, amv := unwrapValue(mv) + // These should be maps. + mv, ok := amv.(map[string]interface{}) if !ok { - return fmt.Errorf("Expected map entry for users") + return fmt.Errorf("Expected map entries for accounts") } - acc := &Account{Name: name} + acc := &Account{Name: aname} opts.Accounts = append(opts.Accounts, acc) - m[name] = struct{}{} - for k, v := range uv { - tk, mv = unwrapValue(v) + for k, v := range mv { + tk, mv := unwrapValue(v) switch strings.ToLower(k) { + case "nkey": + nk, ok := mv.(string) + if !ok || !nkeys.IsValidPublicAccountKey(nk) { + return fmt.Errorf("Not a valid public nkey for an account: %q", v) + } + acc.Nkey = nk + case "imports": + streams, services, err := parseAccountImports(mv, acc, pedantic) + if err != nil { + return err + } + importStreams = append(importStreams, streams...) + importServices = append(importServices, services...) + case "exports": + streams, services, err := parseAccountExports(mv, acc, pedantic) + if err != nil { + return err + } + exportStreams = append(exportStreams, streams...) + exportServices = append(exportServices, services...) case "users": var ( users []*User @@ -673,15 +715,309 @@ func parseAccounts(v interface{}, opts *Options) error { u.Account = acc } opts.Nkeys = append(opts.Nkeys, nkeys...) + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return &unknownConfigFieldErr{ + field: k, + token: tk, + configFile: tk.SourceFile(), + } + } } } } - default: - return fmt.Errorf("Expected an array or map of account entries, got %T", v) } + + // Parse Imports and Exports here after all accounts defined. + // Do exports first since they need to be defined for imports to succeed + // since we do permissions checks. + + // Create a lookup map for accounts lookups. + am := make(map[string]*Account, len(opts.Accounts)) + for _, a := range opts.Accounts { + am[a.Name] = a + } + // Do stream exports + for _, stream := range exportStreams { + // Make array of accounts if applicable. + var accounts []*Account + for _, an := range stream.accs { + ta := am[an] + if ta == nil { + return fmt.Errorf("%q account not defined for stream export", an) + } + accounts = append(accounts, ta) + } + if err := stream.acc.addStreamExport(stream.sub, accounts); err != nil { + return fmt.Errorf("Error adding stream export %q: %v", stream.sub, err) + } + } + for _, service := range exportServices { + // Make array of accounts if applicable. + var accounts []*Account + for _, an := range service.accs { + ta := am[an] + if ta == nil { + return fmt.Errorf("%q account not defined for service export", an) + } + accounts = append(accounts, ta) + } + if err := service.acc.addServiceExport(service.sub, accounts); err != nil { + return fmt.Errorf("Error adding service export %q: %v", service.sub, err) + } + } + for _, stream := range importStreams { + ta := am[stream.an] + if ta == nil { + return fmt.Errorf("%q account not defined for stream import", stream.an) + } + if err := stream.acc.addStreamImport(ta, stream.sub, stream.pre); err != nil { + return fmt.Errorf("Error adding stream import %q: %v", stream.sub, err) + } + } + for _, service := range importServices { + ta := am[service.an] + if ta == nil { + return fmt.Errorf("%q account not defined for service import", service.an) + } + if service.to == "" { + service.to = service.sub + } + if err := service.acc.addServiceImport(ta, service.to, service.sub); err != nil { + return fmt.Errorf("Error adding service import %q: %v", service.sub, err) + } + } + return nil } +// Parse the account imports +func parseAccountExports(v interface{}, acc *Account, pedantic bool) ([]*export, []*export, error) { + // This should be an array of objects/maps. + _, v = unwrapValue(v) + ims, ok := v.([]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Exports should be an array, got %T", v) + } + + var services []*export + var streams []*export + + for _, v := range ims { + _, mv := unwrapValue(v) + io, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Export Items should be a map with type entry, got %T", mv) + } + // Should have stream or service + stream, service, err := parseExportStreamOrService(io, pedantic) + if err != nil { + return nil, nil, err + } + if service != nil { + service.acc = acc + services = append(services, service) + } + if stream != nil { + stream.acc = acc + streams = append(streams, stream) + } + } + return streams, services, nil +} + +// Parse the account imports +func parseAccountImports(v interface{}, acc *Account, pedantic bool) ([]*importStream, []*importService, error) { + // This should be an array of objects/maps. + _, v = unwrapValue(v) + ims, ok := v.([]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Imports should be an array, got %T", v) + } + + var services []*importService + var streams []*importStream + + for _, v := range ims { + _, mv := unwrapValue(v) + io, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Import Items should be a map with type entry, got %T", mv) + } + // Should have stream or service + stream, service, err := parseImportStreamOrService(io, pedantic) + if err != nil { + return nil, nil, err + } + if service != nil { + service.acc = acc + services = append(services, service) + } + if stream != nil { + stream.acc = acc + streams = append(streams, stream) + } + } + return streams, services, nil +} + +// Helper to parse an embedded account description for imported services or streams. +func parseAccount(v map[string]interface{}, pedantic bool) (string, string, error) { + var accountName, subject string + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "account": + accountName = mv.(string) + case "subject": + subject = mv.(string) + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return "", "", &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + } + } + if accountName == "" || subject == "" { + return "", "", fmt.Errorf("Expect an account name and a subject") + } + return accountName, subject, nil +} + +// Parse an import stream or service. +// e.g. +// {stream: "public.>"} # No accounts means public. +// {stream: "synadia.private.>", accounts: [cncf, natsio]} +// {service: "pub.request"} # No accounts means public. +// {service: "pub.special.request", accounts: [nats.io]} +func parseExportStreamOrService(v map[string]interface{}, pedantic bool) (*export, *export, error) { + var ( + curStream *export + curService *export + accounts []string + ) + + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "stream": + if curService != nil { + return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv) + } + curStream = &export{sub: mv.(string)} + if accounts != nil { + curStream.accs = accounts + } + case "service": + if curStream != nil { + return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) + } + curService = &export{sub: mv.(string)} + if accounts != nil { + curService.accs = accounts + } + case "accounts": + for _, iv := range mv.([]interface{}) { + _, mv := unwrapValue(iv) + accounts = append(accounts, mv.(string)) + } + if curStream != nil { + curStream.accs = accounts + } else if curService != nil { + curService.accs = accounts + } + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return nil, nil, &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + return nil, nil, fmt.Errorf("Unknown field %q parsing export service or stream", mk) + } + + } + return curStream, curService, nil +} + +// Parse an import stream or service. +// e.g. +// {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} +// {stream: {account: "synadia", subject:"synadia.private.*"}} +// {service: {account: "synadia", subject: "pub.special.request"}, subject: "synadia.request"} +func parseImportStreamOrService(v map[string]interface{}, pedantic bool) (*importStream, *importService, error) { + var ( + curStream *importStream + curService *importService + pre, to string + ) + + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "stream": + if curService != nil { + return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv) + } + ac, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Stream entry should be an account map, got %T", mv) + } + // Make sure this is a map with account and subject + accountName, subject, err := parseAccount(ac, pedantic) + if err != nil { + return nil, nil, err + } + curStream = &importStream{an: accountName, sub: subject} + if pre != "" { + curStream.pre = pre + } + case "service": + if curStream != nil { + return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) + } + ac, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Service entry should be an account map, got %T", mv) + } + // Make sure this is a map with account and subject + accountName, subject, err := parseAccount(ac, pedantic) + if err != nil { + return nil, nil, err + } + curService = &importService{an: accountName, sub: subject} + if to != "" { + curService.to = to + } + case "prefix": + pre = mv.(string) + if curStream != nil { + curStream.pre = pre + } + case "to": + to = mv.(string) + if curService != nil { + curService.to = to + } + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return nil, nil, &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + return nil, nil, fmt.Errorf("Unknown field %q parsing import service or stream", mk) + } + + } + return curStream, curService, nil +} + // Helper function to parse Authorization configs. func parseAuthorization(v interface{}, opts *Options) (*authorization, error) { var ( From 14cdda8cd469b08751efa73d45482e9bbc2c0e45 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 26 Sep 2018 19:22:34 +0200 Subject: [PATCH 7/8] Updates from comments Signed-off-by: Derek Collison --- server/accounts_test.go | 174 +++++++++++++++++++++++++++++++++++----- server/auth.go | 5 +- server/client.go | 76 +++++++++++++----- server/opts.go | 6 +- server/reload.go | 6 +- server/route.go | 2 +- server/server.go | 4 +- 7 files changed, 228 insertions(+), 45 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index fca206c8..4628ba4a 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -14,10 +14,14 @@ package server import ( + "encoding/base64" + "encoding/json" "fmt" "os" "strings" "testing" + + "github.com/nats-io/nkeys" ) func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) { @@ -45,9 +49,6 @@ func TestRegisterDuplicateAccounts(t *testing.T) { func TestAccountIsolation(t *testing.T) { s, fooAcc, barAcc := simpleAccountServer(t) - if fooAcc == nil || barAcc == nil { - t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") - } cfoo, crFoo, _ := newClientForServer(s) if err := cfoo.registerWithAccount(fooAcc); err != nil { t.Fatalf("Error register client with 'foo' account: %v", err) @@ -135,10 +136,18 @@ func TestNewAccountsFromClients(t *testing.T) { opts.AllowNewAccounts = true s = New(&opts) - c, _, _ = newClientForServer(s) + c, cr, _ = newClientForServer(s) err := c.parse(connectOp) if err != nil { - t.Fatalf("Received an error trying to create an account: %v", err) + t.Fatalf("Received an error trying to connect: %v", err) + } + go c.parse([]byte("PING\r\n")) + l, err = cr.ReadString('\n') + if err != nil { + t.Fatalf("Error reading response for client from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q", l) } } @@ -255,7 +264,6 @@ func TestAccountParseConfig(t *testing.T) { if u.Username == "derek" { if u.Account != natsAcc { t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account) - break } } } @@ -302,8 +310,7 @@ func TestAccountParseConfigImportsExports(t *testing.T) { for _, acc := range opts.Accounts { if acc.Name == "nats.io" { natsAcc = acc - } - if acc.Name == "synadia" { + } else if acc.Name == "synadia" { synAcc = acc } } @@ -420,7 +427,7 @@ func TestImportExportConfigFailures(t *testing.T) { cf = createConfFile(t, []byte(` accounts { nats.io { - exports = [{service: {account: nats.io, subject:"foo.*"}] + exports = [{service: {account: nats.io, subject:"foo.*"}}] } } `)) @@ -490,6 +497,7 @@ func TestImportAuthorized(t *testing.T) { } func TestSimpleMapping(t *testing.T) { + t.Helper() s, fooAcc, barAcc := simpleAccountServer(t) defer s.Shutdown() @@ -555,7 +563,7 @@ func TestSimpleMapping(t *testing.T) { l, err = crBar.ReadString('\n') if err != nil { - t.Fatalf("Error reading from client 'baz': %v", err) + t.Fatalf("Error reading from client 'bar': %v", err) } checkMsg(l, "2") checkPayload(crBar, []byte("hello\r\n"), t) @@ -578,11 +586,11 @@ func TestNoPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. - t.Fatalf("Error adding account export to client foo: %v", err) + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) } if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil { - t.Fatalf("Error adding account import to client bar: %v", err) + t.Fatalf("Error adding stream import to client bar: %v", err) } // Normal Subscription on bar client for literal "foo". @@ -631,11 +639,12 @@ func TestPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. - t.Fatalf("Error adding account export to client foo: %v", err) + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) } + // Checking that trailing '.' is accepted, tested that it is auto added above. if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { - t.Fatalf("Error adding account import to client bar: %v", err) + t.Fatalf("Error adding stream import to client bar: %v", err) } // Normal Subscription on bar client for wildcard. @@ -684,11 +693,11 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. - t.Fatalf("Error adding account export to client foo: %v", err) + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) } if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { - t.Fatalf("Error adding account import to client bar: %v", err) + t.Fatalf("Error adding stream import to client bar: %v", err) } // Normal Subscription on bar client for wildcard. @@ -819,6 +828,133 @@ func TestCrossAccountRequestReply(t *testing.T) { } } +func TestAccountMapsUsers(t *testing.T) { + // Used for the nkey users to properly sign. + seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM" + seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE" + + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: derek, password: foo}, + {nkey: UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR} + ] + } + nats { + users = [ + {user: ivan, password: bar}, + {nkey: UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH} + ] + } + } + `)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Unexpected error parsing config file: %v", err) + } + s := New(opts) + synadia := s.LookupAccount("synadia") + nats := s.LookupAccount("nats") + + if synadia == nil || nats == nil { + t.Fatalf("Expected non nil accounts during lookup") + } + + // Make sure a normal log in maps the accounts correctly. + c, _, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n") + c.parse(connectOp) + if c.acc != synadia { + t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc) + } + // Also test client sublist. + if c.sl != synadia.sl { + t.Fatalf("Expected the client's sublist to match 'synadia' account") + } + + c, _, _ = newClientForServer(s) + connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n") + c.parse(connectOp) + if c.acc != nats { + t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc) + } + // Also test client sublist. + if c.sl != nats.sl { + t.Fatalf("Expected the client's sublist to match 'nats' account") + } + + // Now test nkeys as well. + kp, _ := nkeys.FromSeed(seed1) + pubKey, _ := kp.PublicKey() + + c, cr, l := newClientForServer(s) + // Check for Nonce + var info nonceInfo + err = json.Unmarshal([]byte(l[5:]), &info) + if err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err := kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig := base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != synadia { + t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc) + } + // Also test client sublist. + if c.sl != synadia.sl { + t.Fatalf("Expected the client's sublist to match 'synadia' account") + } + + // Now nats account nkey user. + kp, _ = nkeys.FromSeed(seed2) + pubKey, _ = kp.PublicKey() + + c, cr, l = newClientForServer(s) + // Check for Nonce + err = json.Unmarshal([]byte(l[5:]), &info) + if err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err = kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig = base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != nats { + t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc) + } + // Also test client sublist. + if c.sl != nats.sl { + t.Fatalf("Expected the client's sublist to match 'nats' account") + } +} + func BenchmarkNewRouteReply(b *testing.B) { opts := defaultServerOptions s := New(&opts) diff --git a/server/auth.go b/server/auth.go index a08c051a..c9435728 100644 --- a/server/auth.go +++ b/server/auth.go @@ -64,11 +64,13 @@ type serviceImport struct { ae bool } +// importMap tracks the imported streams and services. type importMap struct { streams map[string]*streamImport services map[string]*serviceImport // TODO(dlc) sync.Map may be better. } +// exportMap tracks the exported streams and services. type exportMap struct { streams map[string]map[string]*Account services map[string]map[string]*Account @@ -364,7 +366,7 @@ func (s *Server) checkAuthforWarnings() { } if warn { // Warning about using plaintext passwords. - s.Warnf("Plaintext passwords detected. Use Nkeys or Bcrypt passwords in config files.") + s.Warnf("Plaintext passwords detected, use nkeys or bcrypt.") } } @@ -480,6 +482,7 @@ func (s *Server) isClientAuthorized(c *client) bool { if err := pub.Verify(c.nonce, sig); err != nil { return false } + c.RegisterNkeyUser(nkey) return true } diff --git a/server/client.go b/server/client.go index 9ef65bf6..1a0f2480 100644 --- a/server/client.go +++ b/server/client.go @@ -335,13 +335,42 @@ func (c *client) registerWithAccount(acc *Account) error { } // RegisterUser allows auth to call back into a new client -// with the authenticated user. This is used to map any permissions -// into the client. +// with the authenticated user. This is used to map +// any permissions into the client and setup accounts. func (c *client) RegisterUser(user *User) { - // Process Permissions and map into client connection structures. c.mu.Lock() defer c.mu.Unlock() + // Register with proper account and sublist. + if user.Account != nil { + c.acc = user.Account + c.sl = c.acc.sl + } + + // Assign permissions. + if user.Permissions == nil { + // Reset perms to nil in case client previously had them. + c.perms = nil + return + } + + c.setPermissions(user.Permissions) +} + +// RegisterNkey allows auth to call back into a new nkey +// client with the authenticated user. This is used to map +// any permissions into the client and setup accounts. +func (c *client) RegisterNkeyUser(user *NkeyUser) { + c.mu.Lock() + defer c.mu.Unlock() + + // Register with proper account and sublist. + if user.Account != nil { + c.acc = user.Account + c.sl = c.acc.sl + } + + // Assign permissions. if user.Permissions == nil { // Reset perms to nil in case client previously had them. c.perms = nil @@ -1339,7 +1368,6 @@ func (c *client) processSub(argo []byte) (err error) { // Check to see if we need to create a shadow subscription due to imports // in other accounts. -// Assume lock is held func (c *client) checkAccountImports(sub *subscription) error { c.mu.Lock() acc := c.acc @@ -1967,28 +1995,38 @@ func (c *client) typeString() string { // longer authorized, e.g. due to a config reload. func (c *client) removeUnauthorizedSubs() { c.mu.Lock() - if c.perms == nil { + if c.perms == nil || c.sl == nil { c.mu.Unlock() return } srv := c.srv - subs := make(map[string]*subscription, len(c.subs)) - for sid, sub := range c.subs { - subs[sid] = sub + + var subsa [32]*subscription + subs := subsa[:0] + for _, sub := range c.subs { + subs = append(subs, sub) + } + + var removedSubs [32]*subscription + removed := removedSubs[:0] + + for _, sub := range subs { + if !c.canSubscribe(sub.subject) { + removed = append(removed, sub) + delete(c.subs, string(sub.sid)) + } } c.mu.Unlock() - for sid, sub := range subs { - if c.sl != nil && !c.canSubscribe(sub.subject) { - _ = c.sl.Remove(sub) - c.mu.Lock() - delete(c.subs, sid) - c.mu.Unlock() - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", - sub.subject, sub.sid)) - srv.Noticef("Removed sub %q for user %q - not authorized", - string(sub.subject), c.opts.Username) - } + // Remove unauthorized clients subscriptions. + c.sl.RemoveBatch(removed) + + // Report back to client and logs. + for _, sub := range removed { + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", + sub.subject, sub.sid)) + srv.Noticef("Removed sub %q for user %q - not authorized", + string(sub.subject), c.opts.Username) } } diff --git a/server/opts.go b/server/opts.go index b73f793d..27427ad3 100644 --- a/server/opts.go +++ b/server/opts.go @@ -915,7 +915,11 @@ func parseExportStreamOrService(v map[string]interface{}, pedantic bool) (*expor if curStream != nil { return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) } - curService = &export{sub: mv.(string)} + mvs, ok := mv.(string) + if !ok { + return nil, nil, fmt.Errorf("Expected service to be string name, got %T", mv) + } + curService = &export{sub: mvs} if accounts != nil { curService.accs = accounts } diff --git a/server/reload.go b/server/reload.go index 3e57c90b..161863bc 100644 --- a/server/reload.go +++ b/server/reload.go @@ -762,7 +762,8 @@ func (s *Server) reloadClusterPermissions() { subsNeedUNSUB []*subscription deleteRoutedSubs []*subscription ) - s.sl.localSubs(&localSubs) + // FIXME(dlc) - Change for accounts. + s.gsl.localSubs(&localSubs) // Go through all local subscriptions for _, sub := range localSubs { @@ -810,7 +811,8 @@ func (s *Server) reloadClusterPermissions() { route.mu.Unlock() } // Remove as a batch all the subs that we have removed from each route. - s.sl.RemoveBatch(deleteRoutedSubs) + // FIXME(dlc) - Change for accounts. + s.gsl.RemoveBatch(deleteRoutedSubs) } // validateClusterOpts ensures the new ClusterOpts does not change host or diff --git a/server/route.go b/server/route.go index bbfa4bf2..7e82d627 100644 --- a/server/route.go +++ b/server/route.go @@ -428,7 +428,7 @@ func (s *Server) updateRemoteRoutePerms(route *client, info *Info) { _localSubs [4096]*subscription localSubs = _localSubs[:0] ) - s.sl.localSubs(&localSubs) + s.gsl.localSubs(&localSubs) route.sendRouteSubProtos(localSubs, func(sub *subscription) bool { subj := sub.subject diff --git a/server/server.go b/server/server.go index 0c2f8570..a0e81aaf 100644 --- a/server/server.go +++ b/server/server.go @@ -305,7 +305,8 @@ func (s *Server) newAccountsAllowed() bool { return s.opts.AllowNewAccounts } -func (s *Server) LookupOrRegisterAccount(name string) (*Account, bool) { +// LookupOrRegisterAccount will return the given account if known or create a new entry. +func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) { s.mu.Lock() defer s.mu.Unlock() if acc, ok := s.accounts[name]; ok { @@ -835,7 +836,6 @@ func (s *Server) copyInfo() Info { func (s *Server) createClient(conn net.Conn) *client { // Snapshot server options. - // TODO(dlc) - This can get expensive. opts := s.getOpts() max_pay := int64(opts.MaxPayload) From dc745f02be0a080b5b6b67a2ff16e39c0abd051d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 1 Oct 2018 08:47:35 -0700 Subject: [PATCH 8/8] Remove helper Signed-off-by: Derek Collison --- server/accounts_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 4628ba4a..02555d92 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -497,7 +497,6 @@ func TestImportAuthorized(t *testing.T) { } func TestSimpleMapping(t *testing.T) { - t.Helper() s, fooAcc, barAcc := simpleAccountServer(t) defer s.Shutdown()