First pass at new cluster design

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-10-23 10:48:17 -07:00
parent 037acf1310
commit 47963303f8
29 changed files with 3435 additions and 1327 deletions

View File

@@ -90,6 +90,22 @@ func NewFileLogger(filename string, time, debug, trace, pid bool) *Logger {
return l
}
// NewTestLogger creates a logger with output directed to Stderr with a prefix.
// Useful for tracing in tests when multiple servers are in the same pid
func NewTestLogger(prefix string, time bool) *Logger {
flags := 0
if time {
flags = log.LstdFlags | log.Lmicroseconds
}
l := &Logger{
logger: log.New(os.Stderr, prefix, flags),
debug: true,
trace: true,
}
setColoredLabelFormats(l)
return l
}
// Close implements the io.Closer interface to clean up
// resources in the server's logger implementation.
// Caller must ensure threadsafety.

431
server/accounts.go Normal file
View File

@@ -0,0 +1,431 @@
// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"sort"
"strings"
"sync"
"time"
)
// For backwards compatibility, users who are not explicitly defined into an
// account will be grouped in the default global account.
const globalAccountName = "$G"
// Route Map Entry - used for efficient interest graph propagation.
// TODO(dlc) - squeeze even more?
type rme struct {
qi int // used to index into key from map for optional queue name
n int32 // number of subscriptions directly matching, local subs only.
}
// Accounts
type Account struct {
Name string
Nkey string
mu sync.RWMutex
sl *Sublist
clients int
rm map[string]*rme
imports importMap
exports exportMap
nae int
maxnae int
maxaettl time.Duration
pruning bool
}
// Import stream mapping struct
type streamImport struct {
acc *Account
from string
prefix string
}
// Import service mapping struct
type serviceImport struct {
acc *Account
from string
to string
ae bool
ts int64
}
// importMap tracks the imported streams and services.
type importMap struct {
streams map[string]*streamImport
services map[string]*serviceImport // TODO(dlc) sync.Map may be better.
}
// exportMap tracks the exported streams and services.
type exportMap struct {
streams map[string]map[string]*Account
services map[string]map[string]*Account
}
// NumClients returns active number of clients for this account.
func (a *Account) NumClients() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.clients
}
// Returns how many subjects we would send across a route when first
// connected or expressing interest. Local client subs.
func (a *Account) RoutedSubs() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.rm)
}
// Returns total number of Subscriptions for this account.
func (a *Account) TotalSubs() int {
a.mu.RLock()
defer a.mu.RUnlock()
return int(a.sl.Count())
}
// addClient keeps our accounting of active clients updated.
// Call in with client but just track total for now.
// Returns previous total.
func (a *Account) addClient(c *client) int {
a.mu.Lock()
n := a.clients
a.clients++
a.mu.Unlock()
return n
}
// removeClient keeps our accounting of active clients updated.
func (a *Account) removeClient(c *client) int {
a.mu.Lock()
n := a.clients
a.clients--
a.mu.Unlock()
return n
}
func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.exports.services == nil {
a.exports.services = make(map[string]map[string]*Account)
}
ma := a.exports.services[subject]
if accounts != nil && ma == nil {
ma = make(map[string]*Account)
}
for _, a := range accounts {
ma[a.Name] = a
}
a.exports.services[subject] = ma
return nil
}
// numServiceRoutes returns the number of service routes on this account.
func (a *Account) numServiceRoutes() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.imports.services)
}
// This will add a route to an account to send published messages / requests
// to the destination account. From is the local subject to map, To is the
// subject that will appear on the destination account. Destination will need
// to have an import rule to allow access via addService.
func (a *Account) AddServiceImport(destination *Account, from, to string) error {
if destination == nil {
return ErrMissingAccount
}
// Empty means use from.
if to == "" {
to = from
}
if !IsValidLiteralSubject(from) || !IsValidLiteralSubject(to) {
return ErrInvalidSubject
}
// First check to see if the account has authorized us to route to the "to" subject.
if !destination.checkServiceImportAuthorized(a, to) {
return ErrServiceImportAuthorization
}
return a.addImplicitServiceImport(destination, from, to, false)
}
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(subject string) {
a.mu.Lock()
si, ok := a.imports.services[subject]
if ok && si != nil && si.ae {
a.nae--
}
delete(a.imports.services, subject)
a.mu.Unlock()
}
// Return the number of AutoExpireResponseMaps for request/reply. These are mapped to the account that
// has the service import.
func (a *Account) numAutoExpireResponseMaps() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.nae
}
// maxAutoExpireResponseMaps return the maximum number of
// auto expire response maps we will allow.
func (a *Account) MaxAutoExpireResponseMaps() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.maxnae
}
// Set the max outstanding auto expire response maps.
func (a *Account) SetMaxAutoExpireResponseMaps(max int) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxnae = max
}
// expireTTL returns the ttl for response maps.
func (a *Account) AutoExpireTTL() time.Duration {
a.mu.RLock()
defer a.mu.RUnlock()
return a.maxaettl
}
// Set the ttl for response maps.
func (a *Account) SetAutoExpireTTL(ttl time.Duration) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxaettl = ttl
}
// Return a list of the current autoExpireResponseMaps.
func (a *Account) autoExpireResponseMaps() []*serviceImport {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.imports.services) == 0 {
return nil
}
aesis := make([]*serviceImport, 0, len(a.imports.services))
for _, si := range a.imports.services {
if si.ae {
aesis = append(aesis, si)
}
}
sort.Slice(aesis, func(i, j int) bool {
return aesis[i].ts < aesis[j].ts
})
return aesis
}
// Add a route to connect from an implicit route created for a response to a request.
// This does no checks and should be only called by the msg processing code. Use
// addServiceImport from above if responding to user input or config, etc.
func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool) error {
a.mu.Lock()
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
}
si := &serviceImport{destination, from, to, autoexpire, 0}
a.imports.services[from] = si
if autoexpire {
a.nae++
si.ts = time.Now().Unix()
if a.nae > a.maxnae && !a.pruning {
a.pruning = true
go a.pruneAutoExpireResponseMaps()
}
}
a.mu.Unlock()
return nil
}
// This will prune the list to below the threshold and remove all ttl'd maps.
func (a *Account) pruneAutoExpireResponseMaps() {
defer func() {
a.mu.Lock()
a.pruning = false
a.mu.Unlock()
}()
a.mu.RLock()
ttl := int64(a.maxaettl/time.Second) + 1
a.mu.RUnlock()
for {
sis := a.autoExpireResponseMaps()
// Check ttl items.
now := time.Now().Unix()
for i, si := range sis {
if now-si.ts >= ttl {
a.removeServiceImport(si.from)
} else {
sis = sis[i:]
break
}
}
a.mu.RLock()
numOver := a.nae - a.maxnae
a.mu.RUnlock()
if numOver <= 0 {
return
} else if numOver >= len(sis) {
numOver = len(sis) - 1
}
// These are in sorted order, remove at least numOver
for _, si := range sis[:numOver] {
a.removeServiceImport(si.from)
}
}
}
// addStreamImport will add in the stream import from a specific account.
func (a *Account) AddStreamImport(account *Account, from, prefix string) error {
if account == nil {
return ErrMissingAccount
}
// First check to see if the account has authorized export of the subject.
if !account.checkStreamImportAuthorized(a, from) {
return ErrStreamImportAuthorization
}
a.mu.Lock()
defer a.mu.Unlock()
if a.imports.streams == nil {
a.imports.streams = make(map[string]*streamImport)
}
if prefix != "" && prefix[len(prefix)-1] != btsep {
prefix = prefix + string(btsep)
}
// TODO(dlc) - collisions, etc.
a.imports.streams[from] = &streamImport{account, from, prefix}
return nil
}
// Placeholder to denote public export.
var IsPublicExport = []*Account(nil)
// addExport will add an export to the account. If accounts is nil
// it will signify a public export, meaning anyone can impoort.
func (a *Account) AddStreamExport(subject string, accounts []*Account) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.exports.streams == nil {
a.exports.streams = make(map[string]map[string]*Account)
}
var ma map[string]*Account
for _, aa := range accounts {
if ma == nil {
ma = make(map[string]*Account, len(accounts))
}
ma[aa.Name] = aa
}
a.exports.streams[subject] = ma
return nil
}
// Check if another account is authorized to import from us.
func (a *Account) checkStreamImportAuthorized(account *Account, subject string) bool {
// Find the subject in the exports list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports.streams == nil || !IsValidSubject(subject) {
return false
}
// Check direct match of subject first
am, ok := a.exports.streams[subject]
if ok {
// if am is nil that denotes a public export
if am == nil {
return true
}
// If we have a matching account we are authorized
_, ok := am[account.Name]
return ok
}
// ok if we are here we did not match directly so we need to test each one.
// The import subject arg has to take precedence, meaning the export
// has to be a true subset of the import claim. We already checked for
// exact matches above.
tokens := strings.Split(subject, tsep)
for subj, am := range a.exports.streams {
if isSubsetMatch(tokens, subj) {
if am == nil {
return true
}
_, ok := am[account.Name]
return ok
}
}
return false
}
// Returns true if `a` and `b` stream imports are the same. Note that the
// check is done with the account's name, not the pointer. This is used
// during config reload where we are comparing current and new config
// in which pointers are different.
// No lock is acquired in this function, so it is assumed that the
// import maps are not changed while this executes.
func (a *Account) checkStreamImportsEqual(b *Account) bool {
if len(a.imports.streams) != len(b.imports.streams) {
return false
}
for subj, aim := range a.imports.streams {
bim := b.imports.streams[subj]
if bim == nil {
return false
}
if aim.acc.Name != bim.acc.Name || aim.from != bim.from || aim.prefix != bim.prefix {
return false
}
}
return true
}
// Check if another account is authorized to route requests to this service.
func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool {
// Find the subject in the services list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports.services == nil || !IsValidLiteralSubject(subject) {
return false
}
// These are always literal subjects so just lookup.
am, ok := a.exports.services[subject]
if !ok {
return false
}
// Check to see if we are public or if we need to search for the account.
if am == nil {
return true
}
// Check that we allow this account.
_, ok = am[account.Name]
return ok
}

View File

@@ -30,11 +30,11 @@ func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
s := New(&opts)
// Now create two accounts.
f, err := s.RegisterAccount("foo")
f, err := s.RegisterAccount("$foo")
if err != nil {
t.Fatalf("Error creating account 'foo': %v", err)
}
b, err := s.RegisterAccount("bar")
b, err := s.RegisterAccount("$bar")
if err != nil {
t.Fatalf("Error creating account 'bar': %v", err)
}
@@ -43,7 +43,7 @@ func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
func TestRegisterDuplicateAccounts(t *testing.T) {
s, _, _ := simpleAccountServer(t)
if _, err := s.RegisterAccount("foo"); err == nil {
if _, err := s.RegisterAccount("$foo"); err == nil {
t.Fatal("Expected an error registering 'foo' twice")
}
}
@@ -153,6 +153,91 @@ func TestNewAccountsFromClients(t *testing.T) {
}
}
func TestActiveAccounts(t *testing.T) {
opts := defaultServerOptions
opts.AllowNewAccounts = true
opts.Cluster.Port = 22
s := New(&opts)
if s.activeAccounts != 0 {
t.Fatalf("Expected no active accounts, got %d", s.activeAccounts)
}
addClientWithAccount := func(accName string) *client {
t.Helper()
c, _, _ := newClientForServer(s)
connectOp := fmt.Sprintf("CONNECT {\"account\":\"%s\"}\r\n", accName)
err := c.parse([]byte(connectOp))
if err != nil {
t.Fatalf("Received an error trying to connect: %v", err)
}
return c
}
// Now add some clients.
cf1 := addClientWithAccount("foo")
if s.activeAccounts != 1 {
t.Fatalf("Expected active accounts to be 1, got %d", s.activeAccounts)
}
// Adding in same one should not change total.
cf2 := addClientWithAccount("foo")
if s.activeAccounts != 1 {
t.Fatalf("Expected active accounts to be 1, got %d", s.activeAccounts)
}
// Add in new one.
cb1 := addClientWithAccount("bar")
if s.activeAccounts != 2 {
t.Fatalf("Expected active accounts to be 2, got %d", s.activeAccounts)
}
// Make sure the Accounts track clients.
foo := s.LookupAccount("foo")
bar := s.LookupAccount("bar")
if foo == nil || bar == nil {
t.Fatalf("Error looking up accounts")
}
if nc := foo.NumClients(); nc != 2 {
t.Fatalf("Expected account foo to have 2 clients, got %d", nc)
}
if nc := bar.NumClients(); nc != 1 {
t.Fatalf("Expected account bar to have 1 client, got %d", nc)
}
waitTilActiveCount := func(n int) {
t.Helper()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if active := s.activeAccounts; active != n {
return fmt.Errorf("Number of active accounts is %d", active)
}
return nil
})
}
// Test Removal
cb1.closeConnection(ClientClosed)
waitTilActiveCount(1)
if nc := bar.NumClients(); nc != 0 {
t.Fatalf("Expected account bar to have 0 clients, got %d", nc)
}
// This should not change the count.
cf1.closeConnection(ClientClosed)
waitTilActiveCount(1)
if nc := foo.NumClients(); nc != 1 {
t.Fatalf("Expected account foo to have 1 client, got %d", nc)
}
cf2.closeConnection(ClientClosed)
waitTilActiveCount(0)
if nc := foo.NumClients(); nc != 0 {
t.Fatalf("Expected account bar to have 0 clients, got %d", nc)
}
}
// Clients can ask that the account be forced to be new. If it exists this is an error.
func TestNewAccountRequireNew(t *testing.T) {
// This has foo and bar accounts already.
@@ -448,12 +533,12 @@ func TestImportAuthorized(t *testing.T) {
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.>"), false, t)
foo.addStreamExport("foo", isPublicExport)
foo.AddStreamExport("foo", IsPublicExport)
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*"), false, t)
foo.addStreamExport("*", []*Account{bar})
foo.AddStreamExport("*", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz"), true, t)
@@ -466,7 +551,7 @@ func TestImportAuthorized(t *testing.T) {
// Reset and test '>' public export
_, foo, bar = simpleAccountServer(t)
foo.addStreamExport(">", nil)
foo.AddStreamExport(">", nil)
// Everything should work.
checkBool(foo.checkStreamImportAuthorized(bar, "foo"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar"), true, t)
@@ -480,7 +565,7 @@ func TestImportAuthorized(t *testing.T) {
// Reset and test pwc and fwc
s, foo, bar := simpleAccountServer(t)
foo.addStreamExport("foo.*.baz.>", []*Account{bar})
foo.AddStreamExport("foo.*.baz.>", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*"), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1"), true, t)
@@ -516,16 +601,16 @@ func TestSimpleMapping(t *testing.T) {
}
// Test first that trying to import with no matching export permission returns an error.
if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization {
if err := cbar.acc.AddStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization {
t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err)
}
// Now map the subject space between foo and bar.
// Need to do export first.
if err := cfoo.acc.addStreamExport("foo", nil); err != nil { // Public with no accounts defined.
if err := cfoo.acc.AddStreamExport("foo", nil); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.addStreamImport(fooAcc, "foo", "import"); err != nil {
if err := cbar.acc.AddStreamImport(fooAcc, "foo", "import"); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
@@ -608,10 +693,10 @@ func TestNoPrefixWildcardMapping(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
if err := cfoo.acc.AddStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil {
if err := cbar.acc.AddStreamImport(fooAcc, "*", ""); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
@@ -661,11 +746,11 @@ func TestPrefixWildcardMapping(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
if err := cfoo.acc.AddStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
// Checking that trailing '.' is accepted, tested that it is auto added above.
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
if err := cbar.acc.AddStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
@@ -715,10 +800,10 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
if err := fooAcc.AddStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
if err := barAcc.AddStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
@@ -769,23 +854,23 @@ func TestCrossAccountRequestReply(t *testing.T) {
}
// Add in the service export for the requests. Make it public.
if err := cfoo.acc.addServiceExport("test.request", nil); err != nil {
if err := cfoo.acc.AddServiceExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
// Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects.
if err := cbar.acc.addServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount {
if err := cbar.acc.AddServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount {
t.Fatalf("Expected ErrMissingAccount but received %v.", err)
}
if err := cbar.acc.addServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject {
if err := cbar.acc.AddServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
}
if err := cbar.acc.addServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject {
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
}
// Now add in the Route for request to be routed to the foo account.
if err := cbar.acc.addServiceImport(fooAcc, "foo", "test.request"); err != nil {
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account service import to client bar: %v", err)
}
@@ -814,8 +899,8 @@ func TestCrossAccountRequestReply(t *testing.T) {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
// Make sure this looks like _INBOX
if !strings.HasPrefix(matches[REPLY_INDEX], "_INBOX.") {
t.Fatalf("Expected an _INBOX.* like reply, got '%s'", matches[REPLY_INDEX])
if !strings.HasPrefix(matches[REPLY_INDEX], "_R_.") {
t.Fatalf("Expected an _R_.* like reply, got '%s'", matches[REPLY_INDEX])
}
checkPayload(crFoo, []byte("help\r\n"), t)
@@ -854,9 +939,18 @@ func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
// Make sure they have the correct defaults
if max := barAcc.MaxAutoExpireResponseMaps(); max != DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS {
t.Fatalf("Expected %d for max default, but got %d", DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS, max)
}
if ttl := barAcc.AutoExpireTTL(); ttl != DEFAULT_TTL_AE_RESPONSE_MAP {
t.Fatalf("Expected %v for the ttl default, got %v", DEFAULT_TTL_AE_RESPONSE_MAP, ttl)
}
ttl := 500 * time.Millisecond
barAcc.setMaxAutoExpireResponseMaps(5)
barAcc.setMaxAutoExpireTTL(ttl)
barAcc.SetMaxAutoExpireResponseMaps(5)
barAcc.SetAutoExpireTTL(ttl)
cfoo, _, _ := newClientForServer(s)
defer cfoo.nc.Close()
@@ -864,10 +958,10 @@ func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
if err := barAcc.addServiceExport("test.request", nil); err != nil {
if err := barAcc.AddServiceExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service export: %v", err)
}
if err := fooAcc.addServiceImport(barAcc, "foo", "test.request"); err != nil {
if err := fooAcc.AddServiceImport(barAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account service import: %v", err)
}
@@ -875,7 +969,7 @@ func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
cfoo.parseAndFlush([]byte("PUB foo bar 4\r\nhelp\r\n"))
}
// We should expire because max.
// We should expire because of max.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nae := barAcc.numAutoExpireResponseMaps(); nae != 5 {
return fmt.Errorf("Number of responsemaps is %d", nae)
@@ -886,7 +980,7 @@ func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
// Wait for the ttl to expire.
time.Sleep(2 * ttl)
// Now run prune and make sure we collect the timedout ones.
// Now run prune and make sure we collect the timed-out ones.
barAcc.pruneAutoExpireResponseMaps()
// We should expire because ttl.
@@ -939,10 +1033,6 @@ func TestAccountMapsUsers(t *testing.T) {
if c.acc != synadia {
t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc)
}
// Also test client sublist.
if c.sl != synadia.sl {
t.Fatalf("Expected the client's sublist to match 'synadia' account")
}
c, _, _ = newClientForServer(s)
connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
@@ -950,10 +1040,6 @@ func TestAccountMapsUsers(t *testing.T) {
if c.acc != nats {
t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc)
}
// Also test client sublist.
if c.sl != nats.sl {
t.Fatalf("Expected the client's sublist to match 'nats' account")
}
// Now test nkeys as well.
kp, _ := nkeys.FromSeed(seed1)
@@ -985,10 +1071,6 @@ func TestAccountMapsUsers(t *testing.T) {
if c.acc != synadia {
t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc)
}
// Also test client sublist.
if c.sl != synadia.sl {
t.Fatalf("Expected the client's sublist to match 'synadia' account")
}
// Now nats account nkey user.
kp, _ = nkeys.FromSeed(seed2)
@@ -1019,10 +1101,6 @@ func TestAccountMapsUsers(t *testing.T) {
if c.acc != nats {
t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc)
}
// Also test client sublist.
if c.sl != nats.sl {
t.Fatalf("Expected the client's sublist to match 'nats' account")
}
}
func TestAccountGlobalDefault(t *testing.T) {
@@ -1049,29 +1127,29 @@ func TestAccountGlobalDefault(t *testing.T) {
func TestAccountCheckStreamImportsEqual(t *testing.T) {
// Create bare accounts for this test
fooAcc := &Account{Name: "foo"}
if err := fooAcc.addStreamExport(">", nil); err != nil {
if err := fooAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
barAcc := &Account{Name: "bar"}
if err := barAcc.addStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
if err := barAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bazAcc := &Account{Name: "baz"}
if err := bazAcc.addStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
if err := bazAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be the same")
}
if err := bazAcc.addStreamImport(fooAcc, "foo.>", ""); err != nil {
if err := bazAcc.AddStreamImport(fooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be different")
}
if err := barAcc.addStreamImport(fooAcc, "foo.>", ""); err != nil {
if err := barAcc.AddStreamImport(fooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !barAcc.checkStreamImportsEqual(bazAcc) {
@@ -1081,14 +1159,14 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) {
// Create another account that is named "foo". We want to make sure
// that the comparison still works (based on account name, not pointer)
newFooAcc := &Account{Name: "foo"}
if err := newFooAcc.addStreamExport(">", nil); err != nil {
if err := newFooAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
batAcc := &Account{Name: "bat"}
if err := batAcc.addStreamImport(newFooAcc, "foo", "myPrefix"); err != nil {
if err := batAcc.AddStreamImport(newFooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if err := batAcc.addStreamImport(newFooAcc, "foo.>", ""); err != nil {
if err := batAcc.AddStreamImport(newFooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !batAcc.checkStreamImportsEqual(barAcc) {
@@ -1100,15 +1178,15 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) {
// Test with account with different "from"
expAcc := &Account{Name: "new_acc"}
if err := expAcc.addStreamExport(">", nil); err != nil {
if err := expAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
aAcc := &Account{Name: "a"}
if err := aAcc.addStreamImport(expAcc, "bar", ""); err != nil {
if err := aAcc.AddStreamImport(expAcc, "bar", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bAcc := &Account{Name: "b"}
if err := bAcc.addStreamImport(expAcc, "baz", ""); err != nil {
if err := bAcc.AddStreamImport(expAcc, "baz", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
@@ -1117,11 +1195,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) {
// Test with account with different "prefix"
aAcc = &Account{Name: "a"}
if err := aAcc.addStreamImport(expAcc, "bar", "prefix"); err != nil {
if err := aAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bAcc = &Account{Name: "b"}
if err := bAcc.addStreamImport(expAcc, "bar", "diff_prefix"); err != nil {
if err := bAcc.AddStreamImport(expAcc, "bar", "diff_prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
@@ -1130,11 +1208,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) {
// Test with account with different "name"
expAcc = &Account{Name: "diff_name"}
if err := expAcc.addStreamExport(">", nil); err != nil {
if err := expAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
bAcc = &Account{Name: "b"}
if err := bAcc.addStreamImport(expAcc, "bar", "prefix"); err != nil {
if err := bAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {

View File

@@ -16,10 +16,7 @@ package server
import (
"crypto/tls"
"encoding/base64"
"sort"
"strings"
"sync"
"time"
"github.com/nats-io/nkeys"
"golang.org/x/crypto/bcrypt"
@@ -41,350 +38,6 @@ type ClientAuthentication interface {
RegisterUser(*User)
}
// For backwards compatibility, users who are not explicitly defined into an
// account will be grouped in the default global account.
const globalAccountName = "$G"
// Accounts
type Account struct {
Name string
Nkey string
mu sync.RWMutex
sl *Sublist
imports importMap
exports exportMap
nae int
maxnae int
maxaettl time.Duration
pruning bool
}
// Import stream mapping struct
type streamImport struct {
acc *Account
from string
prefix string
}
// Import service mapping struct
type serviceImport struct {
acc *Account
from string
to string
ae bool
ts int64
}
// importMap tracks the imported streams and services.
type importMap struct {
streams map[string]*streamImport
services map[string]*serviceImport // TODO(dlc) sync.Map may be better.
}
// exportMap tracks the exported streams and services.
type exportMap struct {
streams map[string]map[string]*Account
services map[string]map[string]*Account
}
func (a *Account) addServiceExport(subject string, accounts []*Account) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.exports.services == nil {
a.exports.services = make(map[string]map[string]*Account)
}
ma := a.exports.services[subject]
if accounts != nil && ma == nil {
ma = make(map[string]*Account)
}
for _, a := range accounts {
ma[a.Name] = a
}
a.exports.services[subject] = ma
return nil
}
// numServiceRoutes returns the number of service routes on this account.
func (a *Account) numServiceRoutes() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.imports.services)
}
// This will add a route to an account to send published messages / requests
// to the destination account. From is the local subject to map, To is the
// subject that will appear on the destination account. Destination will need
// to have an import rule to allow access via addService.
func (a *Account) addServiceImport(destination *Account, from, to string) error {
if destination == nil {
return ErrMissingAccount
}
// Empty means use from.
if to == "" {
to = from
}
if !IsValidLiteralSubject(from) || !IsValidLiteralSubject(to) {
return ErrInvalidSubject
}
// First check to see if the account has authorized us to route to the "to" subject.
if !destination.checkServiceImportAuthorized(a, to) {
return ErrServiceImportAuthorization
}
return a.addImplicitServiceImport(destination, from, to, false)
}
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(subject string) {
a.mu.Lock()
si, ok := a.imports.services[subject]
if ok && si != nil && si.ae {
a.nae--
}
delete(a.imports.services, subject)
a.mu.Unlock()
}
// Return the number of AutoExpireResponseMaps for request/reply. These are mapped to the account that
func (a *Account) numAutoExpireResponseMaps() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.nae
}
// Set the max outstanding auto expire response maps.
func (a *Account) setMaxAutoExpireResponseMaps(max int) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxnae = max
}
// Set the max ttl for response maps.
func (a *Account) setMaxAutoExpireTTL(ttl time.Duration) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxaettl = ttl
}
// Return a list of the current autoExpireResponseMaps.
func (a *Account) autoExpireResponseMaps() []*serviceImport {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.imports.services) == 0 {
return nil
}
aesis := make([]*serviceImport, 0, len(a.imports.services))
for _, si := range a.imports.services {
if si.ae {
aesis = append(aesis, si)
}
}
sort.Slice(aesis, func(i, j int) bool {
return aesis[i].ts < aesis[j].ts
})
return aesis
}
// Add a route to connect from an implicit route created for a response to a request.
// This does no checks and should be only called by the msg processing code. Use addRoute
// above if responding to user input or config, etc.
func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool) error {
a.mu.Lock()
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
}
si := &serviceImport{destination, from, to, autoexpire, 0}
a.imports.services[from] = si
if autoexpire {
a.nae++
si.ts = time.Now().Unix()
if a.nae > a.maxnae && !a.pruning {
a.pruning = true
go a.pruneAutoExpireResponseMaps()
}
}
a.mu.Unlock()
return nil
}
// This will prune the list to below the threshold and remove all ttl'd maps.
func (a *Account) pruneAutoExpireResponseMaps() {
defer func() {
a.mu.Lock()
a.pruning = false
a.mu.Unlock()
}()
a.mu.RLock()
ttl := int64(a.maxaettl/time.Second) + 1
a.mu.RUnlock()
for {
sis := a.autoExpireResponseMaps()
// Check ttl items.
now := time.Now().Unix()
for i, si := range sis {
if now-si.ts >= ttl {
a.removeServiceImport(si.from)
} else {
sis = sis[i:]
break
}
}
a.mu.RLock()
numOver := a.nae - a.maxnae
a.mu.RUnlock()
if numOver <= 0 {
return
} else if numOver >= len(sis) {
numOver = len(sis) - 1
}
// These are in sorted order, remove at least numOver
for _, si := range sis[:numOver] {
a.removeServiceImport(si.from)
}
}
}
// addStreamImport will add in the stream import from a specific account.
func (a *Account) addStreamImport(account *Account, from, prefix string) error {
if account == nil {
return ErrMissingAccount
}
// First check to see if the account has authorized export of the subject.
if !account.checkStreamImportAuthorized(a, from) {
return ErrStreamImportAuthorization
}
a.mu.Lock()
defer a.mu.Unlock()
if a.imports.streams == nil {
a.imports.streams = make(map[string]*streamImport)
}
if prefix != "" && prefix[len(prefix)-1] != btsep {
prefix = prefix + string(btsep)
}
// TODO(dlc) - collisions, etc.
a.imports.streams[from] = &streamImport{account, from, prefix}
return nil
}
// Placeholder to denote public export.
var isPublicExport = []*Account(nil)
// addExport will add an export to the account. If accounts is nil
// it will signify a public export, meaning anyone can impoort.
func (a *Account) addStreamExport(subject string, accounts []*Account) error {
a.mu.Lock()
defer a.mu.Unlock()
if a == nil {
return ErrMissingAccount
}
if a.exports.streams == nil {
a.exports.streams = make(map[string]map[string]*Account)
}
var ma map[string]*Account
for _, aa := range accounts {
if ma == nil {
ma = make(map[string]*Account, len(accounts))
}
ma[aa.Name] = aa
}
a.exports.streams[subject] = ma
return nil
}
// Check if another account is authorized to import from us.
func (a *Account) checkStreamImportAuthorized(account *Account, subject string) bool {
// Find the subject in the exports list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports.streams == nil || !IsValidSubject(subject) {
return false
}
// Check direct match of subject first
am, ok := a.exports.streams[subject]
if ok {
// if am is nil that denotes a public export
if am == nil {
return true
}
// If we have a matching account we are authorized
_, ok := am[account.Name]
return ok
}
// ok if we are here we did not match directly so we need to test each one.
// The import subject arg has to take precedence, meaning the export
// has to be a true subset of the import claim. We already checked for
// exact matches above.
tokens := strings.Split(subject, tsep)
for subj, am := range a.exports.streams {
if isSubsetMatch(tokens, subj) {
if am == nil {
return true
}
_, ok := am[account.Name]
return ok
}
}
return false
}
// Returns true if `a` and `b` stream imports are the same. Note that the
// check is done with the account's name, not the pointer. This is used
// during config reload where we are comparing current and new config
// in which pointers are different.
// No lock is acquired in this function, so it is assumed that the
// import maps are not changed while this executes.
func (a *Account) checkStreamImportsEqual(b *Account) bool {
if len(a.imports.streams) != len(b.imports.streams) {
return false
}
for subj, aim := range a.imports.streams {
bim := b.imports.streams[subj]
if bim == nil {
return false
}
if aim.acc.Name != bim.acc.Name || aim.from != bim.from || aim.prefix != bim.prefix {
return false
}
}
return true
}
// Check if another account is authorized to route requests to this service.
func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool {
// Find the subject in the services list.
a.mu.RLock()
defer a.mu.RUnlock()
if a.exports.services == nil || !IsValidLiteralSubject(subject) {
return false
}
// These are always literal subjects so just lookup.
am, ok := a.exports.services[subject]
if !ok {
return false
}
// Check to see if we are public or if we need to search for the account.
if am == nil {
return true
}
// Check that we allow this account.
_, ok = am[account.Name]
return ok
}
// Nkey is for multiple nkey based users
type NkeyUser struct {
Nkey string `json:"user"`

File diff suppressed because it is too large Load Diff

View File

@@ -663,12 +663,12 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) {
}()
<-ch
if c.sl.Count() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", c.sl.Count())
if s.NumSubscriptions() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", s.NumSubscriptions())
}
c.closeConnection(ClientClosed)
if c.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.gsl.Count())
if s.NumSubscriptions() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.NumSubscriptions())
}
}
@@ -684,8 +684,8 @@ func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
}()
<-ch
if c.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", c.sl.Count())
if c.acc.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", c.acc.sl.Count())
}
}

View File

@@ -38,7 +38,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "1.3.1"
VERSION = "2.0-alpha"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -52,6 +52,10 @@ func (s *Server) ConfigureLogger() {
opts = s.getOpts()
)
if opts.NoLog {
return
}
syslog := opts.Syslog
if isWindowsService() && opts.LogFile == "" {
// Enable syslog if no log file is specified and we're running as a

View File

@@ -711,14 +711,14 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
// FIXME(dlc) - Make account aware.
sz := &Subsz{s.gsl.Stats(), 0, offset, limit, nil}
sz := &Subsz{s.gacc.sl.Stats(), 0, offset, limit, nil}
if subdetail {
// Now add in subscription's details
var raw [4096]*subscription
subs := raw[:0]
s.gsl.localSubs(&subs)
s.gacc.sl.localSubs(&subs)
details := make([]SubDetail, len(subs))
i := 0
// TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
@@ -940,7 +940,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.MaxPending = opts.MaxPending
v.WriteDeadline = opts.WriteDeadline
v.Subscriptions = s.gsl.Count()
v.Subscriptions = s.gacc.sl.Count()
v.ConfigLoadTime = s.configTime
// Need a copy here since s.httpReqStats can change while doing
// the marshaling down below.

View File

@@ -93,7 +93,7 @@ type Options struct {
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
WriteDeadline time.Duration `json:"-"`
RQSubsSweep time.Duration `json:"-"`
RQSubsSweep time.Duration `json:"-"` // Deprecated
MaxClosedClients int `json:"-"`
LameDuckDuration time.Duration `json:"-"`
@@ -772,7 +772,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
}
accounts = append(accounts, ta)
}
if err := stream.acc.addStreamExport(stream.sub, accounts); err != nil {
if err := stream.acc.AddStreamExport(stream.sub, accounts); err != nil {
msg := fmt.Sprintf("Error adding stream export %q: %v", stream.sub, err)
*errors = append(*errors, &configErr{tk, msg})
continue
@@ -790,7 +790,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
}
accounts = append(accounts, ta)
}
if err := service.acc.addServiceExport(service.sub, accounts); err != nil {
if err := service.acc.AddServiceExport(service.sub, accounts); err != nil {
msg := fmt.Sprintf("Error adding service export %q: %v", service.sub, err)
*errors = append(*errors, &configErr{tk, msg})
continue
@@ -803,7 +803,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
*errors = append(*errors, &configErr{tk, msg})
continue
}
if err := stream.acc.addStreamImport(ta, stream.sub, stream.pre); err != nil {
if err := stream.acc.AddStreamImport(ta, stream.sub, stream.pre); err != nil {
msg := fmt.Sprintf("Error adding stream import %q: %v", stream.sub, err)
*errors = append(*errors, &configErr{tk, msg})
continue
@@ -819,7 +819,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
if service.to == "" {
service.to = service.sub
}
if err := service.acc.addServiceImport(ta, service.to, service.sub); err != nil {
if err := service.acc.AddServiceImport(ta, service.to, service.sub); err != nil {
msg := fmt.Sprintf("Error adding service import %q: %v", service.sub, err)
*errors = append(*errors, &configErr{tk, msg})
continue

View File

@@ -18,11 +18,15 @@ import (
)
type pubArg struct {
arg []byte
account []byte
queues [][]byte
//cacheKey []byte
subject []byte
reply []byte
sid []byte
szb []byte
size int
// sid []byte
szb []byte
size int
}
type parseState struct {
@@ -73,6 +77,15 @@ const (
OP_SUB
OP_SUB_SPC
SUB_ARG
OP_A
OP_ASUB
OP_ASUB_SPC
ASUB_ARG
OP_AUSUB
OP_AUSUB_SPC
AUSUB_ARG
OP_R
OP_RS
OP_U
OP_UN
OP_UNS
@@ -121,11 +134,17 @@ func (c *client) parse(buf []byte) error {
c.state = OP_S
case 'U', 'u':
c.state = OP_U
case 'M', 'm':
case 'R', 'r':
if c.typ == CLIENT {
goto parseErr
} else {
c.state = OP_M
c.state = OP_R
}
case 'A', 'a':
if c.typ == CLIENT {
goto parseErr
} else {
c.state = OP_A
}
case 'C', 'c':
c.state = OP_C
@@ -243,6 +262,85 @@ func (c *client) parse(buf []byte) error {
}
continue
}
case OP_A:
switch b {
case '+':
c.state = OP_ASUB
case '-', 'u':
c.state = OP_AUSUB
default:
goto parseErr
}
case OP_ASUB:
switch b {
case ' ', '\t':
c.state = OP_ASUB_SPC
default:
goto parseErr
}
case OP_ASUB_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = ASUB_ARG
c.as = i
}
case ASUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processAccountSub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_AUSUB:
switch b {
case ' ', '\t':
c.state = OP_AUSUB_SPC
default:
goto parseErr
}
case OP_AUSUB_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = AUSUB_ARG
c.as = i
}
case AUSUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
c.processAccountUnsub(arg)
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_S:
switch b {
case 'U', 'u':
@@ -284,7 +382,13 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processSub(arg); err != nil {
var err error
if c.typ == CLIENT {
err = c.processSub(arg)
} else {
err = c.processRemoteSub(arg)
}
if err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
@@ -293,6 +397,24 @@ func (c *client) parse(buf []byte) error {
c.argBuf = append(c.argBuf, b)
}
}
case OP_R:
switch b {
case 'S', 's':
c.state = OP_RS
case 'M', 'm':
c.state = OP_M
default:
goto parseErr
}
case OP_RS:
switch b {
case '+':
c.state = OP_SUB
case '-':
c.state = OP_UNSUB
default:
goto parseErr
}
case OP_U:
switch b {
case 'N', 'n':
@@ -348,7 +470,13 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processUnsub(arg); err != nil {
var err error
if c.typ == CLIENT {
err = c.processUnsub(arg)
} else {
err = c.processRemoteUnsub(arg)
}
if err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
@@ -510,7 +638,7 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processMsgArgs(arg); err != nil {
if err := c.processRoutedMsgArgs(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
@@ -655,6 +783,7 @@ func (c *client) parse(buf []byte) error {
// Check for split buffer scenarios for any ARG state.
if c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG ||
c.state == ASUB_ARG || c.state == AUSUB_ARG ||
c.state == MSG_ARG || c.state == MINUS_ERR_ARG ||
c.state == CONNECT_ARG || c.state == INFO_ARG {
// Setup a holder buffer to deal with split buffer scenario.
@@ -729,21 +858,14 @@ func protoSnippet(start int, buf []byte) string {
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
// we need to hold onto it into the next read.
func (c *client) clonePubArg() {
// Just copy and re-process original arg buffer.
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, c.pa.subject...)
c.argBuf = append(c.argBuf, c.pa.reply...)
c.argBuf = append(c.argBuf, c.pa.sid...)
c.argBuf = append(c.argBuf, c.pa.szb...)
c.argBuf = append(c.argBuf, c.pa.arg...)
c.pa.subject = c.argBuf[:len(c.pa.subject)]
if c.pa.reply != nil {
c.pa.reply = c.argBuf[len(c.pa.subject) : len(c.pa.subject)+len(c.pa.reply)]
// This is a routed msg
if c.pa.account != nil {
c.processRoutedMsgArgs(c.argBuf)
} else {
c.processPub(c.argBuf)
}
if c.pa.sid != nil {
c.pa.sid = c.argBuf[len(c.pa.subject)+len(c.pa.reply) : len(c.pa.subject)+len(c.pa.reply)+len(c.pa.sid)]
}
c.pa.szb = c.argBuf[len(c.pa.subject)+len(c.pa.reply)+len(c.pa.sid):]
}

View File

@@ -316,14 +316,22 @@ func TestParsePubBadSize(t *testing.T) {
}
}
func TestParseMsg(t *testing.T) {
func TestParseRouteMsg(t *testing.T) {
c := dummyRouteClient()
pub := []byte("MSG foo RSID:1:2 5\r\nhello\r")
pub := []byte("MSG $foo foo 5\r\nhello\r")
err := c.parse(pub)
if err == nil {
t.Fatalf("Expected an error, got none")
}
pub = []byte("RMSG $foo foo 5\r\nhello\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.account, []byte("$foo")) {
t.Fatalf("Did not parse account correctly: '$foo' vs '%s'\n", c.pa.account)
}
if !bytes.Equal(c.pa.subject, []byte("foo")) {
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
}
@@ -333,18 +341,18 @@ func TestParseMsg(t *testing.T) {
if c.pa.size != 5 {
t.Fatalf("Did not parse msg size correctly: 5 vs %d\n", c.pa.size)
}
if !bytes.Equal(c.pa.sid, []byte("RSID:1:2")) {
t.Fatalf("Did not parse sid correctly: 'RSID:1:2' vs '%s'\n", c.pa.sid)
}
// Clear snapshots
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START
pub = []byte("MSG foo.bar RSID:1:2 INBOX.22 11\r\nhello world\r")
pub = []byte("RMSG $G foo.bar INBOX.22 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.account, []byte("$G")) {
t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account)
}
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
}
@@ -354,44 +362,54 @@ func TestParseMsg(t *testing.T) {
if c.pa.size != 11 {
t.Fatalf("Did not parse msg size correctly: 11 vs %d\n", c.pa.size)
}
}
func testMsgArg(c *client, t *testing.T) {
if !bytes.Equal(c.pa.subject, []byte("foobar")) {
t.Fatalf("Mismatched subject: '%s'\n", c.pa.subject)
}
if !bytes.Equal(c.pa.szb, []byte("22")) {
t.Fatalf("Bad size buf: '%s'\n", c.pa.szb)
}
if c.pa.size != 22 {
t.Fatalf("Bad size: %d\n", c.pa.size)
}
if !bytes.Equal(c.pa.sid, []byte("RSID:22:1")) {
t.Fatalf("Bad sid: '%s'\n", c.pa.sid)
}
}
// Clear snapshots
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START
func TestParseMsgArg(t *testing.T) {
c := dummyClient()
if err := c.processMsgArgs([]byte("foobar RSID:22:1 22")); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
pub = []byte("RMSG $G foo.bar + reply baz 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
testMsgArg(c, t)
if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22")); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
if !bytes.Equal(c.pa.account, []byte("$G")) {
t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account)
}
testMsgArg(c, t)
if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22 ")); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
}
testMsgArg(c, t)
if err := c.processMsgArgs([]byte("foobar RSID:22:1 \t22")); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
if !bytes.Equal(c.pa.reply, []byte("reply")) {
t.Fatalf("Did not parse reply correctly: 'reply' vs '%s'\n", c.pa.reply)
}
if err := c.processMsgArgs([]byte("foobar\t\tRSID:22:1\t22\r")); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
if len(c.pa.queues) != 1 {
t.Fatalf("Expected 1 queue, got %d", len(c.pa.queues))
}
if !bytes.Equal(c.pa.queues[0], []byte("baz")) {
t.Fatalf("Did not parse queues correctly: 'baz' vs '%q'\n", c.pa.queues[0])
}
// Clear snapshots
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START
pub = []byte("RMSG $G foo.bar | baz 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.account, []byte("$G")) {
t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account)
}
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
}
if !bytes.Equal(c.pa.reply, []byte("")) {
t.Fatalf("Did not parse reply correctly: '' vs '%s'\n", c.pa.reply)
}
if len(c.pa.queues) != 1 {
t.Fatalf("Expected 1 queue, got %d", len(c.pa.queues))
}
if !bytes.Equal(c.pa.queues[0], []byte("baz")) {
t.Fatalf("Did not parse queues correctly: 'baz' vs '%q'\n", c.pa.queues[0])
}
testMsgArg(c, t)
}
func TestParseMsgSpace(t *testing.T) {

View File

@@ -818,7 +818,7 @@ func (s *Server) reloadClusterPermissions() {
for i, route := range s.routes {
// Count the number of routes that can understand receiving INFO updates.
route.mu.Lock()
if route.opts.Protocol >= routeProtoInfo {
if route.opts.Protocol >= RouteProtoInfo {
withNewProto++
}
route.mu.Unlock()
@@ -865,7 +865,7 @@ func (s *Server) reloadClusterPermissions() {
deleteRoutedSubs []*subscription
)
// FIXME(dlc) - Change for accounts.
s.gsl.localSubs(&localSubs)
s.gacc.sl.localSubs(&localSubs)
// Go through all local subscriptions
for _, sub := range localSubs {
@@ -887,7 +887,7 @@ func (s *Server) reloadClusterPermissions() {
for _, route := range routes {
route.mu.Lock()
// If route is to older server, simply close connection.
if route.opts.Protocol < routeProtoInfo {
if route.opts.Protocol < RouteProtoInfo {
route.mu.Unlock()
route.closeConnection(RouteRemoved)
continue
@@ -906,15 +906,15 @@ func (s *Server) reloadClusterPermissions() {
// that we now possibly allow with a change of Export permissions.
route.sendInfo(infoJSON)
// Now send SUB and UNSUB protocols as needed.
closed := route.sendRouteSubProtos(subsNeedSUB, nil)
closed := route.sendRouteSubProtos(subsNeedSUB, false, nil)
if !closed {
route.sendRouteUnSubProtos(subsNeedUNSUB, nil)
route.sendRouteUnSubProtos(subsNeedUNSUB, false, nil)
}
route.mu.Unlock()
}
// Remove as a batch all the subs that we have removed from each route.
// FIXME(dlc) - Change for accounts.
s.gsl.RemoveBatch(deleteRoutedSubs)
s.gacc.sl.RemoveBatch(deleteRoutedSubs)
}
// validateClusterOpts ensures the new ClusterOpts does not change host or

View File

@@ -17,7 +17,6 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/nats-io/nkeys"
"io/ioutil"
"net"
"os"
@@ -28,6 +27,8 @@ import (
"testing"
"time"
"github.com/nats-io/nkeys"
"github.com/nats-io/go-nats"
)
@@ -1861,12 +1862,16 @@ func TestConfigReloadRotateFiles(t *testing.T) {
server, _, config := runReloadServerWithConfig(t, "./configs/reload/file_rotate.conf")
defer func() {
os.Remove(config)
os.Remove("log.txt")
os.Remove("gnatsd.pid")
os.Remove("log1.txt")
os.Remove("gnatsd1.pid")
}()
defer server.Shutdown()
// Configure the logger to enable actual logging
opts := server.getOpts()
opts.NoLog = false
server.ConfigureLogger()
// Load a config that renames the files.
@@ -2379,11 +2384,11 @@ func TestConfigReloadClusterPermsOldServer(t *testing.T) {
optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srva.ClusterAddr().Port))
// Make server B behave like an old server
testRouteProto = routeProtoZero
defer func() { testRouteProto = routeProtoInfo }()
testRouteProto = RouteProtoZero
defer func() { testRouteProto = RouteProtoInfo }()
srvb := RunServer(optsB)
defer srvb.Shutdown()
testRouteProto = routeProtoInfo
testRouteProto = RouteProtoInfo
checkClusterFormed(t, srva, srvb)
@@ -2683,10 +2688,6 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) {
if c.acc != synadia {
t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc)
}
// Also test client sublist.
if c.sl != synadia.sl {
t.Fatalf("Expected the client's sublist to match 'synadia' account")
}
// Now nats account nkey user.
kp, _ = nkeys.FromSeed(seed2)
@@ -2717,10 +2718,6 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) {
if c.acc != nats {
t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc)
}
// Also test client sublist.
if c.sl != nats.sl {
t.Fatalf("Expected the client's sublist to match 'nats' account")
}
// Remove user from account and whole account
reloadUpdateConfig(t, s, conf, `
@@ -2860,7 +2857,7 @@ func TestConfigReloadAccountStreamsImportExport(t *testing.T) {
t.Helper()
dcli := s.getClient(1)
dcli.mu.Lock()
r := dcli.sl.Match(subject)
r := dcli.acc.sl.Match(subject)
dcli.mu.Unlock()
if shouldBeThere && len(r.psubs) != 1 {
t.Fatalf("%s should have 1 match in derek's sublist, got %v", subject, len(r.psubs))

File diff suppressed because it is too large Load Diff

View File

@@ -863,7 +863,6 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
func TestRoutedQueueAutoUnsubscribe(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoSigs, optsA.NoLog = true, true
optsA.RQSubsSweep = 500 * time.Millisecond
srvA := RunServer(optsA)
defer srvA.Shutdown()
@@ -919,9 +918,22 @@ func TestRoutedQueueAutoUnsubscribe(t *testing.T) {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
}
c.Flush()
c.Subscribe("TEST.COMPLETE", func(m *nats.Msg) {})
}
// We coelasce now so for each server we will have all local (250) plus
// two from the remote side for each queue group. We also create one more
// and will wait til each server has 254 subscriptions, that will make sure
// that we have everything setup.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
subsA := srvA.NumSubscriptions()
subsB := srvB.NumSubscriptions()
if subsA != 254 || subsB != 254 {
return fmt.Errorf("Not all subs processed yet: %d and %d", subsA, subsB)
}
return nil
})
expected := int32(250)
// Now send messages from each server
for i := int32(0); i < expected; i++ {
@@ -937,17 +949,6 @@ func TestRoutedQueueAutoUnsubscribe(t *testing.T) {
nbaz := atomic.LoadInt32(&rbaz)
if nbar == expected && nbaz == expected {
time.Sleep(500 * time.Millisecond)
// Now check all mappings are gone.
srvA.rqsMu.RLock()
nrqsa := len(srvA.rqsubs)
srvA.rqsMu.RUnlock()
srvB.rqsMu.RLock()
nrqsb := len(srvB.rqsubs)
srvB.rqsMu.RUnlock()
if nrqsa != 0 || nrqsb != 0 {
return fmt.Errorf("Expected rqs mappings to have cleared, but got A:%d, B:%d",
nrqsa, nrqsb)
}
return nil
}
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
@@ -1106,7 +1107,8 @@ func TestRouteSendLocalSubsWithLowMaxPending(t *testing.T) {
defer nc.Close()
numSubs := 1000
for i := 0; i < numSubs; i++ {
nc.Subscribe("foo.bar", func(_ *nats.Msg) {})
subj := fmt.Sprintf("fo.bar.%d", i)
nc.Subscribe(subj, func(_ *nats.Msg) {})
}
checkExpectedSubs(t, numSubs, srvA)

View File

@@ -66,39 +66,35 @@ type Info struct {
type Server struct {
gcid uint64
stats
mu sync.Mutex
prand *rand.Rand
info Info
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
listener net.Listener
gsl *Sublist
accounts map[string]*Account
clients map[uint64]*client
routes map[uint64]*client
remotes map[string]*client
users map[string]*User
nkeys map[string]*NkeyUser
totalClients uint64
closed *closedRingBuffer
done chan bool
start time.Time
http net.Listener
httpHandler http.Handler
profiler net.Listener
httpReqStats map[string]uint64
routeListener net.Listener
routeInfo Info
routeInfoJSON []byte
quitCh chan struct{}
// Tracking for remote QRSID tags.
rqsMu sync.RWMutex
rqsubs map[string]rqsub
rqsubsTimer *time.Timer
mu sync.Mutex
prand *rand.Rand
info Info
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
listener net.Listener
gacc *Account
accounts map[string]*Account
activeAccounts int
clients map[uint64]*client
routes map[uint64]*client
remotes map[string]*client
users map[string]*User
nkeys map[string]*NkeyUser
totalClients uint64
closed *closedRingBuffer
done chan bool
start time.Time
http net.Listener
httpHandler http.Handler
profiler net.Listener
httpReqStats map[string]uint64
routeListener net.Listener
routeInfo Info
routeInfoJSON []byte
quitCh chan struct{}
// Tracking Go routines
grMu sync.Mutex
@@ -178,7 +174,6 @@ func New(opts *Options) *Server {
configFile: opts.ConfigFile,
info: info,
prand: rand.New(rand.NewSource(time.Now().UnixNano())),
gsl: NewSublist(),
opts: opts,
done: make(chan bool, 1),
start: now,
@@ -201,7 +196,8 @@ func New(opts *Options) *Server {
s.accounts = make(map[string]*Account)
// Create global account.
s.registerAccount(&Account{Name: globalAccountName, sl: s.gsl})
s.gacc = &Account{Name: globalAccountName}
s.registerAccount(s.gacc)
// For tracking clients
s.clients = make(map[uint64]*client)
@@ -254,6 +250,11 @@ func (s *Server) configureAccounts() {
}
func (s *Server) generateRouteInfoJSON() {
// New proto wants a nonce.
var raw [nonceLen]byte
nonce := raw[:]
s.generateNonce(nonce)
s.routeInfo.Nonce = string(nonce)
b, _ := json.Marshal(s.routeInfo)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
s.routeInfoJSON = bytes.Join(pcs, []byte(" "))
@@ -303,7 +304,7 @@ func (s *Server) logPid() error {
}
// newAccountsAllowed returns whether or not new accounts can be created on the fly.
func (s *Server) newAccountsAllowed() bool {
func (s *Server) NewAccountsAllowed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.opts.AllowNewAccounts
@@ -315,6 +316,13 @@ func (s *Server) numReservedAccounts() int {
return 1
}
// NumActiveAccounts reports number of active accounts on this server.
func (s *Server) NumActiveAccounts() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.activeAccounts
}
// LookupOrRegisterAccount will return the given account if known or create a new entry.
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
s.mu.Lock()
@@ -326,7 +334,7 @@ func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew b
Name: name,
sl: NewSublist(),
}
s.accounts[name] = acc
s.registerAccount(acc)
return acc, true
}
@@ -355,6 +363,11 @@ func (s *Server) registerAccount(acc *Account) {
if acc.maxaettl == 0 {
acc.maxaettl = DEFAULT_TTL_AE_RESPONSE_MAP
}
// If we are capable of routing we will track subscription
// information for efficient interest propagation.
if s.opts != nil && s.opts.Cluster.Port != 0 {
acc.rm = make(map[string]*rme, 256)
}
s.accounts[acc.Name] = acc
}
@@ -497,8 +510,6 @@ func (s *Server) Shutdown() {
s.profiler.Close()
}
// Clear any remote qsub mappings
s.clearRemoteQSubs()
s.mu.Unlock()
// Release go routines that wait on that channel
@@ -870,7 +881,9 @@ func (s *Server) createClient(conn net.Conn) *client {
max_subs := opts.MaxSubs
now := time.Now()
c := &client{srv: s, sl: s.gsl, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now}
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now}
c.registerWithAccount(s.gacc)
// Grab JSON info string
s.mu.Lock()
@@ -1180,14 +1193,14 @@ func (s *Server) getClient(cid uint64) *client {
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()
var subs uint32
var subs int
for _, acc := range s.accounts {
if acc.sl != nil {
subs += acc.sl.Count()
subs += acc.TotalSubs()
}
}
s.mu.Unlock()
return subs
return uint32(subs)
}
// NumSlowConsumers will report the number of slow consumers.

View File

@@ -24,8 +24,9 @@ func TestSplitBufferSubOp(t *testing.T) {
defer cli.Close()
defer trash.Close()
s := &Server{gsl: NewSublist()}
c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription), nc: cli}
s := &Server{gacc: &Account{Name: globalAccountName}, accounts: make(map[string]*Account)}
s.registerAccount(s.gacc)
c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription), nc: cli}
subop := []byte("SUB foo 1\r\n")
subop1 := subop[:6]
@@ -43,7 +44,7 @@ func TestSplitBufferSubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.gsl.Match("foo")
r := s.gacc.sl.Match("foo")
if r == nil || len(r.psubs) != 1 {
t.Fatalf("Did not match subscription properly: %+v\n", r)
}
@@ -60,8 +61,9 @@ func TestSplitBufferSubOp(t *testing.T) {
}
func TestSplitBufferUnsubOp(t *testing.T) {
s := &Server{gsl: NewSublist()}
c := &client{srv: s, subs: make(map[string]*subscription)}
s := &Server{gacc: &Account{Name: globalAccountName}, accounts: make(map[string]*Account)}
s.registerAccount(s.gacc)
c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription)}
subop := []byte("SUB foo 1024\r\n")
if err := c.parse(subop); err != nil {
@@ -87,7 +89,7 @@ func TestSplitBufferUnsubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.gsl.Match("foo")
r := s.gacc.sl.Match("foo")
if r != nil && len(r.psubs) != 0 {
t.Fatalf("Should be no subscriptions in results: %+v\n", r)
}
@@ -300,7 +302,7 @@ func TestSplitConnectArg(t *testing.T) {
func TestSplitDanglingArgBuf(t *testing.T) {
s := New(&defaultServerOptions)
c := &client{srv: s, sl: s.gsl, subs: make(map[string]*subscription)}
c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription)}
// We test to make sure we do not dangle any argBufs after processing
// since that could lead to performance issues.
@@ -360,14 +362,14 @@ func TestSplitDanglingArgBuf(t *testing.T) {
// MSG (the client has to be a ROUTE)
c = &client{subs: make(map[string]*subscription), typ: ROUTER}
msgop := []byte("MSG foo RSID:2:1 5\r\nhello\r\n")
msgop := []byte("RMSG $foo foo 5\r\nhello\r\n")
c.parse(msgop[:5])
c.parse(msgop[5:10])
if c.argBuf == nil {
t.Fatal("Expected a non-nil argBuf")
}
if string(c.argBuf) != "foo RS" {
t.Fatalf("Expected argBuf to be \"foo 1 \", got %q", string(c.argBuf))
if string(c.argBuf) != "$foo " {
t.Fatalf("Expected argBuf to be \"$foo \", got %q", string(c.argBuf))
}
c.parse(msgop[10:])
if c.argBuf != nil {
@@ -384,21 +386,21 @@ func TestSplitDanglingArgBuf(t *testing.T) {
if c.argBuf == nil {
t.Fatal("Expected a non-nil argBuf")
}
if string(c.pa.account) != "$foo" {
t.Fatalf("Expected account to be \"$foo\", got %q", c.pa.account)
}
if string(c.pa.subject) != "foo" {
t.Fatalf("Expected subject to be \"foo\", got %q", c.pa.subject)
}
if string(c.pa.reply) != "" {
t.Fatalf("Expected reply to be \"\", got %q", c.pa.reply)
}
if string(c.pa.sid) != "RSID:2:1" {
t.Fatalf("Expected sid to \"RSID:2:1\", got %q", c.pa.sid)
}
if c.pa.size != 5 {
t.Fatalf("Expected sid to 5, got %v", c.pa.size)
}
// msg buffer should be
if c.msgBuf == nil || string(c.msgBuf) != "hel" {
t.Fatalf("Expected msgBuf to be \"hel\", got %q", c.msgBuf)
if c.msgBuf == nil || string(c.msgBuf) != "hello\r" {
t.Fatalf("Expected msgBuf to be \"hello\r\", got %q", c.msgBuf)
}
c.parse(msgop[23:])
// At the end, we should have cleaned-up both arg and msg buffers.
@@ -410,31 +412,29 @@ func TestSplitDanglingArgBuf(t *testing.T) {
}
}
func TestSplitMsgArg(t *testing.T) {
func TestSplitRoutedMsgArg(t *testing.T) {
_, c, _ := setupClient()
// Allow parser to process MSG
// Allow parser to process RMSG
c.typ = ROUTER
b := make([]byte, 1024)
copy(b, []byte("MSG hello.world RSID:14:8 6040\r\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"))
copy(b, []byte("RMSG $G hello.world 6040\r\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"))
c.parse(b)
copy(b, []byte("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\r\n"))
c.parse(b)
wantAccount := "$G"
wantSubject := "hello.world"
wantSid := "RSID:14:8"
wantSzb := "6040"
if string(c.pa.account) != wantAccount {
t.Fatalf("Incorrect account: want %q, got %q", wantAccount, c.pa.account)
}
if string(c.pa.subject) != wantSubject {
t.Fatalf("Incorrect subject: want %q, got %q", wantSubject, c.pa.subject)
}
if string(c.pa.sid) != wantSid {
t.Fatalf("Incorrect sid: want %q, got %q", wantSid, c.pa.sid)
}
if string(c.pa.szb) != wantSzb {
t.Fatalf("Incorrect szb: want %q, got %q", wantSzb, c.pa.szb)
}
@@ -442,21 +442,21 @@ func TestSplitMsgArg(t *testing.T) {
func TestSplitBufferMsgOp(t *testing.T) {
c := &client{subs: make(map[string]*subscription), typ: ROUTER}
msg := []byte("MSG foo.bar QRSID:15:3 _INBOX.22 11\r\nhello world\r")
msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r")
msg1 := msg[:2]
msg2 := msg[2:9]
msg3 := msg[9:15]
msg4 := msg[15:22]
msg5 := msg[22:25]
msg6 := msg[25:37]
msg7 := msg[37:42]
msg8 := msg[42:]
msg7 := msg[37:40]
msg8 := msg[40:]
if err := c.parse(msg1); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != OP_MS {
t.Fatalf("Expected OP_MS state vs %d\n", c.state)
if c.state != OP_M {
t.Fatalf("Expected OP_M state vs %d\n", c.state)
}
if err := c.parse(msg2); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
@@ -493,9 +493,6 @@ func TestSplitBufferMsgOp(t *testing.T) {
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
t.Fatalf("MSG arg subject incorrect: '%s'\n", c.pa.subject)
}
if !bytes.Equal(c.pa.sid, []byte("QRSID:15:3")) {
t.Fatalf("MSG arg sid incorrect: '%s'\n", c.pa.sid)
}
if !bytes.Equal(c.pa.reply, []byte("_INBOX.22")) {
t.Fatalf("MSG arg reply subject incorrect: '%s'\n", c.pa.reply)
}

View File

@@ -213,7 +213,7 @@ func (r *SublistResult) addSubToResult(sub *subscription) *SublistResult {
if sub.queue == nil {
nr.psubs = append(nr.psubs, sub)
} else {
if i := findQSliceForSub(sub, nr.qsubs); i >= 0 {
if i := findQSlot(sub.queue, nr.qsubs); i >= 0 {
nr.qsubs[i] = append(nr.qsubs[i], sub)
} else {
nr.qsubs = append(nr.qsubs, []*subscription{sub})
@@ -320,6 +320,23 @@ func (s *Sublist) reduceCacheCount() {
})
}
// Helper function for auto-expanding remote qsubs.
func isRemoteQSub(sub *subscription) bool {
return sub != nil && sub.queue != nil && sub.client != nil && sub.client.typ == ROUTER
}
// This should be called when we update the weight of an existing
// remote queue sub.
func (s *Sublist) UpdateRemoteQSub(sub *subscription) {
// We could search to make sure we find it, but probably not worth
// it unless we are thrashing the cache. Just remove from our L2 and update
// the genid so L1 will be flushed.
s.Lock()
s.removeFromCache(string(sub.subject), sub)
atomic.AddUint64(&s.genid, 1)
s.Unlock()
}
// This will add in a node's results to the total results.
func addNodeToResults(n *node, results *SublistResult) {
// Normal subscriptions
@@ -335,18 +352,23 @@ func addNodeToResults(n *node, results *SublistResult) {
if len(qr) == 0 {
continue
}
tsub := &subscription{subject: nil, queue: []byte(qname)}
// Need to find matching list in results
if i := findQSliceForSub(tsub, results.qsubs); i >= 0 {
for _, sub := range qr {
var i int
if i = findQSlot([]byte(qname), results.qsubs); i < 0 {
i = len(results.qsubs)
nqsub := make([]*subscription, 0, len(qr))
results.qsubs = append(results.qsubs, nqsub)
}
for _, sub := range qr {
if isRemoteQSub(sub) {
ns := atomic.LoadInt32(&sub.qw)
// Shadow these subscriptions
for n := 0; n < int(ns); n++ {
results.qsubs[i] = append(results.qsubs[i], sub)
}
} else {
results.qsubs[i] = append(results.qsubs[i], sub)
}
} else {
var nqsub []*subscription
for _, sub := range qr {
nqsub = append(nqsub, sub)
}
results.qsubs = append(results.qsubs, nqsub)
}
}
}
@@ -355,12 +377,12 @@ func addNodeToResults(n *node, results *SublistResult) {
// processing publishes in L1 on client. So we need to walk sequentially
// for now. Keep an eye on this in case we start getting large number of
// different queue subscribers for the same subject.
func findQSliceForSub(sub *subscription, qsl [][]*subscription) int {
if sub.queue == nil {
func findQSlot(queue []byte, qsl [][]*subscription) int {
if queue == nil {
return -1
}
for i, qr := range qsl {
if len(qr) > 0 && bytes.Equal(sub.queue, qr[0].queue) {
if len(qr) > 0 && bytes.Equal(queue, qr[0].queue) {
return i
}
}
@@ -494,6 +516,56 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error {
return nil
}
func (s *Sublist) checkNodeForClientSubs(n *node, c *client) {
var removed uint32
for _, sub := range n.psubs {
if sub.client == c {
if s.removeFromNode(n, sub) {
s.removeFromCache(string(sub.subject), sub)
removed++
}
}
}
// Queue subscriptions
for _, qr := range n.qsubs {
for _, sub := range qr {
if sub.client == c {
if s.removeFromNode(n, sub) {
s.removeFromCache(string(sub.subject), sub)
removed++
}
}
}
}
s.count -= removed
s.removes += uint64(removed)
}
func (s *Sublist) removeClientSubs(l *level, c *client) {
for _, n := range l.nodes {
s.checkNodeForClientSubs(n, c)
s.removeClientSubs(n.next, c)
}
if l.pwc != nil {
s.checkNodeForClientSubs(l.pwc, c)
s.removeClientSubs(l.pwc.next, c)
}
if l.fwc != nil {
s.checkNodeForClientSubs(l.fwc, c)
s.removeClientSubs(l.fwc.next, c)
}
}
func (s *Sublist) RemoveAllForClient(c *client) {
s.Lock()
removes := s.removes
s.removeClientSubs(s.root, c)
if s.removes != removes {
atomic.AddUint64(&s.genid, 1)
}
s.Unlock()
}
// pruneNode is used to prune an empty node from the tree.
func (l *level) pruneNode(n *node, t string) {
if n == nil {
@@ -541,7 +613,7 @@ func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) {
delete(n.psubs, sub)
if found && n.plist != nil {
// This will brute force remove the plist to perform
// correct behavior. Will get repopulated on a call
// correct behavior. Will get re-populated on a call
// to Match as needed.
n.plist = nil
}
@@ -549,12 +621,11 @@ func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) {
}
// We have a queue group subscription here
qname := string(sub.queue)
qsub := n.qsubs[qname]
qsub := n.qsubs[string(sub.queue)]
_, found = qsub[sub]
delete(qsub, sub)
if len(qsub) == 0 {
delete(n.qsubs, qname)
delete(n.qsubs, string(sub.queue))
}
return found
}
@@ -830,7 +901,7 @@ func matchLiteral(literal, subject string) bool {
}
func addLocalSub(sub *subscription, subs *[]*subscription) {
if sub != nil && sub.client != nil && sub.client.typ == CLIENT {
if sub != nil && sub.client != nil && sub.client.typ == CLIENT && sub.im == nil {
*subs = append(*subs, sub)
}
}
@@ -855,11 +926,9 @@ func (s *Sublist) addNodeToSubs(n *node, subs *[]*subscription) {
}
func (s *Sublist) collectLocalSubs(l *level, subs *[]*subscription) {
if len(l.nodes) > 0 {
for _, n := range l.nodes {
s.addNodeToSubs(n, subs)
s.collectLocalSubs(n.next, subs)
}
for _, n := range l.nodes {
s.addNodeToSubs(n, subs)
s.collectLocalSubs(n.next, subs)
}
if l.pwc != nil {
s.addNodeToSubs(l.pwc, subs)

View File

@@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -69,7 +70,7 @@ func verifyNumLevels(s *Sublist, expected int, t *testing.T) {
}
func verifyQMember(qsubs [][]*subscription, val *subscription, t *testing.T) {
verifyMember(qsubs[findQSliceForSub(val, qsubs)], val, t)
verifyMember(qsubs[findQSlot(val.queue, qsubs)], val, t)
}
func verifyMember(r []*subscription, val *subscription, t *testing.T) {
@@ -86,7 +87,8 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) {
// Helpers to generate test subscriptions.
func newSub(subject string) *subscription {
return &subscription{subject: []byte(subject)}
c := &client{typ: CLIENT}
return &subscription{client: c, subject: []byte(subject)}
}
func newQSub(subject, queue string) *subscription {
@@ -96,6 +98,14 @@ func newQSub(subject, queue string) *subscription {
return newSub(subject)
}
func newRemoteQSub(subject, queue string, num int32) *subscription {
if queue != "" {
c := &client{typ: ROUTER}
return &subscription{client: c, subject: []byte(subject), queue: []byte(queue), qw: num}
}
return newSub(subject)
}
func TestSublistInit(t *testing.T) {
s := NewSublist()
verifyCount(s, 0, t)
@@ -247,6 +257,34 @@ func TestSublistRemoveWithLargeSubs(t *testing.T) {
verifyLen(r.psubs, plistMin*2-3, t)
}
func TestSublistRemoveByClient(t *testing.T) {
s := NewSublist()
c := &client{}
for i := 0; i < 10; i++ {
subject := fmt.Sprintf("a.b.c.d.e.f.%d", i)
sub := &subscription{client: c, subject: []byte(subject)}
s.Insert(sub)
}
verifyCount(s, 10, t)
s.Insert(&subscription{client: c, subject: []byte(">")})
s.Insert(&subscription{client: c, subject: []byte("foo.*")})
s.Insert(&subscription{client: c, subject: []byte("foo"), queue: []byte("bar")})
s.Insert(&subscription{client: c, subject: []byte("foo"), queue: []byte("bar")})
s.Insert(&subscription{client: c, subject: []byte("foo.bar"), queue: []byte("baz")})
s.Insert(&subscription{client: c, subject: []byte("foo.bar"), queue: []byte("baz")})
verifyCount(s, 16, t)
genid := atomic.LoadUint64(&s.genid)
s.RemoveAllForClient(c)
verifyCount(s, 0, t)
// genid should be different
if genid == atomic.LoadUint64(&s.genid) {
t.Fatalf("GenId should have been changed after removal of subs")
}
if cc := s.CacheCount(); cc != 0 {
t.Fatalf("Cache should be zero, got %d", cc)
}
}
func TestSublistInvalidSubjectsInsert(t *testing.T) {
s := NewSublist()
@@ -398,8 +436,8 @@ func TestSublistBasicQueueResults(t *testing.T) {
r = s.Match(subject)
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 2, t)
verifyLen(r.qsubs[findQSliceForSub(sub1, r.qsubs)], 1, t)
verifyLen(r.qsubs[findQSliceForSub(sub2, r.qsubs)], 2, t)
verifyLen(r.qsubs[findQSlot(sub1.queue, r.qsubs)], 1, t)
verifyLen(r.qsubs[findQSlot(sub2.queue, r.qsubs)], 2, t)
verifyQMember(r.qsubs, sub2, t)
verifyQMember(r.qsubs, sub3, t)
verifyQMember(r.qsubs, sub4, t)
@@ -757,6 +795,52 @@ func TestSublistRaceOnMatch(t *testing.T) {
}
}
// Remote subscriptions for queue subscribers will be weighted such that a single subscription
// is received, but represents all of the queue subscribers on the remote side.
func TestSublistRemoteQueueSubscriptions(t *testing.T) {
s := NewSublist()
// Normals
s1 := newQSub("foo", "bar")
s2 := newQSub("foo", "bar")
s.Insert(s1)
s.Insert(s2)
// Now do weighted remotes.
rs1 := newRemoteQSub("foo", "bar", 10)
s.Insert(rs1)
rs2 := newRemoteQSub("foo", "bar", 10)
s.Insert(rs2)
// These are just shadowed in results, so should appear as 4 subs.
verifyCount(s, 4, t)
r := s.Match("foo")
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 1, t)
verifyLen(r.qsubs[0], 22, t)
s.Remove(s1)
s.Remove(rs1)
verifyCount(s, 2, t)
// Now make sure our shadowed results are correct after a removal.
r = s.Match("foo")
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 1, t)
verifyLen(r.qsubs[0], 11, t)
// Now do an update to an existing remote sub to update its weight.
rs2.qw = 1
s.UpdateRemoteQSub(rs2)
// Results should reflect new weight.
r = s.Match("foo")
verifyLen(r.psubs, 0, t)
verifyQLen(r.qsubs, 1, t)
verifyLen(r.qsubs[0], 2, t)
}
// -- Benchmarks Setup --
var subs []*subscription

View File

@@ -23,12 +23,14 @@ import (
"strings"
"time"
"github.com/nats-io/nuid"
"github.com/nats-io/nkeys"
)
// Use nuid.
// Use nkeys and the public key.
func genID() string {
return nuid.Next()
kp, _ := nkeys.CreateServer()
pub, _ := kp.PublicKey()
return pub
}
// Ascii numbers 0-9

View File

@@ -128,7 +128,7 @@ func TestServerRestartAndQueueSubs(t *testing.T) {
// Client options
opts := nats.GetDefaultOptions()
opts.Timeout = (5 * time.Second)
opts.ReconnectWait = (50 * time.Millisecond)
opts.ReconnectWait = (20 * time.Millisecond)
opts.MaxReconnect = 1000
opts.NoRandomize = true
@@ -267,7 +267,12 @@ func TestServerRestartAndQueueSubs(t *testing.T) {
checkClusterFormed(t, srvA, srvB)
// Make sure subscriptions are propagated in the cluster
if err := checkExpectedSubs(4, srvA, srvB); err != nil {
// Clients will be connected to srvA, so that will be 4,
// but srvB will only have 2 now since we coaelsce.
if err := checkExpectedSubs(4, srvA); err != nil {
t.Fatalf("%v", err)
}
if err := checkExpectedSubs(2, srvB); err != nil {
t.Fatalf("%v", err)
}

View File

@@ -72,6 +72,15 @@ func checkExpectedSubs(expected int, servers ...*server.Server) error {
return nil
}
func runThreeServers(t *testing.T) (srvA, srvB, srvC *server.Server, optsA, optsB, optsC *server.Options) {
srvA, optsA = RunServerWithConfig("./configs/srv_a.conf")
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
srvC, optsC = RunServerWithConfig("./configs/srv_c.conf")
checkClusterFormed(t, srvA, srvB, srvC)
return
}
func runServers(t *testing.T) (srvA, srvB *server.Server, optsA, optsB *server.Options) {
srvA, optsA = RunServerWithConfig("./configs/srv_a.conf")
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
@@ -161,7 +170,8 @@ func TestClusterQueueSubs(t *testing.T) {
expectA(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil {
// New cluster proto this will only be 1.
if err := checkExpectedSubs(1, srvB); err != nil {
t.Fatalf("%v", err)
}
@@ -188,7 +198,8 @@ func TestClusterQueueSubs(t *testing.T) {
expectA(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1SidsA)+len(pSids), srvB); err != nil {
// Normal foo and the queue group will be one a piece, so 2 + wc == 3
if err := checkExpectedSubs(3, srvB); err != nil {
t.Fatalf("%v", err)
}
@@ -219,7 +230,8 @@ func TestClusterQueueSubs(t *testing.T) {
expectB(pongRe)
// Make sure the subs have propagated to srvA before continuing
if err := checkExpectedSubs(len(qg1SidsA)+len(pSids)+len(qg2SidsB), srvA); err != nil {
// This will be all the subs on A and just 1 from B that gets coalesced.
if err := checkExpectedSubs(len(qg1SidsA)+len(pSids)+1, srvA); err != nil {
t.Fatalf("%v", err)
}
@@ -243,7 +255,7 @@ func TestClusterQueueSubs(t *testing.T) {
expectA(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(pSids)+len(qg2SidsB), srvB); err != nil {
if err := checkExpectedSubs(1+1+len(qg2SidsB), srvB); err != nil {
t.Fatalf("%v", err)
}
@@ -302,7 +314,7 @@ func TestClusterDoubleMsgs(t *testing.T) {
expectA1(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil {
if err := checkExpectedSubs(1, srvB); err != nil {
t.Fatalf("%v", err)
}
@@ -323,7 +335,7 @@ func TestClusterDoubleMsgs(t *testing.T) {
pSids := []string{"1", "2"}
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1SidsA)+2, srvB); err != nil {
if err := checkExpectedSubs(1+2, srvB); err != nil {
t.Fatalf("%v", err)
}

View File

@@ -0,0 +1,17 @@
# New Cluster config file
listen: 127.0.0.1:5343
cluster {
#nkey: CBSMNSOLVGFSP62Q2VD24KQIQXIVG2XVKSHE4DL7KKNN55MUYQKMDCHZ
listen: 127.0.0.1:5344
# Routes are actively solicited and connected to from this server.
# Other servers can connect to us if they supply the correct credentials
# in their routes definitions from above.
routes = [
nats-route://127.0.0.1:5345
nats-route://127.0.0.1:5346
]
}

21
test/configs/srv_c.conf Normal file
View File

@@ -0,0 +1,21 @@
# Cluster Server C
listen: 127.0.0.1:5226
cluster {
listen: 127.0.0.1:5248
authorization {
user: ruser
password: top_secret
timeout: 0.5
}
# Routes are actively solicited and connected to from this server.
# Other servers can connect to us if they supply the correct credentials
# in their routes definitions from above.
routes = [
nats-route://ruser:top_secret@127.0.0.1:5244
]
}

1227
test/new_routes_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,6 @@ import (
"os"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
@@ -148,25 +147,14 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
// Send SUB via client connection
send("SUB foo 22\r\n")
// Make sure the SUB is broadcast via the route
buf := expectResult(t, rc, subRe)
matches := subRe.FindAllSubmatch(buf, -1)
rsid := string(matches[0][5])
if !strings.HasPrefix(rsid, "RSID:") {
t.Fatalf("Got wrong RSID: %s\n", rsid)
}
// Make sure the RS+ is broadcast via the route
expectResult(t, rc, rsubRe)
// Send UNSUB via client connection
send("UNSUB 22\r\n")
// Make sure the SUB is broadcast via the route
buf = expectResult(t, rc, unsubRe)
matches = unsubRe.FindAllSubmatch(buf, -1)
rsid2 := string(matches[0][1])
if rsid2 != rsid {
t.Fatalf("Expected rsid's to match. %q vs %q\n", rsid, rsid2)
}
// Make sure the RS- is broadcast via the route
expectResult(t, rc, runsubRe)
// Explicitly shutdown the server, otherwise this test would
// cause following test to fail.
@@ -217,18 +205,16 @@ func TestRouteForwardsMsgFromClients(t *testing.T) {
routeExpect(infoRe)
}
// Send SUB via route connection
routeSend("SUB foo RSID:2:22\r\n")
routeSend("PING\r\n")
// Send SUB via route connection, RS+
routeSend("RS+ $G foo\r\nPING\r\n")
routeExpect(pongRe)
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
clientSend("PING\r\n")
clientSend("PUB foo 2\r\nok\r\nPING\r\n")
clientExpect(pongRe)
matches := expectMsgs(1)
checkMsg(t, matches[0], "foo", "RSID:2:22", "", "2", "ok")
checkRmsg(t, matches[0], "$G", "foo", "", "2", "ok")
}
func TestRouteForwardsMsgToClients(t *testing.T) {
@@ -247,13 +233,12 @@ func TestRouteForwardsMsgToClients(t *testing.T) {
routeSend, _ := setupRoute(t, route, opts)
// Subscribe to foo
clientSend("SUB foo 1\r\n")
clientSend("SUB foo 1\r\nPING\r\n")
// Use ping roundtrip to make sure its processed.
clientSend("PING\r\n")
clientExpect(pongRe)
// Send MSG proto via route connection
routeSend("MSG foo 1 2\r\nok\r\n")
// Send RMSG proto via route connection
routeSend("RMSG $G foo 2\r\nok\r\n")
matches := expectMsgs(1)
checkMsg(t, matches[0], "foo", "1", "", "2", "ok")
@@ -270,10 +255,10 @@ func TestRouteOneHopSemantics(t *testing.T) {
routeSend, _ := setupRoute(t, route, opts)
// Express interest on this route for foo.
routeSend("SUB foo RSID:2:2\r\n")
routeSend("RS+ $G foo\r\n")
// Send MSG proto via route connection
routeSend("MSG foo 1 2\r\nok\r\n")
routeSend("RMSG foo 2\r\nok\r\n")
// Make sure it does not come back!
expectNothing(t, route)
@@ -296,14 +281,13 @@ func TestRouteOnlySendOnce(t *testing.T) {
expectMsgs := expectMsgsCommand(t, routeExpect)
// Express multiple interest on this route for foo.
routeSend("SUB foo RSID:2:1\r\n")
routeSend("SUB foo RSID:2:2\r\n")
routeSend("RS+ $G foo\r\n")
routeSend("RS+ $G foo\r\n")
routeSend("PING\r\n")
routeExpect(pongRe)
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
clientSend("PING\r\n")
clientSend("PUB foo 2\r\nok\r\nPING\r\n")
clientExpect(pongRe)
expectMsgs(1)
@@ -327,13 +311,11 @@ func TestRouteQueueSemantics(t *testing.T) {
expectAuthRequired(t, route)
routeSend, routeExpect := setupRouteEx(t, route, opts, "ROUTER:xyz")
routeSend("INFO {\"server_id\":\"ROUTER:xyz\"}\r\n")
expectMsgs := expectMsgsCommand(t, routeExpect)
expectMsgs := expectRmsgsCommand(t, routeExpect)
// Express multiple interest on this route for foo, queue group bar.
qrsid1 := "QRSID:1:1"
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid1))
qrsid2 := "QRSID:1:2"
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid2))
routeSend("RS+ $G foo bar 1\r\n")
routeSend("RS+ $G foo bar 2\r\n")
// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")
@@ -347,75 +329,48 @@ func TestRouteQueueSemantics(t *testing.T) {
// Only 1
matches := expectMsgs(1)
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
checkRmsg(t, matches[0], "$G", "foo", "| bar", "2", "ok")
// Add normal Interest as well to route interest.
routeSend("SUB foo RSID:1:4\r\n")
routeSend("RS+ $G foo\r\n")
// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")
routeExpect(pongRe)
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
clientSend("PUB foo 2\r\nok\r\nPING\r\n")
// Use ping roundtrip to make sure its processed.
clientSend("PING\r\n")
clientExpect(pongRe)
// Should be 2 now, 1 for all normal, and one for specific queue subscriber.
matches = expectMsgs(2)
// Expect first to be the normal subscriber, next will be the queue one.
if string(matches[0][sidIndex]) != "RSID:1:4" &&
string(matches[1][sidIndex]) != "RSID:1:4" {
t.Fatalf("Did not received routed sid\n")
}
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
checkMsg(t, matches[1], "foo", "", "", "2", "ok")
// Check the rsid to verify it is one of the queue group subscribers.
var rsid string
if matches[0][sidIndex][0] == 'Q' {
rsid = string(matches[0][sidIndex])
} else {
rsid = string(matches[1][sidIndex])
}
if rsid != qrsid1 && rsid != qrsid2 {
t.Fatalf("Expected a queue group rsid, got %s\n", rsid)
}
// Should be 1 now for everything. Always receive 1 message.
matches = expectMsgs(1)
checkRmsg(t, matches[0], "$G", "foo", "| bar", "2", "ok")
// Now create a queue subscription for the client as well as a normal one.
clientSend("SUB foo 1\r\n")
// Use ping roundtrip to make sure its processed.
clientSend("PING\r\n")
clientExpect(pongRe)
routeExpect(subRe)
routeExpect(rsubRe)
clientSend("SUB foo bar 2\r\n")
clientSend("SUB foo bar 2\r\nPING\r\n")
// Use ping roundtrip to make sure its processed.
clientSend("PING\r\n")
clientExpect(pongRe)
routeExpect(subRe)
routeExpect(rsubRe)
// Deliver a MSG from the route itself, make sure the client receives both.
routeSend("MSG foo RSID:1:1 2\r\nok\r\n")
// Queue group one.
routeSend("MSG foo QRSID:1:2 2\r\nok\r\n")
// Invlaid queue sid.
routeSend("MSG foo QRSID 2\r\nok\r\n") // cid and sid missing
routeSend("MSG foo QRSID:1 2\r\nok\r\n") // cid not terminated with ':'
routeSend("MSG foo QRSID:1: 2\r\nok\r\n") // cid==1 but sid missing. It needs to be at least one character.
routeSend("RMSG $G foo | bar 2\r\nok\r\n")
// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")
routeExpect(pongRe)
// Should be 2 now, 1 for all normal, and one for specific queue subscriber.
// Should get 2 msgs.
matches = clientExpectMsgs(2)
// Expect first to be the normal subscriber, next will be the queue one.
checkMsg(t, matches[0], "foo", "1", "", "2", "ok")
checkMsg(t, matches[1], "foo", "2", "", "2", "ok")
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
}
func TestSolicitRouteReconnect(t *testing.T) {
@@ -442,22 +397,24 @@ func TestMultipleRoutesSameId(t *testing.T) {
defer route1.Close()
expectAuthRequired(t, route1)
route1Send, _ := setupRouteEx(t, route1, opts, "ROUTE:2222")
route1Send, route1Expect := setupRouteEx(t, route1, opts, "ROUTE:2222")
route2 := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
defer route2.Close()
expectAuthRequired(t, route2)
route2Send, _ := setupRouteEx(t, route2, opts, "ROUTE:2222")
route2Send, route2Expect := setupRouteEx(t, route2, opts, "ROUTE:2222")
// Send SUB via route connections
sub := "SUB foo RSID:2:22\r\n"
sub := "RS+ $G foo\r\nPING\r\n"
route1Send(sub)
route2Send(sub)
route1Expect(pongRe)
route2Expect(pongRe)
// Make sure we do not get anything on a MSG send to a router.
// Send MSG proto via route connection
route1Send("MSG foo 1 2\r\nok\r\n")
// Make sure we do not get anything on a RMSG send to a router.
// Send RMSG proto via route connection
route1Send("RMSG $G foo 2\r\nok\r\n")
expectNothing(t, route1)
expectNothing(t, route2)
@@ -468,8 +425,7 @@ func TestMultipleRoutesSameId(t *testing.T) {
defer client.Close()
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
clientSend("PING\r\n")
clientSend("PUB foo 2\r\nok\r\nPING\r\n")
clientExpect(pongRe)
// We should only receive on one route, not both.
@@ -486,11 +442,11 @@ func TestMultipleRoutesSameId(t *testing.T) {
}
}
matches := msgRe.FindAllSubmatch(buf, -1)
matches := rmsgRe.FindAllSubmatch(buf, -1)
if len(matches) != 1 {
t.Fatalf("Expected 1 msg, got %d\n", len(matches))
}
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
checkRmsg(t, matches[0], "$G", "foo", "", "2", "ok")
}
func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
@@ -530,7 +486,7 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
// Trigger the send of local subs.
routeSend(infoJSON)
routeExpect(subRe)
routeExpect(rsubRe)
// Close and then re-open
route.Close()
@@ -543,7 +499,7 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
routeExpect(infoRe)
routeSend(infoJSON)
routeExpect(subRe)
routeExpect(rsubRe)
}
type ignoreLogger struct{}
@@ -990,8 +946,8 @@ func TestRouteBasicPermissions(t *testing.T) {
if err := checkExpectedSubs(5, srvA); err != nil {
t.Fatal(err.Error())
}
// B should have 4
if err := checkExpectedSubs(4, srvB); err != nil {
// B should have 3 since we coalesce te two for 'foo'
if err := checkExpectedSubs(3, srvB); err != nil {
t.Fatal(err.Error())
}
// Send a message from B and check that it is received.
@@ -1011,7 +967,7 @@ func TestRouteBasicPermissions(t *testing.T) {
ncb.Close()
srvB.Shutdown()
// Since B had 2 local subs, A should go from 5 to 3
// Since B had 2 local subs, A should still only go from 4 to 3
if err := checkExpectedSubs(3, srvA); err != nil {
t.Fatal(err.Error())
}
@@ -1020,8 +976,8 @@ func TestRouteBasicPermissions(t *testing.T) {
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
defer srvB.Shutdown()
// Check that subs from A that can be sent to B are sent.
// That would be 2 (the 2 subscriptions on foo).
if err := checkExpectedSubs(2, srvB); err != nil {
// That would be 2 (the 2 subscriptions on foo) as one.
if err := checkExpectedSubs(1, srvB); err != nil {
t.Fatal(err.Error())
}

View File

@@ -69,13 +69,12 @@ func RunServer(opts *server.Options) *server.Server {
}
// LoadConfig loads a configuration from a filename
func LoadConfig(configFile string) (opts *server.Options) {
func LoadConfig(configFile string) *server.Options {
opts, err := server.ProcessConfigFile(configFile)
if err != nil {
panic(fmt.Sprintf("Error processing configuration file: %v", err))
}
opts.NoSigs, opts.NoLog = true, true
return
return opts
}
// RunServerWithConfig starts a new Go routine based server with a configuration file.
@@ -215,6 +214,13 @@ func setupConnWithProto(t tLogger, c net.Conn, proto int) (sendFun, expectFun) {
return sendCommand(t, c), expectCommand(t, c)
}
func setupConnWithAccount(t tLogger, c net.Conn, account string) (sendFun, expectFun) {
checkInfoMsg(t, c)
cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"tls_required\":%v,\"account\":%q}\r\n", false, false, false, account)
sendProto(t, c, cs)
return sendCommand(t, c), expectCommand(t, c)
}
type sendFun func(string)
type expectFun func(*regexp.Regexp) []byte
@@ -250,24 +256,31 @@ var (
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`)
connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`)
rsubRe = regexp.MustCompile(`RS\+\s+([^\s]+)\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`)
runsubRe = regexp.MustCompile(`RS\-\s+([^\s]+)\s+([^\s]+)\s*([^\s]+)?\r\n`)
rmsgRe = regexp.MustCompile(`(?:(?:RMSG\s+([^\s]+)\s+([^\s]+)\s+(?:([|+]\s+([\w\s]+)|[^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
)
const (
// Regular Messages
subIndex = 1
sidIndex = 2
replyIndex = 4
lenIndex = 5
msgIndex = 6
// Routed Messages
accIndex = 1
rsubIndex = 2
replyAndQueueIndex = 3
)
// Test result from server against regexp
func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
expBuf := make([]byte, 32768)
// Wait for commands to be processed and results queued for read
c.SetReadDeadline(time.Now().Add(5 * time.Second))
c.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := c.Read(expBuf)
c.SetReadDeadline(time.Time{})
@@ -277,11 +290,22 @@ func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
buf := expBuf[:n]
if !re.Match(buf) {
stackFatalf(t, "Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", buf, re)
stackFatalf(t, "Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'", buf, re)
}
return buf
}
func peek(c net.Conn) []byte {
expBuf := make([]byte, 32768)
c.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
n, err := c.Read(expBuf)
c.SetReadDeadline(time.Time{})
if err != nil || n <= 0 {
return nil
}
return expBuf
}
func expectNothing(t tLogger, c net.Conn) {
expBuf := make([]byte, 32)
c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
@@ -292,7 +316,7 @@ func expectNothing(t tLogger, c net.Conn) {
}
}
// This will check that we got what we expected.
// This will check that we got what we expected from a normal message.
func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) {
if string(m[subIndex]) != subject {
stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[subIndex])
@@ -311,6 +335,33 @@ func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) {
}
}
func checkRmsg(t tLogger, m [][]byte, account, subject, replyAndQueues, len, msg string) {
if string(m[accIndex]) != account {
stackFatalf(t, "Did not get correct account: expected '%s' got '%s'\n", account, m[accIndex])
}
if string(m[rsubIndex]) != subject {
stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[rsubIndex])
}
if string(m[lenIndex]) != len {
stackFatalf(t, "Did not get correct msg length: expected '%s' got '%s'\n", len, m[lenIndex])
}
if string(m[replyAndQueueIndex]) != replyAndQueues {
stackFatalf(t, "Did not get correct reply/queues: expected '%s' got '%s'\n", replyAndQueues, m[replyAndQueueIndex])
}
}
// Closure for expectMsgs
func expectRmsgsCommand(t tLogger, ef expectFun) func(int) [][][]byte {
return func(expected int) [][][]byte {
buf := ef(rmsgRe)
matches := rmsgRe.FindAllSubmatch(buf, -1)
if len(matches) != expected {
stackFatalf(t, "Did not get correct # routed msgs: %d vs %d\n", len(matches), expected)
}
return matches
}
}
// Closure for expectMsgs
func expectMsgsCommand(t tLogger, ef expectFun) func(int) [][][]byte {
return func(expected int) [][][]byte {

View File

@@ -37,10 +37,6 @@ func TestUserAuthorizationProto(t *testing.T) {
sendProto(t, c, "SUB foo 1\r\n")
expectResult(t, c, okRe)
// Check that we now reserve _SYS.> though for internal, so no clients.
sendProto(t, c, "PUB _SYS.HB 2\r\nok\r\n")
expectResult(t, c, permErrRe)
// Check that _ is ok
sendProto(t, c, "PUB _ 2\r\nok\r\n")
expectResult(t, c, okRe)