diff --git a/conf/lex.go b/conf/lex.go index f9603a99..3eedaa89 100644 --- a/conf/lex.go +++ b/conf/lex.go @@ -594,6 +594,8 @@ func lexMapKeyStart(lx *lexer) stateFn { switch { case isKeySeparator(r): return lx.errorf("Unexpected key separator '%v'.", r) + case r == arrayEnd: + return lx.errorf("Unexpected array end '%v' processing map.", r) case unicode.IsSpace(r): lx.next() return lexSkip(lx, lexMapKeyStart) diff --git a/server/accounts_test.go b/server/accounts_test.go new file mode 100644 index 00000000..02555d92 --- /dev/null +++ b/server/accounts_test.go @@ -0,0 +1,965 @@ +// Copyright 2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "os" + "strings" + "testing" + + "github.com/nats-io/nkeys" +) + +func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) { + opts := defaultServerOptions + s := New(&opts) + + // Now create two accounts. + f, err := s.RegisterAccount("foo") + if err != nil { + t.Fatalf("Error creating account 'foo': %v", err) + } + b, err := s.RegisterAccount("bar") + if err != nil { + t.Fatalf("Error creating account 'bar': %v", err) + } + return s, f, b +} + +func TestRegisterDuplicateAccounts(t *testing.T) { + s, _, _ := simpleAccountServer(t) + if _, err := s.RegisterAccount("foo"); err == nil { + t.Fatal("Expected an error registering 'foo' twice") + } +} + +func TestAccountIsolation(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + cfoo, crFoo, _ := newClientForServer(s) + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error register client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error register client with 'bar' account: %v", err) + } + + // Make sure they are different accounts/sl. + if cfoo.acc == cbar.acc { + t.Fatalf("Error, accounts the same for both clients") + } + + // Now do quick test that makes sure messages do not cross over. + // setup bar as a foo subscriber. + go cbar.parse([]byte("SUB foo 1\r\nPING\r\nPING\r\n")) + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q", l) + } + + go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")) + l, err = crFoo.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'foo' from server: %v", err) + } + + matches := msgPat.FindAllStringSubmatch(l, -1)[0] + if matches[SUB_INDEX] != "foo" { + t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + checkPayload(crFoo, []byte("hello\r\n"), t) + + // Now make sure nothing shows up on bar. + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q", l) + } +} + +func TestAccountFromOptions(t *testing.T) { + opts := defaultServerOptions + opts.Accounts = []*Account{ + &Account{Name: "foo"}, + &Account{Name: "bar"}, + } + s := New(&opts) + + if la := len(s.accounts); la != 2 { + t.Fatalf("Expected to have a server with two accounts, got %v", la) + } + // Check that sl is filled in. + fooAcc := s.LookupAccount("foo") + barAcc := s.LookupAccount("bar") + if fooAcc == nil || barAcc == nil { + t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") + } + if fooAcc.sl == nil || barAcc.sl == nil { + t.Fatal("Expected Sublists to be filled in on Opts.Accounts") + } +} + +func TestNewAccountsFromClients(t *testing.T) { + opts := defaultServerOptions + s := New(&opts) + + c, cr, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"account\":\"foo\"}\r\n") + go c.parse(connectOp) + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } + + opts.AllowNewAccounts = true + s = New(&opts) + + c, cr, _ = newClientForServer(s) + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received an error trying to connect: %v", err) + } + go c.parse([]byte("PING\r\n")) + l, err = cr.ReadString('\n') + if err != nil { + t.Fatalf("Error reading response for client from server: %v", err) + } + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q", l) + } +} + +// Clients can ask that the account be forced to be new. If it exists this is an error. +func TestNewAccountRequireNew(t *testing.T) { + // This has foo and bar accounts already. + s, _, _ := simpleAccountServer(t) + + c, cr, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n") + go c.parse(connectOp) + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } + + // Now allow new accounts on the fly, make sure second time does not work. + opts := defaultServerOptions + opts.AllowNewAccounts = true + s = New(&opts) + + c, _, _ = newClientForServer(s) + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received an error trying to create an account: %v", err) + } + + c, cr, _ = newClientForServer(s) + go c.parse(connectOp) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "-ERR ") { + t.Fatalf("Expected an error") + } +} + +func accountNameExists(name string, accounts []*Account) bool { + for _, acc := range accounts { + if strings.Compare(acc.Name, name) == 0 { + return true + } + } + return false +} + +func TestAccountSimpleConfig(t *testing.T) { + confFileName := createConfFile(t, []byte(`accounts = [foo, bar]`)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Received an error processing config file: %v", err) + } + if la := len(opts.Accounts); la != 2 { + t.Fatalf("Expected to see 2 accounts in opts, got %d", la) + } + if !accountNameExists("foo", opts.Accounts) { + t.Fatal("Expected a 'foo' account") + } + if !accountNameExists("bar", opts.Accounts) { + t.Fatal("Expected a 'bar' account") + } + + // Make sure double entries is an error. + confFileName = createConfFile(t, []byte(`accounts = [foo, foo]`)) + defer os.Remove(confFileName) + _, err = ProcessConfigFile(confFileName) + if err == nil { + t.Fatalf("Expected an error with double account entries") + } +} + +func TestAccountParseConfig(t *testing.T) { + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: alice, password: foo} + {user: bob, password: bar} + ] + } + nats.io { + users = [ + {user: derek, password: foo} + {user: ivan, password: bar} + ] + } + } + `)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Received an error processing config file: %v", err) + } + + if la := len(opts.Accounts); la != 2 { + t.Fatalf("Expected to see 2 accounts in opts, got %d", la) + } + + if lu := len(opts.Users); lu != 4 { + t.Fatalf("Expected 4 total Users, got %d", lu) + } + + var natsAcc *Account + for _, acc := range opts.Accounts { + if acc.Name == "nats.io" { + natsAcc = acc + break + } + } + if natsAcc == nil { + t.Fatalf("Error retrieving account for 'nats.io'") + } + + for _, u := range opts.Users { + if u.Username == "derek" { + if u.Account != natsAcc { + t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account) + } + } + } +} + +func TestAccountParseConfigDuplicateUsers(t *testing.T) { + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: alice, password: foo} + {user: bob, password: bar} + ] + } + nats.io { + users = [ + {user: alice, password: bar} + ] + } + } + `)) + defer os.Remove(confFileName) + _, err := ProcessConfigFile(confFileName) + if err == nil { + t.Fatalf("Expected an error with double user entries") + } +} + +func TestAccountParseConfigImportsExports(t *testing.T) { + opts, err := ProcessConfigFile("./configs/accounts.conf") + if err != nil { + t.Fatal(err) + } + if la := len(opts.Accounts); la != 3 { + t.Fatalf("Expected to see 3 accounts in opts, got %d", la) + } + if lu := len(opts.Nkeys); lu != 4 { + t.Fatalf("Expected 4 total Nkey users, got %d", lu) + } + if lu := len(opts.Users); lu != 0 { + t.Fatalf("Expected no Users, got %d", lu) + } + var natsAcc, synAcc *Account + for _, acc := range opts.Accounts { + if acc.Name == "nats.io" { + natsAcc = acc + } else if acc.Name == "synadia" { + synAcc = acc + } + } + if natsAcc == nil { + t.Fatalf("Error retrieving account for 'nats.io'") + } + if natsAcc.Nkey != "AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE" { + t.Fatalf("Expected nats account to have an nkey, got %q\n", natsAcc.Nkey) + } + // Check user assigned to the correct account. + for _, nk := range opts.Nkeys { + if nk.Nkey == "UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW" { + if nk.Account != natsAcc { + t.Fatalf("Expected user to be associated with natsAcc, got %q\n", nk.Account.Name) + } + break + } + } + + // Now check for the imports and exports of streams and services. + if lis := len(natsAcc.imports.streams); lis != 2 { + t.Fatalf("Expected 2 imported streams, got %d\n", lis) + } + if lis := len(natsAcc.imports.services); lis != 1 { + t.Fatalf("Expected 1 imported service, got %d\n", lis) + } + if les := len(natsAcc.exports.services); les != 1 { + t.Fatalf("Expected 1 exported service, got %d\n", les) + } + if les := len(natsAcc.exports.streams); les != 0 { + t.Fatalf("Expected no exported streams, got %d\n", les) + } + + if synAcc == nil { + t.Fatalf("Error retrieving account for 'synadia'") + } + + if lis := len(synAcc.imports.streams); lis != 0 { + t.Fatalf("Expected no imported streams, got %d\n", lis) + } + if lis := len(synAcc.imports.services); lis != 1 { + t.Fatalf("Expected 1 imported service, got %d\n", lis) + } + if les := len(synAcc.exports.services); les != 2 { + t.Fatalf("Expected 2 exported service, got %d\n", les) + } + if les := len(synAcc.exports.streams); les != 2 { + t.Fatalf("Expected 2 exported streams, got %d\n", les) + } +} + +func TestImportExportConfigFailures(t *testing.T) { + // Import from unknow account + cf := createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{stream: {account: "synadia", subject:"foo"}}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import from unknown account") + } + // Import a service with no account. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{service: subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import of a service with no account") + } + // Import a service with a wildcard subject. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{service: {account: "nats.io", subject:"foo.*"}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import of a service with wildcard subject") + } + // Export with unknown keyword. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + exports = [{service: "foo.*", wat:true}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with export with unknown keyword") + } + // Import with unknown keyword. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + imports = [{stream: {account: nats.io, subject: "foo.*"}, wat:true}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with import with unknown keyword") + } + // Export with an account. + cf = createConfFile(t, []byte(` + accounts { + nats.io { + exports = [{service: {account: nats.io, subject:"foo.*"}}] + } + } + `)) + defer os.Remove(cf) + if _, err := ProcessConfigFile(cf); err == nil { + t.Fatalf("Expected an error with export with account") + } +} + +func TestImportAuthorized(t *testing.T) { + _, foo, bar := simpleAccountServer(t) + + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.>"), false, t) + + foo.addStreamExport("foo", isPublicExport) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t) + + foo.addStreamExport("*", []*Account{bar}) + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), false, t) + + // Reset and test '>' public export + _, foo, bar = simpleAccountServer(t) + foo.addStreamExport(">", nil) + // Everything should work. + checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, ">"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), true, t) + + // Reset and test pwc and fwc + s, foo, bar := simpleAccountServer(t) + foo.addStreamExport("foo.*.baz.>", []*Account{bar}) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.22.baz.22"), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz"), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, ""), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.*.*"), false, t) + + // Make sure we match the account as well + + fb, _ := s.RegisterAccount("foobar") + bz, _ := s.RegisterAccount("baz") + + checkBool(foo.checkStreamImportAuthorized(fb, "foo.bar.baz.1"), false, t) + checkBool(foo.checkStreamImportAuthorized(bz, "foo.bar.baz.1"), false, t) +} + +func TestSimpleMapping(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + // Test first that trying to import with no matching export permission returns an error. + if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization { + t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err) + } + + // Now map the subject space between foo and bar. + // Need to do export first. + if err := cfoo.acc.addStreamExport("foo", nil); err != nil { // Public with no accounts defined. + t.Fatalf("Error adding account export to client foo: %v", err) + } + if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != nil { + t.Fatalf("Error adding account import to client bar: %v", err) + } + + // Normal Subscription on bar client. + go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + checkMsg := func(l, sid string) { + t.Helper() + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "import.foo" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != sid { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + } + + // Now check we got the message from normal subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + checkMsg(l, "1") + checkPayload(crBar, []byte("hello\r\n"), t) + + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + checkMsg(l, "2") + checkPayload(crBar, []byte("hello\r\n"), t) +} + +func TestNoPrefixWildcardMapping(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) + } + if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil { + t.Fatalf("Error adding stream import to client bar: %v", err) + } + + // Normal Subscription on bar client for literal "foo". + go cbar.parse([]byte("SUB foo 1\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + // Now check we got the message from normal subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "foo" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("hello\r\n"), t) +} + +func TestPrefixWildcardMapping(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) + } + // Checking that trailing '.' is accepted, tested that it is auto added above. + if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { + t.Fatalf("Error adding stream import to client bar: %v", err) + } + + // Normal Subscription on bar client for wildcard. + go cbar.parse([]byte("SUB pub.imports.* 1\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + // Now check we got the messages from wildcard subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "pub.imports.foo" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("hello\r\n"), t) +} + +func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to client foo: %v", err) + } + if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { + t.Fatalf("Error adding stream import to client bar: %v", err) + } + + // Normal Subscription on bar client for wildcard. + go cbar.parse([]byte("SUB pub.imports.foo 1\r\nPING\r\n")) + _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. + if err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // Now publish our message. + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n")) + + // Now check we got the messages from wildcard subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "pub.imports.foo" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("hello\r\n"), t) +} + +func TestCrossAccountRequestReply(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, crFoo, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + // Add in the service import for the requests. Make it public. + if err := cfoo.acc.addServiceExport("test.request", nil); err != nil { + t.Fatalf("Error adding account service import to client foo: %v", err) + } + + // Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects. + if err := cbar.acc.addServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount { + t.Fatalf("Expected ErrMissingAccount but received %v.", err) + } + if err := cbar.acc.addServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject { + t.Fatalf("Expected ErrInvalidSubject but received %v.", err) + } + if err := cbar.acc.addServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject { + t.Fatalf("Expected ErrInvalidSubject but received %v.", err) + } + + // Now add in the Route for request to be routed to the foo account. + if err := cbar.acc.addServiceImport(fooAcc, "foo", "test.request"); err != nil { + t.Fatalf("Error adding account route to client bar: %v", err) + } + + // Now setup the resonder under cfoo + cfoo.parse([]byte("SUB test.request 1\r\n")) + + // Now send the request. Remember we expect the request on our local foo. We added the route + // with that "from" and will map it to "test.request" + go cbar.parseAndFlush([]byte("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n")) + + // Now read the request from crFoo + l, err := crFoo.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "test.request" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + // Make sure this looks like _INBOX + if !strings.HasPrefix(matches[REPLY_INDEX], "_INBOX.") { + t.Fatalf("Expected an _INBOX.* like reply, got '%s'", matches[REPLY_INDEX]) + } + checkPayload(crFoo, []byte("help\r\n"), t) + + replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX]) + go cfoo.parseAndFlush([]byte(replyOp)) + + // Now read the response from crBar + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw = msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches = mraw[0] + if matches[SUB_INDEX] != "bar" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "11" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + if matches[REPLY_INDEX] != "" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("22\r\n"), t) + + // Make sure we have no service imports on fooAcc. An implicit one was created + // for the response but should be removed when the response was processed. + if nr := fooAcc.numServiceRoutes(); nr != 0 { + t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr) + } +} + +func TestAccountMapsUsers(t *testing.T) { + // Used for the nkey users to properly sign. + seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM" + seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE" + + confFileName := createConfFile(t, []byte(` + accounts { + synadia { + users = [ + {user: derek, password: foo}, + {nkey: UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR} + ] + } + nats { + users = [ + {user: ivan, password: bar}, + {nkey: UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH} + ] + } + } + `)) + defer os.Remove(confFileName) + opts, err := ProcessConfigFile(confFileName) + if err != nil { + t.Fatalf("Unexpected error parsing config file: %v", err) + } + s := New(opts) + synadia := s.LookupAccount("synadia") + nats := s.LookupAccount("nats") + + if synadia == nil || nats == nil { + t.Fatalf("Expected non nil accounts during lookup") + } + + // Make sure a normal log in maps the accounts correctly. + c, _, _ := newClientForServer(s) + connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n") + c.parse(connectOp) + if c.acc != synadia { + t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc) + } + // Also test client sublist. + if c.sl != synadia.sl { + t.Fatalf("Expected the client's sublist to match 'synadia' account") + } + + c, _, _ = newClientForServer(s) + connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n") + c.parse(connectOp) + if c.acc != nats { + t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc) + } + // Also test client sublist. + if c.sl != nats.sl { + t.Fatalf("Expected the client's sublist to match 'nats' account") + } + + // Now test nkeys as well. + kp, _ := nkeys.FromSeed(seed1) + pubKey, _ := kp.PublicKey() + + c, cr, l := newClientForServer(s) + // Check for Nonce + var info nonceInfo + err = json.Unmarshal([]byte(l[5:]), &info) + if err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err := kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig := base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != synadia { + t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc) + } + // Also test client sublist. + if c.sl != synadia.sl { + t.Fatalf("Expected the client's sublist to match 'synadia' account") + } + + // Now nats account nkey user. + kp, _ = nkeys.FromSeed(seed2) + pubKey, _ = kp.PublicKey() + + c, cr, l = newClientForServer(s) + // Check for Nonce + err = json.Unmarshal([]byte(l[5:]), &info) + if err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err = kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig = base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != nats { + t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc) + } + // Also test client sublist. + if c.sl != nats.sl { + t.Fatalf("Expected the client's sublist to match 'nats' account") + } +} + +func BenchmarkNewRouteReply(b *testing.B) { + opts := defaultServerOptions + s := New(&opts) + c, _, _ := newClientForServer(s) + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.newServiceReply() + } +} diff --git a/server/auth.go b/server/auth.go index 252ae6a8..c9435728 100644 --- a/server/auth.go +++ b/server/auth.go @@ -16,8 +16,8 @@ package server import ( "crypto/tls" "encoding/base64" - "fmt" "strings" + "sync" "github.com/nats-io/nkeys" "golang.org/x/crypto/bcrypt" @@ -39,17 +39,237 @@ type ClientAuthentication interface { RegisterUser(*User) } +// Accounts +type Account struct { + Name string + Nkey string + mu sync.RWMutex + sl *Sublist + imports importMap + exports exportMap +} + +// Import stream mapping struct +type streamImport struct { + acc *Account + from string + prefix string +} + +// Import service mapping struct +type serviceImport struct { + acc *Account + from string + to string + ae bool +} + +// importMap tracks the imported streams and services. +type importMap struct { + streams map[string]*streamImport + services map[string]*serviceImport // TODO(dlc) sync.Map may be better. +} + +// exportMap tracks the exported streams and services. +type exportMap struct { + streams map[string]map[string]*Account + services map[string]map[string]*Account +} + +func (a *Account) addServiceExport(subject string, accounts []*Account) error { + a.mu.Lock() + defer a.mu.Unlock() + if a == nil { + return ErrMissingAccount + } + if a.exports.services == nil { + a.exports.services = make(map[string]map[string]*Account) + } + ma := a.exports.services[subject] + if accounts != nil && ma == nil { + ma = make(map[string]*Account) + } + for _, a := range accounts { + ma[a.Name] = a + } + a.exports.services[subject] = ma + return nil +} + +// numServiceRoutes returns the number of service routes on this account. +func (a *Account) numServiceRoutes() int { + a.mu.RLock() + defer a.mu.RUnlock() + return len(a.imports.services) +} + +// This will add a route to an account to send published messages / requests +// to the destination account. From is the local subject to map, To is the +// subject that will appear on the destination account. Destination will need +// to have an import rule to allow access via addService. +func (a *Account) addServiceImport(destination *Account, from, to string) error { + if destination == nil { + return ErrMissingAccount + } + // Empty means use from. + if to == "" { + to = from + } + if !IsValidLiteralSubject(from) || !IsValidLiteralSubject(to) { + return ErrInvalidSubject + } + // First check to see if the account has authorized us to route to the "to" subject. + if !destination.checkServiceImportAuthorized(a, to) { + return ErrServiceImportAuthorization + } + + return a.addImplicitServiceImport(destination, from, to, false) +} + +// removeServiceImport will remove the route by subject. +func (a *Account) removeServiceImport(subject string) { + a.mu.Lock() + delete(a.imports.services, subject) + a.mu.Unlock() +} + +// Add a route to connect from an implicit route created for a response to a request. +// This does no checks and should be only called by the msg processing code. Use addRoute +// above if responding to user input or config, etc. +func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool) error { + a.mu.Lock() + if a.imports.services == nil { + a.imports.services = make(map[string]*serviceImport) + } + a.imports.services[from] = &serviceImport{destination, from, to, autoexpire} + a.mu.Unlock() + return nil +} + +// addStreamImport will add in the stream import from a specific account. +func (a *Account) addStreamImport(account *Account, from, prefix string) error { + if account == nil { + return ErrMissingAccount + } + + // First check to see if the account has authorized export of the subject. + if !account.checkStreamImportAuthorized(a, from) { + return ErrStreamImportAuthorization + } + + a.mu.Lock() + defer a.mu.Unlock() + if a.imports.streams == nil { + a.imports.streams = make(map[string]*streamImport) + } + if prefix != "" && prefix[len(prefix)-1] != btsep { + prefix = prefix + string(btsep) + } + // TODO(dlc) - collisions, etc. + a.imports.streams[from] = &streamImport{account, from, prefix} + return nil +} + +// Placeholder to denote public export. +var isPublicExport = []*Account(nil) + +// addExport will add an export to the account. If accounts is nil +// it will signify a public export, meaning anyone can impoort. +func (a *Account) addStreamExport(subject string, accounts []*Account) error { + a.mu.Lock() + defer a.mu.Unlock() + if a == nil { + return ErrMissingAccount + } + if a.exports.streams == nil { + a.exports.streams = make(map[string]map[string]*Account) + } + var ma map[string]*Account + for _, aa := range accounts { + if ma == nil { + ma = make(map[string]*Account, len(accounts)) + } + ma[aa.Name] = aa + } + a.exports.streams[subject] = ma + return nil +} + +// Check if another account is authorized to import from us. +func (a *Account) checkStreamImportAuthorized(account *Account, subject string) bool { + // Find the subject in the exports list. + a.mu.RLock() + defer a.mu.RUnlock() + + if a.exports.streams == nil || !IsValidSubject(subject) { + return false + } + + // Check direct match of subject first + am, ok := a.exports.streams[subject] + if ok { + // if am is nil that denotes a public export + if am == nil { + return true + } + // If we have a matching account we are authorized + _, ok := am[account.Name] + return ok + } + // ok if we are here we did not match directly so we need to test each one. + // The import subject arg has to take precedence, meaning the export + // has to be a true subset of the import claim. We already checked for + // exact matches above. + + tokens := strings.Split(subject, tsep) + for subj, am := range a.exports.streams { + if isSubsetMatch(tokens, subj) { + if am == nil { + return true + } + _, ok := am[account.Name] + return ok + } + } + return false +} + +// Check if another account is authorized to route requests to this service. +func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool { + // Find the subject in the services list. + a.mu.RLock() + defer a.mu.RUnlock() + + if a.exports.services == nil || !IsValidLiteralSubject(subject) { + return false + } + // These are always literal subjects so just lookup. + am, ok := a.exports.services[subject] + if !ok { + return false + } + // Check to see if we are public or if we need to search for the account. + if am == nil { + return true + } + // Check that we allow this account. + _, ok = am[account.Name] + return ok +} + // Nkey is for multiple nkey based users type NkeyUser struct { Nkey string `json:"user"` - Permissions *Permissions `json:"permissions"` + Permissions *Permissions `json:"permissions,omitempty"` + Account *Account `json:"account,omitempty"` } // User is for multiple accounts/users. type User struct { Username string `json:"user"` Password string `json:"password"` - Permissions *Permissions `json:"permissions"` + Permissions *Permissions `json:"permissions,omitempty"` + Account *Account `json:"account,omitempty"` } // clone performs a deep copy of the User struct, returning a new clone with @@ -146,7 +366,7 @@ func (s *Server) checkAuthforWarnings() { } if warn { // Warning about using plaintext passwords. - s.Warnf("Plaintext passwords detected. Use Nkeys or Bcrypt passwords in config files.") + s.Warnf("Plaintext passwords detected, use nkeys or bcrypt.") } } @@ -262,6 +482,7 @@ func (s *Server) isClientAuthorized(c *client) bool { if err := pub.Verify(c.nonce, sig); err != nil { return false } + c.RegisterNkeyUser(nkey) return true } @@ -309,35 +530,6 @@ func (s *Server) isRouterAuthorized(c *client) bool { return true } -// removeUnauthorizedSubs removes any subscriptions the client has that are no -// longer authorized, e.g. due to a config reload. -func (s *Server) removeUnauthorizedSubs(c *client) { - c.mu.Lock() - if c.perms == nil { - c.mu.Unlock() - return - } - - subs := make(map[string]*subscription, len(c.subs)) - for sid, sub := range c.subs { - subs[sid] = sub - } - c.mu.Unlock() - - for sid, sub := range subs { - if !c.canSubscribe(sub.subject) { - _ = s.sl.Remove(sub) - c.mu.Lock() - delete(c.subs, sid) - c.mu.Unlock() - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", - sub.subject, sub.sid)) - s.Noticef("Removed sub %q for user %q - not authorized", - string(sub.subject), c.opts.Username) - } - } -} - // Support for bcrypt stored passwords and tokens. const bcryptPrefix = "$2a$" diff --git a/server/client.go b/server/client.go index 73d4f453..1a0f2480 100644 --- a/server/client.go +++ b/server/client.go @@ -22,6 +22,7 @@ import ( "math/rand" "net" "regexp" + "strings" "sync" "sync/atomic" "time" @@ -144,6 +145,8 @@ type client struct { ncs string out outbound srv *Server + acc *Account + sl *Sublist subs map[string]*subscription perms *permissions in readCache @@ -235,8 +238,11 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState { return &state } +// This is the main subscription struct that indicates +// interest in published messages. type subscription struct { client *client + im *streamImport // This is for importing support. subject []byte queue []byte sid []byte @@ -258,6 +264,8 @@ type clientOpts struct { Lang string `json:"lang"` Version string `json:"version"` Protocol int `json:"protocol"` + Account string `json:"account,omitempty"` + AccountNew bool `json:"new_account,omitempty"` // Routes only Import *SubjectPermission `json:"import,omitempty"` @@ -313,22 +321,62 @@ func (c *client) initClient() { } } +// RegisterWithAccount will register the given user with a specific +// account. This will change the subject namespace. +func (c *client) registerWithAccount(acc *Account) error { + if acc == nil || acc.sl == nil { + return ErrBadAccount + } + c.mu.Lock() + c.acc = acc + c.sl = acc.sl + c.mu.Unlock() + return nil +} + // RegisterUser allows auth to call back into a new client -// with the authenticated user. This is used to map any permissions -// into the client. +// with the authenticated user. This is used to map +// any permissions into the client and setup accounts. func (c *client) RegisterUser(user *User) { + c.mu.Lock() + defer c.mu.Unlock() + + // Register with proper account and sublist. + if user.Account != nil { + c.acc = user.Account + c.sl = c.acc.sl + } + + // Assign permissions. if user.Permissions == nil { // Reset perms to nil in case client previously had them. - c.mu.Lock() c.perms = nil - c.mu.Unlock() return } - // Process Permissions and map into client connection structures. + c.setPermissions(user.Permissions) +} + +// RegisterNkey allows auth to call back into a new nkey +// client with the authenticated user. This is used to map +// any permissions into the client and setup accounts. +func (c *client) RegisterNkeyUser(user *NkeyUser) { c.mu.Lock() defer c.mu.Unlock() + // Register with proper account and sublist. + if user.Account != nil { + c.acc = user.Account + c.sl = c.acc.sl + } + + // Assign permissions. + if user.Permissions == nil { + // Reset perms to nil in case client previously had them. + c.perms = nil + return + } + c.setPermissions(user.Permissions) } @@ -770,6 +818,8 @@ func (c *client) processConnect(arg []byte) error { proto := c.opts.Protocol verbose := c.opts.Verbose lang := c.opts.Lang + account := c.opts.Account + accountNew := c.opts.AccountNew c.mu.Unlock() if srv != nil { @@ -788,6 +838,38 @@ func (c *client) processConnect(arg []byte) error { c.authViolation() return ErrAuthorization } + + // Check for Account designation + if account != "" { + var acc *Account + var wasNew bool + if !srv.newAccountsAllowed() { + acc = srv.LookupAccount(account) + if acc == nil { + c.Errorf(ErrMissingAccount.Error()) + c.sendErr("Account Not Found") + return ErrMissingAccount + } else if accountNew { + c.Errorf(ErrAccountExists.Error()) + c.sendErr(ErrAccountExists.Error()) + return ErrAccountExists + } + } else { + // We can create this one on the fly. + acc, wasNew = srv.LookupOrRegisterAccount(account) + if accountNew && !wasNew { + c.Errorf(ErrAccountExists.Error()) + c.sendErr(ErrAccountExists.Error()) + return ErrAccountExists + } + } + // If we are here we can register ourselves with the new account. + if err := c.registerWithAccount(acc); err != nil { + c.Errorf("Problem registering with account [%s]", account) + c.sendErr("Account Failed Registration") + return ErrBadAccount + } + } } // Check client protocol request if it exists. @@ -828,7 +910,18 @@ func (c *client) authTimeout() { } func (c *client) authViolation() { - if c.srv != nil && c.srv.getOpts().Users != nil { + var hasNkeys, hasUsers bool + if s := c.srv; s != nil { + s.mu.Lock() + hasNkeys = s.nkeys != nil + hasUsers = s.users != nil + s.mu.Unlock() + } + if hasNkeys { + c.Errorf("%s - Nkey %q", + ErrAuthorization.Error(), + c.opts.Nkey) + } else if hasUsers { c.Errorf("%s - User %q", ErrAuthorization.Error(), c.opts.Username) @@ -902,7 +995,7 @@ func (c *client) queueOutbound(data []byte) bool { c.out.p = nil } // Check for a big message, and if found place directly on nb - // FIXME(dlc) - do we need signaling of ownership here if we want len(data) < + // FIXME(dlc) - do we need signaling of ownership here if we want len(data) < maxBufSize if len(data) > maxBufSize { c.out.nb = append(c.out.nb, data) referenced = true @@ -1227,34 +1320,45 @@ func (c *client) processSub(argo []byte) (err error) { return nil } + // Check if we have a maximum on the number of subscriptions. if c.msubs > 0 && len(c.subs) >= c.msubs { c.mu.Unlock() c.maxSubsExceeded() return nil } - // Check if we have a maximum on the number of subscriptions. // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. sid := string(sub.sid) + var chkImports bool + if c.subs[sid] == nil { c.subs[sid] = sub - if c.srv != nil { - err = c.srv.sl.Insert(sub) + if c.sl != nil { + err = c.sl.Insert(sub) if err != nil { delete(c.subs, sid) } else { + if c.acc != nil { + chkImports = true + } shouldForward = c.typ != ROUTER } } } c.mu.Unlock() + if err != nil { c.sendErr("Invalid Subject") return nil } else if c.opts.Verbose { c.sendOK() } + if chkImports { + if err := c.checkAccountImports(sub); err != nil { + c.Errorf(err.Error()) + } + } if shouldForward { c.srv.broadcastSubscribe(sub) } @@ -1262,6 +1366,48 @@ func (c *client) processSub(argo []byte) (err error) { return nil } +// Check to see if we need to create a shadow subscription due to imports +// in other accounts. +func (c *client) checkAccountImports(sub *subscription) error { + c.mu.Lock() + acc := c.acc + c.mu.Unlock() + + if acc == nil { + return ErrMissingAccount + } + + subject := string(sub.subject) + tokens := strings.Split(subject, tsep) + + var rims [32]*streamImport + var ims = rims[:0] + acc.mu.RLock() + for _, im := range acc.imports.streams { + if isSubsetMatch(tokens, im.prefix+im.from) { + ims = append(ims, im) + } + } + acc.mu.RUnlock() + + // Now walk through collected importMaps + for _, im := range ims { + // We have a match for a local subscription with an import from another account. + // We will create a shadow subscription. + nsub := *sub // copy + nsub.im = im + if im.prefix != "" { + // redo subject here to match subject in the publisher account space. + // Just remove prefix from what they gave us. That maps into other space. + nsub.subject = sub.subject[len(im.prefix):] + } + if err := im.acc.sl.Insert(&nsub); err != nil { + return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name) + } + } + return nil +} + // canSubscribe determines if the client is authorized to subscribe to the // given subject. Assumes caller is holding lock. func (c *client) canSubscribe(subject []byte) bool { @@ -1297,8 +1443,8 @@ func (c *client) unsubscribe(sub *subscription) { c.traceOp("<-> %s", "DELSUB", sub.sid) delete(c.subs, string(sub.sid)) - if c.srv != nil { - c.srv.sl.Remove(sub) + if c.sl != nil { + c.sl.Remove(sub) } // If we are a queue subscriber on a client connection and we have routes, @@ -1362,11 +1508,11 @@ func (c *client) processUnsub(arg []byte) error { return nil } -func (c *client) msgHeader(mh []byte, sub *subscription) []byte { +func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte { mh = append(mh, sub.sid...) mh = append(mh, ' ') - if c.pa.reply != nil { - mh = append(mh, c.pa.reply...) + if reply != nil { + mh = append(mh, reply...) mh = append(mh, ' ') } mh = append(mh, c.pa.szb...) @@ -1518,18 +1664,34 @@ func (c *client) pubAllowed(subject []byte) bool { return allowed } -// prepMsgHeader will prepare the message header prefix -func (c *client) prepMsgHeader() []byte { - // Use the scratch buffer.. - msgh := c.msgb[:msgHeadProtoLen] +// Used to mimic client like replies. +const ( + replyPrefix = "_INBOX." + replyPrefixLen = len(replyPrefix) + digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + base = 62 +) - // msg header - msgh = append(msgh, c.pa.subject...) - return append(msgh, ' ') +// newServiceReply is used when rewriting replies that cross account boundaries. +// These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients. +func (c *client) newServiceReply() []byte { + // Check to see if we have our own rand yet. Global rand + // has contention with lots of clients, etc. + if c.in.prand == nil { + c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } + + var b = [15]byte{'_', 'I', 'N', 'B', 'O', 'X', '.'} + rn := c.in.prand.Int63() + for i, l := replyPrefixLen, rn; i < len(b); i++ { + b[i] = digits[l%base] + l /= base + } + return b[:] } // processMsg is called to process an inbound msg from a client. -func (c *client) processMsg(msg []byte) { +func (c *client) processInboundMsg(msg []byte) { // Snapshot server. srv := c.srv @@ -1562,20 +1724,19 @@ func (c *client) processMsg(msg []byte) { var r *SublistResult var ok bool - genid := atomic.LoadUint64(&srv.sl.genid) + genid := atomic.LoadUint64(&c.sl.genid) if genid == c.in.genid && c.in.results != nil { r, ok = c.in.results[string(c.pa.subject)] } else { - // reset our L1 completely. + // Reset our L1 completely. c.in.results = make(map[string]*SublistResult) c.in.genid = genid } if !ok { - subject := string(c.pa.subject) - r = srv.sl.Match(subject) - c.in.results[subject] = r + r = c.sl.Match(string(c.pa.subject)) + c.in.results[string(c.pa.subject)] = r // Prune the results cache. Keeps us from unbounded growth. if len(c.in.results) > maxResultCacheSize { n := 0 @@ -1588,6 +1749,29 @@ func (c *client) processMsg(msg []byte) { } } + // Check to see if we need to route this message to + // another account. + if c.typ == CLIENT && c.acc != nil && c.acc.imports.services != nil { + c.acc.mu.RLock() + rm := c.acc.imports.services[string(c.pa.subject)] + c.acc.mu.RUnlock() + // Get the results from the other account for the mapped "to" subject. + if rm != nil && rm.acc != nil && rm.acc.sl != nil { + var nrr []byte + if rm.ae { + c.acc.removeServiceImport(rm.from) + } + if c.pa.reply != nil { + // We want to remap this to provide anonymity. + nrr = c.newServiceReply() + rm.acc.addImplicitServiceImport(c.acc, string(nrr), string(c.pa.reply), true) + } + // FIXME(dlc) - Do L1 cache trick from above. + rr := rm.acc.sl.Match(rm.to) + c.processMsgResults(rr, msg, []byte(rm.to), nrr) + } + } + // This is the fanout scale. fanout := len(r.psubs) + len(r.qsubs) @@ -1597,12 +1781,18 @@ func (c *client) processMsg(msg []byte) { } if c.typ == ROUTER { - c.processRoutedMsg(r, msg) - return + c.processRoutedMsgResults(r, msg) + } else if c.typ == CLIENT { + c.processMsgResults(r, msg, c.pa.subject, c.pa.reply) } +} - // Client connection processing here. - msgh := c.prepMsgHeader() +// This processes the sublist results for a given message. +func (c *client) processMsgResults(r *SublistResult, msg, subject, reply []byte) { + // msg header + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, subject...) + msgh = append(msgh, ' ') si := len(msgh) // Used to only send messages once across any given route. @@ -1616,7 +1806,7 @@ func (c *client) processMsg(msg []byte) { if sub.client.typ == ROUTER { // Check to see if we have already sent it here. if rmap == nil { - rmap = make(map[string]struct{}, srv.numRoutes()) + rmap = make(map[string]struct{}, c.srv.numRoutes()) } sub.client.mu.Lock() if sub.client.nc == nil || @@ -1634,8 +1824,17 @@ func (c *client) processMsg(msg []byte) { rmap[sub.client.route.remoteID] = routeSeen sub.client.mu.Unlock() } + // Check for import mapped subs + if sub.im != nil && sub.im.prefix != "" { + // Redo the subject here on the fly. + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, sub.im.prefix...) + msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, ' ') + si = len(msgh) + } // Normal delivery - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, reply) c.deliverMsg(sub, mh, msg) } @@ -1644,6 +1843,7 @@ func (c *client) processMsg(msg []byte) { if c.in.prand == nil { c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) } + // Process queue subs for i := 0; i < len(r.qsubs); i++ { qsubs := r.qsubs[i] @@ -1654,7 +1854,16 @@ func (c *client) processMsg(msg []byte) { index := (startIndex + i) % len(qsubs) sub := qsubs[index] if sub != nil { - mh := c.msgHeader(msgh[:si], sub) + // Check for mapped subs + if sub.im != nil && sub.im.prefix != "" { + // Redo the subject here on the fly. + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, sub.im.prefix...) + msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, ' ') + si = len(msgh) + } + mh := c.msgHeader(msgh[:si], sub, reply) if c.deliverMsg(sub, mh, msg) { break } @@ -1782,6 +1991,45 @@ func (c *client) typeString() string { return "Unknown Type" } +// removeUnauthorizedSubs removes any subscriptions the client has that are no +// longer authorized, e.g. due to a config reload. +func (c *client) removeUnauthorizedSubs() { + c.mu.Lock() + if c.perms == nil || c.sl == nil { + c.mu.Unlock() + return + } + srv := c.srv + + var subsa [32]*subscription + subs := subsa[:0] + for _, sub := range c.subs { + subs = append(subs, sub) + } + + var removedSubs [32]*subscription + removed := removedSubs[:0] + + for _, sub := range subs { + if !c.canSubscribe(sub.subject) { + removed = append(removed, sub) + delete(c.subs, string(sub.sid)) + } + } + c.mu.Unlock() + + // Remove unauthorized clients subscriptions. + c.sl.RemoveBatch(removed) + + // Report back to client and logs. + for _, sub := range removed { + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", + sub.subject, sub.sid)) + srv.Noticef("Removed sub %q for user %q - not authorized", + string(sub.subject), c.opts.Username) + } +} + func (c *client) closeConnection(reason ClosedState) { c.mu.Lock() if c.nc == nil { @@ -1820,10 +2068,13 @@ func (c *client) closeConnection(reason ClosedState) { c.mu.Unlock() + // Remove clients subscriptions. + c.sl.RemoveBatch(subs) + if srv != nil { // This is a route that disconnected... if len(connectURLs) > 0 { - // Unless disabled, possibly update the server's INFO protcol + // Unless disabled, possibly update the server's INFO protocol // and send to clients that know how to handle async INFOs. if !srv.getOpts().Cluster.NoAdvertise { srv.removeClientConnectURLsAndSendINFOToClients(connectURLs) @@ -1833,9 +2084,8 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) - // Remove clients subscriptions. - srv.sl.RemoveBatch(subs) - if c.typ == CLIENT { + // Remove remote subscriptions. + if c.typ != ROUTER { // Forward UNSUBs protocols to all routes srv.broadcastUnSubscribeBatch(subs) } diff --git a/server/client_test.go b/server/client_test.go index 29271a34..afa2efeb 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -52,6 +52,19 @@ func createClientAsync(ch chan *client, s *Server, cli net.Conn) { }() } +func newClientForServer(s *Server) (*client, *bufio.Reader, string) { + cli, srv := net.Pipe() + cr := bufio.NewReaderSize(cli, maxBufSize) + ch := make(chan *client) + createClientAsync(ch, s, srv) + // So failing tests don't just hang. + cli.SetReadDeadline(time.Now().Add(2 * time.Second)) + l, _ := cr.ReadString('\n') + // Grab client + c := <-ch + return c, cr, l +} + var defaultServerOptions = Options{ Trace: false, Debug: false, @@ -272,6 +285,7 @@ const ( ) func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) { + t.Helper() // Read in payload d := make([]byte, len(expected)) n, err := cr.Read(d) @@ -384,13 +398,19 @@ func TestClientNoBodyPubSubWithReply(t *testing.T) { } } -func (c *client) parseFlushAndClose(op []byte) { +// This needs to clear any flushOutbound flags since writeLoop not running. +func (c *client) parseAndFlush(op []byte) { c.parse(op) for cp := range c.pcd { cp.mu.Lock() + cp.flags.clear(flushOutbound) cp.flushOutbound() cp.mu.Unlock() } +} + +func (c *client) parseFlushAndClose(op []byte) { + c.parseAndFlush(op) c.nc.Close() } @@ -643,17 +663,17 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) { }() <-ch - if s.sl.Count() != 2 { - t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count()) + if c.sl.Count() != 2 { + t.Fatalf("Should have 2 subscriptions, got %d\n", c.sl.Count()) } c.closeConnection(ClientClosed) - if s.sl.Count() != 0 { - t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) + if c.sl.Count() != 0 { + t.Fatalf("Should have no subscriptions after close, got %d\n", s.gsl.Count()) } } func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) { - s, c, _ := setupClient() + _, c, _ := setupClient() c.closeConnection(ClientClosed) subs := []byte("SUB foo 1\r\nSUB bar 2\r\n") @@ -664,8 +684,8 @@ func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) { }() <-ch - if s.sl.Count() != 0 { - t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) + if c.sl.Count() != 0 { + t.Fatalf("Should have no subscriptions after close, got %d\n", c.sl.Count()) } } diff --git a/server/configs/accounts.conf b/server/configs/accounts.conf new file mode 100644 index 00000000..2554cb2b --- /dev/null +++ b/server/configs/accounts.conf @@ -0,0 +1,47 @@ + +accounts: { + synadia: { + nkey: ADMHMDX2LEUJRZQHGVSVRWZEJ2CPNHYO6TB4ZCZ37LXAX5SYNEW252GF + + users = [ + # Bob + {nkey : UC6NLCN7AS34YOJVCYD4PJ3QB7QGLYG5B5IMBT25VW5K4TNUJODM7BOX} + # Alice + {nkey : UBAAQWTW6CG2G6ANGNKB5U2B7HRWHSGMZEZX3AQSAJOQDAUGJD46LD2E} + ] + + exports = [ + {stream: "public.>"} # No accounts means public. + {stream: "synadia.private.>", accounts: [cncf, nats.io]} + {service: "pub.request"} # No accounts means public. + {service: "pub.special.request", accounts: [nats.io]} + ] + + imports = [ + {service: {account: "nats.io", subject: "nats.time"}} + ] + } + + nats.io: { + nkey: AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE + + users = [ + # Ivan + {nkey : UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW} + # Derek + {nkey : UDEREK22W43P2NFQCSKGM6BWD23OVWEDR7JE7LSNCD232MZIC4X2MEKZ} + ] + + imports = [ + {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} + {stream: {account: "synadia", subject:"synadia.private.*"}} + {service: {account: "synadia", subject: "pub.special.request"}, to: "synadia.request"} + ] + + exports = [ + {service: "nats.time"} + ] + } + + cncf: { nkey: ABDAYEV6KZVLW3GSJ3V7IWC542676TFYILXF2C7Z56LCPSMVHJE5BVYO} +} diff --git a/server/errors.go b/server/errors.go index c722bfc4..003b9880 100644 --- a/server/errors.go +++ b/server/errors.go @@ -48,4 +48,20 @@ var ( // ErrClientConnectedToRoutePort represents an error condition when a client // attempted to connect to the route listen port. ErrClientConnectedToRoutePort = errors.New("Attempted To Connect To Route Port") + + // ErrAccountExists is returned when an account is attempted to be registered + // but already exists. + ErrAccountExists = errors.New("Account Exists") + + // ErrBadAccount represents a malformed or incorrect account. + ErrBadAccount = errors.New("Bad Account") + + // ErrMissingAccount is returned when an account does not exist. + ErrMissingAccount = errors.New("Account Missing") + + // ErrStreamImportAuthorization is returned when a stream import is not authorized. + ErrStreamImportAuthorization = errors.New("Stream Import Not Authorized") + + // ErrServiceImportAuthorization is returned when a service import is not authorized. + ErrServiceImportAuthorization = errors.New("Service Import Not Authorized") ) diff --git a/server/monitor.go b/server/monitor.go index 9379b736..604f12a8 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -709,14 +709,15 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { } } - sz := &Subsz{s.sl.Stats(), 0, offset, limit, nil} + // FIXME(dlc) - Make account aware. + sz := &Subsz{s.gsl.Stats(), 0, offset, limit, nil} if subdetail { // Now add in subscription's details var raw [4096]*subscription subs := raw[:0] - s.sl.localSubs(&subs) + s.gsl.localSubs(&subs) details := make([]SubDetail, len(subs)) i := 0 // TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering. @@ -938,7 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) v.MaxPending = opts.MaxPending v.WriteDeadline = opts.WriteDeadline - v.Subscriptions = s.sl.Count() + v.Subscriptions = s.gsl.Count() v.ConfigLoadTime = s.configTime // Need a copy here since s.httpReqStats can change while doing // the marshaling down below. diff --git a/server/nkey_test.go b/server/nkey_test.go index f0837f2c..d3186e68 100644 --- a/server/nkey_test.go +++ b/server/nkey_test.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" mrand "math/rand" - "net" "os" "strings" "testing" @@ -56,17 +55,6 @@ func mixedSetup() (*Server, *client, *bufio.Reader, string) { return rawSetup(opts) } -func newClientForServer(s *Server) (*client, *bufio.Reader, string) { - cli, srv := net.Pipe() - cr := bufio.NewReaderSize(cli, maxBufSize) - ch := make(chan *client) - createClientAsync(ch, s, srv) - l, _ := cr.ReadString('\n') - // Grab client - c := <-ch - return c, cr, l -} - func TestServerInfoNonce(t *testing.T) { _, l := setUpClientWithResponse() if !strings.HasPrefix(l, "INFO ") { diff --git a/server/opts.go b/server/opts.go index a7efa8b1..27427ad3 100644 --- a/server/opts.go +++ b/server/opts.go @@ -62,6 +62,8 @@ type Options struct { MaxSubs int `json:"max_subscriptions,omitempty"` Nkeys []*NkeyUser `json:"-"` Users []*User `json:"-"` + Accounts []*Account `json:"-"` + AllowNewAccounts bool `json:"-"` Username string `json:"-"` Password string `json:"-"` Authorization string `json:"-"` @@ -227,7 +229,7 @@ func (e *configWarningErr) Error() string { } // ProcessConfigFile processes a configuration file. -// FIXME(dlc): Hacky +// FIXME(dlc): A bit hacky func ProcessConfigFile(configFile string) (*Options, error) { opts := &Options{} if err := opts.ProcessConfigFile(configFile); err != nil { @@ -306,6 +308,11 @@ func (o *Options) ProcessConfigFile(configFile string) error { o.Trace = v.(bool) case "logtime": o.Logtime = v.(bool) + case "accounts": + err = parseAccounts(v, o) + if err != nil { + return err + } case "authorization": var auth *authorization if pedantic { @@ -595,6 +602,426 @@ func setClusterPermissions(opts *ClusterOpts, perms *Permissions) { } } +// Temp structures to hold account import and export defintions since they need +// to be processed after being parsed. +type export struct { + acc *Account + sub string + accs []string +} + +type importStream struct { + acc *Account + an string + sub string + pre string +} + +type importService struct { + acc *Account + an string + sub string + to string +} + +// parseAccounts will parse the different accounts syntax. +func parseAccounts(v interface{}, opts *Options) error { + var ( + pedantic = opts.CheckConfig + importStreams []*importStream + importServices []*importService + exportStreams []*export + exportServices []*export + ) + _, v = unwrapValue(v) + switch v.(type) { + // Simple array of account names. + case []interface{}, []string: + m := make(map[string]struct{}, len(v.([]interface{}))) + for _, name := range v.([]interface{}) { + ns := name.(string) + if _, ok := m[ns]; ok { + return fmt.Errorf("Duplicate Account Entry: %s", ns) + } + opts.Accounts = append(opts.Accounts, &Account{Name: ns}) + m[ns] = struct{}{} + } + // More common map entry + case map[string]interface{}: + // Track users across accounts, must be unique across + // accounts and nkeys vs users. + uorn := make(map[string]struct{}) + for aname, mv := range v.(map[string]interface{}) { + _, amv := unwrapValue(mv) + // These should be maps. + mv, ok := amv.(map[string]interface{}) + if !ok { + return fmt.Errorf("Expected map entries for accounts") + } + acc := &Account{Name: aname} + opts.Accounts = append(opts.Accounts, acc) + + for k, v := range mv { + tk, mv := unwrapValue(v) + switch strings.ToLower(k) { + case "nkey": + nk, ok := mv.(string) + if !ok || !nkeys.IsValidPublicAccountKey(nk) { + return fmt.Errorf("Not a valid public nkey for an account: %q", v) + } + acc.Nkey = nk + case "imports": + streams, services, err := parseAccountImports(mv, acc, pedantic) + if err != nil { + return err + } + importStreams = append(importStreams, streams...) + importServices = append(importServices, services...) + case "exports": + streams, services, err := parseAccountExports(mv, acc, pedantic) + if err != nil { + return err + } + exportStreams = append(exportStreams, streams...) + exportServices = append(exportServices, services...) + case "users": + var ( + users []*User + err error + nkeys []*NkeyUser + ) + if pedantic { + nkeys, users, err = parseUsers(tk, opts) + } else { + nkeys, users, err = parseUsers(mv, opts) + } + if err != nil { + return err + } + for _, u := range users { + if _, ok := uorn[u.Username]; ok { + return fmt.Errorf("Duplicate user %q detected", u.Username) + } + uorn[u.Username] = struct{}{} + u.Account = acc + } + opts.Users = append(opts.Users, users...) + + for _, u := range nkeys { + if _, ok := uorn[u.Nkey]; ok { + return fmt.Errorf("Duplicate nkey %q detected", u.Nkey) + } + uorn[u.Nkey] = struct{}{} + u.Account = acc + } + opts.Nkeys = append(opts.Nkeys, nkeys...) + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return &unknownConfigFieldErr{ + field: k, + token: tk, + configFile: tk.SourceFile(), + } + } + } + } + } + } + + // Parse Imports and Exports here after all accounts defined. + // Do exports first since they need to be defined for imports to succeed + // since we do permissions checks. + + // Create a lookup map for accounts lookups. + am := make(map[string]*Account, len(opts.Accounts)) + for _, a := range opts.Accounts { + am[a.Name] = a + } + // Do stream exports + for _, stream := range exportStreams { + // Make array of accounts if applicable. + var accounts []*Account + for _, an := range stream.accs { + ta := am[an] + if ta == nil { + return fmt.Errorf("%q account not defined for stream export", an) + } + accounts = append(accounts, ta) + } + if err := stream.acc.addStreamExport(stream.sub, accounts); err != nil { + return fmt.Errorf("Error adding stream export %q: %v", stream.sub, err) + } + } + for _, service := range exportServices { + // Make array of accounts if applicable. + var accounts []*Account + for _, an := range service.accs { + ta := am[an] + if ta == nil { + return fmt.Errorf("%q account not defined for service export", an) + } + accounts = append(accounts, ta) + } + if err := service.acc.addServiceExport(service.sub, accounts); err != nil { + return fmt.Errorf("Error adding service export %q: %v", service.sub, err) + } + } + for _, stream := range importStreams { + ta := am[stream.an] + if ta == nil { + return fmt.Errorf("%q account not defined for stream import", stream.an) + } + if err := stream.acc.addStreamImport(ta, stream.sub, stream.pre); err != nil { + return fmt.Errorf("Error adding stream import %q: %v", stream.sub, err) + } + } + for _, service := range importServices { + ta := am[service.an] + if ta == nil { + return fmt.Errorf("%q account not defined for service import", service.an) + } + if service.to == "" { + service.to = service.sub + } + if err := service.acc.addServiceImport(ta, service.to, service.sub); err != nil { + return fmt.Errorf("Error adding service import %q: %v", service.sub, err) + } + } + + return nil +} + +// Parse the account imports +func parseAccountExports(v interface{}, acc *Account, pedantic bool) ([]*export, []*export, error) { + // This should be an array of objects/maps. + _, v = unwrapValue(v) + ims, ok := v.([]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Exports should be an array, got %T", v) + } + + var services []*export + var streams []*export + + for _, v := range ims { + _, mv := unwrapValue(v) + io, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Export Items should be a map with type entry, got %T", mv) + } + // Should have stream or service + stream, service, err := parseExportStreamOrService(io, pedantic) + if err != nil { + return nil, nil, err + } + if service != nil { + service.acc = acc + services = append(services, service) + } + if stream != nil { + stream.acc = acc + streams = append(streams, stream) + } + } + return streams, services, nil +} + +// Parse the account imports +func parseAccountImports(v interface{}, acc *Account, pedantic bool) ([]*importStream, []*importService, error) { + // This should be an array of objects/maps. + _, v = unwrapValue(v) + ims, ok := v.([]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Imports should be an array, got %T", v) + } + + var services []*importService + var streams []*importStream + + for _, v := range ims { + _, mv := unwrapValue(v) + io, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Import Items should be a map with type entry, got %T", mv) + } + // Should have stream or service + stream, service, err := parseImportStreamOrService(io, pedantic) + if err != nil { + return nil, nil, err + } + if service != nil { + service.acc = acc + services = append(services, service) + } + if stream != nil { + stream.acc = acc + streams = append(streams, stream) + } + } + return streams, services, nil +} + +// Helper to parse an embedded account description for imported services or streams. +func parseAccount(v map[string]interface{}, pedantic bool) (string, string, error) { + var accountName, subject string + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "account": + accountName = mv.(string) + case "subject": + subject = mv.(string) + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return "", "", &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + } + } + if accountName == "" || subject == "" { + return "", "", fmt.Errorf("Expect an account name and a subject") + } + return accountName, subject, nil +} + +// Parse an import stream or service. +// e.g. +// {stream: "public.>"} # No accounts means public. +// {stream: "synadia.private.>", accounts: [cncf, natsio]} +// {service: "pub.request"} # No accounts means public. +// {service: "pub.special.request", accounts: [nats.io]} +func parseExportStreamOrService(v map[string]interface{}, pedantic bool) (*export, *export, error) { + var ( + curStream *export + curService *export + accounts []string + ) + + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "stream": + if curService != nil { + return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv) + } + curStream = &export{sub: mv.(string)} + if accounts != nil { + curStream.accs = accounts + } + case "service": + if curStream != nil { + return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) + } + mvs, ok := mv.(string) + if !ok { + return nil, nil, fmt.Errorf("Expected service to be string name, got %T", mv) + } + curService = &export{sub: mvs} + if accounts != nil { + curService.accs = accounts + } + case "accounts": + for _, iv := range mv.([]interface{}) { + _, mv := unwrapValue(iv) + accounts = append(accounts, mv.(string)) + } + if curStream != nil { + curStream.accs = accounts + } else if curService != nil { + curService.accs = accounts + } + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return nil, nil, &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + return nil, nil, fmt.Errorf("Unknown field %q parsing export service or stream", mk) + } + + } + return curStream, curService, nil +} + +// Parse an import stream or service. +// e.g. +// {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} +// {stream: {account: "synadia", subject:"synadia.private.*"}} +// {service: {account: "synadia", subject: "pub.special.request"}, subject: "synadia.request"} +func parseImportStreamOrService(v map[string]interface{}, pedantic bool) (*importStream, *importService, error) { + var ( + curStream *importStream + curService *importService + pre, to string + ) + + for mk, mv := range v { + tk, mv := unwrapValue(mv) + switch strings.ToLower(mk) { + case "stream": + if curService != nil { + return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv) + } + ac, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Stream entry should be an account map, got %T", mv) + } + // Make sure this is a map with account and subject + accountName, subject, err := parseAccount(ac, pedantic) + if err != nil { + return nil, nil, err + } + curStream = &importStream{an: accountName, sub: subject} + if pre != "" { + curStream.pre = pre + } + case "service": + if curStream != nil { + return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv) + } + ac, ok := mv.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("Service entry should be an account map, got %T", mv) + } + // Make sure this is a map with account and subject + accountName, subject, err := parseAccount(ac, pedantic) + if err != nil { + return nil, nil, err + } + curService = &importService{an: accountName, sub: subject} + if to != "" { + curService.to = to + } + case "prefix": + pre = mv.(string) + if curStream != nil { + curStream.pre = pre + } + case "to": + to = mv.(string) + if curService != nil { + curService.to = to + } + default: + if pedantic && tk != nil && !tk.IsUsedVariable() { + return nil, nil, &unknownConfigFieldErr{ + field: mk, + token: tk, + configFile: tk.SourceFile(), + } + } + return nil, nil, fmt.Errorf("Unknown field %q parsing import service or stream", mk) + } + + } + return curStream, curService, nil +} + // Helper function to parse Authorization configs. func parseAuthorization(v interface{}, opts *Options) (*authorization, error) { var ( diff --git a/server/parser.go b/server/parser.go index 088894fb..6eec1acc 100644 --- a/server/parser.go +++ b/server/parser.go @@ -234,7 +234,7 @@ func (c *client) parse(buf []byte) error { if len(c.msgBuf) != c.pa.size+LEN_CR_LF { goto parseErr } - c.processMsg(c.msgBuf) + c.processInboundMsg(c.msgBuf) c.argBuf, c.msgBuf = nil, nil c.drop, c.as, c.state = 0, i+1, OP_START default: diff --git a/server/reload.go b/server/reload.go index 410ede1d..161863bc 100644 --- a/server/reload.go +++ b/server/reload.go @@ -681,7 +681,7 @@ func (s *Server) reloadAuthorization() { } // Remove any unauthorized subscriptions. - s.removeUnauthorizedSubs(client) + client.removeUnauthorizedSubs() } for _, route := range routes { @@ -762,7 +762,8 @@ func (s *Server) reloadClusterPermissions() { subsNeedUNSUB []*subscription deleteRoutedSubs []*subscription ) - s.sl.localSubs(&localSubs) + // FIXME(dlc) - Change for accounts. + s.gsl.localSubs(&localSubs) // Go through all local subscriptions for _, sub := range localSubs { @@ -810,7 +811,8 @@ func (s *Server) reloadClusterPermissions() { route.mu.Unlock() } // Remove as a batch all the subs that we have removed from each route. - s.sl.RemoveBatch(deleteRoutedSubs) + // FIXME(dlc) - Change for accounts. + s.gsl.RemoveBatch(deleteRoutedSubs) } // validateClusterOpts ensures the new ClusterOpts does not change host or diff --git a/server/route.go b/server/route.go index 55d5fc0f..7e82d627 100644 --- a/server/route.go +++ b/server/route.go @@ -194,7 +194,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { rsub = sub continue } - mh := c.msgHeader(msgh[:], sub) + mh := c.msgHeader(msgh[:], sub, c.pa.reply) if c.deliverMsg(sub, mh, msg) { c.Debugf("Redelivery succeeded for message on group '%q'", group) return @@ -203,7 +203,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { // If we are here we failed to find a local, see if we snapshotted a // remote sub, and if so deliver to that. if rsub != nil { - mh := c.msgHeader(msgh[:], rsub) + mh := c.msgHeader(msgh[:], rsub, c.pa.reply) if c.deliverMsg(rsub, mh, msg) { c.Debugf("Re-routing message on group '%q' to remote server", group) return @@ -212,12 +212,15 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group) } -// processRoutedMsg processes messages inbound from a route. -func (c *client) processRoutedMsg(r *SublistResult, msg []byte) { +// processRoutedMsgResults processes messages inbound from a route. +func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) { // Snapshot server. srv := c.srv - msgh := c.prepMsgHeader() + // msg header + msgh := c.msgb[:msgHeadProtoLen] + msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, ' ') si := len(msgh) // If we have a queue subscription, deliver direct @@ -233,7 +236,7 @@ func (c *client) processRoutedMsg(r *SublistResult, msg []byte) { } didDeliver := false if sub != nil { - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, c.pa.reply) didDeliver = c.deliverMsg(sub, mh, msg) } if !didDeliver && c.srv != nil { @@ -258,7 +261,7 @@ func (c *client) processRoutedMsg(r *SublistResult, msg []byte) { sub.client.mu.Unlock() // Normal delivery - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, c.pa.reply) c.deliverMsg(sub, mh, msg) } } @@ -425,7 +428,7 @@ func (s *Server) updateRemoteRoutePerms(route *client, info *Info) { _localSubs [4096]*subscription localSubs = _localSubs[:0] ) - s.sl.localSubs(&localSubs) + s.gsl.localSubs(&localSubs) route.sendRouteSubProtos(localSubs, func(sub *subscription) bool { subj := sub.subject @@ -578,7 +581,8 @@ func (s *Server) sendLocalSubsToRoute(route *client) { var raw [4096]*subscription subs := raw[:0] - s.sl.localSubs(&subs) + // FIXME(dlc) this needs to be scoped per account when cluster proto changes. + s.gsl.localSubs(&subs) route.mu.Lock() closed := route.sendRouteSubProtos(subs, func(sub *subscription) bool { @@ -691,7 +695,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } } - c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} + c := &client{srv: s, sl: s.gsl, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} // Grab server variables s.mu.Lock() diff --git a/server/server.go b/server/server.go index 1f890d33..a0e81aaf 100644 --- a/server/server.go +++ b/server/server.go @@ -69,13 +69,14 @@ type Server struct { mu sync.Mutex prand *rand.Rand info Info - sl *Sublist configFile string optsMu sync.RWMutex opts *Options running bool shutdown bool listener net.Listener + gsl *Sublist + accounts map[string]*Account clients map[uint64]*client routes map[uint64]*client remotes map[string]*client @@ -173,7 +174,7 @@ func New(opts *Options) *Server { configFile: opts.ConfigFile, info: info, prand: rand.New(rand.NewSource(time.Now().UnixNano())), - sl: NewSublist(), + gsl: NewSublist(), opts: opts, done: make(chan bool, 1), start: now, @@ -192,6 +193,9 @@ func New(opts *Options) *Server { // Used internally for quick look-ups. s.clientConnectURLsMap = make(map[string]struct{}) + // For tracking accounts + s.accounts = make(map[string]*Account) + // For tracking clients s.clients = make(map[uint64]*client) @@ -210,6 +214,9 @@ func New(opts *Options) *Server { // to shutdown. s.quitCh = make(chan struct{}) + // Used to setup Accounts. + s.configureAccounts() + // Used to setup Authorization. s.configureAuthorization() @@ -232,6 +239,16 @@ func (s *Server) setOpts(opts *Options) { s.optsMu.Unlock() } +func (s *Server) configureAccounts() { + // Check opts and walk through them. Making sure to create SLs. + for _, acc := range s.opts.Accounts { + if acc.sl == nil { + acc.sl = NewSublist() + } + s.accounts[acc.Name] = acc + } +} + func (s *Server) generateRouteInfoJSON() { b, _ := json.Marshal(s.routeInfo) pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} @@ -281,6 +298,53 @@ func (s *Server) logPid() error { return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660) } +// newAccountsAllowed returns whether or not new accounts can be created on the fly. +func (s *Server) newAccountsAllowed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.opts.AllowNewAccounts +} + +// LookupOrRegisterAccount will return the given account if known or create a new entry. +func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) { + s.mu.Lock() + defer s.mu.Unlock() + if acc, ok := s.accounts[name]; ok { + return acc, false + } + acc := &Account{ + Name: name, + sl: NewSublist(), + } + s.accounts[name] = acc + return acc, true +} + +// RegisterAccount will register an account. The account must be new +// or this call will fail. +func (s *Server) RegisterAccount(name string) (*Account, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.accounts[name]; ok { + return nil, ErrAccountExists + } + acc := &Account{ + Name: name, + sl: NewSublist(), + } + s.accounts[name] = acc + return acc, nil +} + +// LookupAccount is a public function to return the account structure +// associated with name. +func (s *Server) LookupAccount(name string) *Account { + s.mu.Lock() + defer s.mu.Unlock() + return s.accounts[name] +} + // Start up the server, this will block. // Start via a Go routine if needed. func (s *Server) Start() { @@ -778,7 +842,7 @@ func (s *Server) createClient(conn net.Conn) *client { max_subs := opts.MaxSubs now := time.Now() - c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now} + c := &client{srv: s, sl: s.gsl, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now} // Grab JSON info string s.mu.Lock() @@ -1088,7 +1152,13 @@ func (s *Server) getClient(cid uint64) *client { // NumSubscriptions will report how many subscriptions are active. func (s *Server) NumSubscriptions() uint32 { s.mu.Lock() - subs := s.sl.Count() + var subs uint32 + for _, acc := range s.accounts { + if acc.sl != nil { + subs += acc.sl.Count() + } + } + subs += s.gsl.Count() s.mu.Unlock() return subs } diff --git a/server/split_test.go b/server/split_test.go index 77dd2efb..ff36c80e 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -24,8 +24,8 @@ func TestSplitBufferSubOp(t *testing.T) { defer cli.Close() defer trash.Close() - s := &Server{sl: NewSublist()} - c := &client{srv: s, subs: make(map[string]*subscription), nc: cli} + s := &Server{gsl: NewSublist()} + c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -43,7 +43,7 @@ func TestSplitBufferSubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match("foo") + r := s.gsl.Match("foo") if r == nil || len(r.psubs) != 1 { t.Fatalf("Did not match subscription properly: %+v\n", r) } @@ -60,7 +60,7 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{sl: NewSublist()} + s := &Server{gsl: NewSublist()} c := &client{srv: s, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") @@ -87,7 +87,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match("foo") + r := s.gsl.Match("foo") if r != nil && len(r.psubs) != 0 { t.Fatalf("Should be no subscriptions in results: %+v\n", r) } @@ -300,7 +300,7 @@ func TestSplitConnectArg(t *testing.T) { func TestSplitDanglingArgBuf(t *testing.T) { s := New(&defaultServerOptions) - c := &client{srv: s, subs: make(map[string]*subscription)} + c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues. diff --git a/server/sublist.go b/server/sublist.go index f1faa753..af553f47 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -712,6 +712,52 @@ func IsValidLiteralSubject(subject string) bool { return true } +// This will test a subject as an array of tokens against a test subject +// and determine if the tokens are matched. Both test subject and tokens +// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"], +// but not of foo.bar, etc. +func isSubsetMatch(tokens []string, test string) bool { + tsa := [32]string{} + tts := tsa[:0] + start := 0 + for i := 0; i < len(test); i++ { + if test[i] == btsep { + tts = append(tts, test[start:i]) + start = i + 1 + } + } + tts = append(tts, test[start:]) + + // Walk the target tokens + for i, t2 := range tts { + if i >= len(tokens) { + return false + } + if t2[0] == fwc && len(t2) == 1 { + return true + } + t1 := tokens[i] + if t1[0] == fwc && len(t1) == 1 { + return false + } + if t1[0] == pwc && len(t1) == 1 { + m := t2[0] == pwc && len(t2) == 1 + if !m { + return false + } + if i >= len(tts) { + return true + } else { + continue + } + } + if t2[0] != pwc && strings.Compare(t1, t2) != 0 { + return false + } + } + return len(tokens) == len(tts) +} + // matchLiteral is used to test literal subjects, those that do not have any // wildcards, with a target subject. This is used in the cache layer. func matchLiteral(literal, subject string) bool { @@ -725,7 +771,7 @@ func matchLiteral(literal, subject string) bool { // This function has been optimized for speed. // For instance, do not set b:=subject[i] here since // we may bump `i` in this loop to avoid `continue` or - // skiping common test in a particular test. + // skipping common test in a particular test. // Run Benchmark_SublistMatchLiteral before making any change. switch subject[i] { case pwc: diff --git a/server/sublist_test.go b/server/sublist_test.go index 48c8540f..5801da3b 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -23,8 +23,6 @@ import ( "testing" "time" - dbg "runtime/debug" - "github.com/nats-io/nuid" ) @@ -421,8 +419,8 @@ func TestSublistBasicQueueResults(t *testing.T) { } func checkBool(b, expected bool, t *testing.T) { + t.Helper() if b != expected { - dbg.PrintStack() t.Fatalf("Expected %v, but got %v\n", expected, b) } }