From 9f8330bcc98a4a15198d0a7abba835dd62244ef6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 23 Sep 2018 12:11:42 -0700 Subject: [PATCH] Added import and export parsing for configs Signed-off-by: Derek Collison --- conf/lex.go | 2 + server/accounts_test.go | 148 +++++++++++++- server/auth.go | 31 ++- server/configs/accounts.conf | 47 +++++ server/opts.go | 384 ++++++++++++++++++++++++++++++++--- 5 files changed, 571 insertions(+), 41 deletions(-) create mode 100644 server/configs/accounts.conf diff --git a/conf/lex.go b/conf/lex.go index f9603a99..3eedaa89 100644 --- a/conf/lex.go +++ b/conf/lex.go @@ -594,6 +594,8 @@ func lexMapKeyStart(lx *lexer) stateFn { switch { case isKeySeparator(r): return lx.errorf("Unexpected key separator '%v'.", r) + case r == arrayEnd: + return lx.errorf("Unexpected array end '%v' processing map.", r) case unicode.IsSpace(r): lx.next() return lexSkip(lx, lexMapKeyStart) diff --git a/server/accounts_test.go b/server/accounts_test.go index c70661e2..fca206c8 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -284,6 +284,152 @@ func TestAccountParseConfigDuplicateUsers(t *testing.T) { } } +func TestAccountParseConfigImportsExports(t *testing.T) { + opts, err := ProcessConfigFile("./configs/accounts.conf") + if err != nil { + t.Fatal(err) + } + if la := len(opts.Accounts); la != 3 { + t.Fatalf("Expected to see 3 accounts in opts, got %d", la) + } + if lu := len(opts.Nkeys); lu != 4 { + t.Fatalf("Expected 4 total Nkey users, got %d", lu) + } + if lu := len(opts.Users); lu != 0 { + t.Fatalf("Expected no Users, got %d", lu) + } + var natsAcc, synAcc *Account + for _, acc := range opts.Accounts { + if acc.Name == "nats.io" { + natsAcc = acc + } + if acc.Name == "synadia" { + synAcc = acc + } + } + if natsAcc == nil { + t.Fatalf("Error retrieving account for 'nats.io'") + } + if natsAcc.Nkey != "AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE" { + t.Fatalf("Expected nats account to have an nkey, got %q\n", natsAcc.Nkey) + } + // Check user assigned to the correct account. + for _, nk := range opts.Nkeys { + if nk.Nkey == "UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW" { + if nk.Account != natsAcc { + t.Fatalf("Expected user to be associated with natsAcc, got %q\n", nk.Account.Name) + } + break + } + } + + // Now check for the imports and exports of streams and services. + if lis := len(natsAcc.imports.streams); lis != 2 { + t.Fatalf("Expected 2 imported streams, got %d\n", lis) + } + if lis := len(natsAcc.imports.services); lis != 1 { + t.Fatalf("Expected 1 imported service, got %d\n", lis) + } + if les := len(natsAcc.exports.services); les != 1 { + t.Fatalf("Expected 1 exported service, got %d\n", les) + } + if les := len(natsAcc.exports.streams); les != 0 { + t.Fatalf("Expected no exported streams, got %d\n", les) + } + + if synAcc == nil { + t.Fatalf("Error retrieving account for 'synadia'") + } + + if lis := len(synAcc.imports.streams); lis != 0 { + t.Fatalf("Expected no imported streams, got %d\n", lis) + } + if lis := len(synAcc.imports.services); lis != 1 { + t.Fatalf("Expected 1 imported service, got %d\n", lis) + } + if les := len(synAcc.exports.services); les != 2 { + t.Fatalf("Expected 2 exported service, got %d\n", les) + } + if les := len(synAcc.exports.streams); les != 2 { + t.Fatalf("Expected 2 exported streams, got %d\n", les) + } +} + +func TestImportExportConfigFailures(t *testing.T) { + // Import from unknow account + cf := createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{stream: {account: "synadia", subject:"foo"}}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import from unknown account") + } + // Import a service with no account. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{service: subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import of a service with no account") + } + // Import a service with a wildcard subject. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{service: {account: "nats.io", subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import of a service with wildcard subject") + } + // Export with unknown keyword. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + exports = [{service: "foo.*", wat:true}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with export with unknown keyword") + } + // Import with unknown keyword. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{stream: {account: nats.io, subject: "foo.*"}, wat:true}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import with unknown keyword") + } + // Export with an account. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + exports = [{service: {account: nats.io, subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with export with account") + } +} + func TestImportAuthorized(t *testing.T) { _, foo, bar := simpleAccountServer(t) @@ -592,7 +738,7 @@ func TestCrossAccountRequestReply(t *testing.T) { } // Add in the service import for the requests. Make it public. - if err := cfoo.acc.addServiceExport(nil, "test.request"); err != nil { + if err := cfoo.acc.addServiceExport("test.request", nil); err != nil { t.Fatalf("Error adding account service import to client foo: %v", err) } diff --git a/server/auth.go b/server/auth.go index 38db688d..a08c051a 100644 --- a/server/auth.go +++ b/server/auth.go @@ -39,6 +39,16 @@ type ClientAuthentication interface { RegisterUser(*User) } +// Accounts +type Account struct { + Name string + Nkey string + mu sync.RWMutex + sl *Sublist + imports importMap + exports exportMap +} + // Import stream mapping struct type streamImport struct { acc *Account @@ -64,22 +74,7 @@ type exportMap struct { services map[string]map[string]*Account } -// Accounts -type Account struct { - Name string - mu sync.RWMutex - sl *Sublist - imports importMap - exports exportMap - /* - imports map[string]*importMap - exports map[string]map[string]*Account - services map[string]map[string]*Account - routes map[string]*routeMap // TODO(dlc) sync.Map may be better. - */ -} - -func (a *Account) addServiceExport(accounts []*Account, subject string) error { +func (a *Account) addServiceExport(subject string, accounts []*Account) error { a.mu.Lock() defer a.mu.Unlock() if a == nil { @@ -114,6 +109,10 @@ func (a *Account) addServiceImport(destination *Account, from, to string) error if destination == nil { return ErrMissingAccount } + // Empty means use from. + if to == "" { + to = from + } if !IsValidLiteralSubject(from) || !IsValidLiteralSubject(to) { return ErrInvalidSubject } diff --git a/server/configs/accounts.conf b/server/configs/accounts.conf new file mode 100644 index 00000000..2554cb2b --- /dev/null +++ b/server/configs/accounts.conf @@ -0,0 +1,47 @@ + +accounts: { + synadia: { + nkey: ADMHMDX2LEUJRZQHGVSVRWZEJ2CPNHYO6TB4ZCZ37LXAX5SYNEW252GF + + users = [ + # Bob + {nkey : UC6NLCN7AS34YOJVCYD4PJ3QB7QGLYG5B5IMBT25VW5K4TNUJODM7BOX} + # Alice + {nkey : UBAAQWTW6CG2G6ANGNKB5U2B7HRWHSGMZEZX3AQSAJOQDAUGJD46LD2E} + ] + + exports = [ + {stream: "public.>"} # No accounts means public. + {stream: "synadia.private.>", accounts: [cncf, nats.io]} + {service: "pub.request"} # No accounts means public. + {service: "pub.special.request", accounts: [nats.io]} + ] + + imports = [ + {service: {account: "nats.io", subject: "nats.time"}} + ] + } + + nats.io: { + nkey: AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE + + users = [ + # Ivan + {nkey : UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW} + # Derek + {nkey : UDEREK22W43P2NFQCSKGM6BWD23OVWEDR7JE7LSNCD232MZIC4X2MEKZ} + ] + + imports = [ + {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} + {stream: {account: "synadia", subject:"synadia.private.*"}} + {service: {account: "synadia", subject: "pub.special.request"}, to: "synadia.request"} + ] + + exports = [ + {service: "nats.time"} + ] + } + + cncf: { nkey: ABDAYEV6KZVLW3GSJ3V7IWC542676TFYILXF2C7Z56LCPSMVHJE5BVYO} +} diff --git a/server/opts.go b/server/opts.go index 0b758c93..b73f793d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -309,11 +309,7 @@ func (o *Options) ProcessConfigFile(configFile string) error { case "logtime": o.Logtime = v.(bool) case "accounts": - if pedantic { - err = parseAccounts(tk, o) - } else { - err = parseAccounts(v, o) - } + err = parseAccounts(v, o) if err != nil { return err } @@ -606,14 +602,40 @@ func setClusterPermissions(opts *ClusterOpts, perms *Permissions) { } } +// Temp structures to hold account import and export defintions since they need +// to be processed after being parsed. +type export struct { + acc *Account + sub string + accs []string +} + +type importStream struct { + acc *Account + an string + sub string + pre string +} + +type importService struct { + acc *Account + an string + sub string + to string +} + // parseAccounts will parse the different accounts syntax. func parseAccounts(v interface{}, opts *Options) error { - var pedantic = opts.CheckConfig - var tk token + var ( + pedantic = opts.CheckConfig + importStreams []*importStream + importServices []*importService + exportStreams []*export + exportServices []*export + ) _, v = unwrapValue(v) - uorn := make(map[string]struct{}) - switch v.(type) { + // Simple array of account names. case []interface{}, []string: m := make(map[string]struct{}, len(v.([]interface{}))) for _, name := range v.([]interface{}) { @@ -621,27 +643,47 @@ func parseAccounts(v interface{}, opts *Options) error { if _, ok := m[ns]; ok { return fmt.Errorf("Duplicate Account Entry: %s", ns) } - opts.Accounts = append(opts.Accounts, &Account{Name: name.(string)}) + opts.Accounts = append(opts.Accounts, &Account{Name: ns}) m[ns] = struct{}{} } + // More common map entry case map[string]interface{}: - m := make(map[string]struct{}, len(v.(map[string]interface{}))) - for name, mv := range v.(map[string]interface{}) { - _, mv = unwrapValue(mv) - if _, ok := m[name]; ok { - return fmt.Errorf("Duplicate Account Entry: %s", name) - } - uv, ok := mv.(map[string]interface{}) + // Track users across accounts, must be unique across + // accounts and nkeys vs users. + uorn := make(map[string]struct{}) + for aname, mv := range v.(map[string]interface{}) { + _, amv := unwrapValue(mv) + // These should be maps. + mv, ok := amv.(map[string]interface{}) if !ok { - return fmt.Errorf("Expected map entry for users") + return fmt.Errorf("Expected map entries for accounts") } - acc := &Account{Name: name} + acc := &Account{Name: aname} opts.Accounts = append(opts.Accounts, acc) - m[name] = struct{}{} - for k, v := range uv { - tk, mv = unwrapValue(v) + for k, v := range mv { + tk, mv := unwrapValue(v) switch strings.ToLower(k) { + case "nkey": + nk, ok := mv.(string) + if !ok || !nkeys.IsValidPublicAccountKey(nk) { + return fmt.Errorf("Not a valid public nkey for an account: %q", v) + } + acc.Nkey = nk + case "imports": + streams, services, err := parseAccountImports(mv, acc, pedantic) + if err != nil { + return err + } + importStreams = append(importStreams, streams...) + importServices = append(importServices, services...) + case "exports": + streams, services, err := parseAccountExports(mv, acc, pedantic) + if err != nil { + return err + } + exportStreams = append(exportStreams, streams...) + exportServices = append(exportServices, services...) case "users": var ( users []*User @@ -673,15 +715,309 @@ func parseAccounts(v interface{}, opts *Options) error { u.Account = acc } opts.Nkeys = append(opts.Nkeys, nkeys...) + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return &unknownConfigFieldErr{ + field: k, + token: tk, + configFile: tk.SourceFile(), + } + } } } } - default: - return fmt.Errorf("Expected an array or map of account entries, got %T", v) } + + // Parse Imports and Exports here after all accounts defined. + // Do exports first since they need to be defined for imports to succeed + // since we do permissions checks. + + // Create a lookup map for accounts lookups. + am := make(map[string]*Account, len(opts.Accounts)) + for _, a := range opts.Accounts { + am[a.Name] = a + } + // Do stream exports + for _, stream := range exportStreams { + // Make array of accounts if applicable. + var accounts []*Account + for _, an := range stream.accs { + ta := am[an] + if ta == nil { + return fmt.Errorf("%q account not defined for stream export", an) + } + accounts = append(accounts, ta) + } + if err := stream.acc.addStreamExport(stream.sub, accounts); err != nil { + return fmt.Errorf("Error adding stream export %q: %v", stream.sub, err) + } + } + for _, service := range exportServices { + // Make array of accounts if applicable. + var accounts []*Account + for _, an := range service.accs { + ta := am[an] + if ta == nil { + return fmt.Errorf("%q account not defined for service export", an) + } + accounts = append(accounts, ta) + } + if err := service.acc.addServiceExport(service.sub, accounts); err != nil { + return fmt.Errorf("Error adding service export %q: %v", service.sub, err) + } + } + for _, stream := range importStreams { + ta := am[stream.an] + if ta == nil { + return fmt.Errorf("%q account not defined for stream import", stream.an) + } + if err := stream.acc.addStreamImport(ta, stream.sub, stream.pre); err != nil { + return fmt.Errorf("Error adding stream import %q: %v", stream.sub, err) + } + } + for _, service := range importServices { + ta := am[service.an] + if ta == nil { + return fmt.Errorf("%q account not defined for service import", service.an) + } + if service.to == "" { + service.to = service.sub + } + if err := service.acc.addServiceImport(ta, service.to, service.sub); err != nil { + return fmt.Errorf("Error adding service import %q: %v", service.sub, err) + } + } + return nil } +// Parse the account imports +func parseAccountExports(v interface{}, acc *Account, pedantic bool) ([]*export, []*export, error) { + // This should be an array of objects/maps. + _, v = unwrapValue(v) + ims, ok := v.([]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Exports should be an array, got %T", v) + } + + var services []*export + var streams []*export + + for _, v := range ims { + _, mv := unwrapValue(v) + io, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Export Items should be a map with type entry, got %T", mv) + } + // Should have stream or service + stream, service, err := parseExportStreamOrService(io, pedantic) + if err != nil { + return nil, nil, err + } + if service != nil { + service.acc = acc + services = append(services, service) + } + if stream != nil { + stream.acc = acc + streams = append(streams, stream) + } + } + return streams, services, nil +} + +// Parse the account imports +func parseAccountImports(v interface{}, acc *Account, pedantic bool) ([]*importStream, []*importService, error) { + // This should be an array of objects/maps. + _, v = unwrapValue(v) + ims, ok := v.([]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Imports should be an array, got %T", v) + } + + var services []*importService + var streams []*importStream + + for _, v := range ims { + _, mv := unwrapValue(v) + io, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Import Items should be a map with type entry, got %T", mv) + } + // Should have stream or service + stream, service, err := parseImportStreamOrService(io, pedantic) + if err != nil { + return nil, nil, err + } + if service != nil { + service.acc = acc + services = append(services, service) + } + if stream != nil { + stream.acc = acc + streams = append(streams, stream) + } + } + return streams, services, nil +} + +// Helper to parse an embedded account description for imported services or streams. +func parseAccount(v map[string]interface{}, pedantic bool) (string, string, error) { + var accountName, subject string + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "account": + accountName = mv.(string) + case "subject": + subject = mv.(string) + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return "", "", &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + } + } + if accountName == "" || subject == "" { + return "", "", fmt.Errorf("Expect an account name and a subject") + } + return accountName, subject, nil +} + +// Parse an import stream or service. +// e.g. +// {stream: "public.>"} # No accounts means public. +// {stream: "synadia.private.>", accounts: [cncf, natsio]} +// {service: "pub.request"} # No accounts means public. +// {service: "pub.special.request", accounts: [nats.io]} +func parseExportStreamOrService(v map[string]interface{}, pedantic bool) (*export, *export, error) { + var ( + curStream *export + curService *export + accounts []string + ) + + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "stream": + if curService != nil { + return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv) + } + curStream = &export{sub: mv.(string)} + if accounts != nil { + curStream.accs = accounts + } + case "service": + if curStream != nil { + return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) + } + curService = &export{sub: mv.(string)} + if accounts != nil { + curService.accs = accounts + } + case "accounts": + for _, iv := range mv.([]interface{}) { + _, mv := unwrapValue(iv) + accounts = append(accounts, mv.(string)) + } + if curStream != nil { + curStream.accs = accounts + } else if curService != nil { + curService.accs = accounts + } + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return nil, nil, &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + return nil, nil, fmt.Errorf("Unknown field %q parsing export service or stream", mk) + } + + } + return curStream, curService, nil +} + +// Parse an import stream or service. +// e.g. +// {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} +// {stream: {account: "synadia", subject:"synadia.private.*"}} +// {service: {account: "synadia", subject: "pub.special.request"}, subject: "synadia.request"} +func parseImportStreamOrService(v map[string]interface{}, pedantic bool) (*importStream, *importService, error) { + var ( + curStream *importStream + curService *importService + pre, to string + ) + + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "stream": + if curService != nil { + return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv) + } + ac, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Stream entry should be an account map, got %T", mv) + } + // Make sure this is a map with account and subject + accountName, subject, err := parseAccount(ac, pedantic) + if err != nil { + return nil, nil, err + } + curStream = &importStream{an: accountName, sub: subject} + if pre != "" { + curStream.pre = pre + } + case "service": + if curStream != nil { + return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) + } + ac, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Service entry should be an account map, got %T", mv) + } + // Make sure this is a map with account and subject + accountName, subject, err := parseAccount(ac, pedantic) + if err != nil { + return nil, nil, err + } + curService = &importService{an: accountName, sub: subject} + if to != "" { + curService.to = to + } + case "prefix": + pre = mv.(string) + if curStream != nil { + curStream.pre = pre + } + case "to": + to = mv.(string) + if curService != nil { + curService.to = to + } + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return nil, nil, &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + return nil, nil, fmt.Errorf("Unknown field %q parsing import service or stream", mk) + } + + } + return curStream, curService, nil +} + // Helper function to parse Authorization configs. func parseAuthorization(v interface{}, opts *Options) (*authorization, error) { var (