mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Basic account mapping via import and export
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -19,42 +19,40 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func simpleAccountServer(t *testing.T) *Server {
|
||||
func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
|
||||
opts := defaultServerOptions
|
||||
s := New(&opts)
|
||||
|
||||
// Now create two accounts.
|
||||
_, err := s.RegisterAccount("foo")
|
||||
f, err := s.RegisterAccount("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating account 'foo': %v", err)
|
||||
}
|
||||
_, err = s.RegisterAccount("bar")
|
||||
b, err := s.RegisterAccount("bar")
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating account 'bar': %v", err)
|
||||
}
|
||||
return s
|
||||
return s, f, b
|
||||
}
|
||||
|
||||
func TestRegisterDuplicateAccounts(t *testing.T) {
|
||||
s := simpleAccountServer(t)
|
||||
s, _, _ := simpleAccountServer(t)
|
||||
if _, err := s.RegisterAccount("foo"); err == nil {
|
||||
t.Fatal("Expected an error registering 'foo' twice")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountIsolation(t *testing.T) {
|
||||
s := simpleAccountServer(t)
|
||||
fooAcc := s.LookupAccount("foo")
|
||||
barAcc := s.LookupAccount("bar")
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
if fooAcc == nil || barAcc == nil {
|
||||
t.Fatalf("Error retrieving accounts for 'foo' and 'bar'")
|
||||
}
|
||||
cfoo, crFoo, _ := newClientForServer(s)
|
||||
if err := cfoo.RegisterWithAccount(fooAcc); err != nil {
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error register client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
if err := cbar.RegisterWithAccount(barAcc); err != nil {
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error register client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
@@ -146,7 +144,7 @@ func TestNewAccountsFromClients(t *testing.T) {
|
||||
// Clients can ask that the account be forced to be new. If it exists this is an error.
|
||||
func TestNewAccountRequireNew(t *testing.T) {
|
||||
// This has foo and bar accounts already.
|
||||
s := simpleAccountServer(t)
|
||||
s, _, _ := simpleAccountServer(t)
|
||||
|
||||
c, cr, _ := newClientForServer(s)
|
||||
connectOp := []byte("CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n")
|
||||
@@ -284,3 +282,293 @@ func TestAccountParseConfigDuplicateUsers(t *testing.T) {
|
||||
t.Fatalf("Expected an error with double user entries")
|
||||
}
|
||||
}
|
||||
|
||||
func TestImportAuthorized(t *testing.T) {
|
||||
_, foo, bar := simpleAccountServer(t)
|
||||
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, ">"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.>"), false, t)
|
||||
|
||||
foo.addExport("foo", isPublicExport)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "bar"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*"), false, t)
|
||||
|
||||
foo.addExport("*", []*Account{bar})
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "bar"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "baz"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.bar"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, ">"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.*"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*.*"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*.>"), false, t)
|
||||
|
||||
// Reset and test '>' public export
|
||||
_, foo, bar = simpleAccountServer(t)
|
||||
foo.addExport(">", nil)
|
||||
// Everything should work.
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "bar"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "baz"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.bar"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, ">"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.*"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*.*"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "*.>"), true, t)
|
||||
|
||||
// Reset and test pwc and fwc
|
||||
s, foo, bar := simpleAccountServer(t)
|
||||
foo.addExport("foo.*.baz.>", []*Account{bar})
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.1"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz.*"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.*.baz.1.1"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.22.baz.22"), true, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.bar.baz"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, ""), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bar, "foo.bar.*.*"), false, t)
|
||||
|
||||
// Make sure we match the account as well
|
||||
|
||||
fb, _ := s.RegisterAccount("foobar")
|
||||
bz, _ := s.RegisterAccount("baz")
|
||||
|
||||
checkBool(foo.checkImportAuthorized(fb, "foo.bar.baz.1"), false, t)
|
||||
checkBool(foo.checkImportAuthorized(bz, "foo.bar.baz.1"), false, t)
|
||||
}
|
||||
|
||||
func TestSimpleMapping(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
cfoo, _, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
// Test first that trying to import with no matching export permission returns an error.
|
||||
if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != ErrAccountImportAuthorization {
|
||||
t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err)
|
||||
}
|
||||
|
||||
// Now map the subject space between foo and bar.
|
||||
// Need to do export first.
|
||||
if err := cfoo.acc.addExport("foo", nil); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
}
|
||||
if err := cbar.acc.addImport(fooAcc, "foo", "import"); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client.
|
||||
go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n"))
|
||||
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
||||
if err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
|
||||
// Now publish our message.
|
||||
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
|
||||
|
||||
checkMsg := func(l, sid string) {
|
||||
t.Helper()
|
||||
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches := mraw[0]
|
||||
if matches[SUB_INDEX] != "import.foo" {
|
||||
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
||||
}
|
||||
if matches[SID_INDEX] != sid {
|
||||
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
|
||||
}
|
||||
}
|
||||
|
||||
// Now check we got the message from normal subscription.
|
||||
l, err := crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
checkMsg(l, "1")
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
|
||||
l, err = crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'baz': %v", err)
|
||||
}
|
||||
checkMsg(l, "2")
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
}
|
||||
|
||||
func TestNoPrefixWildcardMapping(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
cfoo, _, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
}
|
||||
if err := cbar.acc.addImport(fooAcc, "*", ""); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client for literal "foo".
|
||||
go cbar.parse([]byte("SUB foo 1\r\nPING\r\n"))
|
||||
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
||||
if err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
|
||||
// Now publish our message.
|
||||
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
|
||||
|
||||
// Now check we got the message from normal subscription.
|
||||
l, err := crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches := mraw[0]
|
||||
if matches[SUB_INDEX] != "foo" {
|
||||
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
||||
}
|
||||
if matches[SID_INDEX] != "1" {
|
||||
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
|
||||
}
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
}
|
||||
|
||||
func TestPrefixWildcardMapping(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
cfoo, _, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
}
|
||||
if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client for wildcard.
|
||||
go cbar.parse([]byte("SUB pub.imports.* 1\r\nPING\r\n"))
|
||||
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
||||
if err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
|
||||
// Now publish our message.
|
||||
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
|
||||
|
||||
// Now check we got the messages from wildcard subscription.
|
||||
l, err := crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches := mraw[0]
|
||||
if matches[SUB_INDEX] != "pub.imports.foo" {
|
||||
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
||||
}
|
||||
if matches[SID_INDEX] != "1" {
|
||||
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
|
||||
}
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
}
|
||||
|
||||
func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
cfoo, _, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
if err := cfoo.acc.addExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
}
|
||||
if err := cbar.acc.addImport(fooAcc, "*", "pub.imports."); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client for wildcard.
|
||||
go cbar.parse([]byte("SUB pub.imports.foo 1\r\nPING\r\n"))
|
||||
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
||||
if err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
|
||||
// Now publish our message.
|
||||
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\n"))
|
||||
|
||||
// Now check we got the messages from wildcard subscription.
|
||||
l, err := crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches := mraw[0]
|
||||
if matches[SUB_INDEX] != "pub.imports.foo" {
|
||||
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
||||
}
|
||||
if matches[SID_INDEX] != "1" {
|
||||
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
|
||||
}
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
}
|
||||
|
||||
102
server/auth.go
102
server/auth.go
@@ -17,6 +17,7 @@ import (
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/nats-io/nkeys"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
@@ -38,10 +39,107 @@ type ClientAuthentication interface {
|
||||
RegisterUser(*User)
|
||||
}
|
||||
|
||||
// Import mapping struct
|
||||
type importMap struct {
|
||||
acc *Account
|
||||
from string
|
||||
prefix string
|
||||
}
|
||||
|
||||
// Accounts
|
||||
type Account struct {
|
||||
Name string
|
||||
sl *Sublist
|
||||
Name string
|
||||
mu sync.RWMutex
|
||||
sl *Sublist
|
||||
imports map[string]*importMap
|
||||
exports map[string]map[string]*Account
|
||||
}
|
||||
|
||||
// addImport will add in the import
|
||||
func (a *Account) addImport(account *Account, from, prefix string) error {
|
||||
// First check to see if the account has authorized export of the subject.
|
||||
if !account.checkImportAuthorized(a, from) {
|
||||
return ErrAccountImportAuthorization
|
||||
}
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if account == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
if a.imports == nil {
|
||||
a.imports = make(map[string]*importMap)
|
||||
}
|
||||
if prefix != "" && prefix[len(prefix)-1] != btsep {
|
||||
prefix = prefix + string(btsep)
|
||||
}
|
||||
// TODO(dlc) - collisions, etc.
|
||||
a.imports[from] = &importMap{account, from, prefix}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Placeholder to denote public export.
|
||||
var isPublicExport = []*Account(nil)
|
||||
|
||||
// addExport will add an export to the account. If accounts is nil
|
||||
// it will signify a public export, meaning anyone can impoort.
|
||||
func (a *Account) addExport(subject string, accounts []*Account) error {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
if a.exports == nil {
|
||||
a.exports = make(map[string]map[string]*Account)
|
||||
}
|
||||
var ma map[string]*Account
|
||||
for _, aa := range accounts {
|
||||
if ma == nil {
|
||||
ma = make(map[string]*Account, len(accounts))
|
||||
}
|
||||
ma[aa.Name] = aa
|
||||
}
|
||||
a.exports[subject] = ma
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if another account is authorized to import from us.
|
||||
func (a *Account) checkImportAuthorized(account *Account, subject string) bool {
|
||||
// Find the subject in the exports list.
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
|
||||
if a.exports == nil || !IsValidSubject(subject) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check direct match of subject first
|
||||
am, ok := a.exports[subject]
|
||||
if ok {
|
||||
// if am is nil that denotes a public export
|
||||
if am == nil {
|
||||
return true
|
||||
}
|
||||
// If we have a matching account we are authorized
|
||||
_, ok := am[account.Name]
|
||||
return ok
|
||||
}
|
||||
// ok if we are here we did not match directly so we need to test each one.
|
||||
// The import subject arg has to take precedence, meaning the export
|
||||
// has to be a true subset of the import claim. We already checked for
|
||||
// exact matches above.
|
||||
|
||||
tokens := strings.Split(subject, tsep)
|
||||
for subj, am := range a.exports {
|
||||
if isSubsetMatch(tokens, subj) {
|
||||
if am == nil {
|
||||
return true
|
||||
}
|
||||
_, ok := am[account.Name]
|
||||
return ok
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Nkey is for multiple nkey based users
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -239,6 +240,7 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
|
||||
|
||||
type subscription struct {
|
||||
client *client
|
||||
im *importMap
|
||||
subject []byte
|
||||
queue []byte
|
||||
sid []byte
|
||||
@@ -319,7 +321,7 @@ func (c *client) initClient() {
|
||||
|
||||
// RegisterWithAccount will register the given user with a specific
|
||||
// account. This will change the subject namespace.
|
||||
func (c *client) RegisterWithAccount(acc *Account) error {
|
||||
func (c *client) registerWithAccount(acc *Account) error {
|
||||
if acc == nil || acc.sl == nil {
|
||||
return ErrBadAccount
|
||||
}
|
||||
@@ -831,7 +833,7 @@ func (c *client) processConnect(arg []byte) error {
|
||||
}
|
||||
}
|
||||
// If we are here we can register ourselves with the new account.
|
||||
if err := c.RegisterWithAccount(acc); err != nil {
|
||||
if err := c.registerWithAccount(acc); err != nil {
|
||||
c.Errorf("Problem registering with account [%s]", account)
|
||||
c.sendErr("Account Failed Registration")
|
||||
return ErrBadAccount
|
||||
@@ -962,7 +964,7 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
c.out.p = nil
|
||||
}
|
||||
// Check for a big message, and if found place directly on nb
|
||||
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) <
|
||||
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) < maxBufSize
|
||||
if len(data) > maxBufSize {
|
||||
c.out.nb = append(c.out.nb, data)
|
||||
referenced = true
|
||||
@@ -1287,16 +1289,18 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if we have a maximum on the number of subscriptions.
|
||||
if c.msubs > 0 && len(c.subs) >= c.msubs {
|
||||
c.mu.Unlock()
|
||||
c.maxSubsExceeded()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if we have a maximum on the number of subscriptions.
|
||||
// We can have two SUB protocols coming from a route due to some
|
||||
// race conditions. We should make sure that we process only one.
|
||||
sid := string(sub.sid)
|
||||
var chkImports bool
|
||||
|
||||
if c.subs[sid] == nil {
|
||||
c.subs[sid] = sub
|
||||
if c.sl != nil {
|
||||
@@ -1304,17 +1308,26 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
if err != nil {
|
||||
delete(c.subs, sid)
|
||||
} else {
|
||||
if c.acc != nil {
|
||||
chkImports = true
|
||||
}
|
||||
shouldForward = c.typ != ROUTER
|
||||
}
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
c.sendErr("Invalid Subject")
|
||||
return nil
|
||||
} else if c.opts.Verbose {
|
||||
c.sendOK()
|
||||
}
|
||||
if chkImports {
|
||||
if err := c.checkAccountImports(sub); err != nil {
|
||||
c.Errorf(err.Error())
|
||||
}
|
||||
}
|
||||
if shouldForward {
|
||||
c.srv.broadcastSubscribe(sub)
|
||||
}
|
||||
@@ -1322,6 +1335,49 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check to see if we need to create a shadow subscription due to imports
|
||||
// in other accounts.
|
||||
// Assume lock is held
|
||||
func (c *client) checkAccountImports(sub *subscription) error {
|
||||
c.mu.Lock()
|
||||
acc := c.acc
|
||||
c.mu.Unlock()
|
||||
|
||||
if acc == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
|
||||
subject := string(sub.subject)
|
||||
tokens := strings.Split(subject, tsep)
|
||||
|
||||
var rims [32]*importMap
|
||||
var ims = rims[:0]
|
||||
acc.mu.RLock()
|
||||
for _, im := range acc.imports {
|
||||
if isSubsetMatch(tokens, im.prefix+im.from) {
|
||||
ims = append(ims, im)
|
||||
}
|
||||
}
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// Now walk through collected importMaps
|
||||
for _, im := range ims {
|
||||
// We have a match for a local subscription with an import from another account.
|
||||
// We will create a shadow subscription.
|
||||
nsub := *sub // copy
|
||||
nsub.im = im
|
||||
if im.prefix != "" {
|
||||
// redo subject here to match subject in the publisher account space.
|
||||
// Just remove prefix from what they gave us. That maps into other space.
|
||||
nsub.subject = sub.subject[len(im.prefix):]
|
||||
}
|
||||
if err := im.acc.sl.Insert(&nsub); err != nil {
|
||||
return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// canSubscribe determines if the client is authorized to subscribe to the
|
||||
// given subject. Assumes caller is holding lock.
|
||||
func (c *client) canSubscribe(subject []byte) bool {
|
||||
@@ -1627,7 +1683,7 @@ func (c *client) processMsg(msg []byte) {
|
||||
if genid == c.in.genid && c.in.results != nil {
|
||||
r, ok = c.in.results[string(c.pa.subject)]
|
||||
} else {
|
||||
// reset our L1 completely.
|
||||
// Reset our L1 completely.
|
||||
c.in.results = make(map[string]*SublistResult)
|
||||
c.in.genid = genid
|
||||
}
|
||||
@@ -1694,6 +1750,15 @@ func (c *client) processMsg(msg []byte) {
|
||||
rmap[sub.client.route.remoteID] = routeSeen
|
||||
sub.client.mu.Unlock()
|
||||
}
|
||||
// Check for mapped subs
|
||||
if sub.im != nil && sub.im.prefix != "" {
|
||||
// Redo the subject here on the fly.
|
||||
msgh := c.msgb[:msgHeadProtoLen]
|
||||
msgh = append(msgh, sub.im.prefix...)
|
||||
msgh = append(msgh, c.pa.subject...)
|
||||
msgh = append(msgh, ' ')
|
||||
si = len(msgh)
|
||||
}
|
||||
// Normal delivery
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
@@ -1714,6 +1779,15 @@ func (c *client) processMsg(msg []byte) {
|
||||
index := (startIndex + i) % len(qsubs)
|
||||
sub := qsubs[index]
|
||||
if sub != nil {
|
||||
// Check for mapped subs
|
||||
if sub.im != nil && sub.im.prefix != "" {
|
||||
// Redo the subject here on the fly.
|
||||
msgh := c.msgb[:msgHeadProtoLen]
|
||||
msgh = append(msgh, sub.im.prefix...)
|
||||
msgh = append(msgh, c.pa.subject...)
|
||||
msgh = append(msgh, ' ')
|
||||
si = len(msgh)
|
||||
}
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
if c.deliverMsg(sub, mh, msg) {
|
||||
break
|
||||
|
||||
@@ -395,13 +395,19 @@ func TestClientNoBodyPubSubWithReply(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) parseFlushAndClose(op []byte) {
|
||||
// This needs to clear any flushOutbound flags since writeLoop not running.
|
||||
func (c *client) parseAndFlush(op []byte) {
|
||||
c.parse(op)
|
||||
for cp := range c.pcd {
|
||||
cp.mu.Lock()
|
||||
cp.flags.clear(flushOutbound)
|
||||
cp.flushOutbound()
|
||||
cp.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) parseFlushAndClose(op []byte) {
|
||||
c.parseAndFlush(op)
|
||||
c.nc.Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -58,4 +58,6 @@ var (
|
||||
|
||||
// ErrMissingAccount is returned when an account does not exist.
|
||||
ErrMissingAccount = errors.New("Account Missing")
|
||||
|
||||
ErrAccountImportAuthorization = errors.New("Account Not Authorized: Subject Not Exported")
|
||||
)
|
||||
|
||||
@@ -712,6 +712,52 @@ func IsValidLiteralSubject(subject string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// This will test a subject as an array of tokens against a test subject
|
||||
// and determine if the tokens are matched. Both test subject and tokens
|
||||
// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"],
|
||||
// but not of foo.bar, etc.
|
||||
func isSubsetMatch(tokens []string, test string) bool {
|
||||
tsa := [32]string{}
|
||||
tts := tsa[:0]
|
||||
start := 0
|
||||
for i := 0; i < len(test); i++ {
|
||||
if test[i] == btsep {
|
||||
tts = append(tts, test[start:i])
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
tts = append(tts, test[start:])
|
||||
|
||||
// Walk the target tokens
|
||||
for i, t2 := range tts {
|
||||
if i >= len(tokens) {
|
||||
return false
|
||||
}
|
||||
if t2[0] == fwc && len(t2) == 1 {
|
||||
return true
|
||||
}
|
||||
t1 := tokens[i]
|
||||
if t1[0] == fwc && len(t1) == 1 {
|
||||
return false
|
||||
}
|
||||
if t1[0] == pwc && len(t1) == 1 {
|
||||
m := t2[0] == pwc && len(t2) == 1
|
||||
if !m {
|
||||
return false
|
||||
}
|
||||
if i >= len(tts) {
|
||||
return true
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if t2[0] != pwc && strings.Compare(t1, t2) != 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return len(tokens) == len(tts)
|
||||
}
|
||||
|
||||
// matchLiteral is used to test literal subjects, those that do not have any
|
||||
// wildcards, with a target subject. This is used in the cache layer.
|
||||
func matchLiteral(literal, subject string) bool {
|
||||
@@ -725,7 +771,7 @@ func matchLiteral(literal, subject string) bool {
|
||||
// This function has been optimized for speed.
|
||||
// For instance, do not set b:=subject[i] here since
|
||||
// we may bump `i` in this loop to avoid `continue` or
|
||||
// skiping common test in a particular test.
|
||||
// skipping common test in a particular test.
|
||||
// Run Benchmark_SublistMatchLiteral before making any change.
|
||||
switch subject[i] {
|
||||
case pwc:
|
||||
|
||||
@@ -23,8 +23,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
dbg "runtime/debug"
|
||||
|
||||
"github.com/nats-io/nuid"
|
||||
)
|
||||
|
||||
@@ -421,8 +419,8 @@ func TestSublistBasicQueueResults(t *testing.T) {
|
||||
}
|
||||
|
||||
func checkBool(b, expected bool, t *testing.T) {
|
||||
t.Helper()
|
||||
if b != expected {
|
||||
dbg.PrintStack()
|
||||
t.Fatalf("Expected %v, but got %v\n", expected, b)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user