From ae21fa22b7c3fbdaf947474e4bb312d284383ec4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 22 Sep 2018 16:53:59 -0700 Subject: [PATCH] API changes to match config for account mappings Signed-off-by: Derek Collison --- server/accounts_test.go | 116 ++++++++++++++--------------- server/auth.go | 157 ++++++++++++++++++++++------------------ server/client.go | 22 +++--- server/errors.go | 8 +- 4 files changed, 158 insertions(+), 145 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 8b44549a..c70661e2 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -287,60 +287,60 @@ func TestAccountParseConfigDuplicateUsers(t *testing.T) { func TestImportAuthorized(t *testing.T) { _, foo, bar := simpleAccountServer(t) - checkBool(foo.checkImportAuthorized(bar, "foo"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*"), false, t) - checkBool(foo.checkImportAuthorized(bar, ">"), false, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t) - checkBool(foo.checkImportAuthorized(bar, "foo.>"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.>"), false, t) - foo.addExport("foo", isPublicExport) - checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) - checkBool(foo.checkImportAuthorized(bar, "bar"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*"), false, t) + foo.addStreamExport("foo", isPublicExport) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t) - foo.addExport("*", []*Account{bar}) - checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) - checkBool(foo.checkImportAuthorized(bar, "bar"), true, t) - checkBool(foo.checkImportAuthorized(bar, "baz"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar"), false, t) - checkBool(foo.checkImportAuthorized(bar, ">"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*.*"), false, t) - checkBool(foo.checkImportAuthorized(bar, "*.>"), false, t) + foo.addStreamExport("*", []*Account{bar}) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), false, t) // Reset and test '>' public export _, foo, bar = simpleAccountServer(t) - foo.addExport(">", nil) + foo.addStreamExport(">", nil) // Everything should work. - checkBool(foo.checkImportAuthorized(bar, "foo"), true, t) - checkBool(foo.checkImportAuthorized(bar, "bar"), true, t) - checkBool(foo.checkImportAuthorized(bar, "baz"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar"), true, t) - checkBool(foo.checkImportAuthorized(bar, ">"), true, t) - checkBool(foo.checkImportAuthorized(bar, "*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "*.*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "*.>"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), true, t) // Reset and test pwc and fwc s, foo, bar := simpleAccountServer(t) - foo.addExport("foo.*.baz.>", []*Account{bar}) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.1"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.*"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.*.baz.1.1"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.22.baz.22"), true, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz"), false, t) - checkBool(foo.checkImportAuthorized(bar, ""), false, t) - checkBool(foo.checkImportAuthorized(bar, "foo.bar.*.*"), false, t) + foo.addStreamExport("foo.*.baz.>", []*Account{bar}) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.22.baz.22"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ""), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.*.*"), false, t) // Make sure we match the account as well fb, _ := s.RegisterAccount("foobar") bz, _ := s.RegisterAccount("baz") - checkBool(foo.checkImportAuthorized(fb, "foo.bar.baz.1"), false, t) - checkBool(foo.checkImportAuthorized(bz, "foo.bar.baz.1"), false, t) + checkBool(foo.checkStreamImportAuthorized(fb, "foo.bar.baz.1"), false, t) + checkBool(foo.checkStreamImportAuthorized(bz, "foo.bar.baz.1"), false, t) } func TestSimpleMapping(t *testing.T) { @@ -361,16 +361,16 @@ func TestSimpleMapping(t *testing.T) { } // Test first that trying to import with no matching export permission returns an error. - if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != ErrAccountImportAuthorization { + if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization { t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err) } // Now map the subject space between foo and bar. // Need to do export first. - if err := cfoo.acc.addExport("foo", nil); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport("foo", nil); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -432,10 +432,10 @@ func TestNoPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "*", ""); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -485,10 +485,10 @@ func TestPrefixWildcardMapping(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -538,10 +538,10 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. t.Fatalf("Error adding account export to client foo: %v", err) } - if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil { + if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { t.Fatalf("Error adding account import to client bar: %v", err) } @@ -592,23 +592,23 @@ func TestCrossAccountRequestReply(t *testing.T) { } // Add in the service import for the requests. Make it public. - if err := cfoo.acc.addService(nil, "test.request"); err != nil { + if err := cfoo.acc.addServiceExport(nil, "test.request"); err != nil { t.Fatalf("Error adding account service import to client foo: %v", err) } - // Test addRoute to make sure it requires accounts, and literalsubjects for both from and to subjects. - if err := cbar.acc.addRoute(nil, "foo", "test.request"); err != ErrMissingAccount { + // Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects. + if err := cbar.acc.addServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount { t.Fatalf("Expected ErrMissingAccount but received %v.", err) } - if err := cbar.acc.addRoute(fooAcc, "*", "test.request"); err != ErrInvalidSubject { + if err := cbar.acc.addServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject { t.Fatalf("Expected ErrInvalidSubject but received %v.", err) } - if err := cbar.acc.addRoute(fooAcc, "foo", "test..request."); err != ErrInvalidSubject { + if err := cbar.acc.addServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject { t.Fatalf("Expected ErrInvalidSubject but received %v.", err) } // Now add in the Route for request to be routed to the foo account. - if err := cbar.acc.addRoute(fooAcc, "foo", "test.request"); err != nil { + if err := cbar.acc.addServiceImport(fooAcc, "foo", "test.request"); err != nil { t.Fatalf("Error adding account route to client bar: %v", err) } @@ -666,9 +666,9 @@ func TestCrossAccountRequestReply(t *testing.T) { } checkPayload(crBar, []byte("22\r\n"), t) - // Make sure we have no routes on fooAcc. An implicit one was created - /// for the response but should be removed when the response was processed. - if nr := fooAcc.numRoutes(); nr != 0 { + // Make sure we have no service imports on fooAcc. An implicit one was created + // for the response but should be removed when the response was processed. + if nr := fooAcc.numServiceRoutes(); nr != 0 { t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr) } } @@ -679,6 +679,6 @@ func BenchmarkNewRouteReply(b *testing.B) { c, _, _ := newClientForServer(s) b.ResetTimer() for i := 0; i < b.N; i++ { - c.newRouteReply() + c.newServiceReply() } } diff --git a/server/auth.go b/server/auth.go index 37a80c12..38db688d 100644 --- a/server/auth.go +++ b/server/auth.go @@ -39,65 +39,78 @@ type ClientAuthentication interface { RegisterUser(*User) } -// Import mapping struct -type importMap struct { +// Import stream mapping struct +type streamImport struct { acc *Account from string prefix string } -// Route mapping struct -type routeMap struct { +// Import service mapping struct +type serviceImport struct { acc *Account from string to string ae bool } -// Accounts -type Account struct { - Name string - mu sync.RWMutex - sl *Sublist - imports map[string]*importMap - exports map[string]map[string]*Account - services map[string]map[string]*Account - // TODO(dlc) sync.Map may be better. - routes map[string]*routeMap +type importMap struct { + streams map[string]*streamImport + services map[string]*serviceImport // TODO(dlc) sync.Map may be better. } -func (a *Account) addService(accounts []*Account, subject string) error { +type exportMap struct { + streams map[string]map[string]*Account + services map[string]map[string]*Account +} + +// Accounts +type Account struct { + Name string + mu sync.RWMutex + sl *Sublist + imports importMap + exports exportMap + /* + imports map[string]*importMap + exports map[string]map[string]*Account + services map[string]map[string]*Account + routes map[string]*routeMap // TODO(dlc) sync.Map may be better. + */ +} + +func (a *Account) addServiceExport(accounts []*Account, subject string) error { a.mu.Lock() defer a.mu.Unlock() if a == nil { return ErrMissingAccount } - if a.services == nil { - a.services = make(map[string]map[string]*Account) + if a.exports.services == nil { + a.exports.services = make(map[string]map[string]*Account) } - ma := a.services[subject] + ma := a.exports.services[subject] if accounts != nil && ma == nil { ma = make(map[string]*Account) } for _, a := range accounts { ma[a.Name] = a } - a.services[subject] = ma + a.exports.services[subject] = ma return nil } -// numRoutes returns the number of routes on this account. -func (a *Account) numRoutes() int { +// numServiceRoutes returns the number of service routes on this account. +func (a *Account) numServiceRoutes() int { a.mu.RLock() defer a.mu.RUnlock() - return len(a.routes) + return len(a.imports.services) } // This will add a route to an account to send published messages / requests // to the destination account. From is the local subject to map, To is the // subject that will appear on the destination account. Destination will need // to have an import rule to allow access via addService. -func (a *Account) addRoute(destination *Account, from, to string) error { +func (a *Account) addServiceImport(destination *Account, from, to string) error { if destination == nil { return ErrMissingAccount } @@ -105,54 +118,54 @@ func (a *Account) addRoute(destination *Account, from, to string) error { return ErrInvalidSubject } // First check to see if the account has authorized us to route to the "to" subject. - if !destination.checkRouteAuthorized(a, to) { - return ErrAccountRouteAuthorization + if !destination.checkServiceImportAuthorized(a, to) { + return ErrServiceImportAuthorization } - return a.addImplicitRoute(destination, from, to, false) + return a.addImplicitServiceImport(destination, from, to, false) } -// removeRoute will remove the route by subject. -func (a *Account) removeRoute(subject string) { +// removeServiceImport will remove the route by subject. +func (a *Account) removeServiceImport(subject string) { a.mu.Lock() - delete(a.routes, subject) + delete(a.imports.services, subject) a.mu.Unlock() } -// Add a route to a connect from an implicit route created for a response to a request. +// Add a route to connect from an implicit route created for a response to a request. // This does no checks and should be only called by the msg processing code. Use addRoute // above if responding to user input or config, etc. -func (a *Account) addImplicitRoute(destination *Account, from, to string, autoexpire bool) error { +func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool) error { a.mu.Lock() - if a.routes == nil { - a.routes = make(map[string]*routeMap) + if a.imports.services == nil { + a.imports.services = make(map[string]*serviceImport) } - a.routes[from] = &routeMap{destination, from, to, autoexpire} + a.imports.services[from] = &serviceImport{destination, from, to, autoexpire} a.mu.Unlock() return nil } -// addImport will add in the import from a specific account. -func (a *Account) addImport(account *Account, from, prefix string) error { +// addStreamImport will add in the stream import from a specific account. +func (a *Account) addStreamImport(account *Account, from, prefix string) error { if account == nil { return ErrMissingAccount } // First check to see if the account has authorized export of the subject. - if !account.checkImportAuthorized(a, from) { - return ErrAccountImportAuthorization + if !account.checkStreamImportAuthorized(a, from) { + return ErrStreamImportAuthorization } a.mu.Lock() defer a.mu.Unlock() - if a.imports == nil { - a.imports = make(map[string]*importMap) + if a.imports.streams == nil { + a.imports.streams = make(map[string]*streamImport) } if prefix != "" && prefix[len(prefix)-1] != btsep { prefix = prefix + string(btsep) } // TODO(dlc) - collisions, etc. - a.imports[from] = &importMap{account, from, prefix} + a.imports.streams[from] = &streamImport{account, from, prefix} return nil } @@ -161,14 +174,14 @@ var isPublicExport = []*Account(nil) // addExport will add an export to the account. If accounts is nil // it will signify a public export, meaning anyone can impoort. -func (a *Account) addExport(subject string, accounts []*Account) error { +func (a *Account) addStreamExport(subject string, accounts []*Account) error { a.mu.Lock() defer a.mu.Unlock() if a == nil { return ErrMissingAccount } - if a.exports == nil { - a.exports = make(map[string]map[string]*Account) + if a.exports.streams == nil { + a.exports.streams = make(map[string]map[string]*Account) } var ma map[string]*Account for _, aa := range accounts { @@ -177,45 +190,22 @@ func (a *Account) addExport(subject string, accounts []*Account) error { } ma[aa.Name] = aa } - a.exports[subject] = ma + a.exports.streams[subject] = ma return nil } -// Check if another account is authorized to route requests to us. -func (a *Account) checkRouteAuthorized(account *Account, subject string) bool { - // Find the subject in the services list. - a.mu.RLock() - defer a.mu.RUnlock() - - if a.services == nil || !IsValidLiteralSubject(subject) { - return false - } - // These are always literal subjects so just lookup. - am, ok := a.services[subject] - if !ok { - return false - } - // Check to see if we are public or if we need to search for the account. - if am == nil { - return true - } - // Check that we allow this account. - _, ok = am[account.Name] - return ok -} - // Check if another account is authorized to import from us. -func (a *Account) checkImportAuthorized(account *Account, subject string) bool { +func (a *Account) checkStreamImportAuthorized(account *Account, subject string) bool { // Find the subject in the exports list. a.mu.RLock() defer a.mu.RUnlock() - if a.exports == nil || !IsValidSubject(subject) { + if a.exports.streams == nil || !IsValidSubject(subject) { return false } // Check direct match of subject first - am, ok := a.exports[subject] + am, ok := a.exports.streams[subject] if ok { // if am is nil that denotes a public export if am == nil { @@ -231,7 +221,7 @@ func (a *Account) checkImportAuthorized(account *Account, subject string) bool { // exact matches above. tokens := strings.Split(subject, tsep) - for subj, am := range a.exports { + for subj, am := range a.exports.streams { if isSubsetMatch(tokens, subj) { if am == nil { return true @@ -243,6 +233,29 @@ func (a *Account) checkImportAuthorized(account *Account, subject string) bool { return false } +// Check if another account is authorized to route requests to this service. +func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool { + // Find the subject in the services list. + a.mu.RLock() + defer a.mu.RUnlock() + + if a.exports.services == nil || !IsValidLiteralSubject(subject) { + return false + } + // These are always literal subjects so just lookup. + am, ok := a.exports.services[subject] + if !ok { + return false + } + // Check to see if we are public or if we need to search for the account. + if am == nil { + return true + } + // Check that we allow this account. + _, ok = am[account.Name] + return ok +} + // Nkey is for multiple nkey based users type NkeyUser struct { Nkey string `json:"user"` diff --git a/server/client.go b/server/client.go index 1589ecad..9ef65bf6 100644 --- a/server/client.go +++ b/server/client.go @@ -242,7 +242,7 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState { // interest in published messages. type subscription struct { client *client - im *importMap // This is for importing support. + im *streamImport // This is for importing support. subject []byte queue []byte sid []byte @@ -1352,10 +1352,10 @@ func (c *client) checkAccountImports(sub *subscription) error { subject := string(sub.subject) tokens := strings.Split(subject, tsep) - var rims [32]*importMap + var rims [32]*streamImport var ims = rims[:0] acc.mu.RLock() - for _, im := range acc.imports { + for _, im := range acc.imports.streams { if isSubsetMatch(tokens, im.prefix+im.from) { ims = append(ims, im) } @@ -1644,9 +1644,9 @@ const ( base = 62 ) -// newRouteReply is used when rewriting replies that cross account boundaries. +// newServiceReply is used when rewriting replies that cross account boundaries. // These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients. -func (c *client) newRouteReply() []byte { +func (c *client) newServiceReply() []byte { // Check to see if we have our own rand yet. Global rand // has contention with lots of clients, etc. if c.in.prand == nil { @@ -1722,21 +1722,21 @@ func (c *client) processInboundMsg(msg []byte) { } // Check to see if we need to route this message to - // another account via a route entry. - if c.typ == CLIENT && c.acc != nil && c.acc.routes != nil { + // another account. + if c.typ == CLIENT && c.acc != nil && c.acc.imports.services != nil { c.acc.mu.RLock() - rm := c.acc.routes[string(c.pa.subject)] + rm := c.acc.imports.services[string(c.pa.subject)] c.acc.mu.RUnlock() // Get the results from the other account for the mapped "to" subject. if rm != nil && rm.acc != nil && rm.acc.sl != nil { var nrr []byte if rm.ae { - c.acc.removeRoute(rm.from) + c.acc.removeServiceImport(rm.from) } if c.pa.reply != nil { // We want to remap this to provide anonymity. - nrr = c.newRouteReply() - rm.acc.addImplicitRoute(c.acc, string(nrr), string(c.pa.reply), true) + nrr = c.newServiceReply() + rm.acc.addImplicitServiceImport(c.acc, string(nrr), string(c.pa.reply), true) } // FIXME(dlc) - Do L1 cache trick from above. rr := rm.acc.sl.Match(rm.to) diff --git a/server/errors.go b/server/errors.go index 1656048d..003b9880 100644 --- a/server/errors.go +++ b/server/errors.go @@ -59,9 +59,9 @@ var ( // ErrMissingAccount is returned when an account does not exist. ErrMissingAccount = errors.New("Account Missing") - // ErrAccountImportAuthorization is returned when an import is not authorized. - ErrAccountImportAuthorization = errors.New("Account Not Authorized: Subject Not Exported") + // ErrStreamImportAuthorization is returned when a stream import is not authorized. + ErrStreamImportAuthorization = errors.New("Stream Import Not Authorized") - // ErrAccountRouteAuthorization is returned when a route is not authorized. - ErrAccountRouteAuthorization = errors.New("Account Not Authorized On Service") + // ErrServiceImportAuthorization is returned when a service import is not authorized. + ErrServiceImportAuthorization = errors.New("Service Import Not Authorized") )