API changes to match config for account mappings

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-09-22 16:53:59 -07:00
parent c4bcbf6275
commit ae21fa22b7
4 changed files with 158 additions and 145 deletions

View File

@@ -287,60 +287,60 @@ func TestAccountParseConfigDuplicateUsers(t *testing.T) {
func TestImportAuthorized(t *testing.T) {
_, foo, bar := simpleAccountServer(t)
checkBool(foo.checkImportAuthorized(bar, "foo"), false, t)
checkBool(foo.checkImportAuthorized(bar, "*"), false, t)
checkBool(foo.checkImportAuthorized(bar, ">"), false, t)
checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t)
checkBool(foo.checkImportAuthorized(bar, "foo.>"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.>"), false, t)
foo.addExport("foo", isPublicExport)
checkBool(foo.checkImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkImportAuthorized(bar, "bar"), false, t)
checkBool(foo.checkImportAuthorized(bar, "*"), false, t)
foo.addStreamExport("foo", isPublicExport)
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t)
foo.addExport("*", []*Account{bar})
checkBool(foo.checkImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkImportAuthorized(bar, "bar"), true, t)
checkBool(foo.checkImportAuthorized(bar, "baz"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.bar"), false, t)
checkBool(foo.checkImportAuthorized(bar, ">"), false, t)
checkBool(foo.checkImportAuthorized(bar, "*"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t)
checkBool(foo.checkImportAuthorized(bar, "*.*"), false, t)
checkBool(foo.checkImportAuthorized(bar, "*.>"), false, t)
foo.addStreamExport("*", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), false, t)
// Reset and test '>' public export
_, foo, bar = simpleAccountServer(t)
foo.addExport(">", nil)
foo.addStreamExport(">", nil)
// Everything should work.
checkBool(foo.checkImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkImportAuthorized(bar, "bar"), true, t)
checkBool(foo.checkImportAuthorized(bar, "baz"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.bar"), true, t)
checkBool(foo.checkImportAuthorized(bar, ">"), true, t)
checkBool(foo.checkImportAuthorized(bar, "*"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.*"), true, t)
checkBool(foo.checkImportAuthorized(bar, "*.*"), true, t)
checkBool(foo.checkImportAuthorized(bar, "*.>"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), true, t)
// Reset and test pwc and fwc
s, foo, bar := simpleAccountServer(t)
foo.addExport("foo.*.baz.>", []*Account{bar})
checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.1"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.*"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.*.baz.1.1"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.22.baz.22"), true, t)
checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz"), false, t)
checkBool(foo.checkImportAuthorized(bar, ""), false, t)
checkBool(foo.checkImportAuthorized(bar, "foo.bar.*.*"), false, t)
foo.addStreamExport("foo.*.baz.>", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.22.baz.22"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ""), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.*.*"), false, t)
// Make sure we match the account as well
fb, _ := s.RegisterAccount("foobar")
bz, _ := s.RegisterAccount("baz")
checkBool(foo.checkImportAuthorized(fb, "foo.bar.baz.1"), false, t)
checkBool(foo.checkImportAuthorized(bz, "foo.bar.baz.1"), false, t)
checkBool(foo.checkStreamImportAuthorized(fb, "foo.bar.baz.1"), false, t)
checkBool(foo.checkStreamImportAuthorized(bz, "foo.bar.baz.1"), false, t)
}
func TestSimpleMapping(t *testing.T) {
@@ -361,16 +361,16 @@ func TestSimpleMapping(t *testing.T) {
}
// Test first that trying to import with no matching export permission returns an error.
if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != ErrAccountImportAuthorization {
if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization {
t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err)
}
// Now map the subject space between foo and bar.
// Need to do export first.
if err := cfoo.acc.addExport("foo", nil); err != nil { // Public with no accounts defined.
if err := cfoo.acc.addStreamExport("foo", nil); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != nil {
if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
@@ -432,10 +432,10 @@ func TestNoPrefixWildcardMapping(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.addImport(fooAcc, "*", ""); err != nil {
if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
@@ -485,10 +485,10 @@ func TestPrefixWildcardMapping(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil {
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
@@ -538,10 +538,10 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil {
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
@@ -592,23 +592,23 @@ func TestCrossAccountRequestReply(t *testing.T) {
}
// Add in the service import for the requests. Make it public.
if err := cfoo.acc.addService(nil, "test.request"); err != nil {
if err := cfoo.acc.addServiceExport(nil, "test.request"); err != nil {
t.Fatalf("Error adding account service import to client foo: %v", err)
}
// Test addRoute to make sure it requires accounts, and literalsubjects for both from and to subjects.
if err := cbar.acc.addRoute(nil, "foo", "test.request"); err != ErrMissingAccount {
// Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects.
if err := cbar.acc.addServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount {
t.Fatalf("Expected ErrMissingAccount but received %v.", err)
}
if err := cbar.acc.addRoute(fooAcc, "*", "test.request"); err != ErrInvalidSubject {
if err := cbar.acc.addServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
}
if err := cbar.acc.addRoute(fooAcc, "foo", "test..request."); err != ErrInvalidSubject {
if err := cbar.acc.addServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
}
// Now add in the Route for request to be routed to the foo account.
if err := cbar.acc.addRoute(fooAcc, "foo", "test.request"); err != nil {
if err := cbar.acc.addServiceImport(fooAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account route to client bar: %v", err)
}
@@ -666,9 +666,9 @@ func TestCrossAccountRequestReply(t *testing.T) {
}
checkPayload(crBar, []byte("22\r\n"), t)
// Make sure we have no routes on fooAcc. An implicit one was created
/// for the response but should be removed when the response was processed.
if nr := fooAcc.numRoutes(); nr != 0 {
// Make sure we have no service imports on fooAcc. An implicit one was created
// for the response but should be removed when the response was processed.
if nr := fooAcc.numServiceRoutes(); nr != 0 {
t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr)
}
}
@@ -679,6 +679,6 @@ func BenchmarkNewRouteReply(b *testing.B) {
c, _, _ := newClientForServer(s)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.newRouteReply()
c.newServiceReply()
}
}

View File

@@ -39,65 +39,78 @@ type ClientAuthentication interface {
RegisterUser(*User)
}
// Import mapping struct
type importMap struct {
// Import stream mapping struct
type streamImport struct {
acc *Account
from string
prefix string
}
// Route mapping struct
type routeMap struct {
// Import service mapping struct
type serviceImport struct {
acc *Account
from string
to string
ae bool
}
// Accounts
type Account struct {
Name string
mu sync.RWMutex
sl *Sublist
imports map[string]*importMap
exports map[string]map[string]*Account
services map[string]map[string]*Account
// TODO(dlc) sync.Map may be better.
routes map[string]*routeMap
type importMap struct {
streams map[string]*streamImport
services map[string]*serviceImport // TODO(dlc) sync.Map may be better.
}
func (a *Account) addService(accounts []*Account, subject string) error {
type exportMap struct {
streams map[string]map[string]*Account
services map[string]map[string]*Account
}
// Accounts
type Account struct {
Name string
mu sync.RWMutex
sl *Sublist
imports importMap
exports exportMap
/*
imports map[string]*importMap
exports map[string]map[string]*Account
services map[string]map[string]*Account
routes map[string]*routeMap // TODO(dlc) sync.Map may be better.
*/
}
func (a *Account) addServiceExport(accounts []*Account, subject string) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.services == nil {
a.services = make(map[string]map[string]*Account)
if a.exports.services == nil {
a.exports.services = make(map[string]map[string]*Account)
}
ma := a.services[subject]
ma := a.exports.services[subject]
if accounts != nil && ma == nil {
ma = make(map[string]*Account)
}
for _, a := range accounts {
ma[a.Name] = a
}
a.services[subject] = ma
a.exports.services[subject] = ma
return nil
}
// numRoutes returns the number of routes on this account.
func (a *Account) numRoutes() int {
// numServiceRoutes returns the number of service routes on this account.
func (a *Account) numServiceRoutes() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.routes)
return len(a.imports.services)
}
// This will add a route to an account to send published messages / requests
// to the destination account. From is the local subject to map, To is the
// subject that will appear on the destination account. Destination will need
// to have an import rule to allow access via addService.
func (a *Account) addRoute(destination *Account, from, to string) error {
func (a *Account) addServiceImport(destination *Account, from, to string) error {
if destination == nil {
return ErrMissingAccount
}
@@ -105,54 +118,54 @@ func (a *Account) addRoute(destination *Account, from, to string) error {
return ErrInvalidSubject
}
// First check to see if the account has authorized us to route to the "to" subject.
if !destination.checkRouteAuthorized(a, to) {
return ErrAccountRouteAuthorization
if !destination.checkServiceImportAuthorized(a, to) {
return ErrServiceImportAuthorization
}
return a.addImplicitRoute(destination, from, to, false)
return a.addImplicitServiceImport(destination, from, to, false)
}
// removeRoute will remove the route by subject.
func (a *Account) removeRoute(subject string) {
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(subject string) {
a.mu.Lock()
delete(a.routes, subject)
delete(a.imports.services, subject)
a.mu.Unlock()
}
// Add a route to a connect from an implicit route created for a response to a request.
// Add a route to connect from an implicit route created for a response to a request.
// This does no checks and should be only called by the msg processing code. Use addRoute
// above if responding to user input or config, etc.
func (a *Account) addImplicitRoute(destination *Account, from, to string, autoexpire bool) error {
func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool) error {
a.mu.Lock()
if a.routes == nil {
a.routes = make(map[string]*routeMap)
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
}
a.routes[from] = &routeMap{destination, from, to, autoexpire}
a.imports.services[from] = &serviceImport{destination, from, to, autoexpire}
a.mu.Unlock()
return nil
}
// addImport will add in the import from a specific account.
func (a *Account) addImport(account *Account, from, prefix string) error {
// addStreamImport will add in the stream import from a specific account.
func (a *Account) addStreamImport(account *Account, from, prefix string) error {
if account == nil {
return ErrMissingAccount
}
// First check to see if the account has authorized export of the subject.
if !account.checkImportAuthorized(a, from) {
return ErrAccountImportAuthorization
if !account.checkStreamImportAuthorized(a, from) {
return ErrStreamImportAuthorization
}
a.mu.Lock()
defer a.mu.Unlock()
if a.imports == nil {
a.imports = make(map[string]*importMap)
if a.imports.streams == nil {
a.imports.streams = make(map[string]*streamImport)
}
if prefix != "" && prefix[len(prefix)-1] != btsep {
prefix = prefix + string(btsep)
}
// TODO(dlc) - collisions, etc.
a.imports[from] = &importMap{account, from, prefix}
a.imports.streams[from] = &streamImport{account, from, prefix}
return nil
}
@@ -161,14 +174,14 @@ var isPublicExport = []*Account(nil)
// addExport will add an export to the account. If accounts is nil
// it will signify a public export, meaning anyone can impoort.
func (a *Account) addExport(subject string, accounts []*Account) error {
func (a *Account) addStreamExport(subject string, accounts []*Account) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.exports == nil {
a.exports = make(map[string]map[string]*Account)
if a.exports.streams == nil {
a.exports.streams = make(map[string]map[string]*Account)
}
var ma map[string]*Account
for _, aa := range accounts {
@@ -177,45 +190,22 @@ func (a *Account) addExport(subject string, accounts []*Account) error {
}
ma[aa.Name] = aa
}
a.exports[subject] = ma
a.exports.streams[subject] = ma
return nil
}
// Check if another account is authorized to route requests to us.
func (a *Account) checkRouteAuthorized(account *Account, subject string) bool {
// Find the subject in the services list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.services == nil || !IsValidLiteralSubject(subject) {
return false
}
// These are always literal subjects so just lookup.
am, ok := a.services[subject]
if !ok {
return false
}
// Check to see if we are public or if we need to search for the account.
if am == nil {
return true
}
// Check that we allow this account.
_, ok = am[account.Name]
return ok
}
// Check if another account is authorized to import from us.
func (a *Account) checkImportAuthorized(account *Account, subject string) bool {
func (a *Account) checkStreamImportAuthorized(account *Account, subject string) bool {
// Find the subject in the exports list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports == nil || !IsValidSubject(subject) {
if a.exports.streams == nil || !IsValidSubject(subject) {
return false
}
// Check direct match of subject first
am, ok := a.exports[subject]
am, ok := a.exports.streams[subject]
if ok {
// if am is nil that denotes a public export
if am == nil {
@@ -231,7 +221,7 @@ func (a *Account) checkImportAuthorized(account *Account, subject string) bool {
// exact matches above.
tokens := strings.Split(subject, tsep)
for subj, am := range a.exports {
for subj, am := range a.exports.streams {
if isSubsetMatch(tokens, subj) {
if am == nil {
return true
@@ -243,6 +233,29 @@ func (a *Account) checkImportAuthorized(account *Account, subject string) bool {
return false
}
// Check if another account is authorized to route requests to this service.
func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool {
// Find the subject in the services list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports.services == nil || !IsValidLiteralSubject(subject) {
return false
}
// These are always literal subjects so just lookup.
am, ok := a.exports.services[subject]
if !ok {
return false
}
// Check to see if we are public or if we need to search for the account.
if am == nil {
return true
}
// Check that we allow this account.
_, ok = am[account.Name]
return ok
}
// Nkey is for multiple nkey based users
type NkeyUser struct {
Nkey string `json:"user"`

View File

@@ -242,7 +242,7 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
// interest in published messages.
type subscription struct {
client *client
im *importMap // This is for importing support.
im *streamImport // This is for importing support.
subject []byte
queue []byte
sid []byte
@@ -1352,10 +1352,10 @@ func (c *client) checkAccountImports(sub *subscription) error {
subject := string(sub.subject)
tokens := strings.Split(subject, tsep)
var rims [32]*importMap
var rims [32]*streamImport
var ims = rims[:0]
acc.mu.RLock()
for _, im := range acc.imports {
for _, im := range acc.imports.streams {
if isSubsetMatch(tokens, im.prefix+im.from) {
ims = append(ims, im)
}
@@ -1644,9 +1644,9 @@ const (
base = 62
)
// newRouteReply is used when rewriting replies that cross account boundaries.
// newServiceReply is used when rewriting replies that cross account boundaries.
// These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients.
func (c *client) newRouteReply() []byte {
func (c *client) newServiceReply() []byte {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.in.prand == nil {
@@ -1722,21 +1722,21 @@ func (c *client) processInboundMsg(msg []byte) {
}
// Check to see if we need to route this message to
// another account via a route entry.
if c.typ == CLIENT && c.acc != nil && c.acc.routes != nil {
// another account.
if c.typ == CLIENT && c.acc != nil && c.acc.imports.services != nil {
c.acc.mu.RLock()
rm := c.acc.routes[string(c.pa.subject)]
rm := c.acc.imports.services[string(c.pa.subject)]
c.acc.mu.RUnlock()
// Get the results from the other account for the mapped "to" subject.
if rm != nil && rm.acc != nil && rm.acc.sl != nil {
var nrr []byte
if rm.ae {
c.acc.removeRoute(rm.from)
c.acc.removeServiceImport(rm.from)
}
if c.pa.reply != nil {
// We want to remap this to provide anonymity.
nrr = c.newRouteReply()
rm.acc.addImplicitRoute(c.acc, string(nrr), string(c.pa.reply), true)
nrr = c.newServiceReply()
rm.acc.addImplicitServiceImport(c.acc, string(nrr), string(c.pa.reply), true)
}
// FIXME(dlc) - Do L1 cache trick from above.
rr := rm.acc.sl.Match(rm.to)

View File

@@ -59,9 +59,9 @@ var (
// ErrMissingAccount is returned when an account does not exist.
ErrMissingAccount = errors.New("Account Missing")
// ErrAccountImportAuthorization is returned when an import is not authorized.
ErrAccountImportAuthorization = errors.New("Account Not Authorized: Subject Not Exported")
// ErrStreamImportAuthorization is returned when a stream import is not authorized.
ErrStreamImportAuthorization = errors.New("Stream Import Not Authorized")
// ErrAccountRouteAuthorization is returned when a route is not authorized.
ErrAccountRouteAuthorization = errors.New("Account Not Authorized On Service")
// ErrServiceImportAuthorization is returned when a service import is not authorized.
ErrServiceImportAuthorization = errors.New("Service Import Not Authorized")
)