Files
nats-server/server/accounts_test.go
Jaime Piña e12181cb83 Return not ready for connection reason
Currently, we use ReadyForConnections in server tests to wait for the
server to be ready. However, when this fails we don't get a clue about
why it failed.

This change adds a new unexported method called readyForConnections that
returns an error describing which check failed. The exported
ReadyForConnections version works exactly as before. The unexported
version gets used in internal tests only.
2021-04-20 11:45:08 -07:00

3297 lines
97 KiB
Go

// Copyright 2018-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
opts := defaultServerOptions
s := New(&opts)
// Now create two accounts.
f, err := s.RegisterAccount("$foo")
if err != nil {
t.Fatalf("Error creating account 'foo': %v", err)
}
b, err := s.RegisterAccount("$bar")
if err != nil {
t.Fatalf("Error creating account 'bar': %v", err)
}
return s, f, b
}
func TestRegisterDuplicateAccounts(t *testing.T) {
s, _, _ := simpleAccountServer(t)
if _, err := s.RegisterAccount("$foo"); err == nil {
t.Fatal("Expected an error registering 'foo' twice")
}
}
func TestAccountIsolation(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
cfoo, crFoo, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error register client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error register client with 'bar' account: %v", err)
}
// Make sure they are different accounts/sl.
if cfoo.acc == cbar.acc {
t.Fatalf("Error, accounts the same for both clients")
}
// Now do quick test that makes sure messages do not cross over.
// setup bar as a foo subscriber.
cbar.parseAsync("SUB foo 1\r\nPING\r\nPING\r\n")
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q", l)
}
cfoo.parseAsync("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")
l, err = crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'foo' from server: %v", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crFoo, []byte("hello\r\n"), t)
// Now make sure nothing shows up on bar.
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q", l)
}
}
func TestAccountIsolationExportImport(t *testing.T) {
checkIsolation := func(t *testing.T, pubSubj string, ncExp, ncImp *nats.Conn) {
// We keep track of 2 subjects.
// One subject (pubSubj) is based off the stream import.
// The other subject "fizz" is not imported and should be isolated.
gotSubjs := map[string]int{
pubSubj: 0,
"fizz": 0,
}
count := int32(0)
ch := make(chan struct{}, 1)
if _, err := ncImp.Subscribe(">", func(m *nats.Msg) {
gotSubjs[m.Subject] += 1
if n := atomic.AddInt32(&count, 1); n == 3 {
ch <- struct{}{}
}
}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Since both prod and cons use same server, flushing here will ensure
// that the interest is registered and known at the time we publish.
ncImp.Flush()
if err := ncExp.Publish(pubSubj, []byte(fmt.Sprintf("ncExp pub %s", pubSubj))); err != nil {
t.Fatal(err)
}
if err := ncImp.Publish(pubSubj, []byte(fmt.Sprintf("ncImp pub %s", pubSubj))); err != nil {
t.Fatal(err)
}
if err := ncExp.Publish("fizz", []byte("ncExp pub fizz")); err != nil {
t.Fatal(err)
}
if err := ncImp.Publish("fizz", []byte("ncImp pub fizz")); err != nil {
t.Fatal(err)
}
wantSubjs := map[string]int{
// Subscriber ncImp should receive publishes from ncExp and ncImp.
pubSubj: 2,
// Subscriber ncImp should only receive the publish from ncImp.
"fizz": 1,
}
// Wait for at least the 3 expected messages
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Expected 3 messages, got %v", atomic.LoadInt32(&count))
}
// But now wait a bit to see if subscription receives more than expected.
time.Sleep(50 * time.Millisecond)
if got, want := len(gotSubjs), len(wantSubjs); got != want {
t.Fatalf("unexpected subjs len, got=%d; want=%d", got, want)
}
for key, gotCnt := range gotSubjs {
if wantCnt := wantSubjs[key]; gotCnt != wantCnt {
t.Errorf("unexpected receive count for subject %q, got=%d, want=%d", key, gotCnt, wantCnt)
}
}
}
cases := []struct {
name string
exp, imp string
pubSubj string
}{
{
name: "export literal, import literal",
exp: "foo", imp: "foo",
pubSubj: "foo",
},
{
name: "export full wildcard, import literal",
exp: "foo.>", imp: "foo.bar",
pubSubj: "foo.bar",
},
{
name: "export full wildcard, import sublevel full wildcard",
exp: "foo.>", imp: "foo.bar.>",
pubSubj: "foo.bar.whizz",
},
{
name: "export full wildcard, import full wildcard",
exp: "foo.>", imp: "foo.>",
pubSubj: "foo.bar",
},
{
name: "export partial wildcard, import partial wildcard",
exp: "foo.*", imp: "foo.*",
pubSubj: "foo.bar",
},
{
name: "export mid partial wildcard, import mid partial wildcard",
exp: "foo.*.bar", imp: "foo.*.bar",
pubSubj: "foo.whizz.bar",
},
}
for _, c := range cases {
t.Run(fmt.Sprintf("%s jwt", c.name), func(t *testing.T) {
// Setup NATS server.
s := opTrustBasicSetup()
defer s.Shutdown()
go s.Start()
if err := s.readyForConnections(5 * time.Second); err != nil {
t.Fatal(err)
}
buildMemAccResolver(s)
// Setup exporter account.
accExpPair, accExpPub := createKey(t)
accExpClaims := jwt.NewAccountClaims(accExpPub)
if c.exp != "" {
accExpClaims.Limits.WildcardExports = true
accExpClaims.Exports.Add(&jwt.Export{
Name: fmt.Sprintf("%s-stream-export", c.exp),
Subject: jwt.Subject(c.exp),
Type: jwt.Stream,
})
}
accExpJWT, err := accExpClaims.Encode(oKp)
require_NoError(t, err)
addAccountToMemResolver(s, accExpPub, accExpJWT)
// Setup importer account.
accImpPair, accImpPub := createKey(t)
accImpClaims := jwt.NewAccountClaims(accImpPub)
if c.imp != "" {
accImpClaims.Imports.Add(&jwt.Import{
Name: fmt.Sprintf("%s-stream-import", c.imp),
Subject: jwt.Subject(c.imp),
Account: accExpPub,
Type: jwt.Stream,
})
}
accImpJWT, err := accImpClaims.Encode(oKp)
require_NoError(t, err)
addAccountToMemResolver(s, accImpPub, accImpJWT)
// Connect with different accounts.
ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accExpPair),
nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp)))
ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accImpPair),
nats.Name(fmt.Sprintf("nc-importer-%s", c.imp)))
defer ncExp.Close()
defer ncImp.Close()
checkIsolation(t, c.pubSubj, ncExp, ncImp)
if t.Failed() {
t.Logf("exported=%q; imported=%q", c.exp, c.imp)
}
})
t.Run(fmt.Sprintf("%s conf", c.name), func(t *testing.T) {
// Setup NATS server.
cf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
accounts: {
accExp: {
users: [{user: accExp, password: accExp}]
exports: [{stream: %q}]
}
accImp: {
users: [{user: accImp, password: accImp}]
imports: [{stream: {account: accExp, subject: %q}}]
}
}
`,
c.exp, c.imp,
)))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
// Connect with different accounts.
ncExp := natsConnect(t, s.ClientURL(), nats.UserInfo("accExp", "accExp"),
nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp)))
ncImp := natsConnect(t, s.ClientURL(), nats.UserInfo("accImp", "accImp"),
nats.Name(fmt.Sprintf("nc-importer-%s", c.imp)))
defer ncExp.Close()
defer ncImp.Close()
checkIsolation(t, c.pubSubj, ncExp, ncImp)
if t.Failed() {
t.Logf("exported=%q; imported=%q", c.exp, c.imp)
}
})
}
}
func TestAccountFromOptions(t *testing.T) {
opts := defaultServerOptions
opts.Accounts = []*Account{NewAccount("foo"), NewAccount("bar")}
s := New(&opts)
defer s.Shutdown()
ta := s.numReservedAccounts() + 2
if la := s.numAccounts(); la != ta {
t.Fatalf("Expected to have a server with %d active accounts, got %v", ta, la)
}
// Check that sl is filled in.
fooAcc, _ := s.LookupAccount("foo")
barAcc, _ := s.LookupAccount("bar")
if fooAcc == nil || barAcc == nil {
t.Fatalf("Error retrieving accounts for 'foo' and 'bar'")
}
if fooAcc.sl == nil || barAcc.sl == nil {
t.Fatal("Expected Sublists to be filled in on Opts.Accounts")
}
}
func TestNewAccountsFromClients(t *testing.T) {
opts := defaultServerOptions
s := New(&opts)
defer s.Shutdown()
c, cr, _ := newClientForServer(s)
defer c.close()
connectOp := "CONNECT {\"account\":\"foo\"}\r\n"
c.parseAsync(connectOp)
l, _ := cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
opts.AllowNewAccounts = true
s = New(&opts)
defer s.Shutdown()
c, cr, _ = newClientForServer(s)
defer c.close()
err := c.parse([]byte(connectOp))
if err != nil {
t.Fatalf("Received an error trying to connect: %v", err)
}
c.parseAsync("PING\r\n")
l, err = cr.ReadString('\n')
if err != nil {
t.Fatalf("Error reading response for client from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q", l)
}
}
func TestActiveAccounts(t *testing.T) {
opts := defaultServerOptions
opts.AllowNewAccounts = true
opts.Cluster.Port = 22
s := New(&opts)
defer s.Shutdown()
if s.NumActiveAccounts() != 0 {
t.Fatalf("Expected no active account, got %d", s.NumActiveAccounts())
}
addClientWithAccount := func(accName string) *testAsyncClient {
t.Helper()
c, _, _ := newClientForServer(s)
connectOp := fmt.Sprintf("CONNECT {\"account\":\"%s\"}\r\n", accName)
err := c.parse([]byte(connectOp))
if err != nil {
t.Fatalf("Received an error trying to connect: %v", err)
}
return c
}
// Now add some clients.
cf1 := addClientWithAccount("foo")
defer cf1.close()
if s.activeAccounts != 1 {
t.Fatalf("Expected active accounts to be 1, got %d", s.activeAccounts)
}
// Adding in same one should not change total.
cf2 := addClientWithAccount("foo")
defer cf2.close()
if s.activeAccounts != 1 {
t.Fatalf("Expected active accounts to be 1, got %d", s.activeAccounts)
}
// Add in new one.
cb1 := addClientWithAccount("bar")
defer cb1.close()
if s.activeAccounts != 2 {
t.Fatalf("Expected active accounts to be 2, got %d", s.activeAccounts)
}
// Make sure the Accounts track clients.
foo, _ := s.LookupAccount("foo")
bar, _ := s.LookupAccount("bar")
if foo == nil || bar == nil {
t.Fatalf("Error looking up accounts")
}
if nc := foo.NumConnections(); nc != 2 {
t.Fatalf("Expected account foo to have 2 clients, got %d", nc)
}
if nc := bar.NumConnections(); nc != 1 {
t.Fatalf("Expected account bar to have 1 client, got %d", nc)
}
waitTilActiveCount := func(n int32) {
t.Helper()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if active := s.NumActiveAccounts(); active != n {
return fmt.Errorf("Number of active accounts is %d", active)
}
return nil
})
}
// Test Removal
cb1.closeConnection(ClientClosed)
waitTilActiveCount(1)
checkAccClientsCount(t, bar, 0)
// This should not change the count.
cf1.closeConnection(ClientClosed)
waitTilActiveCount(1)
checkAccClientsCount(t, foo, 1)
cf2.closeConnection(ClientClosed)
waitTilActiveCount(0)
checkAccClientsCount(t, foo, 0)
}
// Clients can ask that the account be forced to be new. If it exists this is an error.
func TestNewAccountRequireNew(t *testing.T) {
// This has foo and bar accounts already.
s, _, _ := simpleAccountServer(t)
c, cr, _ := newClientForServer(s)
defer c.close()
connectOp := "CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n"
c.parseAsync(connectOp)
l, _ := cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
// Now allow new accounts on the fly, make sure second time does not work.
opts := defaultServerOptions
opts.AllowNewAccounts = true
s = New(&opts)
c, _, _ = newClientForServer(s)
defer c.close()
err := c.parse([]byte(connectOp))
if err != nil {
t.Fatalf("Received an error trying to create an account: %v", err)
}
c, cr, _ = newClientForServer(s)
defer c.close()
c.parseAsync(connectOp)
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "-ERR ") {
t.Fatalf("Expected an error")
}
}
func accountNameExists(name string, accounts []*Account) bool {
for _, acc := range accounts {
if strings.Compare(acc.Name, name) == 0 {
return true
}
}
return false
}
func TestAccountSimpleConfig(t *testing.T) {
confFileName := createConfFile(t, []byte(`accounts = [foo, bar]`))
defer removeFile(t, confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Received an error processing config file: %v", err)
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
}
if !accountNameExists("foo", opts.Accounts) {
t.Fatal("Expected a 'foo' account")
}
if !accountNameExists("bar", opts.Accounts) {
t.Fatal("Expected a 'bar' account")
}
// Make sure double entries is an error.
confFileName = createConfFile(t, []byte(`accounts = [foo, foo]`))
defer removeFile(t, confFileName)
_, err = ProcessConfigFile(confFileName)
if err == nil {
t.Fatalf("Expected an error with double account entries")
}
}
func TestAccountParseConfig(t *testing.T) {
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: alice, password: foo}
{user: bob, password: bar}
]
}
nats.io {
users = [
{user: derek, password: foo}
{user: ivan, password: bar}
]
}
}
`))
defer removeFile(t, confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Received an error processing config file: %v", err)
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
}
if lu := len(opts.Users); lu != 4 {
t.Fatalf("Expected 4 total Users, got %d", lu)
}
var natsAcc *Account
for _, acc := range opts.Accounts {
if acc.Name == "nats.io" {
natsAcc = acc
break
}
}
if natsAcc == nil {
t.Fatalf("Error retrieving account for 'nats.io'")
}
for _, u := range opts.Users {
if u.Username == "derek" {
if u.Account != natsAcc {
t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account)
}
}
}
}
func TestAccountParseConfigDuplicateUsers(t *testing.T) {
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: alice, password: foo}
{user: bob, password: bar}
]
}
nats.io {
users = [
{user: alice, password: bar}
]
}
}
`))
defer removeFile(t, confFileName)
_, err := ProcessConfigFile(confFileName)
if err == nil {
t.Fatalf("Expected an error with double user entries")
}
}
func TestAccountParseConfigImportsExports(t *testing.T) {
opts, err := ProcessConfigFile("./configs/accounts.conf")
if err != nil {
t.Fatal("parsing failed: ", err)
}
if la := len(opts.Accounts); la != 3 {
t.Fatalf("Expected to see 3 accounts in opts, got %d", la)
}
if lu := len(opts.Nkeys); lu != 4 {
t.Fatalf("Expected 4 total Nkey users, got %d", lu)
}
if lu := len(opts.Users); lu != 0 {
t.Fatalf("Expected no Users, got %d", lu)
}
var natsAcc, synAcc *Account
for _, acc := range opts.Accounts {
if acc.Name == "nats.io" {
natsAcc = acc
} else if acc.Name == "synadia" {
synAcc = acc
}
}
if natsAcc == nil {
t.Fatalf("Error retrieving account for 'nats.io'")
}
if natsAcc.Nkey != "AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE" {
t.Fatalf("Expected nats account to have an nkey, got %q\n", natsAcc.Nkey)
}
// Check user assigned to the correct account.
for _, nk := range opts.Nkeys {
if nk.Nkey == "UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW" {
if nk.Account != natsAcc {
t.Fatalf("Expected user to be associated with natsAcc, got %q\n", nk.Account.Name)
}
break
}
}
// Now check for the imports and exports of streams and services.
if lis := len(natsAcc.imports.streams); lis != 2 {
t.Fatalf("Expected 2 imported streams, got %d\n", lis)
}
if lis := len(natsAcc.imports.services); lis != 1 {
t.Fatalf("Expected 1 imported service, got %d\n", lis)
}
if les := len(natsAcc.exports.services); les != 4 {
t.Fatalf("Expected 4 exported services, got %d\n", les)
}
if les := len(natsAcc.exports.streams); les != 0 {
t.Fatalf("Expected no exported streams, got %d\n", les)
}
ea := natsAcc.exports.services["nats.time"]
if ea == nil {
t.Fatalf("Expected to get a non-nil exportAuth for service")
}
if ea.respType != Streamed {
t.Fatalf("Expected to get a Streamed response type, got %q", ea.respType)
}
ea = natsAcc.exports.services["nats.photo"]
if ea == nil {
t.Fatalf("Expected to get a non-nil exportAuth for service")
}
if ea.respType != Chunked {
t.Fatalf("Expected to get a Chunked response type, got %q", ea.respType)
}
ea = natsAcc.exports.services["nats.add"]
if ea == nil {
t.Fatalf("Expected to get a non-nil exportAuth for service")
}
if ea.respType != Singleton {
t.Fatalf("Expected to get a Singleton response type, got %q", ea.respType)
}
if synAcc == nil {
t.Fatalf("Error retrieving account for 'synadia'")
}
if lis := len(synAcc.imports.streams); lis != 0 {
t.Fatalf("Expected no imported streams, got %d\n", lis)
}
if lis := len(synAcc.imports.services); lis != 1 {
t.Fatalf("Expected 1 imported service, got %d\n", lis)
}
if les := len(synAcc.exports.services); les != 2 {
t.Fatalf("Expected 2 exported service, got %d\n", les)
}
if les := len(synAcc.exports.streams); les != 2 {
t.Fatalf("Expected 2 exported streams, got %d\n", les)
}
}
func TestImportExportConfigFailures(t *testing.T) {
// Import from unknow account
cf := createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{stream: {account: "synadia", subject:"foo"}}]
}
}
`))
defer removeFile(t, cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import from unknown account")
}
// Import a service with no account.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{service: subject:"foo.*"}]
}
}
`))
defer removeFile(t, cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import of a service with no account")
}
// Import a service with a wildcard subject.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{service: {account: "nats.io", subject:"foo.*"}]
}
}
`))
defer removeFile(t, cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import of a service with wildcard subject")
}
// Export with unknown keyword.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
exports = [{service: "foo.*", wat:true}]
}
}
`))
defer removeFile(t, cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with export with unknown keyword")
}
// Import with unknown keyword.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
imports = [{stream: {account: nats.io, subject: "foo.*"}, wat:true}]
}
}
`))
defer removeFile(t, cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import with unknown keyword")
}
// Export with an account.
cf = createConfFile(t, []byte(`
accounts {
nats.io {
exports = [{service: {account: nats.io, subject:"foo.*"}}]
}
}
`))
defer removeFile(t, cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with export with account")
}
}
func TestImportAuthorized(t *testing.T) {
_, foo, bar := simpleAccountServer(t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.>", nil), false, t)
foo.AddStreamExport("foo", IsPublicExport)
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), false, t)
foo.AddStreamExport("*", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.*", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.>", nil), false, t)
// Reset and test '>' public export
_, foo, bar = simpleAccountServer(t)
foo.AddStreamExport(">", nil)
// Everything should work.
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, ">", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.*", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.>", nil), true, t)
_, foo, bar = simpleAccountServer(t)
foo.addStreamExportWithAccountPos("foo.*", []*Account{}, 2)
foo.addStreamExportWithAccountPos("bar.*.foo", []*Account{}, 2)
if err := foo.addStreamExportWithAccountPos("baz.*.>", []*Account{}, 3); err == nil {
t.Fatal("expected error")
}
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("foo.%s", bar.Name), nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("bar.%s.foo", bar.Name), nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("baz.foo.%s", bar.Name), nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.X", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar.X.foo", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz.foo.X", nil), false, t)
foo.addServiceExportWithAccountPos("a.*", []*Account{}, 2)
foo.addServiceExportWithAccountPos("b.*.a", []*Account{}, 2)
if err := foo.addServiceExportWithAccountPos("c.*.>", []*Account{}, 3); err == nil {
t.Fatal("expected error")
}
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("a.%s", bar.Name), nil), true, t)
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("b.%s.a", bar.Name), nil), true, t)
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("c.a.%s", bar.Name), nil), false, t)
checkBool(foo.checkServiceImportAuthorized(bar, "a.X", nil), false, t)
checkBool(foo.checkServiceImportAuthorized(bar, "b.X.a", nil), false, t)
checkBool(foo.checkServiceImportAuthorized(bar, "c.a.X", nil), false, t)
// Reset and test pwc and fwc
s, foo, bar := simpleAccountServer(t)
foo.AddStreamExport("foo.*.baz.>", []*Account{bar})
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.22.baz.22", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.*.*", nil), false, t)
// Make sure we match the account as well
fb, _ := s.RegisterAccount("foobar")
bz, _ := s.RegisterAccount("baz")
checkBool(foo.checkStreamImportAuthorized(fb, "foo.bar.baz.1", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bz, "foo.bar.baz.1", nil), false, t)
}
func TestSimpleMapping(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Test first that trying to import with no matching export permission returns an error.
if err := cbar.acc.AddStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization {
t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err)
}
// Now map the subject space between foo and bar.
// Need to do export first.
if err := cfoo.acc.AddStreamExport("foo", nil); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.AddStreamImport(fooAcc, "foo", "import"); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
// Normal and Queue Subscription on bar client.
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
checkMsg := func(l, sid string) {
t.Helper()
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "import.foo" {
t.Fatalf("Did not get correct subject: wanted %q, got %q", "import.foo", matches[SUB_INDEX])
}
if matches[SID_INDEX] != sid {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
}
// Now check we got the message from normal subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
checkMsg(l, "1")
checkPayload(crBar, []byte("hello\r\n"), t)
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
checkMsg(l, "2")
checkPayload(crBar, []byte("hello\r\n"), t)
// We should have 2 subscriptions in both. Normal and Queue Subscriber
// for barAcc which are local, and 2 that are shadowed in fooAcc.
// Now make sure that when we unsubscribe we clean up properly for both.
if bslc := barAcc.sl.Count(); bslc != 2 {
t.Fatalf("Expected 2 normal subscriptions on barAcc, got %d", bslc)
}
if fslc := fooAcc.sl.Count(); fslc != 2 {
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
}
// Now unsubscribe.
if err := cbar.parse([]byte("UNSUB 1\r\nUNSUB 2\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// We should have zero on both.
if bslc := barAcc.sl.Count(); bslc != 0 {
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
}
if fslc := fooAcc.sl.Count(); fslc != 0 {
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
}
}
// https://github.com/nats-io/nats-server/issues/1159
func TestStreamImportLengthBug(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, _, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.AddStreamExport("client.>", nil); err != nil {
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.AddStreamImport(fooAcc, "client.>", "events.>"); err == nil {
t.Fatalf("Expected an error when using a stream import prefix with a wildcard")
}
if err := cbar.acc.AddStreamImport(fooAcc, "client.>", "events"); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
if err := cbar.parse([]byte("SUB events.> 1\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Also make sure that we will get an error from a config version.
// JWT will be updated separately.
cf := createConfFile(t, []byte(`
accounts {
foo {
exports = [{stream: "client.>"}]
}
bar {
imports = [{stream: {account: "foo", subject:"client.>"}, prefix:"events.>"}]
}
}
`))
defer removeFile(t, cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import with wildcard prefix")
}
}
func TestShadowSubsCleanupOnClientClose(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
// Now map the subject space between foo and bar.
// Need to do export first.
if err := fooAcc.AddStreamExport("foo", nil); err != nil { // Public with no accounts defined.
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := barAcc.AddStreamImport(fooAcc, "foo", "import"); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
cbar, _, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Normal and Queue Subscription on bar client.
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if fslc := fooAcc.sl.Count(); fslc != 2 {
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
}
// Now close cbar and make sure we remove shadows.
cbar.closeConnection(ClientClosed)
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if fslc := fooAcc.sl.Count(); fslc != 0 {
return fmt.Errorf("Number of shadow subscriptions is %d", fslc)
}
return nil
})
}
func TestNoPrefixWildcardMapping(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.AddStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
if err := cbar.acc.AddStreamImport(fooAcc, "*", ""); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
// Normal Subscription on bar client for literal "foo".
cbar.parseAsync("SUB foo 1\r\nPING\r\n")
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
// Now check we got the message from normal subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
func TestPrefixWildcardMapping(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.AddStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
// Checking that trailing '.' is accepted, tested that it is auto added above.
if err := cbar.acc.AddStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
// Normal Subscription on bar client for wildcard.
cbar.parseAsync("SUB pub.imports.* 1\r\nPING\r\n")
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
// Now check we got the messages from wildcard subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "pub.imports.foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := fooAcc.AddStreamExport(">", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to client foo: %v", err)
}
if err := barAcc.AddStreamImport(fooAcc, "*", "pub.imports."); err != nil {
t.Fatalf("Error adding stream import to client bar: %v", err)
}
// Normal Subscription on bar client for wildcard.
cbar.parseAsync("SUB pub.imports.foo 1\r\nPING\r\n")
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Now publish our message.
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
// Now check we got the messages from wildcard subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "pub.imports.foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
func TestMultipleImportsAndSingleWCSub(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := fooAcc.AddStreamExport("foo", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to account foo: %v", err)
}
if err := fooAcc.AddStreamExport("bar", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding stream export to account foo: %v", err)
}
if err := barAcc.AddStreamImport(fooAcc, "foo", "pub."); err != nil {
t.Fatalf("Error adding stream import to account bar: %v", err)
}
if err := barAcc.AddStreamImport(fooAcc, "bar", "pub."); err != nil {
t.Fatalf("Error adding stream import to account bar: %v", err)
}
// Wildcard Subscription on bar client for both imports.
cbar.parse([]byte("SUB pub.* 1\r\n"))
// Now publish a message on 'foo' and 'bar'
cfoo.parseAsync("PUB foo 5\r\nhello\r\nPUB bar 5\r\nworld\r\n")
// Now check we got the messages from the wildcard subscription.
l, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "pub.foo" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw = msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches = mraw[0]
if matches[SUB_INDEX] != "pub.bar" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("world\r\n"), t)
// Check subscription count.
if fslc := fooAcc.sl.Count(); fslc != 2 {
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
}
if bslc := barAcc.sl.Count(); bslc != 1 {
t.Fatalf("Expected 1 normal subscriptions on barAcc, got %d", bslc)
}
// Now unsubscribe.
if err := cbar.parse([]byte("UNSUB 1\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// We should have zero on both.
if bslc := barAcc.sl.Count(); bslc != 0 {
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
}
if fslc := fooAcc.sl.Count(); fslc != 0 {
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
}
}
// Make sure the AddServiceExport function is additive if called multiple times.
func TestAddServiceExport(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
bazAcc, err := s.RegisterAccount("$baz")
if err != nil {
t.Fatalf("Error creating account 'baz': %v", err)
}
defer s.Shutdown()
if err := fooAcc.AddServiceExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
tr := fooAcc.exports.services["test.request"]
if len(tr.approved) != 0 {
t.Fatalf("Expected no authorized accounts, got %d", len(tr.approved))
}
if err := fooAcc.AddServiceExport("test.request", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
tr = fooAcc.exports.services["test.request"]
if tr == nil {
t.Fatalf("Expected authorized accounts, got nil")
}
if ls := len(tr.approved); ls != 1 {
t.Fatalf("Expected 1 authorized accounts, got %d", ls)
}
if err := fooAcc.AddServiceExport("test.request", []*Account{bazAcc}); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
tr = fooAcc.exports.services["test.request"]
if tr == nil {
t.Fatalf("Expected authorized accounts, got nil")
}
if ls := len(tr.approved); ls != 2 {
t.Fatalf("Expected 2 authorized accounts, got %d", ls)
}
}
func TestServiceExportWithWildcards(t *testing.T) {
for _, test := range []struct {
name string
public bool
}{
{
name: "public",
public: true,
},
{
name: "private",
public: false,
},
} {
t.Run(test.name, func(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
var accs []*Account
if !test.public {
accs = []*Account{barAcc}
}
// Add service export with a wildcard
if err := fooAcc.AddServiceExport("ngs.update.*", accs); err != nil {
t.Fatalf("Error adding account service export: %v", err)
}
// Import on bar account
if err := barAcc.AddServiceImport(fooAcc, "ngs.update", "ngs.update.$bar"); err != nil {
t.Fatalf("Error adding account service import: %v", err)
}
cfoo, crFoo, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Now setup the responder under cfoo
cfoo.parse([]byte("SUB ngs.update.* 1\r\n"))
// Now send the request. Remember we expect the request on our local ngs.update.
// We added the route with that "from" and will map it to "ngs.update.$bar"
cbar.parseAsync("SUB reply 11\r\nPUB ngs.update reply 4\r\nhelp\r\n")
// Now read the request from crFoo
l, err := crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "ngs.update.$bar" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
// Make sure this looks like _INBOX
if !strings.HasPrefix(matches[REPLY_INDEX], "_R_.") {
t.Fatalf("Expected an _R_.* like reply, got '%s'", matches[REPLY_INDEX])
}
checkPayload(crFoo, []byte("help\r\n"), t)
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
cfoo.parseAsync(replyOp)
// Now read the response from crBar
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw = msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches = mraw[0]
if matches[SUB_INDEX] != "reply" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "11" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("22\r\n"), t)
if nr := barAcc.NumPendingAllResponses(); nr != 0 {
t.Fatalf("Expected no responses on barAcc, got %d", nr)
}
})
}
}
func TestAccountAddServiceImportRace(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
if err := fooAcc.AddServiceExport("foo.*", nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
total := 100
errCh := make(chan error, total)
for i := 0; i < 100; i++ {
go func(i int) {
err := barAcc.AddServiceImport(fooAcc, fmt.Sprintf("foo.%d", i), "")
errCh <- err // nil is a valid value.
}(i)
}
for i := 0; i < 100; i++ {
err := <-errCh
if err != nil {
t.Fatalf("Error adding account service import: %v", err)
}
}
barAcc.mu.Lock()
lens := len(barAcc.imports.services)
c := barAcc.internalClient()
barAcc.mu.Unlock()
if lens != total {
t.Fatalf("Expected %d imported services, got %d", total, lens)
}
c.mu.Lock()
lens = len(c.subs)
c.mu.Unlock()
if lens != total {
t.Fatalf("Expected %d subscriptions in internal client, got %d", total, lens)
}
}
func TestServiceImportWithWildcards(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
if err := fooAcc.AddServiceExport("test.*", nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
// We can not map wildcards atm, so if we supply a to mapping and a wildcard we should fail.
if err := barAcc.AddServiceImport(fooAcc, "test.*", "foo"); err == nil {
t.Fatalf("Expected error adding account service import with wildcard and mapping, got none")
}
if err := barAcc.AddServiceImport(fooAcc, "test.>", ""); err == nil {
t.Fatalf("Expected error adding account service import with broader wildcard, got none")
}
// This should work.
if err := barAcc.AddServiceImport(fooAcc, "test.*", ""); err != nil {
t.Fatalf("Error adding account service import: %v", err)
}
// Make sure we can send and receive.
cfoo, crFoo, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
// Now setup the resonder under cfoo
cfoo.parse([]byte("SUB test.* 1\r\n"))
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Now send the request.
go cbar.parse([]byte("SUB bar 11\r\nPUB test.22 bar 4\r\nhelp\r\n"))
// Now read the request from crFoo
l, err := crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "test.22" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
// Make sure this looks like _INBOX
if !strings.HasPrefix(matches[REPLY_INDEX], "_R_.") {
t.Fatalf("Expected an _R_.* like reply, got '%s'", matches[REPLY_INDEX])
}
checkPayload(crFoo, []byte("help\r\n"), t)
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
go cfoo.parse([]byte(replyOp))
// Now read the response from crBar
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw = msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches = mraw[0]
if matches[SUB_INDEX] != "bar" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "11" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("22\r\n"), t)
// Remove the service import with the wildcard and make sure hasWC is cleared.
barAcc.removeServiceImport("test.*")
barAcc.mu.Lock()
defer barAcc.mu.Unlock()
if len(barAcc.imports.services) != 0 {
t.Fatalf("Expected no imported services, got %d", len(barAcc.imports.services))
}
}
// Make sure the AddStreamExport function is additive if called multiple times.
func TestAddStreamExport(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
bazAcc, err := s.RegisterAccount("$baz")
if err != nil {
t.Fatalf("Error creating account 'baz': %v", err)
}
defer s.Shutdown()
if err := fooAcc.AddStreamExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
tr := fooAcc.exports.streams["test.request"]
if tr != nil {
t.Fatalf("Expected no authorized accounts, got %d", len(tr.approved))
}
if err := fooAcc.AddStreamExport("test.request", []*Account{barAcc}); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
tr = fooAcc.exports.streams["test.request"]
if tr == nil {
t.Fatalf("Expected authorized accounts, got nil")
}
if ls := len(tr.approved); ls != 1 {
t.Fatalf("Expected 1 authorized accounts, got %d", ls)
}
if err := fooAcc.AddStreamExport("test.request", []*Account{bazAcc}); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
tr = fooAcc.exports.streams["test.request"]
if tr == nil {
t.Fatalf("Expected authorized accounts, got nil")
}
if ls := len(tr.approved); ls != 2 {
t.Fatalf("Expected 2 authorized accounts, got %d", ls)
}
}
func TestCrossAccountRequestReply(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, crFoo, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Add in the service export for the requests. Make it public.
if err := cfoo.acc.AddServiceExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
// Test addServiceImport to make sure it requires accounts.
if err := cbar.acc.AddServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount {
t.Fatalf("Expected ErrMissingAccount but received %v.", err)
}
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
}
// Now add in the route mapping for request to be routed to the foo account.
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account service import to client bar: %v", err)
}
// Now setup the resonder under cfoo
cfoo.parse([]byte("SUB test.request 1\r\n"))
// Now send the request. Remember we expect the request on our local foo. We added the route
// with that "from" and will map it to "test.request"
cbar.parseAsync("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n")
// Now read the request from crFoo
l, err := crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
if matches[SUB_INDEX] != "test.request" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
// Make sure this looks like _INBOX
if !strings.HasPrefix(matches[REPLY_INDEX], "_R_.") {
t.Fatalf("Expected an _R_.* like reply, got '%s'", matches[REPLY_INDEX])
}
checkPayload(crFoo, []byte("help\r\n"), t)
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
cfoo.parseAsync(replyOp)
// Now read the response from crBar
l, err = crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw = msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches = mraw[0]
if matches[SUB_INDEX] != "bar" {
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "11" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "" {
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("22\r\n"), t)
if nr := barAcc.NumPendingAllResponses(); nr != 0 {
t.Fatalf("Expected no responses on barAcc, got %d", nr)
}
}
func TestAccountRequestReplyTrackLatency(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
// Run server in Go routine. We need this one running for internal sending of msgs.
go s.Start()
// Wait for accept loop(s) to be started
if err := s.readyForConnections(10 * time.Second); err != nil {
t.Fatal(err)
}
cfoo, crFoo, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Add in the service export for the requests. Make it public.
if err := fooAcc.AddServiceExport("track.service", nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
// Now let's add in tracking
// First check we get an error if service does not exist.
if err := fooAcc.TrackServiceExport("track.wrong", "results"); err != ErrMissingService {
t.Fatalf("Expected error enabling tracking latency for wrong service")
}
// Check results should be a valid subject
if err := fooAcc.TrackServiceExport("track.service", "results.*"); err != ErrBadPublishSubject {
t.Fatalf("Expected error enabling tracking latency for bad results subject")
}
// Make sure we can not loop around on ourselves..
if err := fooAcc.TrackServiceExport("track.service", "track.service"); err != ErrBadPublishSubject {
t.Fatalf("Expected error enabling tracking latency for same subject")
}
// Check bad sampling
if err := fooAcc.TrackServiceExportWithSampling("track.service", "results", -1); err != ErrBadSampling {
t.Fatalf("Expected error enabling tracking latency for bad sampling")
}
if err := fooAcc.TrackServiceExportWithSampling("track.service", "results", 101); err != ErrBadSampling {
t.Fatalf("Expected error enabling tracking latency for bad sampling")
}
// Now let's add in tracking for real. This will be 100%
if err := fooAcc.TrackServiceExport("track.service", "results"); err != nil {
t.Fatalf("Error enabling tracking latency: %v", err)
}
// Now add in the route mapping for request to be routed to the foo account.
if err := barAcc.AddServiceImport(fooAcc, "req", "track.service"); err != nil {
t.Fatalf("Error adding account service import to client bar: %v", err)
}
// Now setup the responder under cfoo and the listener for the results
cfoo.parse([]byte("SUB track.service 1\r\nSUB results 2\r\n"))
readFooMsg := func() ([]byte, string) {
t.Helper()
l, err := crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'foo': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
msg := mraw[0]
msgSize, _ := strconv.Atoi(msg[LEN_INDEX])
return grabPayload(crFoo, msgSize), msg[REPLY_INDEX]
}
start := time.Now()
// Now send the request. Remember we expect the request on our local foo. We added the route
// with that "from" and will map it to "test.request"
cbar.parseAsync("SUB resp 11\r\nPUB req resp 4\r\nhelp\r\n")
// Now read the request from crFoo
_, reply := readFooMsg()
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", reply)
serviceTime := 25 * time.Millisecond
// We will wait a bit to check latency results
go func() {
time.Sleep(serviceTime)
cfoo.parseAsync(replyOp)
}()
// Now read the response from crBar
_, err := crBar.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
// Now let's check that we got the sampling results
rMsg, _ := readFooMsg()
// Unmarshal and check it.
var sl ServiceLatency
err = json.Unmarshal(rMsg, &sl)
if err != nil {
t.Fatalf("Could not parse latency json: %v\n", err)
}
startDelta := sl.RequestStart.Sub(start)
if startDelta > 5*time.Millisecond {
t.Fatalf("Bad start delta %v", startDelta)
}
if sl.ServiceLatency < serviceTime {
t.Fatalf("Bad service latency: %v", sl.ServiceLatency)
}
if sl.TotalLatency < sl.ServiceLatency {
t.Fatalf("Bad total latency: %v", sl.ServiceLatency)
}
}
// This will test for leaks in the remote latency tracking via client.rrTracking
func TestAccountTrackLatencyRemoteLeaks(t *testing.T) {
optsA, err := ProcessConfigFile("./configs/seed.conf")
require_NoError(t, err)
optsA.NoSigs, optsA.NoLog = true, true
optsA.ServerName = "A"
srvA := RunServer(optsA)
defer srvA.Shutdown()
optsB := nextServerOpts(optsA)
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
optsB.ServerName = "B"
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
srvs := []*Server{srvA, srvB}
// Now add in the accounts and setup tracking.
for _, s := range srvs {
s.SetSystemAccount(globalAccountName)
fooAcc, _ := s.RegisterAccount("$foo")
fooAcc.AddServiceExport("track.service", nil)
fooAcc.TrackServiceExport("track.service", "results")
barAcc, _ := s.RegisterAccount("$bar")
if err := barAcc.AddServiceImport(fooAcc, "req", "track.service"); err != nil {
t.Fatalf("Failed to import: %v", err)
}
}
getClient := func(s *Server, name string) *client {
t.Helper()
s.mu.Lock()
defer s.mu.Unlock()
for _, c := range s.clients {
c.mu.Lock()
n := c.opts.Name
c.mu.Unlock()
if n == name {
return c
}
}
t.Fatalf("Did not find client %q on server %q", name, s.info.ID)
return nil
}
// Test with a responder on second server, srvB. but they will not respond.
cfooNC := natsConnect(t, srvB.ClientURL(), nats.Name("foo"))
defer cfooNC.Close()
cfoo := getClient(srvB, "foo")
fooAcc, _ := srvB.LookupAccount("$foo")
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
// Set new limits
for _, srv := range srvs {
fooAcc, _ := srv.LookupAccount("$foo")
err := fooAcc.SetServiceExportResponseThreshold("track.service", 5*time.Millisecond)
if err != nil {
t.Fatalf("Error setting response threshold: %v", err)
}
}
// Now setup the responder under cfoo and the listener for the results
time.Sleep(50 * time.Millisecond)
baseSubs := int(srvA.NumSubscriptions())
fooSub := natsSubSync(t, cfooNC, "track.service")
natsFlush(t, cfooNC)
// Wait for it to propagate.
checkExpectedSubs(t, baseSubs+1, srvA)
cbarNC := natsConnect(t, srvA.ClientURL(), nats.Name("bar"))
defer cbarNC.Close()
cbar := getClient(srvA, "bar")
barAcc, _ := srvA.LookupAccount("$bar")
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
readFooMsg := func() {
t.Helper()
if _, err := fooSub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not receive foo msg: %v", err)
}
}
// Send 2 requests
natsSubSync(t, cbarNC, "resp")
natsPubReq(t, cbarNC, "req", "resp", []byte("help"))
natsPubReq(t, cbarNC, "req", "resp", []byte("help"))
readFooMsg()
readFooMsg()
var rc *client
// Pull out first client
srvB.mu.Lock()
for _, rc = range srvB.clients {
if rc != nil {
break
}
}
srvB.mu.Unlock()
tracking := func() int {
rc.mu.Lock()
var nt int
if rc.rrTracking != nil {
nt = len(rc.rrTracking.rmap)
}
rc.mu.Unlock()
return nt
}
expectTracking := func(expected int) {
t.Helper()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if numTracking := tracking(); numTracking != expected {
return fmt.Errorf("Expected to have %d tracking replies, got %d", expected, numTracking)
}
return nil
})
}
expectTracking(2)
// Make sure these remote tracking replies honor the current respThresh for a service export.
time.Sleep(10 * time.Millisecond)
expectTracking(0)
// Also make sure tracking is removed
rc.mu.Lock()
removed := rc.rrTracking == nil
rc.mu.Unlock()
if !removed {
t.Fatalf("Expected the rrTracking to be removed")
}
// Now let's test that a lower response threshold is picked up.
fSub := natsSubSync(t, cfooNC, "foo")
natsFlush(t, cfooNC)
// Wait for it to propagate.
checkExpectedSubs(t, baseSubs+4, srvA)
// queue up some first. We want to test changing when rrTracking exists.
natsPubReq(t, cbarNC, "req", "resp", []byte("help"))
readFooMsg()
expectTracking(1)
for _, s := range srvs {
fooAcc, _ := s.LookupAccount("$foo")
barAcc, _ := s.LookupAccount("$bar")
fooAcc.AddServiceExport("foo", nil)
fooAcc.TrackServiceExport("foo", "foo.results")
fooAcc.SetServiceExportResponseThreshold("foo", time.Millisecond)
barAcc.AddServiceImport(fooAcc, "foo", "foo")
}
natsSubSync(t, cbarNC, "reply")
natsPubReq(t, cbarNC, "foo", "reply", []byte("help"))
if _, err := fSub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not receive foo msg: %v", err)
}
expectTracking(2)
rc.mu.Lock()
lrt := rc.rrTracking.lrt
rc.mu.Unlock()
if lrt != time.Millisecond {
t.Fatalf("Expected lrt of %v, got %v", time.Millisecond, lrt)
}
// Now make sure we clear on close.
rc.closeConnection(ClientClosed)
// Actual tear down will be not inline.
checkFor(t, time.Second, 5*time.Millisecond, func() error {
rc.mu.Lock()
removed = rc.rrTracking == nil
rc.mu.Unlock()
if !removed {
return fmt.Errorf("Expected the rrTracking to be removed after client close")
}
return nil
})
}
func TestCrossAccountServiceResponseTypes(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, crFoo, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, crBar, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Add in the service export for the requests. Make it public.
if err := fooAcc.AddServiceExportWithResponse("test.request", Streamed, nil); err != nil {
t.Fatalf("Error adding account service export to client foo: %v", err)
}
// Now add in the route mapping for request to be routed to the foo account.
if err := barAcc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account service import to client bar: %v", err)
}
// Now setup the resonder under cfoo
cfoo.parse([]byte("SUB test.request 1\r\n"))
// Now send the request. Remember we expect the request on our local foo. We added the route
// with that "from" and will map it to "test.request"
cbar.parseAsync("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n")
// Now read the request from crFoo
l, err := crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches := mraw[0]
reply := matches[REPLY_INDEX]
if !strings.HasPrefix(reply, "_R_.") {
t.Fatalf("Expected an _R_.* like reply, got '%s'", reply)
}
crFoo.ReadString('\n')
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
var mReply []byte
for i := 0; i < 10; i++ {
mReply = append(mReply, replyOp...)
}
cfoo.parseAsync(string(mReply))
var b [256]byte
n, err := crBar.Read(b[:])
if err != nil {
t.Fatalf("Error reading response: %v", err)
}
mraw = msgPat.FindAllStringSubmatch(string(b[:n]), -1)
if len(mraw) != 10 {
t.Fatalf("Expected a response but got %d", len(mraw))
}
// Also make sure the response map gets cleaned up when interest goes away.
cbar.closeConnection(ClientClosed)
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nr := barAcc.NumPendingAllResponses(); nr != 0 {
return fmt.Errorf("Number of responses is %d", nr)
}
return nil
})
// Now test bogus reply subjects are handled and do not accumulate the response maps.
cbar, _, _ = newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Do not create any interest in the reply subject 'bar'. Just send a request.
cbar.parseAsync("PUB foo bar 4\r\nhelp\r\n")
// Now read the request from crFoo
l, err = crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw = msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
matches = mraw[0]
reply = matches[REPLY_INDEX]
if !strings.HasPrefix(reply, "_R_.") {
t.Fatalf("Expected an _R_.* like reply, got '%s'", reply)
}
crFoo.ReadString('\n')
replyOp = fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
cfoo.parseAsync(replyOp)
// Now wait for a bit, the reply should trip a no interest condition
// which should clean this up.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nr := fooAcc.NumPendingAllResponses(); nr != 0 {
return fmt.Errorf("Number of responses is %d", nr)
}
return nil
})
// Also make sure the response map entry is gone as well.
fooAcc.mu.RLock()
lrm := len(fooAcc.exports.responses)
fooAcc.mu.RUnlock()
if lrm != 0 {
t.Fatalf("Expected the responses to be cleared, got %d entries", lrm)
}
}
func TestAccountMapsUsers(t *testing.T) {
// Used for the nkey users to properly sign.
seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM"
seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE"
confFileName := createConfFile(t, []byte(`
accounts {
synadia {
users = [
{user: derek, password: foo},
{nkey: UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR}
]
}
nats {
users = [
{user: ivan, password: bar},
{nkey: UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH}
]
}
}
`))
defer removeFile(t, confFileName)
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Unexpected error parsing config file: %v", err)
}
opts.NoSigs = true
s := New(opts)
defer s.Shutdown()
synadia, _ := s.LookupAccount("synadia")
nats, _ := s.LookupAccount("nats")
if synadia == nil || nats == nil {
t.Fatalf("Expected non nil accounts during lookup")
}
// Make sure a normal log in maps the accounts correctly.
c, _, _ := newClientForServer(s)
defer c.close()
connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n")
c.parse(connectOp)
if c.acc != synadia {
t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc)
}
c, _, _ = newClientForServer(s)
defer c.close()
connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
c.parse(connectOp)
if c.acc != nats {
t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc)
}
// Now test nkeys as well.
kp, _ := nkeys.FromSeed([]byte(seed1))
pubKey, _ := kp.PublicKey()
c, cr, l := newClientForServer(s)
defer c.close()
// Check for Nonce
var info nonceInfo
err = json.Unmarshal([]byte(l[5:]), &info)
if err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
if info.Nonce == "" {
t.Fatalf("Expected a non-empty nonce with nkeys defined")
}
sigraw, err := kp.Sign([]byte(info.Nonce))
if err != nil {
t.Fatalf("Failed signing nonce: %v", err)
}
sig := base64.RawURLEncoding.EncodeToString(sigraw)
// PING needed to flush the +OK to us.
cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
c.parseAsync(cs)
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "+OK") {
t.Fatalf("Expected an OK, got: %v", l)
}
if c.acc != synadia {
t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc)
}
// Now nats account nkey user.
kp, _ = nkeys.FromSeed([]byte(seed2))
pubKey, _ = kp.PublicKey()
c, cr, l = newClientForServer(s)
defer c.close()
// Check for Nonce
err = json.Unmarshal([]byte(l[5:]), &info)
if err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
if info.Nonce == "" {
t.Fatalf("Expected a non-empty nonce with nkeys defined")
}
sigraw, err = kp.Sign([]byte(info.Nonce))
if err != nil {
t.Fatalf("Failed signing nonce: %v", err)
}
sig = base64.RawURLEncoding.EncodeToString(sigraw)
// PING needed to flush the +OK to us.
cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
c.parseAsync(cs)
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "+OK") {
t.Fatalf("Expected an OK, got: %v", l)
}
if c.acc != nats {
t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc)
}
}
func TestAccountGlobalDefault(t *testing.T) {
opts := defaultServerOptions
s := New(&opts)
if acc, _ := s.LookupAccount(globalAccountName); acc == nil {
t.Fatalf("Expected a global default account on a new server, got none.")
}
// Make sure we can not create one with same name..
if _, err := s.RegisterAccount(globalAccountName); err == nil {
t.Fatalf("Expected error trying to create a new reserved account")
}
// Make sure we can not define one in a config file either.
confFileName := createConfFile(t, []byte(`accounts { $G {} }`))
defer removeFile(t, confFileName)
if _, err := ProcessConfigFile(confFileName); err == nil {
t.Fatalf("Expected an error parsing config file with reserved account")
}
}
func TestAccountCheckStreamImportsEqual(t *testing.T) {
// Create bare accounts for this test
fooAcc := NewAccount("foo")
if err := fooAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
barAcc := NewAccount("bar")
if err := barAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bazAcc := NewAccount("baz")
if err := bazAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be the same")
}
if err := bazAcc.AddStreamImport(fooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be different")
}
if err := barAcc.AddStreamImport(fooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be the same")
}
// Create another account that is named "foo". We want to make sure
// that the comparison still works (based on account name, not pointer)
newFooAcc := NewAccount("foo")
if err := newFooAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
batAcc := NewAccount("bat")
if err := batAcc.AddStreamImport(newFooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if err := batAcc.AddStreamImport(newFooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !batAcc.checkStreamImportsEqual(barAcc) {
t.Fatal("Expected stream imports to be the same")
}
if !batAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be the same")
}
// Test with account with different "from"
expAcc := NewAccount("new_acc")
if err := expAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
aAcc := NewAccount("a")
if err := aAcc.AddStreamImport(expAcc, "bar", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bAcc := NewAccount("b")
if err := bAcc.AddStreamImport(expAcc, "baz", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
t.Fatal("Expected stream imports to be different")
}
// Test with account with different "prefix"
aAcc = NewAccount("a")
if err := aAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bAcc = NewAccount("b")
if err := bAcc.AddStreamImport(expAcc, "bar", "diff_prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
t.Fatal("Expected stream imports to be different")
}
// Test with account with different "name"
expAcc = NewAccount("diff_name")
if err := expAcc.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
bAcc = NewAccount("b")
if err := bAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
t.Fatal("Expected stream imports to be different")
}
}
func TestAccountNoDeadlockOnQueueSubRouteMapUpdate(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
nc.QueueSubscribeSync("foo", "bar")
var accs []*Account
for i := 0; i < 10; i++ {
acc, _ := s.RegisterAccount(fmt.Sprintf("acc%d", i))
acc.mu.Lock()
accs = append(accs, acc)
}
opts2 := DefaultOptions()
opts2.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port))
s2 := RunServer(opts2)
defer s2.Shutdown()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
time.Sleep(100 * time.Millisecond)
for _, acc := range accs {
acc.mu.Unlock()
}
wg.Done()
}()
nc.QueueSubscribeSync("foo", "bar")
nc.Flush()
wg.Wait()
}
func TestAccountDuplicateServiceImportSubject(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
fooAcc, _ := s.RegisterAccount("foo")
fooAcc.AddServiceExport("remote1", nil)
fooAcc.AddServiceExport("remote2", nil)
barAcc, _ := s.RegisterAccount("bar")
if err := barAcc.AddServiceImport(fooAcc, "foo", "remote1"); err != nil {
t.Fatalf("Error adding service import: %v", err)
}
if err := barAcc.AddServiceImport(fooAcc, "foo", "remote2"); err == nil || !strings.Contains(err.Error(), "duplicate") {
t.Fatalf("Expected an error about duplicate service import subject, got %q", err)
}
}
func TestMultipleStreamImportsWithSameSubjectDifferentPrefix(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
fooAcc, _ := s.RegisterAccount("foo")
fooAcc.AddStreamExport("test", nil)
barAcc, _ := s.RegisterAccount("bar")
barAcc.AddStreamExport("test", nil)
importAcc, _ := s.RegisterAccount("import")
if err := importAcc.AddStreamImport(fooAcc, "test", "foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := importAcc.AddStreamImport(barAcc, "test", "bar"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now make sure we can see messages from both.
cimport, crImport, _ := newClientForServer(s)
defer cimport.close()
if err := cimport.registerWithAccount(importAcc); err != nil {
t.Fatalf("Error registering client with 'import' account: %v", err)
}
if err := cimport.parse([]byte("SUB *.test 1\r\n")); err != nil {
t.Fatalf("Error for client 'import' from server: %v", err)
}
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, _, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
readMsg := func() {
t.Helper()
l, err := crImport.ReadString('\n')
if err != nil {
t.Fatalf("Error reading msg header from client 'import': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
// Consume msg body too.
if _, err = crImport.ReadString('\n'); err != nil {
t.Fatalf("Error reading msg body from client 'import': %v", err)
}
}
cbar.parseAsync("PUB test 9\r\nhello-bar\r\n")
readMsg()
cfoo.parseAsync("PUB test 9\r\nhello-foo\r\n")
readMsg()
}
// This should work with prefixes that are different but we also want it to just work with same subject
// being imported from multiple accounts.
func TestMultipleStreamImportsWithSameSubject(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
fooAcc, _ := s.RegisterAccount("foo")
fooAcc.AddStreamExport("test", nil)
barAcc, _ := s.RegisterAccount("bar")
barAcc.AddStreamExport("test", nil)
importAcc, _ := s.RegisterAccount("import")
if err := importAcc.AddStreamImport(fooAcc, "test", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Since we allow this now, make sure we do detect a duplicate import from same account etc.
// That should be not allowed.
if err := importAcc.AddStreamImport(fooAcc, "test", ""); err != ErrStreamImportDuplicate {
t.Fatalf("Expected ErrStreamImportDuplicate but got %v", err)
}
if err := importAcc.AddStreamImport(barAcc, "test", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now make sure we can see messages from both.
cimport, crImport, _ := newClientForServer(s)
defer cimport.close()
if err := cimport.registerWithAccount(importAcc); err != nil {
t.Fatalf("Error registering client with 'import' account: %v", err)
}
if err := cimport.parse([]byte("SUB test 1\r\n")); err != nil {
t.Fatalf("Error for client 'import' from server: %v", err)
}
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, _, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
readMsg := func() {
t.Helper()
l, err := crImport.ReadString('\n')
if err != nil {
t.Fatalf("Error reading msg header from client 'import': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
// Consume msg body too.
if _, err = crImport.ReadString('\n'); err != nil {
t.Fatalf("Error reading msg body from client 'import': %v", err)
}
}
cbar.parseAsync("PUB test 9\r\nhello-bar\r\n")
readMsg()
cfoo.parseAsync("PUB test 9\r\nhello-foo\r\n")
readMsg()
}
func TestAccountBasicRouteMapping(t *testing.T) {
opts := DefaultOptions()
opts.Port = -1
s := RunServer(opts)
defer s.Shutdown()
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
acc.AddMapping("foo", "bar")
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
fsub, _ := nc.SubscribeSync("foo")
bsub, _ := nc.SubscribeSync("bar")
nc.Publish("foo", nil)
nc.Flush()
checkPending := func(sub *nats.Subscription, expected int) {
t.Helper()
if n, _, _ := sub.Pending(); n != expected {
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
}
}
checkPending(fsub, 0)
checkPending(bsub, 1)
acc.RemoveMapping("foo")
nc.Publish("foo", nil)
nc.Flush()
checkPending(fsub, 1)
checkPending(bsub, 1)
}
func TestAccountWildcardRouteMapping(t *testing.T) {
opts := DefaultOptions()
opts.Port = -1
s := RunServer(opts)
defer s.Shutdown()
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
addMap := func(src, dest string) {
t.Helper()
if err := acc.AddMapping(src, dest); err != nil {
t.Fatalf("Error adding mapping: %v", err)
}
}
addMap("foo.*.*", "bar.$2.$1")
addMap("bar.*.>", "baz.$1.>")
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
pub := func(subj string) {
t.Helper()
err := nc.Publish(subj, nil)
if err == nil {
err = nc.Flush()
}
if err != nil {
t.Fatalf("Error publishing: %v", err)
}
}
fsub, _ := nc.SubscribeSync("foo.>")
bsub, _ := nc.SubscribeSync("bar.>")
zsub, _ := nc.SubscribeSync("baz.>")
checkPending := func(sub *nats.Subscription, expected int) {
t.Helper()
if n, _, _ := sub.Pending(); n != expected {
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
}
}
pub("foo.1.2")
checkPending(fsub, 0)
checkPending(bsub, 1)
checkPending(zsub, 0)
}
func TestAccountRouteMappingChangesAfterClientStart(t *testing.T) {
opts := DefaultOptions()
opts.Port = -1
s := RunServer(opts)
defer s.Shutdown()
// Create the client first then add in mapping.
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
nc.Flush()
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
acc.AddMapping("foo", "bar")
fsub, _ := nc.SubscribeSync("foo")
bsub, _ := nc.SubscribeSync("bar")
nc.Publish("foo", nil)
nc.Flush()
checkPending := func(sub *nats.Subscription, expected int) {
t.Helper()
if n, _, _ := sub.Pending(); n != expected {
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
}
}
checkPending(fsub, 0)
checkPending(bsub, 1)
acc.RemoveMapping("foo")
nc.Publish("foo", nil)
nc.Flush()
checkPending(fsub, 1)
checkPending(bsub, 1)
}
func TestAccountSimpleWeightedRouteMapping(t *testing.T) {
opts := DefaultOptions()
opts.Port = -1
s := RunServer(opts)
defer s.Shutdown()
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
acc.AddWeightedMappings("foo", NewMapDest("bar", 50))
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
fsub, _ := nc.SubscribeSync("foo")
bsub, _ := nc.SubscribeSync("bar")
total := 500
for i := 0; i < total; i++ {
nc.Publish("foo", nil)
}
nc.Flush()
fpending, _, _ := fsub.Pending()
bpending, _, _ := bsub.Pending()
h := total / 2
tp := h / 5
min, max := h-tp, h+tp
if fpending < min || fpending > max {
t.Fatalf("Expected about %d msgs, got %d and %d", h, fpending, bpending)
}
}
func TestAccountMultiWeightedRouteMappings(t *testing.T) {
opts := DefaultOptions()
opts.Port = -1
s := RunServer(opts)
defer s.Shutdown()
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
// Check failures for bad weights.
shouldErr := func(rds ...*MapDest) {
t.Helper()
if acc.AddWeightedMappings("foo", rds...) == nil {
t.Fatalf("Expected an error, got none")
}
}
shouldNotErr := func(rds ...*MapDest) {
t.Helper()
if err := acc.AddWeightedMappings("foo", rds...); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
shouldErr(NewMapDest("bar", 150))
shouldNotErr(NewMapDest("bar", 50))
shouldNotErr(NewMapDest("bar", 50), NewMapDest("baz", 50))
// Same dest duplicated should error.
shouldErr(NewMapDest("bar", 50), NewMapDest("bar", 50))
// total over 100
shouldErr(NewMapDest("bar", 50), NewMapDest("baz", 60))
acc.RemoveMapping("foo")
// 20 for original, you can leave it off will be auto-added.
shouldNotErr(NewMapDest("bar", 50), NewMapDest("baz", 30))
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
fsub, _ := nc.SubscribeSync("foo")
bsub, _ := nc.SubscribeSync("bar")
zsub, _ := nc.SubscribeSync("baz")
// For checking later.
rds := []struct {
sub *nats.Subscription
w uint8
}{
{fsub, 20},
{bsub, 50},
{zsub, 30},
}
total := 5000
for i := 0; i < total; i++ {
nc.Publish("foo", nil)
}
nc.Flush()
for _, rd := range rds {
pending, _, _ := rd.sub.Pending()
expected := total / int(100/rd.w)
tp := expected / 5 // 20%
min, max := expected-tp, expected+tp
if pending < min || pending > max {
t.Fatalf("Expected about %d msgs for %q, got %d", expected, rd.sub.Subject, pending)
}
}
}
func TestGlobalAccountRouteMappingsConfiguration(t *testing.T) {
cf := createConfFile(t, []byte(`
mappings = {
foo: bar
foo.*: [ { dest: bar.v1.$1, weight: 40% }, { destination: baz.v2.$1, weight: 20 } ]
bar.*.*: RAB.$2.$1
}
`))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
bsub, _ := nc.SubscribeSync("bar")
fsub1, _ := nc.SubscribeSync("bar.v1.>")
fsub2, _ := nc.SubscribeSync("baz.v2.>")
zsub, _ := nc.SubscribeSync("RAB.>")
f22sub, _ := nc.SubscribeSync("foo.*")
checkPending := func(sub *nats.Subscription, expected int) {
t.Helper()
if n, _, _ := sub.Pending(); n != expected {
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
}
}
nc.Publish("foo", nil)
nc.Publish("bar.11.22", nil)
total := 500
for i := 0; i < total; i++ {
nc.Publish("foo.22", nil)
}
nc.Flush()
checkPending(bsub, 1)
checkPending(zsub, 1)
fpending, _, _ := f22sub.Pending()
fpending1, _, _ := fsub1.Pending()
fpending2, _, _ := fsub2.Pending()
if fpending1 < fpending2 || fpending < fpending2 {
t.Fatalf("Loadbalancing seems off for the foo.* mappings: %d and %d and %d", fpending, fpending1, fpending2)
}
}
func TestAccountRouteMappingsConfiguration(t *testing.T) {
cf := createConfFile(t, []byte(`
accounts {
synadia {
users = [{user: derek, password: foo}]
mappings = {
foo: bar
foo.*: [ { dest: bar.v1.$1, weight: 40% }, { destination: baz.v2.$1, weight: 20 } ]
bar.*.*: RAB.$2.$1
}
}
}
`))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
// We test functionality above, so for this one just make sure we have mappings for the account.
acc, _ := s.LookupAccount("synadia")
if !acc.hasMappings() {
t.Fatalf("Account %q does not have mappings", "synadia")
}
az, err := s.Accountz(&AccountzOptions{"synadia"})
if err != nil {
t.Fatalf("Error getting Accountz: %v", err)
}
if az.Account == nil {
t.Fatalf("Expected an Account")
}
if len(az.Account.Mappings) != 3 {
t.Fatalf("Expected %d mappings, saw %d", 3, len(az.Account.Mappings))
}
}
func TestAccountRouteMappingsWithLossInjection(t *testing.T) {
cf := createConfFile(t, []byte(`
mappings = {
foo: { dest: foo, weight: 80% }
bar: { dest: bar, weight: 0% }
}
`))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
sub, _ := nc.SubscribeSync("foo")
total := 1000
for i := 0; i < total; i++ {
nc.Publish("foo", nil)
}
nc.Flush()
if pending, _, _ := sub.Pending(); pending == total {
t.Fatalf("Expected some loss and pending to not be same as sent")
}
sub, _ = nc.SubscribeSync("bar")
for i := 0; i < total; i++ {
nc.Publish("bar", nil)
}
nc.Flush()
if pending, _, _ := sub.Pending(); pending != 0 {
t.Fatalf("Expected all messages to be dropped and pending to be 0, got %d", pending)
}
}
func TestAccountRouteMappingsWithOriginClusterFilter(t *testing.T) {
cf := createConfFile(t, []byte(`
mappings = {
foo: { dest: bar, cluster: SYN, weight: 100% }
}
`))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
sub, _ := nc.SubscribeSync("foo")
total := 1000
for i := 0; i < total; i++ {
nc.Publish("foo", nil)
}
nc.Flush()
if pending, _, _ := sub.Pending(); pending != total {
t.Fatalf("Expected pending to be %d, got %d", total, pending)
}
s.setClusterName("SYN")
sub, _ = nc.SubscribeSync("bar")
for i := 0; i < total; i++ {
nc.Publish("foo", nil)
}
nc.Flush()
if pending, _, _ := sub.Pending(); pending != total {
t.Fatalf("Expected pending to be %d, got %d", total, pending)
}
}
func TestAccountServiceImportWithRouteMappings(t *testing.T) {
cf := createConfFile(t, []byte(`
accounts {
foo {
users = [{user: derek, password: foo}]
exports = [{service: "request"}]
}
bar {
users = [{user: ivan, password: bar}]
imports = [{service: {account: "foo", subject:"request"}}]
}
}
`))
defer removeFile(t, cf)
s, opts := RunServerWithConfig(cf)
defer s.Shutdown()
acc, _ := s.LookupAccount("foo")
acc.AddMapping("request", "request.v2")
// Create the service client first.
ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port))
defer ncFoo.Close()
fooSub := natsSubSync(t, ncFoo, "request.v2")
ncFoo.Flush()
// Requestor
ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port))
defer ncBar.Close()
ncBar.Publish("request", nil)
ncBar.Flush()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if n, _, _ := fooSub.Pending(); n != 1 {
return fmt.Errorf("Expected a request for %q, but got %d", fooSub.Subject, n)
}
return nil
})
}
func TestAccountImportsWithWildcardSupport(t *testing.T) {
cf := createConfFile(t, []byte(`
accounts {
foo {
users = [{user: derek, password: foo}]
exports = [
{ service: "request.*" }
{ stream: "events.>" }
{ stream: "info.*.*.>" }
]
}
bar {
users = [{user: ivan, password: bar}]
imports = [
{ service: {account: "foo", subject:"request.*"}, to:"my.request.*"}
{ stream: {account: "foo", subject:"events.>"}, to:"foo.events.>"}
{ stream: {account: "foo", subject:"info.*.*.>"}, to:"foo.info.$2.$1.>"}
]
}
}
`))
defer removeFile(t, cf)
s, opts := RunServerWithConfig(cf)
defer s.Shutdown()
ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port))
defer ncFoo.Close()
ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port))
defer ncBar.Close()
// Create subscriber for the service endpoint in foo.
_, err := ncFoo.QueueSubscribe("request.*", "t22", func(m *nats.Msg) {
if m.Subject != "request.22" {
t.Fatalf("Expected literal subject for request, got %q", m.Subject)
}
m.Respond([]byte("yes!"))
})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
ncFoo.Flush()
// Now test service import.
resp, err := ncBar.Request("my.request.22", []byte("yes?"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
if string(resp.Data) != "yes!" {
t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data)
}
// Now test stream imports.
esub, _ := ncBar.SubscribeSync("foo.events.*") // subset
isub, _ := ncBar.SubscribeSync("foo.info.>")
ncBar.Flush()
// Now publish some stream events.
ncFoo.Publish("events.22", nil)
ncFoo.Publish("info.11.22.bar", nil)
ncFoo.Flush()
checkPending := func(sub *nats.Subscription, expected int) {
t.Helper()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if n, _, _ := sub.Pending(); n != expected {
return fmt.Errorf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
}
return nil
})
}
checkPending(esub, 1)
checkPending(isub, 1)
// Now check to make sure the subjects are correct etc.
m, err := esub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if m.Subject != "foo.events.22" {
t.Fatalf("Incorrect subject for stream import, expected %q, got %q", "foo.events.22", m.Subject)
}
m, err = isub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if m.Subject != "foo.info.22.11.bar" {
t.Fatalf("Incorrect subject for stream import, expected %q, got %q", "foo.info.22.11.bar", m.Subject)
}
}
// duplicates TestJWTAccountImportsWithWildcardSupport (jwt_test.go) in config
func TestAccountImportsWithWildcardSupportStreamAndService(t *testing.T) {
cf := createConfFile(t, []byte(`
accounts {
foo {
users = [{user: derek, password: foo}]
exports = [
{ service: "$request.*.$in.*.>" }
{ stream: "$events.*.$in.*.>" }
]
}
bar {
users = [{user: ivan, password: bar}]
imports = [
{ service: {account: "foo", subject:"$request.*.$in.*.>"}, to:"my.request.$2.$1.>"}
{ stream: {account: "foo", subject:"$events.*.$in.*.>"}, to:"my.events.$2.$1.>"}
]
}
}
`))
defer removeFile(t, cf)
s, opts := RunServerWithConfig(cf)
defer s.Shutdown()
ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port))
defer ncFoo.Close()
ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port))
defer ncBar.Close()
// Create subscriber for the service endpoint in foo.
_, err := ncFoo.Subscribe("$request.>", func(m *nats.Msg) {
if m.Subject != "$request.2.$in.1.bar" {
t.Fatalf("Expected literal subject for request, got %q", m.Subject)
}
m.Respond([]byte("yes!"))
})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
ncFoo.Flush()
// Now test service import.
if resp, err := ncBar.Request("my.request.1.2.bar", []byte("yes?"), time.Second); err != nil {
t.Fatalf("Expected a response")
} else if string(resp.Data) != "yes!" {
t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data)
}
subBar, err := ncBar.SubscribeSync("my.events.>")
if err != nil {
t.Fatalf("Expected a response")
}
ncBar.Flush()
ncFoo.Publish("$events.1.$in.2.bar", nil)
m, err := subBar.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
if m.Subject != "my.events.2.1.bar" {
t.Fatalf("Expected literal subject for request, got %q", m.Subject)
}
}
func BenchmarkNewRouteReply(b *testing.B) {
opts := defaultServerOptions
s := New(&opts)
g := s.globalAccount()
b.ResetTimer()
for i := 0; i < b.N; i++ {
g.newServiceReply(false)
}
}
func TestSamplingHeader(t *testing.T) {
test := func(expectSampling bool, h http.Header) {
t.Helper()
b := strings.Builder{}
b.WriteString("\r\n") // simulate status line
h.Write(&b)
b.WriteString("\r\n")
hdrString := b.String()
c := &client{parseState: parseState{msgBuf: []byte(hdrString), pa: pubArg{hdr: len(hdrString)}}}
sample, hdr := shouldSample(&serviceLatency{0, "foo"}, c)
if expectSampling {
if !sample {
t.Fatal("Expected to sample")
} else if hdr == nil {
t.Fatal("Expected a header")
}
for k, v := range h {
if hdr.Get(k) != v[0] {
t.Fatal("Expect header to match")
}
}
} else {
if sample {
t.Fatal("Expected not to sample")
} else if hdr != nil {
t.Fatal("Expected no header")
}
}
}
test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:0"}})
test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:00"}}) // one byte encoded as two hex digits
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:1"}})
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:01"}})
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:5"}}) // debug and sample
test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:1"}})
test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}})
test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:0"}})
test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:0"}})
test(true, http.Header{"X-B3-Sampled": []string{"1"}})
test(false, http.Header{"X-B3-Sampled": []string{"0"}})
test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}) // decision left to recipient
test(false, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"0"}})
test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"1"}})
test(false, http.Header{"B3": []string{"0"}}) // deny only
test(false, http.Header{"B3": []string{"0-0-0-0"}})
test(false, http.Header{"B3": []string{"0-0-0"}})
test(true, http.Header{"B3": []string{"0-0-1-0"}})
test(true, http.Header{"B3": []string{"0-0-1"}})
test(true, http.Header{"B3": []string{"0-0-d"}}) // debug is not a deny
test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"}})
test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}})
test(false, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-0-05e3ac9a4f6e3b90"}})
test(true, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}})
test(false, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00"}})
}
func TestSubjectTransforms(t *testing.T) {
shouldErr := func(src, dest string) {
t.Helper()
if _, err := newTransform(src, dest); err != ErrBadSubject {
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
}
}
// Must be valid subjects.
shouldErr("foo", "")
shouldErr("foo..", "bar")
// Wildcards are allowed in src, but must be matched by token placements on the other side.
// e.g. foo.* -> bar.$1.
// Need to have as many pwcs as placements on other side.
shouldErr("foo.*", "bar.*")
shouldErr("foo.*", "bar.$2") // Bad pwc token identifier
shouldErr("foo.*", "bar.$1.>") // fwcs have to match.
shouldErr("foo.>", "bar.baz") // fwcs have to match.
shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.
shouldBeOK := func(src, dest string) *transform {
t.Helper()
tr, err := newTransform(src, dest)
if err != nil {
t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest)
}
return tr
}
shouldBeOK("foo", "bar")
shouldBeOK("foo.*.bar.*.baz", "req.$2.$1")
shouldBeOK("baz.>", "mybaz.>")
shouldMatch := func(src, dest, sample, expected string) {
t.Helper()
tr := shouldBeOK(src, dest)
s, err := tr.match(sample)
if err != nil {
t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected)
}
if s != expected {
t.Fatalf("Dest does not match what was expected. Got %q, expected %q", s, expected)
}
}
shouldMatch("foo", "bar", "foo", "bar")
shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A")
shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3")
shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3")
shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo")
}
func TestAccountSystemPermsWithGlobalAccess(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
accounts {
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`))
defer removeFile(t, conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
// Make sure we can connect with no auth to global account as normal.
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
// Make sure we can connect to the system account with correct credentials.
sc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer sc.Close()
}