Merge pull request #755 from nats-io/accounts

[ADDED] Account Support
This commit is contained in:
Derek Collison
2018-10-01 17:57:55 +02:00
committed by GitHub
17 changed files with 2152 additions and 124 deletions

View File

@@ -594,6 +594,8 @@ func lexMapKeyStart(lx *lexer) stateFn {
switch {
case isKeySeparator(r):
return lx.errorf("Unexpected key separator '%v'.", r)
case r == arrayEnd:
return lx.errorf("Unexpected array end '%v' processing map.", r)
case unicode.IsSpace(r):
lx.next()
return lexSkip(lx, lexMapKeyStart)

965
server/accounts_test.go Normal file
View File

@@ -0,0 +1,965 @@
// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"encoding/base64"
"encoding/json"
"fmt"
"os"
"strings"
"testing"
"github.com/nats-io/nkeys"
)
func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
opts := defaultServerOptions
s := New(&opts)
// Now create two accounts.
f, err := s.RegisterAccount("foo")
if err != nil {
t.Fatalf("Error creating account 'foo': %v", err)
}
b, err := s.RegisterAccount("bar")
if err != nil {
t.Fatalf("Error creating account 'bar': %v", err)
}
return s, f, b
}
func TestRegisterDuplicateAccounts(t *testing.T) {
s, _, _ := simpleAccountServer(t)
if _, err := s.RegisterAccount("foo"); err == nil {
t.Fatal("Expected an error registering 'foo' twice")
}
}
func TestAccountIsolation(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
cfoo, crFoo, _ := newClientForServer(s)
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error register client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error register client with 'bar' account: %v", err)
}
// Make sure they are different accounts/sl.
if cfoo.acc == cbar.acc {
t.Fatalf("Error, accounts the same for both clients")
}
// Now do quick test that makes sure messages do not cross over.
// setup bar as a foo subscriber.
go cbar.parse([]byte("SUB foo 1\r\nPING\r\nPING\r\n"))
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q", l)
}
go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"))
l, err = crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'foo' from server: %v", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crFoo, []byte("hello\r\n"), t)
// Now make sure nothing shows up on bar.
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q", l)
}
}
func TestAccountFromOptions(t *testing.T) {
opts := defaultServerOptions
opts.Accounts = []*Account{
&Account{Name: "foo"},
&Account{Name: "bar"},
}
s := New(&opts)
if la := len(s.accounts); la != 2 {
t.Fatalf("Expected to have a server with two accounts, got %v", la)
}
// Check that sl is filled in.
fooAcc := s.LookupAccount("foo")
barAcc := s.LookupAccount("bar")
if fooAcc == nil || barAcc == nil {
t.Fatalf("Error retrieving accounts for 'foo' and 'bar'")
}
if fooAcc.sl == nil || barAcc.sl == nil {
t.Fatal("Expected Sublists to be filled in on Opts.Accounts")
}
}
func TestNewAccountsFromClients(t *testing.T) {
opts := defaultServerOptions
s := New(&opts)
c, cr, _ := newClientForServer(s)
connectOp := []byte("CONNECT {\"account\":\"foo\"}\r\n")
go c.parse(connectOp)
l, _ := cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
opts.AllowNewAccounts = true
s = New(&opts)
c, cr, _ = newClientForServer(s)
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received an error trying to connect: %v", err)
}
go c.parse([]byte("PING\r\n"))
l, err = cr.ReadString('\n')
if err != nil {
t.Fatalf("Error reading response for client from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q", l)
}
}
// Clients can ask that the account be forced to be new. If it exists this is an error.
func TestNewAccountRequireNew(t *testing.T) {
// This has foo and bar accounts already.
s, _, _ := simpleAccountServer(t)
c, cr, _ := newClientForServer(s)
connectOp := []byte("CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n")
go c.parse(connectOp)
l, _ := cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
// Now allow new accounts on the fly, make sure second time does not work.
opts := defaultServerOptions
opts.AllowNewAccounts = true
s = New(&opts)
c, _, _ = newClientForServer(s)
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received an error trying to create an account: %v", err)
}
c, cr, _ = newClientForServer(s)
go c.parse(connectOp)
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
}
func accountNameExists(name string, accounts []*Account) bool {
for _, acc := range accounts {
if strings.Compare(acc.Name, name) == 0 {
return true
}
}
return false
}
func TestAccountSimpleConfig(t *testing.T) {
confFileName := createConfFile(t, []byte(`accounts = [foo, bar]`))
defer os.Remove(confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Received an error processing config file: %v", err)
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
}
if !accountNameExists("foo", opts.Accounts) {
t.Fatal("Expected a 'foo' account")
}
if !accountNameExists("bar", opts.Accounts) {
t.Fatal("Expected a 'bar' account")
}
// Make sure double entries is an error.
confFileName = createConfFile(t, []byte(`accounts = [foo, foo]`))
defer os.Remove(confFileName)
_, err = ProcessConfigFile(confFileName)
if err == nil {
t.Fatalf("Expected an error with double account entries")
}
}
func TestAccountParseConfig(t *testing.T) {
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: alice, password: foo}
{user: bob, password: bar}
]
}
nats.io {
users = [
{user: derek, password: foo}
{user: ivan, password: bar}
]
}
}
`))
defer os.Remove(confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Received an error processing config file: %v", err)
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
}
if lu := len(opts.Users); lu != 4 {
t.Fatalf("Expected 4 total Users, got %d", lu)
}
var natsAcc *Account
for _, acc := range opts.Accounts {
if acc.Name == "nats.io" {
natsAcc = acc
break
}
}
if natsAcc == nil {
t.Fatalf("Error retrieving account for 'nats.io'")
}
for _, u := range opts.Users {
if u.Username == "derek" {
if u.Account != natsAcc {
t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account)
}
}
}
}
func TestAccountParseConfigDuplicateUsers(t *testing.T) {
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: alice, password: foo}
{user: bob, password: bar}
]
}
nats.io {
users = [
{user: alice, password: bar}
]
}
}
`))
defer os.Remove(confFileName)
_, err := ProcessConfigFile(confFileName)
if err == nil {
t.Fatalf("Expected an error with double user entries")
}
}
func TestAccountParseConfigImportsExports(t *testing.T) {
opts, err := ProcessConfigFile("./configs/accounts.conf")
if err != nil {
t.Fatal(err)
}
if la := len(opts.Accounts); la != 3 {
t.Fatalf("Expected to see 3 accounts in opts, got %d", la)
}
if lu := len(opts.Nkeys); lu != 4 {
t.Fatalf("Expected 4 total Nkey users, got %d", lu)
}
if lu := len(opts.Users); lu != 0 {
t.Fatalf("Expected no Users, got %d", lu)
}
var natsAcc, synAcc *Account
for _, acc := range opts.Accounts {
if acc.Name == "nats.io" {
natsAcc = acc
} else if acc.Name == "synadia" {
synAcc = acc
}
}
if natsAcc == nil {
t.Fatalf("Error retrieving account for 'nats.io'")
}
if natsAcc.Nkey != "AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE" {
t.Fatalf("Expected nats account to have an nkey, got %q\n", natsAcc.Nkey)
}
// Check user assigned to the correct account.
for _, nk := range opts.Nkeys {
if nk.Nkey == "UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW" {
if nk.Account != natsAcc {
t.Fatalf("Expected user to be associated with natsAcc, got %q\n", nk.Account.Name)
}
break
}
}
// Now check for the imports and exports of streams and services.
if lis := len(natsAcc.imports.streams); lis != 2 {
t.Fatalf("Expected 2 imported streams, got %d\n", lis)
}
if lis := len(natsAcc.imports.services); lis != 1 {
t.Fatalf("Expected 1 imported service, got %d\n", lis)
}
if les := len(natsAcc.exports.services); les != 1 {
t.Fatalf("Expected 1 exported service, got %d\n", les)
}
if les := len(natsAcc.exports.streams); les != 0 {
t.Fatalf("Expected no exported streams, got %d\n", les)
}
if synAcc == nil {
t.Fatalf("Error retrieving account for 'synadia'")
}
if lis := len(synAcc.imports.streams); lis != 0 {
t.Fatalf("Expected no imported streams, got %d\n", lis)
}
if lis := len(synAcc.imports.services); lis != 1 {
t.Fatalf("Expected 1 imported service, got %d\n", lis)
}
if les := len(synAcc.exports.services); les != 2 {
t.Fatalf("Expected 2 exported service, got %d\n", les)
}
if les := len(synAcc.exports.streams); les != 2 {
t.Fatalf("Expected 2 exported streams, got %d\n", les)
}
}
func TestImportExportConfigFailures(t *testing.T) {
// Import from unknow account
cf := createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{stream: {account: "synadia", subject:"foo"}}]
}
}
`))
defer os.Remove(cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import from unknown account")
}
// Import a service with no account.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{service: subject:"foo.*"}]
}
}
`))
defer os.Remove(cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import of a service with no account")
}
// Import a service with a wildcard subject.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{service: {account: "nats.io", subject:"foo.*"}]
}
}
`))
defer os.Remove(cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import of a service with wildcard subject")
}
// Export with unknown keyword.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
exports = [{service: "foo.*", wat:true}]
}
}
`))
defer os.Remove(cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with export with unknown keyword")
}
// Import with unknown keyword.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{stream: {account: nats.io, subject: "foo.*"}, wat:true}]
}
}
`))
defer os.Remove(cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import with unknown keyword")
}
// Export with an account.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
exports = [{service: {account: nats.io, subject:"foo.*"}}]
}
}
`))
defer os.Remove(cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with export with account")
}
}
func TestImportAuthorized(t *testing.T) {
_, foo, bar := simpleAccountServer(t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.>"), false, t)
foo.addStreamExport("foo", isPublicExport)
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t)
foo.addStreamExport("*", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), false, t)
// Reset and test '>' public export
_, foo, bar = simpleAccountServer(t)
foo.addStreamExport(">", nil)
// Everything should work.
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.>"), true, t)
// Reset and test pwc and fwc
s, foo, bar := simpleAccountServer(t)
foo.addStreamExport("foo.*.baz.>", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.22.baz.22"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ""), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.*.*"), false, t)
// Make sure we match the account as well
fb, _ := s.RegisterAccount("foobar")
bz, _ := s.RegisterAccount("baz")
checkBool(foo.checkStreamImportAuthorized(fb, "foo.bar.baz.1"), false, t)
checkBool(foo.checkStreamImportAuthorized(bz, "foo.bar.baz.1"), false, t)
}
func TestSimpleMapping(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.nc.Close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.nc.Close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Test first that trying to import with no matching export permission returns an error.
if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization {
t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err)
}
// Now map the subject space between foo and bar.
// Need to do export first.
if err := cfoo.acc.addStreamExport("foo", nil); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
// Normal Subscription on bar client.
go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n"))
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
checkMsg := func(l, sid string) {
t.Helper()
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "import.foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != sid {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
}
// Now check we got the message from normal subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
checkMsg(l, "1")
checkPayload(crBar, []byte("hello\r\n"), t)
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
checkMsg(l, "2")
checkPayload(crBar, []byte("hello\r\n"), t)
}
func TestNoPrefixWildcardMapping(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.nc.Close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.nc.Close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
// Normal Subscription on bar client for literal "foo".
go cbar.parse([]byte("SUB foo 1\r\nPING\r\n"))
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
// Now check we got the message from normal subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
func TestPrefixWildcardMapping(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.nc.Close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.nc.Close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
// Checking that trailing '.' is accepted, tested that it is auto added above.
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
// Normal Subscription on bar client for wildcard.
go cbar.parse([]byte("SUB pub.imports.* 1\r\nPING\r\n"))
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
// Now check we got the messages from wildcard subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "pub.imports.foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.nc.Close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.nc.Close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
// Normal Subscription on bar client for wildcard.
go cbar.parse([]byte("SUB pub.imports.foo 1\r\nPING\r\n"))
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
// Now check we got the messages from wildcard subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "pub.imports.foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
func TestCrossAccountRequestReply(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, crFoo, _ := newClientForServer(s)
defer cfoo.nc.Close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.nc.Close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Add in the service import for the requests. Make it public.
if err := cfoo.acc.addServiceExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service import to client foo: %v", err)
}
// Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects.
if err := cbar.acc.addServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount {
t.Fatalf("Expected ErrMissingAccount but received %v.", err)
}
if err := cbar.acc.addServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
}
if err := cbar.acc.addServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
}
// Now add in the Route for request to be routed to the foo account.
if err := cbar.acc.addServiceImport(fooAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account route to client bar: %v", err)
}
// Now setup the resonder under cfoo
cfoo.parse([]byte("SUB test.request 1\r\n"))
// Now send the request. Remember we expect the request on our local foo. We added the route
// with that "from" and will map it to "test.request"
go cbar.parseAndFlush([]byte("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n"))
// Now read the request from crFoo
l, err := crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "test.request" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
// Make sure this looks like _INBOX
if !strings.HasPrefix(matches[REPLY_INDEX], "_INBOX.") {
t.Fatalf("Expected an _INBOX.* like reply, got '%s'", matches[REPLY_INDEX])
}
checkPayload(crFoo, []byte("help\r\n"), t)
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
go cfoo.parseAndFlush([]byte(replyOp))
// Now read the response from crBar
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw = msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches = mraw[0]
if matches[SUB_INDEX] != "bar" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "11" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("22\r\n"), t)
// Make sure we have no service imports on fooAcc. An implicit one was created
// for the response but should be removed when the response was processed.
if nr := fooAcc.numServiceRoutes(); nr != 0 {
t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr)
}
}
func TestAccountMapsUsers(t *testing.T) {
// Used for the nkey users to properly sign.
seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM"
seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE"
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: derek, password: foo},
{nkey: UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR}
]
}
nats {
users = [
{user: ivan, password: bar},
{nkey: UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH}
]
}
}
`))
defer os.Remove(confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Unexpected error parsing config file: %v", err)
}
s := New(opts)
synadia := s.LookupAccount("synadia")
nats := s.LookupAccount("nats")
if synadia == nil || nats == nil {
t.Fatalf("Expected non nil accounts during lookup")
}
// Make sure a normal log in maps the accounts correctly.
c, _, _ := newClientForServer(s)
connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n")
c.parse(connectOp)
if c.acc != synadia {
t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc)
}
// Also test client sublist.
if c.sl != synadia.sl {
t.Fatalf("Expected the client's sublist to match 'synadia' account")
}
c, _, _ = newClientForServer(s)
connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
c.parse(connectOp)
if c.acc != nats {
t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc)
}
// Also test client sublist.
if c.sl != nats.sl {
t.Fatalf("Expected the client's sublist to match 'nats' account")
}
// Now test nkeys as well.
kp, _ := nkeys.FromSeed(seed1)
pubKey, _ := kp.PublicKey()
c, cr, l := newClientForServer(s)
// Check for Nonce
var info nonceInfo
err = json.Unmarshal([]byte(l[5:]), &info)
if err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
if info.Nonce == "" {
t.Fatalf("Expected a non-empty nonce with nkeys defined")
}
sigraw, err := kp.Sign([]byte(info.Nonce))
if err != nil {
t.Fatalf("Failed signing nonce: %v", err)
}
sig := base64.StdEncoding.EncodeToString(sigraw)
// PING needed to flush the +OK to us.
cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
go c.parse([]byte(cs))
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "+OK") {
t.Fatalf("Expected an OK, got: %v", l)
}
if c.acc != synadia {
t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc)
}
// Also test client sublist.
if c.sl != synadia.sl {
t.Fatalf("Expected the client's sublist to match 'synadia' account")
}
// Now nats account nkey user.
kp, _ = nkeys.FromSeed(seed2)
pubKey, _ = kp.PublicKey()
c, cr, l = newClientForServer(s)
// Check for Nonce
err = json.Unmarshal([]byte(l[5:]), &info)
if err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
if info.Nonce == "" {
t.Fatalf("Expected a non-empty nonce with nkeys defined")
}
sigraw, err = kp.Sign([]byte(info.Nonce))
if err != nil {
t.Fatalf("Failed signing nonce: %v", err)
}
sig = base64.StdEncoding.EncodeToString(sigraw)
// PING needed to flush the +OK to us.
cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
go c.parse([]byte(cs))
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "+OK") {
t.Fatalf("Expected an OK, got: %v", l)
}
if c.acc != nats {
t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc)
}
// Also test client sublist.
if c.sl != nats.sl {
t.Fatalf("Expected the client's sublist to match 'nats' account")
}
}
func BenchmarkNewRouteReply(b *testing.B) {
opts := defaultServerOptions
s := New(&opts)
c, _, _ := newClientForServer(s)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.newServiceReply()
}
}

View File

@@ -16,8 +16,8 @@ package server
import (
"crypto/tls"
"encoding/base64"
"fmt"
"strings"
"sync"
"github.com/nats-io/nkeys"
"golang.org/x/crypto/bcrypt"
@@ -39,17 +39,237 @@ type ClientAuthentication interface {
RegisterUser(*User)
}
// Accounts
type Account struct {
Name string
Nkey string
mu sync.RWMutex
sl *Sublist
imports importMap
exports exportMap
}
// Import stream mapping struct
type streamImport struct {
acc *Account
from string
prefix string
}
// Import service mapping struct
type serviceImport struct {
acc *Account
from string
to string
ae bool
}
// importMap tracks the imported streams and services.
type importMap struct {
streams map[string]*streamImport
services map[string]*serviceImport // TODO(dlc) sync.Map may be better.
}
// exportMap tracks the exported streams and services.
type exportMap struct {
streams map[string]map[string]*Account
services map[string]map[string]*Account
}
func (a *Account) addServiceExport(subject string, accounts []*Account) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.exports.services == nil {
a.exports.services = make(map[string]map[string]*Account)
}
ma := a.exports.services[subject]
if accounts != nil && ma == nil {
ma = make(map[string]*Account)
}
for _, a := range accounts {
ma[a.Name] = a
}
a.exports.services[subject] = ma
return nil
}
// numServiceRoutes returns the number of service routes on this account.
func (a *Account) numServiceRoutes() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.imports.services)
}
// This will add a route to an account to send published messages / requests
// to the destination account. From is the local subject to map, To is the
// subject that will appear on the destination account. Destination will need
// to have an import rule to allow access via addService.
func (a *Account) addServiceImport(destination *Account, from, to string) error {
if destination == nil {
return ErrMissingAccount
}
// Empty means use from.
if to == "" {
to = from
}
if !IsValidLiteralSubject(from) || !IsValidLiteralSubject(to) {
return ErrInvalidSubject
}
// First check to see if the account has authorized us to route to the "to" subject.
if !destination.checkServiceImportAuthorized(a, to) {
return ErrServiceImportAuthorization
}
return a.addImplicitServiceImport(destination, from, to, false)
}
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(subject string) {
a.mu.Lock()
delete(a.imports.services, subject)
a.mu.Unlock()
}
// Add a route to connect from an implicit route created for a response to a request.
// This does no checks and should be only called by the msg processing code. Use addRoute
// above if responding to user input or config, etc.
func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool) error {
a.mu.Lock()
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
}
a.imports.services[from] = &serviceImport{destination, from, to, autoexpire}
a.mu.Unlock()
return nil
}
// addStreamImport will add in the stream import from a specific account.
func (a *Account) addStreamImport(account *Account, from, prefix string) error {
if account == nil {
return ErrMissingAccount
}
// First check to see if the account has authorized export of the subject.
if !account.checkStreamImportAuthorized(a, from) {
return ErrStreamImportAuthorization
}
a.mu.Lock()
defer a.mu.Unlock()
if a.imports.streams == nil {
a.imports.streams = make(map[string]*streamImport)
}
if prefix != "" && prefix[len(prefix)-1] != btsep {
prefix = prefix + string(btsep)
}
// TODO(dlc) - collisions, etc.
a.imports.streams[from] = &streamImport{account, from, prefix}
return nil
}
// Placeholder to denote public export.
var isPublicExport = []*Account(nil)
// addExport will add an export to the account. If accounts is nil
// it will signify a public export, meaning anyone can impoort.
func (a *Account) addStreamExport(subject string, accounts []*Account) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.exports.streams == nil {
a.exports.streams = make(map[string]map[string]*Account)
}
var ma map[string]*Account
for _, aa := range accounts {
if ma == nil {
ma = make(map[string]*Account, len(accounts))
}
ma[aa.Name] = aa
}
a.exports.streams[subject] = ma
return nil
}
// Check if another account is authorized to import from us.
func (a *Account) checkStreamImportAuthorized(account *Account, subject string) bool {
// Find the subject in the exports list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports.streams == nil || !IsValidSubject(subject) {
return false
}
// Check direct match of subject first
am, ok := a.exports.streams[subject]
if ok {
// if am is nil that denotes a public export
if am == nil {
return true
}
// If we have a matching account we are authorized
_, ok := am[account.Name]
return ok
}
// ok if we are here we did not match directly so we need to test each one.
// The import subject arg has to take precedence, meaning the export
// has to be a true subset of the import claim. We already checked for
// exact matches above.
tokens := strings.Split(subject, tsep)
for subj, am := range a.exports.streams {
if isSubsetMatch(tokens, subj) {
if am == nil {
return true
}
_, ok := am[account.Name]
return ok
}
}
return false
}
// Check if another account is authorized to route requests to this service.
func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool {
// Find the subject in the services list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports.services == nil || !IsValidLiteralSubject(subject) {
return false
}
// These are always literal subjects so just lookup.
am, ok := a.exports.services[subject]
if !ok {
return false
}
// Check to see if we are public or if we need to search for the account.
if am == nil {
return true
}
// Check that we allow this account.
_, ok = am[account.Name]
return ok
}
// Nkey is for multiple nkey based users
type NkeyUser struct {
Nkey string `json:"user"`
Permissions *Permissions `json:"permissions"`
Permissions *Permissions `json:"permissions,omitempty"`
Account *Account `json:"account,omitempty"`
}
// User is for multiple accounts/users.
type User struct {
Username string `json:"user"`
Password string `json:"password"`
Permissions *Permissions `json:"permissions"`
Permissions *Permissions `json:"permissions,omitempty"`
Account *Account `json:"account,omitempty"`
}
// clone performs a deep copy of the User struct, returning a new clone with
@@ -146,7 +366,7 @@ func (s *Server) checkAuthforWarnings() {
}
if warn {
// Warning about using plaintext passwords.
s.Warnf("Plaintext passwords detected. Use Nkeys or Bcrypt passwords in config files.")
s.Warnf("Plaintext passwords detected, use nkeys or bcrypt.")
}
}
@@ -262,6 +482,7 @@ func (s *Server) isClientAuthorized(c *client) bool {
if err := pub.Verify(c.nonce, sig); err != nil {
return false
}
c.RegisterNkeyUser(nkey)
return true
}
@@ -309,35 +530,6 @@ func (s *Server) isRouterAuthorized(c *client) bool {
return true
}
// removeUnauthorizedSubs removes any subscriptions the client has that are no
// longer authorized, e.g. due to a config reload.
func (s *Server) removeUnauthorizedSubs(c *client) {
c.mu.Lock()
if c.perms == nil {
c.mu.Unlock()
return
}
subs := make(map[string]*subscription, len(c.subs))
for sid, sub := range c.subs {
subs[sid] = sub
}
c.mu.Unlock()
for sid, sub := range subs {
if !c.canSubscribe(sub.subject) {
_ = s.sl.Remove(sub)
c.mu.Lock()
delete(c.subs, sid)
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
sub.subject, sub.sid))
s.Noticef("Removed sub %q for user %q - not authorized",
string(sub.subject), c.opts.Username)
}
}
}
// Support for bcrypt stored passwords and tokens.
const bcryptPrefix = "$2a$"

View File

@@ -22,6 +22,7 @@ import (
"math/rand"
"net"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
@@ -144,6 +145,8 @@ type client struct {
ncs string
out outbound
srv *Server
acc *Account
sl *Sublist
subs map[string]*subscription
perms *permissions
in readCache
@@ -235,8 +238,11 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
return &state
}
// This is the main subscription struct that indicates
// interest in published messages.
type subscription struct {
client *client
im *streamImport // This is for importing support.
subject []byte
queue []byte
sid []byte
@@ -258,6 +264,8 @@ type clientOpts struct {
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Account string `json:"account,omitempty"`
AccountNew bool `json:"new_account,omitempty"`
// Routes only
Import *SubjectPermission `json:"import,omitempty"`
@@ -313,22 +321,62 @@ func (c *client) initClient() {
}
}
// RegisterWithAccount will register the given user with a specific
// account. This will change the subject namespace.
func (c *client) registerWithAccount(acc *Account) error {
if acc == nil || acc.sl == nil {
return ErrBadAccount
}
c.mu.Lock()
c.acc = acc
c.sl = acc.sl
c.mu.Unlock()
return nil
}
// RegisterUser allows auth to call back into a new client
// with the authenticated user. This is used to map any permissions
// into the client.
// with the authenticated user. This is used to map
// any permissions into the client and setup accounts.
func (c *client) RegisterUser(user *User) {
c.mu.Lock()
defer c.mu.Unlock()
// Register with proper account and sublist.
if user.Account != nil {
c.acc = user.Account
c.sl = c.acc.sl
}
// Assign permissions.
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
c.mu.Lock()
c.perms = nil
c.mu.Unlock()
return
}
// Process Permissions and map into client connection structures.
c.setPermissions(user.Permissions)
}
// RegisterNkey allows auth to call back into a new nkey
// client with the authenticated user. This is used to map
// any permissions into the client and setup accounts.
func (c *client) RegisterNkeyUser(user *NkeyUser) {
c.mu.Lock()
defer c.mu.Unlock()
// Register with proper account and sublist.
if user.Account != nil {
c.acc = user.Account
c.sl = c.acc.sl
}
// Assign permissions.
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
c.perms = nil
return
}
c.setPermissions(user.Permissions)
}
@@ -770,6 +818,8 @@ func (c *client) processConnect(arg []byte) error {
proto := c.opts.Protocol
verbose := c.opts.Verbose
lang := c.opts.Lang
account := c.opts.Account
accountNew := c.opts.AccountNew
c.mu.Unlock()
if srv != nil {
@@ -788,6 +838,38 @@ func (c *client) processConnect(arg []byte) error {
c.authViolation()
return ErrAuthorization
}
// Check for Account designation
if account != "" {
var acc *Account
var wasNew bool
if !srv.newAccountsAllowed() {
acc = srv.LookupAccount(account)
if acc == nil {
c.Errorf(ErrMissingAccount.Error())
c.sendErr("Account Not Found")
return ErrMissingAccount
} else if accountNew {
c.Errorf(ErrAccountExists.Error())
c.sendErr(ErrAccountExists.Error())
return ErrAccountExists
}
} else {
// We can create this one on the fly.
acc, wasNew = srv.LookupOrRegisterAccount(account)
if accountNew && !wasNew {
c.Errorf(ErrAccountExists.Error())
c.sendErr(ErrAccountExists.Error())
return ErrAccountExists
}
}
// If we are here we can register ourselves with the new account.
if err := c.registerWithAccount(acc); err != nil {
c.Errorf("Problem registering with account [%s]", account)
c.sendErr("Account Failed Registration")
return ErrBadAccount
}
}
}
// Check client protocol request if it exists.
@@ -828,7 +910,18 @@ func (c *client) authTimeout() {
}
func (c *client) authViolation() {
if c.srv != nil && c.srv.getOpts().Users != nil {
var hasNkeys, hasUsers bool
if s := c.srv; s != nil {
s.mu.Lock()
hasNkeys = s.nkeys != nil
hasUsers = s.users != nil
s.mu.Unlock()
}
if hasNkeys {
c.Errorf("%s - Nkey %q",
ErrAuthorization.Error(),
c.opts.Nkey)
} else if hasUsers {
c.Errorf("%s - User %q",
ErrAuthorization.Error(),
c.opts.Username)
@@ -902,7 +995,7 @@ func (c *client) queueOutbound(data []byte) bool {
c.out.p = nil
}
// Check for a big message, and if found place directly on nb
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) <
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) < maxBufSize
if len(data) > maxBufSize {
c.out.nb = append(c.out.nb, data)
referenced = true
@@ -1227,34 +1320,45 @@ func (c *client) processSub(argo []byte) (err error) {
return nil
}
// Check if we have a maximum on the number of subscriptions.
if c.msubs > 0 && len(c.subs) >= c.msubs {
c.mu.Unlock()
c.maxSubsExceeded()
return nil
}
// Check if we have a maximum on the number of subscriptions.
// We can have two SUB protocols coming from a route due to some
// race conditions. We should make sure that we process only one.
sid := string(sub.sid)
var chkImports bool
if c.subs[sid] == nil {
c.subs[sid] = sub
if c.srv != nil {
err = c.srv.sl.Insert(sub)
if c.sl != nil {
err = c.sl.Insert(sub)
if err != nil {
delete(c.subs, sid)
} else {
if c.acc != nil {
chkImports = true
}
shouldForward = c.typ != ROUTER
}
}
}
c.mu.Unlock()
if err != nil {
c.sendErr("Invalid Subject")
return nil
} else if c.opts.Verbose {
c.sendOK()
}
if chkImports {
if err := c.checkAccountImports(sub); err != nil {
c.Errorf(err.Error())
}
}
if shouldForward {
c.srv.broadcastSubscribe(sub)
}
@@ -1262,6 +1366,48 @@ func (c *client) processSub(argo []byte) (err error) {
return nil
}
// Check to see if we need to create a shadow subscription due to imports
// in other accounts.
func (c *client) checkAccountImports(sub *subscription) error {
c.mu.Lock()
acc := c.acc
c.mu.Unlock()
if acc == nil {
return ErrMissingAccount
}
subject := string(sub.subject)
tokens := strings.Split(subject, tsep)
var rims [32]*streamImport
var ims = rims[:0]
acc.mu.RLock()
for _, im := range acc.imports.streams {
if isSubsetMatch(tokens, im.prefix+im.from) {
ims = append(ims, im)
}
}
acc.mu.RUnlock()
// Now walk through collected importMaps
for _, im := range ims {
// We have a match for a local subscription with an import from another account.
// We will create a shadow subscription.
nsub := *sub // copy
nsub.im = im
if im.prefix != "" {
// redo subject here to match subject in the publisher account space.
// Just remove prefix from what they gave us. That maps into other space.
nsub.subject = sub.subject[len(im.prefix):]
}
if err := im.acc.sl.Insert(&nsub); err != nil {
return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name)
}
}
return nil
}
// canSubscribe determines if the client is authorized to subscribe to the
// given subject. Assumes caller is holding lock.
func (c *client) canSubscribe(subject []byte) bool {
@@ -1297,8 +1443,8 @@ func (c *client) unsubscribe(sub *subscription) {
c.traceOp("<-> %s", "DELSUB", sub.sid)
delete(c.subs, string(sub.sid))
if c.srv != nil {
c.srv.sl.Remove(sub)
if c.sl != nil {
c.sl.Remove(sub)
}
// If we are a queue subscriber on a client connection and we have routes,
@@ -1362,11 +1508,11 @@ func (c *client) processUnsub(arg []byte) error {
return nil
}
func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte {
mh = append(mh, sub.sid...)
mh = append(mh, ' ')
if c.pa.reply != nil {
mh = append(mh, c.pa.reply...)
if reply != nil {
mh = append(mh, reply...)
mh = append(mh, ' ')
}
mh = append(mh, c.pa.szb...)
@@ -1518,18 +1664,34 @@ func (c *client) pubAllowed(subject []byte) bool {
return allowed
}
// prepMsgHeader will prepare the message header prefix
func (c *client) prepMsgHeader() []byte {
// Use the scratch buffer..
msgh := c.msgb[:msgHeadProtoLen]
// Used to mimic client like replies.
const (
replyPrefix = "_INBOX."
replyPrefixLen = len(replyPrefix)
digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
base = 62
)
// msg header
msgh = append(msgh, c.pa.subject...)
return append(msgh, ' ')
// newServiceReply is used when rewriting replies that cross account boundaries.
// These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients.
func (c *client) newServiceReply() []byte {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
var b = [15]byte{'_', 'I', 'N', 'B', 'O', 'X', '.'}
rn := c.in.prand.Int63()
for i, l := replyPrefixLen, rn; i < len(b); i++ {
b[i] = digits[l%base]
l /= base
}
return b[:]
}
// processMsg is called to process an inbound msg from a client.
func (c *client) processMsg(msg []byte) {
func (c *client) processInboundMsg(msg []byte) {
// Snapshot server.
srv := c.srv
@@ -1562,20 +1724,19 @@ func (c *client) processMsg(msg []byte) {
var r *SublistResult
var ok bool
genid := atomic.LoadUint64(&srv.sl.genid)
genid := atomic.LoadUint64(&c.sl.genid)
if genid == c.in.genid && c.in.results != nil {
r, ok = c.in.results[string(c.pa.subject)]
} else {
// reset our L1 completely.
// Reset our L1 completely.
c.in.results = make(map[string]*SublistResult)
c.in.genid = genid
}
if !ok {
subject := string(c.pa.subject)
r = srv.sl.Match(subject)
c.in.results[subject] = r
r = c.sl.Match(string(c.pa.subject))
c.in.results[string(c.pa.subject)] = r
// Prune the results cache. Keeps us from unbounded growth.
if len(c.in.results) > maxResultCacheSize {
n := 0
@@ -1588,6 +1749,29 @@ func (c *client) processMsg(msg []byte) {
}
}
// Check to see if we need to route this message to
// another account.
if c.typ == CLIENT && c.acc != nil && c.acc.imports.services != nil {
c.acc.mu.RLock()
rm := c.acc.imports.services[string(c.pa.subject)]
c.acc.mu.RUnlock()
// Get the results from the other account for the mapped "to" subject.
if rm != nil && rm.acc != nil && rm.acc.sl != nil {
var nrr []byte
if rm.ae {
c.acc.removeServiceImport(rm.from)
}
if c.pa.reply != nil {
// We want to remap this to provide anonymity.
nrr = c.newServiceReply()
rm.acc.addImplicitServiceImport(c.acc, string(nrr), string(c.pa.reply), true)
}
// FIXME(dlc) - Do L1 cache trick from above.
rr := rm.acc.sl.Match(rm.to)
c.processMsgResults(rr, msg, []byte(rm.to), nrr)
}
}
// This is the fanout scale.
fanout := len(r.psubs) + len(r.qsubs)
@@ -1597,12 +1781,18 @@ func (c *client) processMsg(msg []byte) {
}
if c.typ == ROUTER {
c.processRoutedMsg(r, msg)
return
c.processRoutedMsgResults(r, msg)
} else if c.typ == CLIENT {
c.processMsgResults(r, msg, c.pa.subject, c.pa.reply)
}
}
// Client connection processing here.
msgh := c.prepMsgHeader()
// This processes the sublist results for a given message.
func (c *client) processMsgResults(r *SublistResult, msg, subject, reply []byte) {
// msg header
msgh := c.msgb[:msgHeadProtoLen]
msgh = append(msgh, subject...)
msgh = append(msgh, ' ')
si := len(msgh)
// Used to only send messages once across any given route.
@@ -1616,7 +1806,7 @@ func (c *client) processMsg(msg []byte) {
if sub.client.typ == ROUTER {
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, srv.numRoutes())
rmap = make(map[string]struct{}, c.srv.numRoutes())
}
sub.client.mu.Lock()
if sub.client.nc == nil ||
@@ -1634,8 +1824,17 @@ func (c *client) processMsg(msg []byte) {
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
}
// Check for import mapped subs
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
msgh := c.msgb[:msgHeadProtoLen]
msgh = append(msgh, sub.im.prefix...)
msgh = append(msgh, c.pa.subject...)
msgh = append(msgh, ' ')
si = len(msgh)
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
mh := c.msgHeader(msgh[:si], sub, reply)
c.deliverMsg(sub, mh, msg)
}
@@ -1644,6 +1843,7 @@ func (c *client) processMsg(msg []byte) {
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
@@ -1654,7 +1854,16 @@ func (c *client) processMsg(msg []byte) {
index := (startIndex + i) % len(qsubs)
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
// Check for mapped subs
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
msgh := c.msgb[:msgHeadProtoLen]
msgh = append(msgh, sub.im.prefix...)
msgh = append(msgh, c.pa.subject...)
msgh = append(msgh, ' ')
si = len(msgh)
}
mh := c.msgHeader(msgh[:si], sub, reply)
if c.deliverMsg(sub, mh, msg) {
break
}
@@ -1782,6 +1991,45 @@ func (c *client) typeString() string {
return "Unknown Type"
}
// removeUnauthorizedSubs removes any subscriptions the client has that are no
// longer authorized, e.g. due to a config reload.
func (c *client) removeUnauthorizedSubs() {
c.mu.Lock()
if c.perms == nil || c.sl == nil {
c.mu.Unlock()
return
}
srv := c.srv
var subsa [32]*subscription
subs := subsa[:0]
for _, sub := range c.subs {
subs = append(subs, sub)
}
var removedSubs [32]*subscription
removed := removedSubs[:0]
for _, sub := range subs {
if !c.canSubscribe(sub.subject) {
removed = append(removed, sub)
delete(c.subs, string(sub.sid))
}
}
c.mu.Unlock()
// Remove unauthorized clients subscriptions.
c.sl.RemoveBatch(removed)
// Report back to client and logs.
for _, sub := range removed {
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
sub.subject, sub.sid))
srv.Noticef("Removed sub %q for user %q - not authorized",
string(sub.subject), c.opts.Username)
}
}
func (c *client) closeConnection(reason ClosedState) {
c.mu.Lock()
if c.nc == nil {
@@ -1820,10 +2068,13 @@ func (c *client) closeConnection(reason ClosedState) {
c.mu.Unlock()
// Remove clients subscriptions.
c.sl.RemoveBatch(subs)
if srv != nil {
// This is a route that disconnected...
if len(connectURLs) > 0 {
// Unless disabled, possibly update the server's INFO protcol
// Unless disabled, possibly update the server's INFO protocol
// and send to clients that know how to handle async INFOs.
if !srv.getOpts().Cluster.NoAdvertise {
srv.removeClientConnectURLsAndSendINFOToClients(connectURLs)
@@ -1833,9 +2084,8 @@ func (c *client) closeConnection(reason ClosedState) {
// Unregister
srv.removeClient(c)
// Remove clients subscriptions.
srv.sl.RemoveBatch(subs)
if c.typ == CLIENT {
// Remove remote subscriptions.
if c.typ != ROUTER {
// Forward UNSUBs protocols to all routes
srv.broadcastUnSubscribeBatch(subs)
}

View File

@@ -52,6 +52,19 @@ func createClientAsync(ch chan *client, s *Server, cli net.Conn) {
}()
}
func newClientForServer(s *Server) (*client, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, maxBufSize)
ch := make(chan *client)
createClientAsync(ch, s, srv)
// So failing tests don't just hang.
cli.SetReadDeadline(time.Now().Add(2 * time.Second))
l, _ := cr.ReadString('\n')
// Grab client
c := <-ch
return c, cr, l
}
var defaultServerOptions = Options{
Trace: false,
Debug: false,
@@ -272,6 +285,7 @@ const (
)
func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) {
t.Helper()
// Read in payload
d := make([]byte, len(expected))
n, err := cr.Read(d)
@@ -384,13 +398,19 @@ func TestClientNoBodyPubSubWithReply(t *testing.T) {
}
}
func (c *client) parseFlushAndClose(op []byte) {
// This needs to clear any flushOutbound flags since writeLoop not running.
func (c *client) parseAndFlush(op []byte) {
c.parse(op)
for cp := range c.pcd {
cp.mu.Lock()
cp.flags.clear(flushOutbound)
cp.flushOutbound()
cp.mu.Unlock()
}
}
func (c *client) parseFlushAndClose(op []byte) {
c.parseAndFlush(op)
c.nc.Close()
}
@@ -643,17 +663,17 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) {
}()
<-ch
if s.sl.Count() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count())
if c.sl.Count() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", c.sl.Count())
}
c.closeConnection(ClientClosed)
if s.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count())
if c.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.gsl.Count())
}
}
func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
s, c, _ := setupClient()
_, c, _ := setupClient()
c.closeConnection(ClientClosed)
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
@@ -664,8 +684,8 @@ func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
}()
<-ch
if s.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count())
if c.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", c.sl.Count())
}
}

View File

@@ -0,0 +1,47 @@
accounts: {
synadia: {
nkey: ADMHMDX2LEUJRZQHGVSVRWZEJ2CPNHYO6TB4ZCZ37LXAX5SYNEW252GF
users = [
# Bob
{nkey : UC6NLCN7AS34YOJVCYD4PJ3QB7QGLYG5B5IMBT25VW5K4TNUJODM7BOX}
# Alice
{nkey : UBAAQWTW6CG2G6ANGNKB5U2B7HRWHSGMZEZX3AQSAJOQDAUGJD46LD2E}
]
exports = [
{stream: "public.>"} # No accounts means public.
{stream: "synadia.private.>", accounts: [cncf, nats.io]}
{service: "pub.request"} # No accounts means public.
{service: "pub.special.request", accounts: [nats.io]}
]
imports = [
{service: {account: "nats.io", subject: "nats.time"}}
]
}
nats.io: {
nkey: AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE
users = [
# Ivan
{nkey : UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW}
# Derek
{nkey : UDEREK22W43P2NFQCSKGM6BWD23OVWEDR7JE7LSNCD232MZIC4X2MEKZ}
]
imports = [
{stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"}
{stream: {account: "synadia", subject:"synadia.private.*"}}
{service: {account: "synadia", subject: "pub.special.request"}, to: "synadia.request"}
]
exports = [
{service: "nats.time"}
]
}
cncf: { nkey: ABDAYEV6KZVLW3GSJ3V7IWC542676TFYILXF2C7Z56LCPSMVHJE5BVYO}
}

View File

@@ -48,4 +48,20 @@ var (
// ErrClientConnectedToRoutePort represents an error condition when a client
// attempted to connect to the route listen port.
ErrClientConnectedToRoutePort = errors.New("Attempted To Connect To Route Port")
// ErrAccountExists is returned when an account is attempted to be registered
// but already exists.
ErrAccountExists = errors.New("Account Exists")
// ErrBadAccount represents a malformed or incorrect account.
ErrBadAccount = errors.New("Bad Account")
// ErrMissingAccount is returned when an account does not exist.
ErrMissingAccount = errors.New("Account Missing")
// ErrStreamImportAuthorization is returned when a stream import is not authorized.
ErrStreamImportAuthorization = errors.New("Stream Import Not Authorized")
// ErrServiceImportAuthorization is returned when a service import is not authorized.
ErrServiceImportAuthorization = errors.New("Service Import Not Authorized")
)

View File

@@ -709,14 +709,15 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
}
sz := &Subsz{s.sl.Stats(), 0, offset, limit, nil}
// FIXME(dlc) - Make account aware.
sz := &Subsz{s.gsl.Stats(), 0, offset, limit, nil}
if subdetail {
// Now add in subscription's details
var raw [4096]*subscription
subs := raw[:0]
s.sl.localSubs(&subs)
s.gsl.localSubs(&subs)
details := make([]SubDetail, len(subs))
i := 0
// TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
@@ -938,7 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.MaxPending = opts.MaxPending
v.WriteDeadline = opts.WriteDeadline
v.Subscriptions = s.sl.Count()
v.Subscriptions = s.gsl.Count()
v.ConfigLoadTime = s.configTime
// Need a copy here since s.httpReqStats can change while doing
// the marshaling down below.

View File

@@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
mrand "math/rand"
"net"
"os"
"strings"
"testing"
@@ -56,17 +55,6 @@ func mixedSetup() (*Server, *client, *bufio.Reader, string) {
return rawSetup(opts)
}
func newClientForServer(s *Server) (*client, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, maxBufSize)
ch := make(chan *client)
createClientAsync(ch, s, srv)
l, _ := cr.ReadString('\n')
// Grab client
c := <-ch
return c, cr, l
}
func TestServerInfoNonce(t *testing.T) {
_, l := setUpClientWithResponse()
if !strings.HasPrefix(l, "INFO ") {

View File

@@ -62,6 +62,8 @@ type Options struct {
MaxSubs int `json:"max_subscriptions,omitempty"`
Nkeys []*NkeyUser `json:"-"`
Users []*User `json:"-"`
Accounts []*Account `json:"-"`
AllowNewAccounts bool `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
@@ -227,7 +229,7 @@ func (e *configWarningErr) Error() string {
}
// ProcessConfigFile processes a configuration file.
// FIXME(dlc): Hacky
// FIXME(dlc): A bit hacky
func ProcessConfigFile(configFile string) (*Options, error) {
opts := &Options{}
if err := opts.ProcessConfigFile(configFile); err != nil {
@@ -306,6 +308,11 @@ func (o *Options) ProcessConfigFile(configFile string) error {
o.Trace = v.(bool)
case "logtime":
o.Logtime = v.(bool)
case "accounts":
err = parseAccounts(v, o)
if err != nil {
return err
}
case "authorization":
var auth *authorization
if pedantic {
@@ -595,6 +602,426 @@ func setClusterPermissions(opts *ClusterOpts, perms *Permissions) {
}
}
// Temp structures to hold account import and export defintions since they need
// to be processed after being parsed.
type export struct {
acc *Account
sub string
accs []string
}
type importStream struct {
acc *Account
an string
sub string
pre string
}
type importService struct {
acc *Account
an string
sub string
to string
}
// parseAccounts will parse the different accounts syntax.
func parseAccounts(v interface{}, opts *Options) error {
var (
pedantic = opts.CheckConfig
importStreams []*importStream
importServices []*importService
exportStreams []*export
exportServices []*export
)
_, v = unwrapValue(v)
switch v.(type) {
// Simple array of account names.
case []interface{}, []string:
m := make(map[string]struct{}, len(v.([]interface{})))
for _, name := range v.([]interface{}) {
ns := name.(string)
if _, ok := m[ns]; ok {
return fmt.Errorf("Duplicate Account Entry: %s", ns)
}
opts.Accounts = append(opts.Accounts, &Account{Name: ns})
m[ns] = struct{}{}
}
// More common map entry
case map[string]interface{}:
// Track users across accounts, must be unique across
// accounts and nkeys vs users.
uorn := make(map[string]struct{})
for aname, mv := range v.(map[string]interface{}) {
_, amv := unwrapValue(mv)
// These should be maps.
mv, ok := amv.(map[string]interface{})
if !ok {
return fmt.Errorf("Expected map entries for accounts")
}
acc := &Account{Name: aname}
opts.Accounts = append(opts.Accounts, acc)
for k, v := range mv {
tk, mv := unwrapValue(v)
switch strings.ToLower(k) {
case "nkey":
nk, ok := mv.(string)
if !ok || !nkeys.IsValidPublicAccountKey(nk) {
return fmt.Errorf("Not a valid public nkey for an account: %q", v)
}
acc.Nkey = nk
case "imports":
streams, services, err := parseAccountImports(mv, acc, pedantic)
if err != nil {
return err
}
importStreams = append(importStreams, streams...)
importServices = append(importServices, services...)
case "exports":
streams, services, err := parseAccountExports(mv, acc, pedantic)
if err != nil {
return err
}
exportStreams = append(exportStreams, streams...)
exportServices = append(exportServices, services...)
case "users":
var (
users []*User
err error
nkeys []*NkeyUser
)
if pedantic {
nkeys, users, err = parseUsers(tk, opts)
} else {
nkeys, users, err = parseUsers(mv, opts)
}
if err != nil {
return err
}
for _, u := range users {
if _, ok := uorn[u.Username]; ok {
return fmt.Errorf("Duplicate user %q detected", u.Username)
}
uorn[u.Username] = struct{}{}
u.Account = acc
}
opts.Users = append(opts.Users, users...)
for _, u := range nkeys {
if _, ok := uorn[u.Nkey]; ok {
return fmt.Errorf("Duplicate nkey %q detected", u.Nkey)
}
uorn[u.Nkey] = struct{}{}
u.Account = acc
}
opts.Nkeys = append(opts.Nkeys, nkeys...)
default:
if pedantic && tk != nil && !tk.IsUsedVariable() {
return &unknownConfigFieldErr{
field: k,
token: tk,
configFile: tk.SourceFile(),
}
}
}
}
}
}
// Parse Imports and Exports here after all accounts defined.
// Do exports first since they need to be defined for imports to succeed
// since we do permissions checks.
// Create a lookup map for accounts lookups.
am := make(map[string]*Account, len(opts.Accounts))
for _, a := range opts.Accounts {
am[a.Name] = a
}
// Do stream exports
for _, stream := range exportStreams {
// Make array of accounts if applicable.
var accounts []*Account
for _, an := range stream.accs {
ta := am[an]
if ta == nil {
return fmt.Errorf("%q account not defined for stream export", an)
}
accounts = append(accounts, ta)
}
if err := stream.acc.addStreamExport(stream.sub, accounts); err != nil {
return fmt.Errorf("Error adding stream export %q: %v", stream.sub, err)
}
}
for _, service := range exportServices {
// Make array of accounts if applicable.
var accounts []*Account
for _, an := range service.accs {
ta := am[an]
if ta == nil {
return fmt.Errorf("%q account not defined for service export", an)
}
accounts = append(accounts, ta)
}
if err := service.acc.addServiceExport(service.sub, accounts); err != nil {
return fmt.Errorf("Error adding service export %q: %v", service.sub, err)
}
}
for _, stream := range importStreams {
ta := am[stream.an]
if ta == nil {
return fmt.Errorf("%q account not defined for stream import", stream.an)
}
if err := stream.acc.addStreamImport(ta, stream.sub, stream.pre); err != nil {
return fmt.Errorf("Error adding stream import %q: %v", stream.sub, err)
}
}
for _, service := range importServices {
ta := am[service.an]
if ta == nil {
return fmt.Errorf("%q account not defined for service import", service.an)
}
if service.to == "" {
service.to = service.sub
}
if err := service.acc.addServiceImport(ta, service.to, service.sub); err != nil {
return fmt.Errorf("Error adding service import %q: %v", service.sub, err)
}
}
return nil
}
// Parse the account imports
func parseAccountExports(v interface{}, acc *Account, pedantic bool) ([]*export, []*export, error) {
// This should be an array of objects/maps.
_, v = unwrapValue(v)
ims, ok := v.([]interface{})
if !ok {
return nil, nil, fmt.Errorf("Exports should be an array, got %T", v)
}
var services []*export
var streams []*export
for _, v := range ims {
_, mv := unwrapValue(v)
io, ok := mv.(map[string]interface{})
if !ok {
return nil, nil, fmt.Errorf("Export Items should be a map with type entry, got %T", mv)
}
// Should have stream or service
stream, service, err := parseExportStreamOrService(io, pedantic)
if err != nil {
return nil, nil, err
}
if service != nil {
service.acc = acc
services = append(services, service)
}
if stream != nil {
stream.acc = acc
streams = append(streams, stream)
}
}
return streams, services, nil
}
// Parse the account imports
func parseAccountImports(v interface{}, acc *Account, pedantic bool) ([]*importStream, []*importService, error) {
// This should be an array of objects/maps.
_, v = unwrapValue(v)
ims, ok := v.([]interface{})
if !ok {
return nil, nil, fmt.Errorf("Imports should be an array, got %T", v)
}
var services []*importService
var streams []*importStream
for _, v := range ims {
_, mv := unwrapValue(v)
io, ok := mv.(map[string]interface{})
if !ok {
return nil, nil, fmt.Errorf("Import Items should be a map with type entry, got %T", mv)
}
// Should have stream or service
stream, service, err := parseImportStreamOrService(io, pedantic)
if err != nil {
return nil, nil, err
}
if service != nil {
service.acc = acc
services = append(services, service)
}
if stream != nil {
stream.acc = acc
streams = append(streams, stream)
}
}
return streams, services, nil
}
// Helper to parse an embedded account description for imported services or streams.
func parseAccount(v map[string]interface{}, pedantic bool) (string, string, error) {
var accountName, subject string
for mk, mv := range v {
tk, mv := unwrapValue(mv)
switch strings.ToLower(mk) {
case "account":
accountName = mv.(string)
case "subject":
subject = mv.(string)
default:
if pedantic && tk != nil && !tk.IsUsedVariable() {
return "", "", &unknownConfigFieldErr{
field: mk,
token: tk,
configFile: tk.SourceFile(),
}
}
}
}
if accountName == "" || subject == "" {
return "", "", fmt.Errorf("Expect an account name and a subject")
}
return accountName, subject, nil
}
// Parse an import stream or service.
// e.g.
// {stream: "public.>"} # No accounts means public.
// {stream: "synadia.private.>", accounts: [cncf, natsio]}
// {service: "pub.request"} # No accounts means public.
// {service: "pub.special.request", accounts: [nats.io]}
func parseExportStreamOrService(v map[string]interface{}, pedantic bool) (*export, *export, error) {
var (
curStream *export
curService *export
accounts []string
)
for mk, mv := range v {
tk, mv := unwrapValue(mv)
switch strings.ToLower(mk) {
case "stream":
if curService != nil {
return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv)
}
curStream = &export{sub: mv.(string)}
if accounts != nil {
curStream.accs = accounts
}
case "service":
if curStream != nil {
return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv)
}
mvs, ok := mv.(string)
if !ok {
return nil, nil, fmt.Errorf("Expected service to be string name, got %T", mv)
}
curService = &export{sub: mvs}
if accounts != nil {
curService.accs = accounts
}
case "accounts":
for _, iv := range mv.([]interface{}) {
_, mv := unwrapValue(iv)
accounts = append(accounts, mv.(string))
}
if curStream != nil {
curStream.accs = accounts
} else if curService != nil {
curService.accs = accounts
}
default:
if pedantic && tk != nil && !tk.IsUsedVariable() {
return nil, nil, &unknownConfigFieldErr{
field: mk,
token: tk,
configFile: tk.SourceFile(),
}
}
return nil, nil, fmt.Errorf("Unknown field %q parsing export service or stream", mk)
}
}
return curStream, curService, nil
}
// Parse an import stream or service.
// e.g.
// {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"}
// {stream: {account: "synadia", subject:"synadia.private.*"}}
// {service: {account: "synadia", subject: "pub.special.request"}, subject: "synadia.request"}
func parseImportStreamOrService(v map[string]interface{}, pedantic bool) (*importStream, *importService, error) {
var (
curStream *importStream
curService *importService
pre, to string
)
for mk, mv := range v {
tk, mv := unwrapValue(mv)
switch strings.ToLower(mk) {
case "stream":
if curService != nil {
return nil, nil, fmt.Errorf("Detected stream but already saw a service: %+v", mv)
}
ac, ok := mv.(map[string]interface{})
if !ok {
return nil, nil, fmt.Errorf("Stream entry should be an account map, got %T", mv)
}
// Make sure this is a map with account and subject
accountName, subject, err := parseAccount(ac, pedantic)
if err != nil {
return nil, nil, err
}
curStream = &importStream{an: accountName, sub: subject}
if pre != "" {
curStream.pre = pre
}
case "service":
if curStream != nil {
return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv)
}
ac, ok := mv.(map[string]interface{})
if !ok {
return nil, nil, fmt.Errorf("Service entry should be an account map, got %T", mv)
}
// Make sure this is a map with account and subject
accountName, subject, err := parseAccount(ac, pedantic)
if err != nil {
return nil, nil, err
}
curService = &importService{an: accountName, sub: subject}
if to != "" {
curService.to = to
}
case "prefix":
pre = mv.(string)
if curStream != nil {
curStream.pre = pre
}
case "to":
to = mv.(string)
if curService != nil {
curService.to = to
}
default:
if pedantic && tk != nil && !tk.IsUsedVariable() {
return nil, nil, &unknownConfigFieldErr{
field: mk,
token: tk,
configFile: tk.SourceFile(),
}
}
return nil, nil, fmt.Errorf("Unknown field %q parsing import service or stream", mk)
}
}
return curStream, curService, nil
}
// Helper function to parse Authorization configs.
func parseAuthorization(v interface{}, opts *Options) (*authorization, error) {
var (

View File

@@ -234,7 +234,7 @@ func (c *client) parse(buf []byte) error {
if len(c.msgBuf) != c.pa.size+LEN_CR_LF {
goto parseErr
}
c.processMsg(c.msgBuf)
c.processInboundMsg(c.msgBuf)
c.argBuf, c.msgBuf = nil, nil
c.drop, c.as, c.state = 0, i+1, OP_START
default:

View File

@@ -681,7 +681,7 @@ func (s *Server) reloadAuthorization() {
}
// Remove any unauthorized subscriptions.
s.removeUnauthorizedSubs(client)
client.removeUnauthorizedSubs()
}
for _, route := range routes {
@@ -762,7 +762,8 @@ func (s *Server) reloadClusterPermissions() {
subsNeedUNSUB []*subscription
deleteRoutedSubs []*subscription
)
s.sl.localSubs(&localSubs)
// FIXME(dlc) - Change for accounts.
s.gsl.localSubs(&localSubs)
// Go through all local subscriptions
for _, sub := range localSubs {
@@ -810,7 +811,8 @@ func (s *Server) reloadClusterPermissions() {
route.mu.Unlock()
}
// Remove as a batch all the subs that we have removed from each route.
s.sl.RemoveBatch(deleteRoutedSubs)
// FIXME(dlc) - Change for accounts.
s.gsl.RemoveBatch(deleteRoutedSubs)
}
// validateClusterOpts ensures the new ClusterOpts does not change host or

View File

@@ -194,7 +194,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
rsub = sub
continue
}
mh := c.msgHeader(msgh[:], sub)
mh := c.msgHeader(msgh[:], sub, c.pa.reply)
if c.deliverMsg(sub, mh, msg) {
c.Debugf("Redelivery succeeded for message on group '%q'", group)
return
@@ -203,7 +203,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
// If we are here we failed to find a local, see if we snapshotted a
// remote sub, and if so deliver to that.
if rsub != nil {
mh := c.msgHeader(msgh[:], rsub)
mh := c.msgHeader(msgh[:], rsub, c.pa.reply)
if c.deliverMsg(rsub, mh, msg) {
c.Debugf("Re-routing message on group '%q' to remote server", group)
return
@@ -212,12 +212,15 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group)
}
// processRoutedMsg processes messages inbound from a route.
func (c *client) processRoutedMsg(r *SublistResult, msg []byte) {
// processRoutedMsgResults processes messages inbound from a route.
func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) {
// Snapshot server.
srv := c.srv
msgh := c.prepMsgHeader()
// msg header
msgh := c.msgb[:msgHeadProtoLen]
msgh = append(msgh, c.pa.subject...)
msgh = append(msgh, ' ')
si := len(msgh)
// If we have a queue subscription, deliver direct
@@ -233,7 +236,7 @@ func (c *client) processRoutedMsg(r *SublistResult, msg []byte) {
}
didDeliver := false
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
mh := c.msgHeader(msgh[:si], sub, c.pa.reply)
didDeliver = c.deliverMsg(sub, mh, msg)
}
if !didDeliver && c.srv != nil {
@@ -258,7 +261,7 @@ func (c *client) processRoutedMsg(r *SublistResult, msg []byte) {
sub.client.mu.Unlock()
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
mh := c.msgHeader(msgh[:si], sub, c.pa.reply)
c.deliverMsg(sub, mh, msg)
}
}
@@ -425,7 +428,7 @@ func (s *Server) updateRemoteRoutePerms(route *client, info *Info) {
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
)
s.sl.localSubs(&localSubs)
s.gsl.localSubs(&localSubs)
route.sendRouteSubProtos(localSubs, func(sub *subscription) bool {
subj := sub.subject
@@ -578,7 +581,8 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
var raw [4096]*subscription
subs := raw[:0]
s.sl.localSubs(&subs)
// FIXME(dlc) this needs to be scoped per account when cluster proto changes.
s.gsl.localSubs(&subs)
route.mu.Lock()
closed := route.sendRouteSubProtos(subs, func(sub *subscription) bool {
@@ -691,7 +695,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
}
}
c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
c := &client{srv: s, sl: s.gsl, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
// Grab server variables
s.mu.Lock()

View File

@@ -69,13 +69,14 @@ type Server struct {
mu sync.Mutex
prand *rand.Rand
info Info
sl *Sublist
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
listener net.Listener
gsl *Sublist
accounts map[string]*Account
clients map[uint64]*client
routes map[uint64]*client
remotes map[string]*client
@@ -173,7 +174,7 @@ func New(opts *Options) *Server {
configFile: opts.ConfigFile,
info: info,
prand: rand.New(rand.NewSource(time.Now().UnixNano())),
sl: NewSublist(),
gsl: NewSublist(),
opts: opts,
done: make(chan bool, 1),
start: now,
@@ -192,6 +193,9 @@ func New(opts *Options) *Server {
// Used internally for quick look-ups.
s.clientConnectURLsMap = make(map[string]struct{})
// For tracking accounts
s.accounts = make(map[string]*Account)
// For tracking clients
s.clients = make(map[uint64]*client)
@@ -210,6 +214,9 @@ func New(opts *Options) *Server {
// to shutdown.
s.quitCh = make(chan struct{})
// Used to setup Accounts.
s.configureAccounts()
// Used to setup Authorization.
s.configureAuthorization()
@@ -232,6 +239,16 @@ func (s *Server) setOpts(opts *Options) {
s.optsMu.Unlock()
}
func (s *Server) configureAccounts() {
// Check opts and walk through them. Making sure to create SLs.
for _, acc := range s.opts.Accounts {
if acc.sl == nil {
acc.sl = NewSublist()
}
s.accounts[acc.Name] = acc
}
}
func (s *Server) generateRouteInfoJSON() {
b, _ := json.Marshal(s.routeInfo)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
@@ -281,6 +298,53 @@ func (s *Server) logPid() error {
return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660)
}
// newAccountsAllowed returns whether or not new accounts can be created on the fly.
func (s *Server) newAccountsAllowed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.opts.AllowNewAccounts
}
// LookupOrRegisterAccount will return the given account if known or create a new entry.
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
s.mu.Lock()
defer s.mu.Unlock()
if acc, ok := s.accounts[name]; ok {
return acc, false
}
acc := &Account{
Name: name,
sl: NewSublist(),
}
s.accounts[name] = acc
return acc, true
}
// RegisterAccount will register an account. The account must be new
// or this call will fail.
func (s *Server) RegisterAccount(name string) (*Account, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.accounts[name]; ok {
return nil, ErrAccountExists
}
acc := &Account{
Name: name,
sl: NewSublist(),
}
s.accounts[name] = acc
return acc, nil
}
// LookupAccount is a public function to return the account structure
// associated with name.
func (s *Server) LookupAccount(name string) *Account {
s.mu.Lock()
defer s.mu.Unlock()
return s.accounts[name]
}
// Start up the server, this will block.
// Start via a Go routine if needed.
func (s *Server) Start() {
@@ -778,7 +842,7 @@ func (s *Server) createClient(conn net.Conn) *client {
max_subs := opts.MaxSubs
now := time.Now()
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now}
c := &client{srv: s, sl: s.gsl, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now}
// Grab JSON info string
s.mu.Lock()
@@ -1088,7 +1152,13 @@ func (s *Server) getClient(cid uint64) *client {
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()
subs := s.sl.Count()
var subs uint32
for _, acc := range s.accounts {
if acc.sl != nil {
subs += acc.sl.Count()
}
}
subs += s.gsl.Count()
s.mu.Unlock()
return subs
}

View File

@@ -24,8 +24,8 @@ func TestSplitBufferSubOp(t *testing.T) {
defer cli.Close()
defer trash.Close()
s := &Server{sl: NewSublist()}
c := &client{srv: s, subs: make(map[string]*subscription), nc: cli}
s := &Server{gsl: NewSublist()}
c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription), nc: cli}
subop := []byte("SUB foo 1\r\n")
subop1 := subop[:6]
@@ -43,7 +43,7 @@ func TestSplitBufferSubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match("foo")
r := s.gsl.Match("foo")
if r == nil || len(r.psubs) != 1 {
t.Fatalf("Did not match subscription properly: %+v\n", r)
}
@@ -60,7 +60,7 @@ func TestSplitBufferSubOp(t *testing.T) {
}
func TestSplitBufferUnsubOp(t *testing.T) {
s := &Server{sl: NewSublist()}
s := &Server{gsl: NewSublist()}
c := &client{srv: s, subs: make(map[string]*subscription)}
subop := []byte("SUB foo 1024\r\n")
@@ -87,7 +87,7 @@ func TestSplitBufferUnsubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match("foo")
r := s.gsl.Match("foo")
if r != nil && len(r.psubs) != 0 {
t.Fatalf("Should be no subscriptions in results: %+v\n", r)
}
@@ -300,7 +300,7 @@ func TestSplitConnectArg(t *testing.T) {
func TestSplitDanglingArgBuf(t *testing.T) {
s := New(&defaultServerOptions)
c := &client{srv: s, subs: make(map[string]*subscription)}
c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription)}
// We test to make sure we do not dangle any argBufs after processing
// since that could lead to performance issues.

View File

@@ -712,6 +712,52 @@ func IsValidLiteralSubject(subject string) bool {
return true
}
// This will test a subject as an array of tokens against a test subject
// and determine if the tokens are matched. Both test subject and tokens
// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"],
// but not of foo.bar, etc.
func isSubsetMatch(tokens []string, test string) bool {
tsa := [32]string{}
tts := tsa[:0]
start := 0
for i := 0; i < len(test); i++ {
if test[i] == btsep {
tts = append(tts, test[start:i])
start = i + 1
}
}
tts = append(tts, test[start:])
// Walk the target tokens
for i, t2 := range tts {
if i >= len(tokens) {
return false
}
if t2[0] == fwc && len(t2) == 1 {
return true
}
t1 := tokens[i]
if t1[0] == fwc && len(t1) == 1 {
return false
}
if t1[0] == pwc && len(t1) == 1 {
m := t2[0] == pwc && len(t2) == 1
if !m {
return false
}
if i >= len(tts) {
return true
} else {
continue
}
}
if t2[0] != pwc && strings.Compare(t1, t2) != 0 {
return false
}
}
return len(tokens) == len(tts)
}
// matchLiteral is used to test literal subjects, those that do not have any
// wildcards, with a target subject. This is used in the cache layer.
func matchLiteral(literal, subject string) bool {
@@ -725,7 +771,7 @@ func matchLiteral(literal, subject string) bool {
// This function has been optimized for speed.
// For instance, do not set b:=subject[i] here since
// we may bump `i` in this loop to avoid `continue` or
// skiping common test in a particular test.
// skipping common test in a particular test.
// Run Benchmark_SublistMatchLiteral before making any change.
switch subject[i] {
case pwc:

View File

@@ -23,8 +23,6 @@ import (
"testing"
"time"
dbg "runtime/debug"
"github.com/nats-io/nuid"
)
@@ -421,8 +419,8 @@ func TestSublistBasicQueueResults(t *testing.T) {
}
func checkBool(b, expected bool, t *testing.T) {
t.Helper()
if b != expected {
dbg.PrintStack()
t.Fatalf("Expected %v, but got %v\n", expected, b)
}
}