From 14cdda8cd469b08751efa73d45482e9bbc2c0e45 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 26 Sep 2018 19:22:34 +0200 Subject: [PATCH] Updates from comments Signed-off-by: Derek Collison --- server/accounts_test.go | 174 +++++++++++++++++++++++++++++++++++----- server/auth.go | 5 +- server/client.go | 76 +++++++++++++----- server/opts.go | 6 +- server/reload.go | 6 +- server/route.go | 2 +- server/server.go | 4 +- 7 files changed, 228 insertions(+), 45 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index fca206c8..4628ba4a 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -14,10 +14,14 @@ package server import ( + "encoding/base64" + "encoding/json" "fmt" "os" "strings" "testing" + + "github.com/nats-io/nkeys" ) func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) { @@ -45,9 +49,6 @@ func TestRegisterDuplicateAccounts(t *testing.T) { func TestAccountIsolation(t *testing.T) { s, fooAcc, barAcc := simpleAccountServer(t) - if fooAcc == nil || barAcc == nil { - t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") - } cfoo, crFoo, _ := newClientForServer(s) if err := cfoo.registerWithAccount(fooAcc); err != nil { t.Fatalf("Error register client with 'foo' account: %v", err) @@ -135,10 +136,18 @@ func TestNewAccountsFromClients(t *testing.T) { opts.AllowNewAccounts = true s = New(&opts) - c, _, _ = newClientForServer(s) + c, cr, _ = newClientForServer(s) err := c.parse(connectOp) if err != nil { - t.Fatalf("Received an error trying to create an account: %v", err) + t.Fatalf("Received an error trying to connect: %v", err) + } + go c.parse([]byte("PING\r\n")) + l, err = cr.ReadString('\n') + if err != nil { + t.Fatalf("Error reading response for client from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q", l) } } @@ -255,7 +264,6 @@ func TestAccountParseConfig(t *testing.T) { if u.Username == "derek" { if u.Account != natsAcc { t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account) - break } } } @@ -302,8 +310,7 @@ func TestAccountParseConfigImportsExports(t *testing.T) { for _, acc := range opts.Accounts { if acc.Name == "nats.io" { natsAcc = acc - } - if acc.Name == "synadia" { + } else if acc.Name == "synadia" { synAcc = acc } } @@ -420,7 +427,7 @@ func TestImportExportConfigFailures(t *testing.T) { cf = createConfFile(t, []byte(` accounts { nats.io { - exports = [{service: {account: nats.io, subject:"foo.*"}] + exports = [{service: {account: nats.io, subject:"foo.*"}}] } } `)) @@ -490,6 +497,7 @@ func TestImportAuthorized(t *testing.T) { } func TestSimpleMapping(t *testing.T) { + t.Helper() s, fooAcc, barAcc := simpleAccountServer(t) defer s.Shutdown() @@ -555,7 +563,7 @@ func TestSimpleMapping(t *testing.T) { l, err = crBar.ReadString('\n') if err != nil { - t.Fatalf("Error reading from client 'baz': %v", err) + t.Fatalf("Error reading from client 'bar': %v", err) } checkMsg(l, "2") checkPayload(crBar, []byte("hello\r\n"), t) @@ -578,11 +586,11 @@ func TestNoPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. - t.Fatalf("Error adding account export to client foo: %v", err) + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) } if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil { - t.Fatalf("Error adding account import to client bar: %v", err) + t.Fatalf("Error adding stream import to client bar: %v", err) } // Normal Subscription on bar client for literal "foo". @@ -631,11 +639,12 @@ func TestPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. - t.Fatalf("Error adding account export to client foo: %v", err) + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) } + // Checking that trailing '.' is accepted, tested that it is auto added above. if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { - t.Fatalf("Error adding account import to client bar: %v", err) + t.Fatalf("Error adding stream import to client bar: %v", err) } // Normal Subscription on bar client for wildcard. @@ -684,11 +693,11 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. - t.Fatalf("Error adding account export to client foo: %v", err) + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) } if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { - t.Fatalf("Error adding account import to client bar: %v", err) + t.Fatalf("Error adding stream import to client bar: %v", err) } // Normal Subscription on bar client for wildcard. @@ -819,6 +828,133 @@ func TestCrossAccountRequestReply(t *testing.T) { } } +func TestAccountMapsUsers(t *testing.T) { + // Used for the nkey users to properly sign. + seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM" + seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE" + + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: derek, password: foo}, + {nkey: UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR} + ] + } + nats { + users = [ + {user: ivan, password: bar}, + {nkey: UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH} + ] + } + } + `)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Unexpected error parsing config file: %v", err) + } + s := New(opts) + synadia := s.LookupAccount("synadia") + nats := s.LookupAccount("nats") + + if synadia == nil || nats == nil { + t.Fatalf("Expected non nil accounts during lookup") + } + + // Make sure a normal log in maps the accounts correctly. + c, _, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n") + c.parse(connectOp) + if c.acc != synadia { + t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc) + } + // Also test client sublist. + if c.sl != synadia.sl { + t.Fatalf("Expected the client's sublist to match 'synadia' account") + } + + c, _, _ = newClientForServer(s) + connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n") + c.parse(connectOp) + if c.acc != nats { + t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc) + } + // Also test client sublist. + if c.sl != nats.sl { + t.Fatalf("Expected the client's sublist to match 'nats' account") + } + + // Now test nkeys as well. + kp, _ := nkeys.FromSeed(seed1) + pubKey, _ := kp.PublicKey() + + c, cr, l := newClientForServer(s) + // Check for Nonce + var info nonceInfo + err = json.Unmarshal([]byte(l[5:]), &info) + if err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err := kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig := base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != synadia { + t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc) + } + // Also test client sublist. + if c.sl != synadia.sl { + t.Fatalf("Expected the client's sublist to match 'synadia' account") + } + + // Now nats account nkey user. + kp, _ = nkeys.FromSeed(seed2) + pubKey, _ = kp.PublicKey() + + c, cr, l = newClientForServer(s) + // Check for Nonce + err = json.Unmarshal([]byte(l[5:]), &info) + if err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err = kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig = base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != nats { + t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc) + } + // Also test client sublist. + if c.sl != nats.sl { + t.Fatalf("Expected the client's sublist to match 'nats' account") + } +} + func BenchmarkNewRouteReply(b *testing.B) { opts := defaultServerOptions s := New(&opts) diff --git a/server/auth.go b/server/auth.go index a08c051a..c9435728 100644 --- a/server/auth.go +++ b/server/auth.go @@ -64,11 +64,13 @@ type serviceImport struct { ae bool } +// importMap tracks the imported streams and services. type importMap struct { streams map[string]*streamImport services map[string]*serviceImport // TODO(dlc) sync.Map may be better. } +// exportMap tracks the exported streams and services. type exportMap struct { streams map[string]map[string]*Account services map[string]map[string]*Account @@ -364,7 +366,7 @@ func (s *Server) checkAuthforWarnings() { } if warn { // Warning about using plaintext passwords. - s.Warnf("Plaintext passwords detected. Use Nkeys or Bcrypt passwords in config files.") + s.Warnf("Plaintext passwords detected, use nkeys or bcrypt.") } } @@ -480,6 +482,7 @@ func (s *Server) isClientAuthorized(c *client) bool { if err := pub.Verify(c.nonce, sig); err != nil { return false } + c.RegisterNkeyUser(nkey) return true } diff --git a/server/client.go b/server/client.go index 9ef65bf6..1a0f2480 100644 --- a/server/client.go +++ b/server/client.go @@ -335,13 +335,42 @@ func (c *client) registerWithAccount(acc *Account) error { } // RegisterUser allows auth to call back into a new client -// with the authenticated user. This is used to map any permissions -// into the client. +// with the authenticated user. This is used to map +// any permissions into the client and setup accounts. func (c *client) RegisterUser(user *User) { - // Process Permissions and map into client connection structures. c.mu.Lock() defer c.mu.Unlock() + // Register with proper account and sublist. + if user.Account != nil { + c.acc = user.Account + c.sl = c.acc.sl + } + + // Assign permissions. + if user.Permissions == nil { + // Reset perms to nil in case client previously had them. + c.perms = nil + return + } + + c.setPermissions(user.Permissions) +} + +// RegisterNkey allows auth to call back into a new nkey +// client with the authenticated user. This is used to map +// any permissions into the client and setup accounts. +func (c *client) RegisterNkeyUser(user *NkeyUser) { + c.mu.Lock() + defer c.mu.Unlock() + + // Register with proper account and sublist. + if user.Account != nil { + c.acc = user.Account + c.sl = c.acc.sl + } + + // Assign permissions. if user.Permissions == nil { // Reset perms to nil in case client previously had them. c.perms = nil @@ -1339,7 +1368,6 @@ func (c *client) processSub(argo []byte) (err error) { // Check to see if we need to create a shadow subscription due to imports // in other accounts. -// Assume lock is held func (c *client) checkAccountImports(sub *subscription) error { c.mu.Lock() acc := c.acc @@ -1967,28 +1995,38 @@ func (c *client) typeString() string { // longer authorized, e.g. due to a config reload. func (c *client) removeUnauthorizedSubs() { c.mu.Lock() - if c.perms == nil { + if c.perms == nil || c.sl == nil { c.mu.Unlock() return } srv := c.srv - subs := make(map[string]*subscription, len(c.subs)) - for sid, sub := range c.subs { - subs[sid] = sub + + var subsa [32]*subscription + subs := subsa[:0] + for _, sub := range c.subs { + subs = append(subs, sub) + } + + var removedSubs [32]*subscription + removed := removedSubs[:0] + + for _, sub := range subs { + if !c.canSubscribe(sub.subject) { + removed = append(removed, sub) + delete(c.subs, string(sub.sid)) + } } c.mu.Unlock() - for sid, sub := range subs { - if c.sl != nil && !c.canSubscribe(sub.subject) { - _ = c.sl.Remove(sub) - c.mu.Lock() - delete(c.subs, sid) - c.mu.Unlock() - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", - sub.subject, sub.sid)) - srv.Noticef("Removed sub %q for user %q - not authorized", - string(sub.subject), c.opts.Username) - } + // Remove unauthorized clients subscriptions. + c.sl.RemoveBatch(removed) + + // Report back to client and logs. + for _, sub := range removed { + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", + sub.subject, sub.sid)) + srv.Noticef("Removed sub %q for user %q - not authorized", + string(sub.subject), c.opts.Username) } } diff --git a/server/opts.go b/server/opts.go index b73f793d..27427ad3 100644 --- a/server/opts.go +++ b/server/opts.go @@ -915,7 +915,11 @@ func parseExportStreamOrService(v map[string]interface{}, pedantic bool) (*expor if curStream != nil { return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) } - curService = &export{sub: mv.(string)} + mvs, ok := mv.(string) + if !ok { + return nil, nil, fmt.Errorf("Expected service to be string name, got %T", mv) + } + curService = &export{sub: mvs} if accounts != nil { curService.accs = accounts } diff --git a/server/reload.go b/server/reload.go index 3e57c90b..161863bc 100644 --- a/server/reload.go +++ b/server/reload.go @@ -762,7 +762,8 @@ func (s *Server) reloadClusterPermissions() { subsNeedUNSUB []*subscription deleteRoutedSubs []*subscription ) - s.sl.localSubs(&localSubs) + // FIXME(dlc) - Change for accounts. + s.gsl.localSubs(&localSubs) // Go through all local subscriptions for _, sub := range localSubs { @@ -810,7 +811,8 @@ func (s *Server) reloadClusterPermissions() { route.mu.Unlock() } // Remove as a batch all the subs that we have removed from each route. - s.sl.RemoveBatch(deleteRoutedSubs) + // FIXME(dlc) - Change for accounts. + s.gsl.RemoveBatch(deleteRoutedSubs) } // validateClusterOpts ensures the new ClusterOpts does not change host or diff --git a/server/route.go b/server/route.go index bbfa4bf2..7e82d627 100644 --- a/server/route.go +++ b/server/route.go @@ -428,7 +428,7 @@ func (s *Server) updateRemoteRoutePerms(route *client, info *Info) { _localSubs [4096]*subscription localSubs = _localSubs[:0] ) - s.sl.localSubs(&localSubs) + s.gsl.localSubs(&localSubs) route.sendRouteSubProtos(localSubs, func(sub *subscription) bool { subj := sub.subject diff --git a/server/server.go b/server/server.go index 0c2f8570..a0e81aaf 100644 --- a/server/server.go +++ b/server/server.go @@ -305,7 +305,8 @@ func (s *Server) newAccountsAllowed() bool { return s.opts.AllowNewAccounts } -func (s *Server) LookupOrRegisterAccount(name string) (*Account, bool) { +// LookupOrRegisterAccount will return the given account if known or create a new entry. +func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) { s.mu.Lock() defer s.mu.Unlock() if acc, ok := s.accounts[name]; ok { @@ -835,7 +836,6 @@ func (s *Server) copyInfo() Info { func (s *Server) createClient(conn net.Conn) *client { // Snapshot server options. - // TODO(dlc) - This can get expensive. opts := s.getOpts() max_pay := int64(opts.MaxPayload)