mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Currently, we use ReadyForConnections in server tests to wait for the server to be ready. However, when this fails we don't get a clue about why it failed. This change adds a new unexported method called readyForConnections that returns an error describing which check failed. The exported ReadyForConnections version works exactly as before. The unexported version gets used in internal tests only.
3297 lines
97 KiB
Go
3297 lines
97 KiB
Go
// Copyright 2018-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/jwt/v2"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
)
|
|
|
|
func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
|
|
opts := defaultServerOptions
|
|
s := New(&opts)
|
|
|
|
// Now create two accounts.
|
|
f, err := s.RegisterAccount("$foo")
|
|
if err != nil {
|
|
t.Fatalf("Error creating account 'foo': %v", err)
|
|
}
|
|
b, err := s.RegisterAccount("$bar")
|
|
if err != nil {
|
|
t.Fatalf("Error creating account 'bar': %v", err)
|
|
}
|
|
return s, f, b
|
|
}
|
|
|
|
func TestRegisterDuplicateAccounts(t *testing.T) {
|
|
s, _, _ := simpleAccountServer(t)
|
|
if _, err := s.RegisterAccount("$foo"); err == nil {
|
|
t.Fatal("Expected an error registering 'foo' twice")
|
|
}
|
|
}
|
|
|
|
func TestAccountIsolation(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
cfoo, crFoo, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error register client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error register client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Make sure they are different accounts/sl.
|
|
if cfoo.acc == cbar.acc {
|
|
t.Fatalf("Error, accounts the same for both clients")
|
|
}
|
|
|
|
// Now do quick test that makes sure messages do not cross over.
|
|
// setup bar as a foo subscriber.
|
|
cbar.parseAsync("SUB foo 1\r\nPING\r\nPING\r\n")
|
|
l, err := crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
if !strings.HasPrefix(l, "PONG\r\n") {
|
|
t.Fatalf("PONG response incorrect: %q", l)
|
|
}
|
|
|
|
cfoo.parseAsync("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")
|
|
l, err = crFoo.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error for client 'foo' from server: %v", err)
|
|
}
|
|
|
|
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
|
|
if matches[SUB_INDEX] != "foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crFoo, []byte("hello\r\n"), t)
|
|
|
|
// Now make sure nothing shows up on bar.
|
|
l, err = crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
if !strings.HasPrefix(l, "PONG\r\n") {
|
|
t.Fatalf("PONG response incorrect: %q", l)
|
|
}
|
|
}
|
|
|
|
func TestAccountIsolationExportImport(t *testing.T) {
|
|
checkIsolation := func(t *testing.T, pubSubj string, ncExp, ncImp *nats.Conn) {
|
|
// We keep track of 2 subjects.
|
|
// One subject (pubSubj) is based off the stream import.
|
|
// The other subject "fizz" is not imported and should be isolated.
|
|
|
|
gotSubjs := map[string]int{
|
|
pubSubj: 0,
|
|
"fizz": 0,
|
|
}
|
|
count := int32(0)
|
|
ch := make(chan struct{}, 1)
|
|
if _, err := ncImp.Subscribe(">", func(m *nats.Msg) {
|
|
gotSubjs[m.Subject] += 1
|
|
if n := atomic.AddInt32(&count, 1); n == 3 {
|
|
ch <- struct{}{}
|
|
}
|
|
}); err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
// Since both prod and cons use same server, flushing here will ensure
|
|
// that the interest is registered and known at the time we publish.
|
|
ncImp.Flush()
|
|
|
|
if err := ncExp.Publish(pubSubj, []byte(fmt.Sprintf("ncExp pub %s", pubSubj))); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := ncImp.Publish(pubSubj, []byte(fmt.Sprintf("ncImp pub %s", pubSubj))); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := ncExp.Publish("fizz", []byte("ncExp pub fizz")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := ncImp.Publish("fizz", []byte("ncImp pub fizz")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
wantSubjs := map[string]int{
|
|
// Subscriber ncImp should receive publishes from ncExp and ncImp.
|
|
pubSubj: 2,
|
|
// Subscriber ncImp should only receive the publish from ncImp.
|
|
"fizz": 1,
|
|
}
|
|
|
|
// Wait for at least the 3 expected messages
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("Expected 3 messages, got %v", atomic.LoadInt32(&count))
|
|
}
|
|
// But now wait a bit to see if subscription receives more than expected.
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
if got, want := len(gotSubjs), len(wantSubjs); got != want {
|
|
t.Fatalf("unexpected subjs len, got=%d; want=%d", got, want)
|
|
}
|
|
|
|
for key, gotCnt := range gotSubjs {
|
|
if wantCnt := wantSubjs[key]; gotCnt != wantCnt {
|
|
t.Errorf("unexpected receive count for subject %q, got=%d, want=%d", key, gotCnt, wantCnt)
|
|
}
|
|
}
|
|
}
|
|
|
|
cases := []struct {
|
|
name string
|
|
exp, imp string
|
|
pubSubj string
|
|
}{
|
|
{
|
|
name: "export literal, import literal",
|
|
exp: "foo", imp: "foo",
|
|
pubSubj: "foo",
|
|
},
|
|
{
|
|
name: "export full wildcard, import literal",
|
|
exp: "foo.>", imp: "foo.bar",
|
|
pubSubj: "foo.bar",
|
|
},
|
|
{
|
|
name: "export full wildcard, import sublevel full wildcard",
|
|
exp: "foo.>", imp: "foo.bar.>",
|
|
pubSubj: "foo.bar.whizz",
|
|
},
|
|
{
|
|
name: "export full wildcard, import full wildcard",
|
|
exp: "foo.>", imp: "foo.>",
|
|
pubSubj: "foo.bar",
|
|
},
|
|
{
|
|
name: "export partial wildcard, import partial wildcard",
|
|
exp: "foo.*", imp: "foo.*",
|
|
pubSubj: "foo.bar",
|
|
},
|
|
{
|
|
name: "export mid partial wildcard, import mid partial wildcard",
|
|
exp: "foo.*.bar", imp: "foo.*.bar",
|
|
pubSubj: "foo.whizz.bar",
|
|
},
|
|
}
|
|
for _, c := range cases {
|
|
t.Run(fmt.Sprintf("%s jwt", c.name), func(t *testing.T) {
|
|
// Setup NATS server.
|
|
s := opTrustBasicSetup()
|
|
defer s.Shutdown()
|
|
go s.Start()
|
|
if err := s.readyForConnections(5 * time.Second); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
buildMemAccResolver(s)
|
|
|
|
// Setup exporter account.
|
|
accExpPair, accExpPub := createKey(t)
|
|
accExpClaims := jwt.NewAccountClaims(accExpPub)
|
|
if c.exp != "" {
|
|
accExpClaims.Limits.WildcardExports = true
|
|
accExpClaims.Exports.Add(&jwt.Export{
|
|
Name: fmt.Sprintf("%s-stream-export", c.exp),
|
|
Subject: jwt.Subject(c.exp),
|
|
Type: jwt.Stream,
|
|
})
|
|
}
|
|
accExpJWT, err := accExpClaims.Encode(oKp)
|
|
require_NoError(t, err)
|
|
addAccountToMemResolver(s, accExpPub, accExpJWT)
|
|
|
|
// Setup importer account.
|
|
accImpPair, accImpPub := createKey(t)
|
|
accImpClaims := jwt.NewAccountClaims(accImpPub)
|
|
if c.imp != "" {
|
|
accImpClaims.Imports.Add(&jwt.Import{
|
|
Name: fmt.Sprintf("%s-stream-import", c.imp),
|
|
Subject: jwt.Subject(c.imp),
|
|
Account: accExpPub,
|
|
Type: jwt.Stream,
|
|
})
|
|
}
|
|
accImpJWT, err := accImpClaims.Encode(oKp)
|
|
require_NoError(t, err)
|
|
addAccountToMemResolver(s, accImpPub, accImpJWT)
|
|
|
|
// Connect with different accounts.
|
|
ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accExpPair),
|
|
nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp)))
|
|
ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accImpPair),
|
|
nats.Name(fmt.Sprintf("nc-importer-%s", c.imp)))
|
|
defer ncExp.Close()
|
|
defer ncImp.Close()
|
|
|
|
checkIsolation(t, c.pubSubj, ncExp, ncImp)
|
|
if t.Failed() {
|
|
t.Logf("exported=%q; imported=%q", c.exp, c.imp)
|
|
}
|
|
})
|
|
|
|
t.Run(fmt.Sprintf("%s conf", c.name), func(t *testing.T) {
|
|
// Setup NATS server.
|
|
cf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
port: -1
|
|
|
|
accounts: {
|
|
accExp: {
|
|
users: [{user: accExp, password: accExp}]
|
|
exports: [{stream: %q}]
|
|
}
|
|
accImp: {
|
|
users: [{user: accImp, password: accImp}]
|
|
imports: [{stream: {account: accExp, subject: %q}}]
|
|
}
|
|
}
|
|
`,
|
|
c.exp, c.imp,
|
|
)))
|
|
defer removeFile(t, cf)
|
|
s, _ := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
// Connect with different accounts.
|
|
ncExp := natsConnect(t, s.ClientURL(), nats.UserInfo("accExp", "accExp"),
|
|
nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp)))
|
|
ncImp := natsConnect(t, s.ClientURL(), nats.UserInfo("accImp", "accImp"),
|
|
nats.Name(fmt.Sprintf("nc-importer-%s", c.imp)))
|
|
defer ncExp.Close()
|
|
defer ncImp.Close()
|
|
|
|
checkIsolation(t, c.pubSubj, ncExp, ncImp)
|
|
if t.Failed() {
|
|
t.Logf("exported=%q; imported=%q", c.exp, c.imp)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAccountFromOptions(t *testing.T) {
|
|
opts := defaultServerOptions
|
|
opts.Accounts = []*Account{NewAccount("foo"), NewAccount("bar")}
|
|
s := New(&opts)
|
|
defer s.Shutdown()
|
|
|
|
ta := s.numReservedAccounts() + 2
|
|
if la := s.numAccounts(); la != ta {
|
|
t.Fatalf("Expected to have a server with %d active accounts, got %v", ta, la)
|
|
}
|
|
// Check that sl is filled in.
|
|
fooAcc, _ := s.LookupAccount("foo")
|
|
barAcc, _ := s.LookupAccount("bar")
|
|
if fooAcc == nil || barAcc == nil {
|
|
t.Fatalf("Error retrieving accounts for 'foo' and 'bar'")
|
|
}
|
|
if fooAcc.sl == nil || barAcc.sl == nil {
|
|
t.Fatal("Expected Sublists to be filled in on Opts.Accounts")
|
|
}
|
|
}
|
|
|
|
func TestNewAccountsFromClients(t *testing.T) {
|
|
opts := defaultServerOptions
|
|
s := New(&opts)
|
|
defer s.Shutdown()
|
|
|
|
c, cr, _ := newClientForServer(s)
|
|
defer c.close()
|
|
connectOp := "CONNECT {\"account\":\"foo\"}\r\n"
|
|
c.parseAsync(connectOp)
|
|
l, _ := cr.ReadString('\n')
|
|
if !strings.HasPrefix(l, "-ERR ") {
|
|
t.Fatalf("Expected an error")
|
|
}
|
|
|
|
opts.AllowNewAccounts = true
|
|
s = New(&opts)
|
|
defer s.Shutdown()
|
|
|
|
c, cr, _ = newClientForServer(s)
|
|
defer c.close()
|
|
err := c.parse([]byte(connectOp))
|
|
if err != nil {
|
|
t.Fatalf("Received an error trying to connect: %v", err)
|
|
}
|
|
c.parseAsync("PING\r\n")
|
|
l, err = cr.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading response for client from server: %v", err)
|
|
}
|
|
if !strings.HasPrefix(l, "PONG\r\n") {
|
|
t.Fatalf("PONG response incorrect: %q", l)
|
|
}
|
|
}
|
|
|
|
func TestActiveAccounts(t *testing.T) {
|
|
opts := defaultServerOptions
|
|
opts.AllowNewAccounts = true
|
|
opts.Cluster.Port = 22
|
|
|
|
s := New(&opts)
|
|
defer s.Shutdown()
|
|
|
|
if s.NumActiveAccounts() != 0 {
|
|
t.Fatalf("Expected no active account, got %d", s.NumActiveAccounts())
|
|
}
|
|
|
|
addClientWithAccount := func(accName string) *testAsyncClient {
|
|
t.Helper()
|
|
c, _, _ := newClientForServer(s)
|
|
connectOp := fmt.Sprintf("CONNECT {\"account\":\"%s\"}\r\n", accName)
|
|
err := c.parse([]byte(connectOp))
|
|
if err != nil {
|
|
t.Fatalf("Received an error trying to connect: %v", err)
|
|
}
|
|
return c
|
|
}
|
|
|
|
// Now add some clients.
|
|
cf1 := addClientWithAccount("foo")
|
|
defer cf1.close()
|
|
if s.activeAccounts != 1 {
|
|
t.Fatalf("Expected active accounts to be 1, got %d", s.activeAccounts)
|
|
}
|
|
// Adding in same one should not change total.
|
|
cf2 := addClientWithAccount("foo")
|
|
defer cf2.close()
|
|
if s.activeAccounts != 1 {
|
|
t.Fatalf("Expected active accounts to be 1, got %d", s.activeAccounts)
|
|
}
|
|
// Add in new one.
|
|
cb1 := addClientWithAccount("bar")
|
|
defer cb1.close()
|
|
if s.activeAccounts != 2 {
|
|
t.Fatalf("Expected active accounts to be 2, got %d", s.activeAccounts)
|
|
}
|
|
|
|
// Make sure the Accounts track clients.
|
|
foo, _ := s.LookupAccount("foo")
|
|
bar, _ := s.LookupAccount("bar")
|
|
if foo == nil || bar == nil {
|
|
t.Fatalf("Error looking up accounts")
|
|
}
|
|
if nc := foo.NumConnections(); nc != 2 {
|
|
t.Fatalf("Expected account foo to have 2 clients, got %d", nc)
|
|
}
|
|
if nc := bar.NumConnections(); nc != 1 {
|
|
t.Fatalf("Expected account bar to have 1 client, got %d", nc)
|
|
}
|
|
|
|
waitTilActiveCount := func(n int32) {
|
|
t.Helper()
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if active := s.NumActiveAccounts(); active != n {
|
|
return fmt.Errorf("Number of active accounts is %d", active)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Test Removal
|
|
cb1.closeConnection(ClientClosed)
|
|
waitTilActiveCount(1)
|
|
|
|
checkAccClientsCount(t, bar, 0)
|
|
|
|
// This should not change the count.
|
|
cf1.closeConnection(ClientClosed)
|
|
waitTilActiveCount(1)
|
|
|
|
checkAccClientsCount(t, foo, 1)
|
|
|
|
cf2.closeConnection(ClientClosed)
|
|
waitTilActiveCount(0)
|
|
|
|
checkAccClientsCount(t, foo, 0)
|
|
}
|
|
|
|
// Clients can ask that the account be forced to be new. If it exists this is an error.
|
|
func TestNewAccountRequireNew(t *testing.T) {
|
|
// This has foo and bar accounts already.
|
|
s, _, _ := simpleAccountServer(t)
|
|
|
|
c, cr, _ := newClientForServer(s)
|
|
defer c.close()
|
|
connectOp := "CONNECT {\"account\":\"foo\",\"new_account\":true}\r\n"
|
|
c.parseAsync(connectOp)
|
|
l, _ := cr.ReadString('\n')
|
|
if !strings.HasPrefix(l, "-ERR ") {
|
|
t.Fatalf("Expected an error")
|
|
}
|
|
|
|
// Now allow new accounts on the fly, make sure second time does not work.
|
|
opts := defaultServerOptions
|
|
opts.AllowNewAccounts = true
|
|
s = New(&opts)
|
|
|
|
c, _, _ = newClientForServer(s)
|
|
defer c.close()
|
|
err := c.parse([]byte(connectOp))
|
|
if err != nil {
|
|
t.Fatalf("Received an error trying to create an account: %v", err)
|
|
}
|
|
|
|
c, cr, _ = newClientForServer(s)
|
|
defer c.close()
|
|
c.parseAsync(connectOp)
|
|
l, _ = cr.ReadString('\n')
|
|
if !strings.HasPrefix(l, "-ERR ") {
|
|
t.Fatalf("Expected an error")
|
|
}
|
|
}
|
|
|
|
func accountNameExists(name string, accounts []*Account) bool {
|
|
for _, acc := range accounts {
|
|
if strings.Compare(acc.Name, name) == 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func TestAccountSimpleConfig(t *testing.T) {
|
|
confFileName := createConfFile(t, []byte(`accounts = [foo, bar]`))
|
|
defer removeFile(t, confFileName)
|
|
opts, err := ProcessConfigFile(confFileName)
|
|
if err != nil {
|
|
t.Fatalf("Received an error processing config file: %v", err)
|
|
}
|
|
if la := len(opts.Accounts); la != 2 {
|
|
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
|
|
}
|
|
if !accountNameExists("foo", opts.Accounts) {
|
|
t.Fatal("Expected a 'foo' account")
|
|
}
|
|
if !accountNameExists("bar", opts.Accounts) {
|
|
t.Fatal("Expected a 'bar' account")
|
|
}
|
|
|
|
// Make sure double entries is an error.
|
|
confFileName = createConfFile(t, []byte(`accounts = [foo, foo]`))
|
|
defer removeFile(t, confFileName)
|
|
_, err = ProcessConfigFile(confFileName)
|
|
if err == nil {
|
|
t.Fatalf("Expected an error with double account entries")
|
|
}
|
|
}
|
|
|
|
func TestAccountParseConfig(t *testing.T) {
|
|
confFileName := createConfFile(t, []byte(`
|
|
accounts {
|
|
synadia {
|
|
users = [
|
|
{user: alice, password: foo}
|
|
{user: bob, password: bar}
|
|
]
|
|
}
|
|
nats.io {
|
|
users = [
|
|
{user: derek, password: foo}
|
|
{user: ivan, password: bar}
|
|
]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, confFileName)
|
|
opts, err := ProcessConfigFile(confFileName)
|
|
if err != nil {
|
|
t.Fatalf("Received an error processing config file: %v", err)
|
|
}
|
|
|
|
if la := len(opts.Accounts); la != 2 {
|
|
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
|
|
}
|
|
|
|
if lu := len(opts.Users); lu != 4 {
|
|
t.Fatalf("Expected 4 total Users, got %d", lu)
|
|
}
|
|
|
|
var natsAcc *Account
|
|
for _, acc := range opts.Accounts {
|
|
if acc.Name == "nats.io" {
|
|
natsAcc = acc
|
|
break
|
|
}
|
|
}
|
|
if natsAcc == nil {
|
|
t.Fatalf("Error retrieving account for 'nats.io'")
|
|
}
|
|
|
|
for _, u := range opts.Users {
|
|
if u.Username == "derek" {
|
|
if u.Account != natsAcc {
|
|
t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestAccountParseConfigDuplicateUsers(t *testing.T) {
|
|
confFileName := createConfFile(t, []byte(`
|
|
accounts {
|
|
synadia {
|
|
users = [
|
|
{user: alice, password: foo}
|
|
{user: bob, password: bar}
|
|
]
|
|
}
|
|
nats.io {
|
|
users = [
|
|
{user: alice, password: bar}
|
|
]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, confFileName)
|
|
_, err := ProcessConfigFile(confFileName)
|
|
if err == nil {
|
|
t.Fatalf("Expected an error with double user entries")
|
|
}
|
|
}
|
|
|
|
func TestAccountParseConfigImportsExports(t *testing.T) {
|
|
opts, err := ProcessConfigFile("./configs/accounts.conf")
|
|
if err != nil {
|
|
t.Fatal("parsing failed: ", err)
|
|
}
|
|
if la := len(opts.Accounts); la != 3 {
|
|
t.Fatalf("Expected to see 3 accounts in opts, got %d", la)
|
|
}
|
|
if lu := len(opts.Nkeys); lu != 4 {
|
|
t.Fatalf("Expected 4 total Nkey users, got %d", lu)
|
|
}
|
|
if lu := len(opts.Users); lu != 0 {
|
|
t.Fatalf("Expected no Users, got %d", lu)
|
|
}
|
|
var natsAcc, synAcc *Account
|
|
for _, acc := range opts.Accounts {
|
|
if acc.Name == "nats.io" {
|
|
natsAcc = acc
|
|
} else if acc.Name == "synadia" {
|
|
synAcc = acc
|
|
}
|
|
}
|
|
if natsAcc == nil {
|
|
t.Fatalf("Error retrieving account for 'nats.io'")
|
|
}
|
|
if natsAcc.Nkey != "AB5UKNPVHDWBP5WODG742274I3OGY5FM3CBIFCYI4OFEH7Y23GNZPXFE" {
|
|
t.Fatalf("Expected nats account to have an nkey, got %q\n", natsAcc.Nkey)
|
|
}
|
|
// Check user assigned to the correct account.
|
|
for _, nk := range opts.Nkeys {
|
|
if nk.Nkey == "UBRYMDSRTC6AVJL6USKKS3FIOE466GMEU67PZDGOWYSYHWA7GSKO42VW" {
|
|
if nk.Account != natsAcc {
|
|
t.Fatalf("Expected user to be associated with natsAcc, got %q\n", nk.Account.Name)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
// Now check for the imports and exports of streams and services.
|
|
if lis := len(natsAcc.imports.streams); lis != 2 {
|
|
t.Fatalf("Expected 2 imported streams, got %d\n", lis)
|
|
}
|
|
if lis := len(natsAcc.imports.services); lis != 1 {
|
|
t.Fatalf("Expected 1 imported service, got %d\n", lis)
|
|
}
|
|
if les := len(natsAcc.exports.services); les != 4 {
|
|
t.Fatalf("Expected 4 exported services, got %d\n", les)
|
|
}
|
|
if les := len(natsAcc.exports.streams); les != 0 {
|
|
t.Fatalf("Expected no exported streams, got %d\n", les)
|
|
}
|
|
|
|
ea := natsAcc.exports.services["nats.time"]
|
|
if ea == nil {
|
|
t.Fatalf("Expected to get a non-nil exportAuth for service")
|
|
}
|
|
if ea.respType != Streamed {
|
|
t.Fatalf("Expected to get a Streamed response type, got %q", ea.respType)
|
|
}
|
|
ea = natsAcc.exports.services["nats.photo"]
|
|
if ea == nil {
|
|
t.Fatalf("Expected to get a non-nil exportAuth for service")
|
|
}
|
|
if ea.respType != Chunked {
|
|
t.Fatalf("Expected to get a Chunked response type, got %q", ea.respType)
|
|
}
|
|
ea = natsAcc.exports.services["nats.add"]
|
|
if ea == nil {
|
|
t.Fatalf("Expected to get a non-nil exportAuth for service")
|
|
}
|
|
if ea.respType != Singleton {
|
|
t.Fatalf("Expected to get a Singleton response type, got %q", ea.respType)
|
|
}
|
|
|
|
if synAcc == nil {
|
|
t.Fatalf("Error retrieving account for 'synadia'")
|
|
}
|
|
|
|
if lis := len(synAcc.imports.streams); lis != 0 {
|
|
t.Fatalf("Expected no imported streams, got %d\n", lis)
|
|
}
|
|
if lis := len(synAcc.imports.services); lis != 1 {
|
|
t.Fatalf("Expected 1 imported service, got %d\n", lis)
|
|
}
|
|
if les := len(synAcc.exports.services); les != 2 {
|
|
t.Fatalf("Expected 2 exported service, got %d\n", les)
|
|
}
|
|
if les := len(synAcc.exports.streams); les != 2 {
|
|
t.Fatalf("Expected 2 exported streams, got %d\n", les)
|
|
}
|
|
}
|
|
|
|
func TestImportExportConfigFailures(t *testing.T) {
|
|
// Import from unknow account
|
|
cf := createConfFile(t, []byte(`
|
|
accounts {
|
|
nats.io {
|
|
imports = [{stream: {account: "synadia", subject:"foo"}}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
if _, err := ProcessConfigFile(cf); err == nil {
|
|
t.Fatalf("Expected an error with import from unknown account")
|
|
}
|
|
// Import a service with no account.
|
|
cf = createConfFile(t, []byte(`
|
|
accounts {
|
|
nats.io {
|
|
imports = [{service: subject:"foo.*"}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
if _, err := ProcessConfigFile(cf); err == nil {
|
|
t.Fatalf("Expected an error with import of a service with no account")
|
|
}
|
|
// Import a service with a wildcard subject.
|
|
cf = createConfFile(t, []byte(`
|
|
accounts {
|
|
nats.io {
|
|
imports = [{service: {account: "nats.io", subject:"foo.*"}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
if _, err := ProcessConfigFile(cf); err == nil {
|
|
t.Fatalf("Expected an error with import of a service with wildcard subject")
|
|
}
|
|
// Export with unknown keyword.
|
|
cf = createConfFile(t, []byte(`
|
|
accounts {
|
|
nats.io {
|
|
exports = [{service: "foo.*", wat:true}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
if _, err := ProcessConfigFile(cf); err == nil {
|
|
t.Fatalf("Expected an error with export with unknown keyword")
|
|
}
|
|
// Import with unknown keyword.
|
|
cf = createConfFile(t, []byte(`
|
|
accounts {
|
|
nats.io {
|
|
imports = [{stream: {account: nats.io, subject: "foo.*"}, wat:true}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
if _, err := ProcessConfigFile(cf); err == nil {
|
|
t.Fatalf("Expected an error with import with unknown keyword")
|
|
}
|
|
// Export with an account.
|
|
cf = createConfFile(t, []byte(`
|
|
accounts {
|
|
nats.io {
|
|
exports = [{service: {account: nats.io, subject:"foo.*"}}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
if _, err := ProcessConfigFile(cf); err == nil {
|
|
t.Fatalf("Expected an error with export with account")
|
|
}
|
|
}
|
|
|
|
func TestImportAuthorized(t *testing.T) {
|
|
_, foo, bar := simpleAccountServer(t)
|
|
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, ">", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.>", nil), false, t)
|
|
|
|
foo.AddStreamExport("foo", IsPublicExport)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "bar", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), false, t)
|
|
|
|
foo.AddStreamExport("*", []*Account{bar})
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "bar", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "baz", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, ">", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*.*", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*.>", nil), false, t)
|
|
|
|
// Reset and test '>' public export
|
|
_, foo, bar = simpleAccountServer(t)
|
|
foo.AddStreamExport(">", nil)
|
|
// Everything should work.
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "bar", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "baz", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, ">", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*.*", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "*.>", nil), true, t)
|
|
|
|
_, foo, bar = simpleAccountServer(t)
|
|
foo.addStreamExportWithAccountPos("foo.*", []*Account{}, 2)
|
|
foo.addStreamExportWithAccountPos("bar.*.foo", []*Account{}, 2)
|
|
if err := foo.addStreamExportWithAccountPos("baz.*.>", []*Account{}, 3); err == nil {
|
|
t.Fatal("expected error")
|
|
}
|
|
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("foo.%s", bar.Name), nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("bar.%s.foo", bar.Name), nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("baz.foo.%s", bar.Name), nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.X", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "bar.X.foo", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "baz.foo.X", nil), false, t)
|
|
|
|
foo.addServiceExportWithAccountPos("a.*", []*Account{}, 2)
|
|
foo.addServiceExportWithAccountPos("b.*.a", []*Account{}, 2)
|
|
if err := foo.addServiceExportWithAccountPos("c.*.>", []*Account{}, 3); err == nil {
|
|
t.Fatal("expected error")
|
|
}
|
|
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("a.%s", bar.Name), nil), true, t)
|
|
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("b.%s.a", bar.Name), nil), true, t)
|
|
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("c.a.%s", bar.Name), nil), false, t)
|
|
checkBool(foo.checkServiceImportAuthorized(bar, "a.X", nil), false, t)
|
|
checkBool(foo.checkServiceImportAuthorized(bar, "b.X.a", nil), false, t)
|
|
checkBool(foo.checkServiceImportAuthorized(bar, "c.a.X", nil), false, t)
|
|
|
|
// Reset and test pwc and fwc
|
|
s, foo, bar := simpleAccountServer(t)
|
|
foo.AddStreamExport("foo.*.baz.>", []*Account{bar})
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.1", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz.*", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.*.baz.1.1", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.22.baz.22", nil), true, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.baz", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bar, "foo.bar.*.*", nil), false, t)
|
|
|
|
// Make sure we match the account as well
|
|
|
|
fb, _ := s.RegisterAccount("foobar")
|
|
bz, _ := s.RegisterAccount("baz")
|
|
|
|
checkBool(foo.checkStreamImportAuthorized(fb, "foo.bar.baz.1", nil), false, t)
|
|
checkBool(foo.checkStreamImportAuthorized(bz, "foo.bar.baz.1", nil), false, t)
|
|
}
|
|
|
|
func TestSimpleMapping(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Test first that trying to import with no matching export permission returns an error.
|
|
if err := cbar.acc.AddStreamImport(fooAcc, "foo", "import"); err != ErrStreamImportAuthorization {
|
|
t.Fatalf("Expected error of ErrAccountImportAuthorization but got %v", err)
|
|
}
|
|
|
|
// Now map the subject space between foo and bar.
|
|
// Need to do export first.
|
|
if err := cfoo.acc.AddStreamExport("foo", nil); err != nil { // Public with no accounts defined.
|
|
t.Fatalf("Error adding account export to client foo: %v", err)
|
|
}
|
|
if err := cbar.acc.AddStreamImport(fooAcc, "foo", "import"); err != nil {
|
|
t.Fatalf("Error adding account import to client bar: %v", err)
|
|
}
|
|
|
|
// Normal and Queue Subscription on bar client.
|
|
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
|
|
// Now publish our message.
|
|
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
|
|
|
|
checkMsg := func(l, sid string) {
|
|
t.Helper()
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "import.foo" {
|
|
t.Fatalf("Did not get correct subject: wanted %q, got %q", "import.foo", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != sid {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
}
|
|
|
|
// Now check we got the message from normal subscription.
|
|
l, err := crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
checkMsg(l, "1")
|
|
checkPayload(crBar, []byte("hello\r\n"), t)
|
|
|
|
l, err = crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
checkMsg(l, "2")
|
|
checkPayload(crBar, []byte("hello\r\n"), t)
|
|
|
|
// We should have 2 subscriptions in both. Normal and Queue Subscriber
|
|
// for barAcc which are local, and 2 that are shadowed in fooAcc.
|
|
// Now make sure that when we unsubscribe we clean up properly for both.
|
|
if bslc := barAcc.sl.Count(); bslc != 2 {
|
|
t.Fatalf("Expected 2 normal subscriptions on barAcc, got %d", bslc)
|
|
}
|
|
if fslc := fooAcc.sl.Count(); fslc != 2 {
|
|
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
|
|
}
|
|
|
|
// Now unsubscribe.
|
|
if err := cbar.parse([]byte("UNSUB 1\r\nUNSUB 2\r\n")); err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
|
|
// We should have zero on both.
|
|
if bslc := barAcc.sl.Count(); bslc != 0 {
|
|
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
|
|
}
|
|
if fslc := fooAcc.sl.Count(); fslc != 0 {
|
|
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
|
|
}
|
|
}
|
|
|
|
// https://github.com/nats-io/nats-server/issues/1159
|
|
func TestStreamImportLengthBug(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, _, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
if err := cfoo.acc.AddStreamExport("client.>", nil); err != nil {
|
|
t.Fatalf("Error adding account export to client foo: %v", err)
|
|
}
|
|
if err := cbar.acc.AddStreamImport(fooAcc, "client.>", "events.>"); err == nil {
|
|
t.Fatalf("Expected an error when using a stream import prefix with a wildcard")
|
|
}
|
|
|
|
if err := cbar.acc.AddStreamImport(fooAcc, "client.>", "events"); err != nil {
|
|
t.Fatalf("Error adding account import to client bar: %v", err)
|
|
}
|
|
|
|
if err := cbar.parse([]byte("SUB events.> 1\r\n")); err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
|
|
// Also make sure that we will get an error from a config version.
|
|
// JWT will be updated separately.
|
|
cf := createConfFile(t, []byte(`
|
|
accounts {
|
|
foo {
|
|
exports = [{stream: "client.>"}]
|
|
}
|
|
bar {
|
|
imports = [{stream: {account: "foo", subject:"client.>"}, prefix:"events.>"}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
if _, err := ProcessConfigFile(cf); err == nil {
|
|
t.Fatalf("Expected an error with import with wildcard prefix")
|
|
}
|
|
}
|
|
|
|
func TestShadowSubsCleanupOnClientClose(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
// Now map the subject space between foo and bar.
|
|
// Need to do export first.
|
|
if err := fooAcc.AddStreamExport("foo", nil); err != nil { // Public with no accounts defined.
|
|
t.Fatalf("Error adding account export to client foo: %v", err)
|
|
}
|
|
if err := barAcc.AddStreamImport(fooAcc, "foo", "import"); err != nil {
|
|
t.Fatalf("Error adding account import to client bar: %v", err)
|
|
}
|
|
|
|
cbar, _, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Normal and Queue Subscription on bar client.
|
|
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
|
|
if fslc := fooAcc.sl.Count(); fslc != 2 {
|
|
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
|
|
}
|
|
|
|
// Now close cbar and make sure we remove shadows.
|
|
cbar.closeConnection(ClientClosed)
|
|
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if fslc := fooAcc.sl.Count(); fslc != 0 {
|
|
return fmt.Errorf("Number of shadow subscriptions is %d", fslc)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestNoPrefixWildcardMapping(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
if err := cfoo.acc.AddStreamExport(">", []*Account{barAcc}); err != nil {
|
|
t.Fatalf("Error adding stream export to client foo: %v", err)
|
|
}
|
|
if err := cbar.acc.AddStreamImport(fooAcc, "*", ""); err != nil {
|
|
t.Fatalf("Error adding stream import to client bar: %v", err)
|
|
}
|
|
|
|
// Normal Subscription on bar client for literal "foo".
|
|
cbar.parseAsync("SUB foo 1\r\nPING\r\n")
|
|
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
|
if err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
|
|
// Now publish our message.
|
|
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
|
|
|
|
// Now check we got the message from normal subscription.
|
|
l, err := crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("hello\r\n"), t)
|
|
}
|
|
|
|
func TestPrefixWildcardMapping(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
if err := cfoo.acc.AddStreamExport(">", []*Account{barAcc}); err != nil {
|
|
t.Fatalf("Error adding stream export to client foo: %v", err)
|
|
}
|
|
// Checking that trailing '.' is accepted, tested that it is auto added above.
|
|
if err := cbar.acc.AddStreamImport(fooAcc, "*", "pub.imports."); err != nil {
|
|
t.Fatalf("Error adding stream import to client bar: %v", err)
|
|
}
|
|
|
|
// Normal Subscription on bar client for wildcard.
|
|
cbar.parseAsync("SUB pub.imports.* 1\r\nPING\r\n")
|
|
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
|
if err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
|
|
// Now publish our message.
|
|
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
|
|
|
|
// Now check we got the messages from wildcard subscription.
|
|
l, err := crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "pub.imports.foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("hello\r\n"), t)
|
|
}
|
|
|
|
func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
if err := fooAcc.AddStreamExport(">", []*Account{barAcc}); err != nil {
|
|
t.Fatalf("Error adding stream export to client foo: %v", err)
|
|
}
|
|
if err := barAcc.AddStreamImport(fooAcc, "*", "pub.imports."); err != nil {
|
|
t.Fatalf("Error adding stream import to client bar: %v", err)
|
|
}
|
|
|
|
// Normal Subscription on bar client for wildcard.
|
|
cbar.parseAsync("SUB pub.imports.foo 1\r\nPING\r\n")
|
|
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
|
if err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
|
|
// Now publish our message.
|
|
cfoo.parseAsync("PUB foo 5\r\nhello\r\n")
|
|
|
|
// Now check we got the messages from wildcard subscription.
|
|
l, err := crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "pub.imports.foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("hello\r\n"), t)
|
|
}
|
|
|
|
func TestMultipleImportsAndSingleWCSub(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
if err := fooAcc.AddStreamExport("foo", []*Account{barAcc}); err != nil {
|
|
t.Fatalf("Error adding stream export to account foo: %v", err)
|
|
}
|
|
if err := fooAcc.AddStreamExport("bar", []*Account{barAcc}); err != nil {
|
|
t.Fatalf("Error adding stream export to account foo: %v", err)
|
|
}
|
|
|
|
if err := barAcc.AddStreamImport(fooAcc, "foo", "pub."); err != nil {
|
|
t.Fatalf("Error adding stream import to account bar: %v", err)
|
|
}
|
|
if err := barAcc.AddStreamImport(fooAcc, "bar", "pub."); err != nil {
|
|
t.Fatalf("Error adding stream import to account bar: %v", err)
|
|
}
|
|
|
|
// Wildcard Subscription on bar client for both imports.
|
|
cbar.parse([]byte("SUB pub.* 1\r\n"))
|
|
|
|
// Now publish a message on 'foo' and 'bar'
|
|
cfoo.parseAsync("PUB foo 5\r\nhello\r\nPUB bar 5\r\nworld\r\n")
|
|
|
|
// Now check we got the messages from the wildcard subscription.
|
|
l, err := crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "pub.foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("hello\r\n"), t)
|
|
|
|
l, err = crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw = msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches = mraw[0]
|
|
if matches[SUB_INDEX] != "pub.bar" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("world\r\n"), t)
|
|
|
|
// Check subscription count.
|
|
if fslc := fooAcc.sl.Count(); fslc != 2 {
|
|
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
|
|
}
|
|
if bslc := barAcc.sl.Count(); bslc != 1 {
|
|
t.Fatalf("Expected 1 normal subscriptions on barAcc, got %d", bslc)
|
|
}
|
|
|
|
// Now unsubscribe.
|
|
if err := cbar.parse([]byte("UNSUB 1\r\n")); err != nil {
|
|
t.Fatalf("Error for client 'bar' from server: %v", err)
|
|
}
|
|
// We should have zero on both.
|
|
if bslc := barAcc.sl.Count(); bslc != 0 {
|
|
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
|
|
}
|
|
if fslc := fooAcc.sl.Count(); fslc != 0 {
|
|
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
|
|
}
|
|
}
|
|
|
|
// Make sure the AddServiceExport function is additive if called multiple times.
|
|
func TestAddServiceExport(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
bazAcc, err := s.RegisterAccount("$baz")
|
|
if err != nil {
|
|
t.Fatalf("Error creating account 'baz': %v", err)
|
|
}
|
|
defer s.Shutdown()
|
|
|
|
if err := fooAcc.AddServiceExport("test.request", nil); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
tr := fooAcc.exports.services["test.request"]
|
|
if len(tr.approved) != 0 {
|
|
t.Fatalf("Expected no authorized accounts, got %d", len(tr.approved))
|
|
}
|
|
if err := fooAcc.AddServiceExport("test.request", []*Account{barAcc}); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
tr = fooAcc.exports.services["test.request"]
|
|
if tr == nil {
|
|
t.Fatalf("Expected authorized accounts, got nil")
|
|
}
|
|
if ls := len(tr.approved); ls != 1 {
|
|
t.Fatalf("Expected 1 authorized accounts, got %d", ls)
|
|
}
|
|
if err := fooAcc.AddServiceExport("test.request", []*Account{bazAcc}); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
tr = fooAcc.exports.services["test.request"]
|
|
if tr == nil {
|
|
t.Fatalf("Expected authorized accounts, got nil")
|
|
}
|
|
if ls := len(tr.approved); ls != 2 {
|
|
t.Fatalf("Expected 2 authorized accounts, got %d", ls)
|
|
}
|
|
}
|
|
|
|
func TestServiceExportWithWildcards(t *testing.T) {
|
|
for _, test := range []struct {
|
|
name string
|
|
public bool
|
|
}{
|
|
{
|
|
name: "public",
|
|
public: true,
|
|
},
|
|
{
|
|
name: "private",
|
|
public: false,
|
|
},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
var accs []*Account
|
|
if !test.public {
|
|
accs = []*Account{barAcc}
|
|
}
|
|
// Add service export with a wildcard
|
|
if err := fooAcc.AddServiceExport("ngs.update.*", accs); err != nil {
|
|
t.Fatalf("Error adding account service export: %v", err)
|
|
}
|
|
// Import on bar account
|
|
if err := barAcc.AddServiceImport(fooAcc, "ngs.update", "ngs.update.$bar"); err != nil {
|
|
t.Fatalf("Error adding account service import: %v", err)
|
|
}
|
|
|
|
cfoo, crFoo, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Now setup the responder under cfoo
|
|
cfoo.parse([]byte("SUB ngs.update.* 1\r\n"))
|
|
|
|
// Now send the request. Remember we expect the request on our local ngs.update.
|
|
// We added the route with that "from" and will map it to "ngs.update.$bar"
|
|
cbar.parseAsync("SUB reply 11\r\nPUB ngs.update reply 4\r\nhelp\r\n")
|
|
|
|
// Now read the request from crFoo
|
|
l, err := crFoo.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "ngs.update.$bar" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
// Make sure this looks like _INBOX
|
|
if !strings.HasPrefix(matches[REPLY_INDEX], "_R_.") {
|
|
t.Fatalf("Expected an _R_.* like reply, got '%s'", matches[REPLY_INDEX])
|
|
}
|
|
checkPayload(crFoo, []byte("help\r\n"), t)
|
|
|
|
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
|
|
cfoo.parseAsync(replyOp)
|
|
|
|
// Now read the response from crBar
|
|
l, err = crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw = msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches = mraw[0]
|
|
if matches[SUB_INDEX] != "reply" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "11" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
if matches[REPLY_INDEX] != "" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("22\r\n"), t)
|
|
|
|
if nr := barAcc.NumPendingAllResponses(); nr != 0 {
|
|
t.Fatalf("Expected no responses on barAcc, got %d", nr)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAccountAddServiceImportRace(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
if err := fooAcc.AddServiceExport("foo.*", nil); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
|
|
total := 100
|
|
errCh := make(chan error, total)
|
|
for i := 0; i < 100; i++ {
|
|
go func(i int) {
|
|
err := barAcc.AddServiceImport(fooAcc, fmt.Sprintf("foo.%d", i), "")
|
|
errCh <- err // nil is a valid value.
|
|
}(i)
|
|
}
|
|
|
|
for i := 0; i < 100; i++ {
|
|
err := <-errCh
|
|
if err != nil {
|
|
t.Fatalf("Error adding account service import: %v", err)
|
|
}
|
|
}
|
|
|
|
barAcc.mu.Lock()
|
|
lens := len(barAcc.imports.services)
|
|
c := barAcc.internalClient()
|
|
barAcc.mu.Unlock()
|
|
if lens != total {
|
|
t.Fatalf("Expected %d imported services, got %d", total, lens)
|
|
}
|
|
c.mu.Lock()
|
|
lens = len(c.subs)
|
|
c.mu.Unlock()
|
|
if lens != total {
|
|
t.Fatalf("Expected %d subscriptions in internal client, got %d", total, lens)
|
|
}
|
|
}
|
|
|
|
func TestServiceImportWithWildcards(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
if err := fooAcc.AddServiceExport("test.*", nil); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
// We can not map wildcards atm, so if we supply a to mapping and a wildcard we should fail.
|
|
if err := barAcc.AddServiceImport(fooAcc, "test.*", "foo"); err == nil {
|
|
t.Fatalf("Expected error adding account service import with wildcard and mapping, got none")
|
|
}
|
|
if err := barAcc.AddServiceImport(fooAcc, "test.>", ""); err == nil {
|
|
t.Fatalf("Expected error adding account service import with broader wildcard, got none")
|
|
}
|
|
// This should work.
|
|
if err := barAcc.AddServiceImport(fooAcc, "test.*", ""); err != nil {
|
|
t.Fatalf("Error adding account service import: %v", err)
|
|
}
|
|
// Make sure we can send and receive.
|
|
cfoo, crFoo, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
|
|
// Now setup the resonder under cfoo
|
|
cfoo.parse([]byte("SUB test.* 1\r\n"))
|
|
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Now send the request.
|
|
go cbar.parse([]byte("SUB bar 11\r\nPUB test.22 bar 4\r\nhelp\r\n"))
|
|
|
|
// Now read the request from crFoo
|
|
l, err := crFoo.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "test.22" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
// Make sure this looks like _INBOX
|
|
if !strings.HasPrefix(matches[REPLY_INDEX], "_R_.") {
|
|
t.Fatalf("Expected an _R_.* like reply, got '%s'", matches[REPLY_INDEX])
|
|
}
|
|
checkPayload(crFoo, []byte("help\r\n"), t)
|
|
|
|
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
|
|
go cfoo.parse([]byte(replyOp))
|
|
|
|
// Now read the response from crBar
|
|
l, err = crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw = msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches = mraw[0]
|
|
if matches[SUB_INDEX] != "bar" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "11" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
if matches[REPLY_INDEX] != "" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("22\r\n"), t)
|
|
|
|
// Remove the service import with the wildcard and make sure hasWC is cleared.
|
|
barAcc.removeServiceImport("test.*")
|
|
|
|
barAcc.mu.Lock()
|
|
defer barAcc.mu.Unlock()
|
|
if len(barAcc.imports.services) != 0 {
|
|
t.Fatalf("Expected no imported services, got %d", len(barAcc.imports.services))
|
|
}
|
|
}
|
|
|
|
// Make sure the AddStreamExport function is additive if called multiple times.
|
|
func TestAddStreamExport(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
bazAcc, err := s.RegisterAccount("$baz")
|
|
if err != nil {
|
|
t.Fatalf("Error creating account 'baz': %v", err)
|
|
}
|
|
defer s.Shutdown()
|
|
|
|
if err := fooAcc.AddStreamExport("test.request", nil); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
tr := fooAcc.exports.streams["test.request"]
|
|
if tr != nil {
|
|
t.Fatalf("Expected no authorized accounts, got %d", len(tr.approved))
|
|
}
|
|
if err := fooAcc.AddStreamExport("test.request", []*Account{barAcc}); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
tr = fooAcc.exports.streams["test.request"]
|
|
if tr == nil {
|
|
t.Fatalf("Expected authorized accounts, got nil")
|
|
}
|
|
if ls := len(tr.approved); ls != 1 {
|
|
t.Fatalf("Expected 1 authorized accounts, got %d", ls)
|
|
}
|
|
if err := fooAcc.AddStreamExport("test.request", []*Account{bazAcc}); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
tr = fooAcc.exports.streams["test.request"]
|
|
if tr == nil {
|
|
t.Fatalf("Expected authorized accounts, got nil")
|
|
}
|
|
if ls := len(tr.approved); ls != 2 {
|
|
t.Fatalf("Expected 2 authorized accounts, got %d", ls)
|
|
}
|
|
}
|
|
|
|
func TestCrossAccountRequestReply(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, crFoo, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Add in the service export for the requests. Make it public.
|
|
if err := cfoo.acc.AddServiceExport("test.request", nil); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
|
|
// Test addServiceImport to make sure it requires accounts.
|
|
if err := cbar.acc.AddServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount {
|
|
t.Fatalf("Expected ErrMissingAccount but received %v.", err)
|
|
}
|
|
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject {
|
|
t.Fatalf("Expected ErrInvalidSubject but received %v.", err)
|
|
}
|
|
|
|
// Now add in the route mapping for request to be routed to the foo account.
|
|
if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
|
|
t.Fatalf("Error adding account service import to client bar: %v", err)
|
|
}
|
|
|
|
// Now setup the resonder under cfoo
|
|
cfoo.parse([]byte("SUB test.request 1\r\n"))
|
|
|
|
// Now send the request. Remember we expect the request on our local foo. We added the route
|
|
// with that "from" and will map it to "test.request"
|
|
cbar.parseAsync("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n")
|
|
|
|
// Now read the request from crFoo
|
|
l, err := crFoo.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
if matches[SUB_INDEX] != "test.request" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
// Make sure this looks like _INBOX
|
|
if !strings.HasPrefix(matches[REPLY_INDEX], "_R_.") {
|
|
t.Fatalf("Expected an _R_.* like reply, got '%s'", matches[REPLY_INDEX])
|
|
}
|
|
checkPayload(crFoo, []byte("help\r\n"), t)
|
|
|
|
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
|
|
cfoo.parseAsync(replyOp)
|
|
|
|
// Now read the response from crBar
|
|
l, err = crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw = msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches = mraw[0]
|
|
if matches[SUB_INDEX] != "bar" {
|
|
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "11" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
if matches[REPLY_INDEX] != "" {
|
|
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
|
}
|
|
checkPayload(crBar, []byte("22\r\n"), t)
|
|
|
|
if nr := barAcc.NumPendingAllResponses(); nr != 0 {
|
|
t.Fatalf("Expected no responses on barAcc, got %d", nr)
|
|
}
|
|
}
|
|
|
|
func TestAccountRequestReplyTrackLatency(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
// Run server in Go routine. We need this one running for internal sending of msgs.
|
|
go s.Start()
|
|
// Wait for accept loop(s) to be started
|
|
if err := s.readyForConnections(10 * time.Second); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
cfoo, crFoo, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Add in the service export for the requests. Make it public.
|
|
if err := fooAcc.AddServiceExport("track.service", nil); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
|
|
// Now let's add in tracking
|
|
|
|
// First check we get an error if service does not exist.
|
|
if err := fooAcc.TrackServiceExport("track.wrong", "results"); err != ErrMissingService {
|
|
t.Fatalf("Expected error enabling tracking latency for wrong service")
|
|
}
|
|
// Check results should be a valid subject
|
|
if err := fooAcc.TrackServiceExport("track.service", "results.*"); err != ErrBadPublishSubject {
|
|
t.Fatalf("Expected error enabling tracking latency for bad results subject")
|
|
}
|
|
// Make sure we can not loop around on ourselves..
|
|
if err := fooAcc.TrackServiceExport("track.service", "track.service"); err != ErrBadPublishSubject {
|
|
t.Fatalf("Expected error enabling tracking latency for same subject")
|
|
}
|
|
// Check bad sampling
|
|
if err := fooAcc.TrackServiceExportWithSampling("track.service", "results", -1); err != ErrBadSampling {
|
|
t.Fatalf("Expected error enabling tracking latency for bad sampling")
|
|
}
|
|
if err := fooAcc.TrackServiceExportWithSampling("track.service", "results", 101); err != ErrBadSampling {
|
|
t.Fatalf("Expected error enabling tracking latency for bad sampling")
|
|
}
|
|
|
|
// Now let's add in tracking for real. This will be 100%
|
|
if err := fooAcc.TrackServiceExport("track.service", "results"); err != nil {
|
|
t.Fatalf("Error enabling tracking latency: %v", err)
|
|
}
|
|
|
|
// Now add in the route mapping for request to be routed to the foo account.
|
|
if err := barAcc.AddServiceImport(fooAcc, "req", "track.service"); err != nil {
|
|
t.Fatalf("Error adding account service import to client bar: %v", err)
|
|
}
|
|
|
|
// Now setup the responder under cfoo and the listener for the results
|
|
cfoo.parse([]byte("SUB track.service 1\r\nSUB results 2\r\n"))
|
|
|
|
readFooMsg := func() ([]byte, string) {
|
|
t.Helper()
|
|
l, err := crFoo.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'foo': %v", err)
|
|
}
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
msg := mraw[0]
|
|
msgSize, _ := strconv.Atoi(msg[LEN_INDEX])
|
|
return grabPayload(crFoo, msgSize), msg[REPLY_INDEX]
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
// Now send the request. Remember we expect the request on our local foo. We added the route
|
|
// with that "from" and will map it to "test.request"
|
|
cbar.parseAsync("SUB resp 11\r\nPUB req resp 4\r\nhelp\r\n")
|
|
|
|
// Now read the request from crFoo
|
|
_, reply := readFooMsg()
|
|
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", reply)
|
|
|
|
serviceTime := 25 * time.Millisecond
|
|
|
|
// We will wait a bit to check latency results
|
|
go func() {
|
|
time.Sleep(serviceTime)
|
|
cfoo.parseAsync(replyOp)
|
|
}()
|
|
|
|
// Now read the response from crBar
|
|
_, err := crBar.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
|
|
// Now let's check that we got the sampling results
|
|
rMsg, _ := readFooMsg()
|
|
|
|
// Unmarshal and check it.
|
|
var sl ServiceLatency
|
|
err = json.Unmarshal(rMsg, &sl)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse latency json: %v\n", err)
|
|
}
|
|
startDelta := sl.RequestStart.Sub(start)
|
|
if startDelta > 5*time.Millisecond {
|
|
t.Fatalf("Bad start delta %v", startDelta)
|
|
}
|
|
if sl.ServiceLatency < serviceTime {
|
|
t.Fatalf("Bad service latency: %v", sl.ServiceLatency)
|
|
}
|
|
if sl.TotalLatency < sl.ServiceLatency {
|
|
t.Fatalf("Bad total latency: %v", sl.ServiceLatency)
|
|
}
|
|
}
|
|
|
|
// This will test for leaks in the remote latency tracking via client.rrTracking
|
|
func TestAccountTrackLatencyRemoteLeaks(t *testing.T) {
|
|
optsA, err := ProcessConfigFile("./configs/seed.conf")
|
|
require_NoError(t, err)
|
|
optsA.NoSigs, optsA.NoLog = true, true
|
|
optsA.ServerName = "A"
|
|
srvA := RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
optsB := nextServerOpts(optsA)
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
|
|
optsB.ServerName = "B"
|
|
srvB := RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB)
|
|
srvs := []*Server{srvA, srvB}
|
|
|
|
// Now add in the accounts and setup tracking.
|
|
for _, s := range srvs {
|
|
s.SetSystemAccount(globalAccountName)
|
|
fooAcc, _ := s.RegisterAccount("$foo")
|
|
fooAcc.AddServiceExport("track.service", nil)
|
|
fooAcc.TrackServiceExport("track.service", "results")
|
|
barAcc, _ := s.RegisterAccount("$bar")
|
|
if err := barAcc.AddServiceImport(fooAcc, "req", "track.service"); err != nil {
|
|
t.Fatalf("Failed to import: %v", err)
|
|
}
|
|
}
|
|
|
|
getClient := func(s *Server, name string) *client {
|
|
t.Helper()
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for _, c := range s.clients {
|
|
c.mu.Lock()
|
|
n := c.opts.Name
|
|
c.mu.Unlock()
|
|
if n == name {
|
|
return c
|
|
}
|
|
}
|
|
t.Fatalf("Did not find client %q on server %q", name, s.info.ID)
|
|
return nil
|
|
}
|
|
|
|
// Test with a responder on second server, srvB. but they will not respond.
|
|
cfooNC := natsConnect(t, srvB.ClientURL(), nats.Name("foo"))
|
|
defer cfooNC.Close()
|
|
cfoo := getClient(srvB, "foo")
|
|
fooAcc, _ := srvB.LookupAccount("$foo")
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
|
|
// Set new limits
|
|
for _, srv := range srvs {
|
|
fooAcc, _ := srv.LookupAccount("$foo")
|
|
err := fooAcc.SetServiceExportResponseThreshold("track.service", 5*time.Millisecond)
|
|
if err != nil {
|
|
t.Fatalf("Error setting response threshold: %v", err)
|
|
}
|
|
}
|
|
|
|
// Now setup the responder under cfoo and the listener for the results
|
|
time.Sleep(50 * time.Millisecond)
|
|
baseSubs := int(srvA.NumSubscriptions())
|
|
fooSub := natsSubSync(t, cfooNC, "track.service")
|
|
natsFlush(t, cfooNC)
|
|
// Wait for it to propagate.
|
|
checkExpectedSubs(t, baseSubs+1, srvA)
|
|
|
|
cbarNC := natsConnect(t, srvA.ClientURL(), nats.Name("bar"))
|
|
defer cbarNC.Close()
|
|
cbar := getClient(srvA, "bar")
|
|
|
|
barAcc, _ := srvA.LookupAccount("$bar")
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
readFooMsg := func() {
|
|
t.Helper()
|
|
if _, err := fooSub.NextMsg(time.Second); err != nil {
|
|
t.Fatalf("Did not receive foo msg: %v", err)
|
|
}
|
|
}
|
|
|
|
// Send 2 requests
|
|
natsSubSync(t, cbarNC, "resp")
|
|
|
|
natsPubReq(t, cbarNC, "req", "resp", []byte("help"))
|
|
natsPubReq(t, cbarNC, "req", "resp", []byte("help"))
|
|
|
|
readFooMsg()
|
|
readFooMsg()
|
|
|
|
var rc *client
|
|
// Pull out first client
|
|
srvB.mu.Lock()
|
|
for _, rc = range srvB.clients {
|
|
if rc != nil {
|
|
break
|
|
}
|
|
}
|
|
srvB.mu.Unlock()
|
|
|
|
tracking := func() int {
|
|
rc.mu.Lock()
|
|
var nt int
|
|
if rc.rrTracking != nil {
|
|
nt = len(rc.rrTracking.rmap)
|
|
}
|
|
rc.mu.Unlock()
|
|
return nt
|
|
}
|
|
|
|
expectTracking := func(expected int) {
|
|
t.Helper()
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if numTracking := tracking(); numTracking != expected {
|
|
return fmt.Errorf("Expected to have %d tracking replies, got %d", expected, numTracking)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
expectTracking(2)
|
|
// Make sure these remote tracking replies honor the current respThresh for a service export.
|
|
time.Sleep(10 * time.Millisecond)
|
|
expectTracking(0)
|
|
// Also make sure tracking is removed
|
|
rc.mu.Lock()
|
|
removed := rc.rrTracking == nil
|
|
rc.mu.Unlock()
|
|
if !removed {
|
|
t.Fatalf("Expected the rrTracking to be removed")
|
|
}
|
|
|
|
// Now let's test that a lower response threshold is picked up.
|
|
fSub := natsSubSync(t, cfooNC, "foo")
|
|
natsFlush(t, cfooNC)
|
|
|
|
// Wait for it to propagate.
|
|
checkExpectedSubs(t, baseSubs+4, srvA)
|
|
|
|
// queue up some first. We want to test changing when rrTracking exists.
|
|
natsPubReq(t, cbarNC, "req", "resp", []byte("help"))
|
|
readFooMsg()
|
|
expectTracking(1)
|
|
|
|
for _, s := range srvs {
|
|
fooAcc, _ := s.LookupAccount("$foo")
|
|
barAcc, _ := s.LookupAccount("$bar")
|
|
fooAcc.AddServiceExport("foo", nil)
|
|
fooAcc.TrackServiceExport("foo", "foo.results")
|
|
fooAcc.SetServiceExportResponseThreshold("foo", time.Millisecond)
|
|
barAcc.AddServiceImport(fooAcc, "foo", "foo")
|
|
}
|
|
|
|
natsSubSync(t, cbarNC, "reply")
|
|
natsPubReq(t, cbarNC, "foo", "reply", []byte("help"))
|
|
if _, err := fSub.NextMsg(time.Second); err != nil {
|
|
t.Fatalf("Did not receive foo msg: %v", err)
|
|
}
|
|
expectTracking(2)
|
|
|
|
rc.mu.Lock()
|
|
lrt := rc.rrTracking.lrt
|
|
rc.mu.Unlock()
|
|
if lrt != time.Millisecond {
|
|
t.Fatalf("Expected lrt of %v, got %v", time.Millisecond, lrt)
|
|
}
|
|
|
|
// Now make sure we clear on close.
|
|
rc.closeConnection(ClientClosed)
|
|
|
|
// Actual tear down will be not inline.
|
|
checkFor(t, time.Second, 5*time.Millisecond, func() error {
|
|
rc.mu.Lock()
|
|
removed = rc.rrTracking == nil
|
|
rc.mu.Unlock()
|
|
if !removed {
|
|
return fmt.Errorf("Expected the rrTracking to be removed after client close")
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestCrossAccountServiceResponseTypes(t *testing.T) {
|
|
s, fooAcc, barAcc := simpleAccountServer(t)
|
|
defer s.Shutdown()
|
|
|
|
cfoo, crFoo, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
cbar, crBar, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Add in the service export for the requests. Make it public.
|
|
if err := fooAcc.AddServiceExportWithResponse("test.request", Streamed, nil); err != nil {
|
|
t.Fatalf("Error adding account service export to client foo: %v", err)
|
|
}
|
|
// Now add in the route mapping for request to be routed to the foo account.
|
|
if err := barAcc.AddServiceImport(fooAcc, "foo", "test.request"); err != nil {
|
|
t.Fatalf("Error adding account service import to client bar: %v", err)
|
|
}
|
|
|
|
// Now setup the resonder under cfoo
|
|
cfoo.parse([]byte("SUB test.request 1\r\n"))
|
|
|
|
// Now send the request. Remember we expect the request on our local foo. We added the route
|
|
// with that "from" and will map it to "test.request"
|
|
cbar.parseAsync("SUB bar 11\r\nPUB foo bar 4\r\nhelp\r\n")
|
|
|
|
// Now read the request from crFoo
|
|
l, err := crFoo.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches := mraw[0]
|
|
reply := matches[REPLY_INDEX]
|
|
if !strings.HasPrefix(reply, "_R_.") {
|
|
t.Fatalf("Expected an _R_.* like reply, got '%s'", reply)
|
|
}
|
|
crFoo.ReadString('\n')
|
|
|
|
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
|
|
var mReply []byte
|
|
for i := 0; i < 10; i++ {
|
|
mReply = append(mReply, replyOp...)
|
|
}
|
|
|
|
cfoo.parseAsync(string(mReply))
|
|
|
|
var b [256]byte
|
|
n, err := crBar.Read(b[:])
|
|
if err != nil {
|
|
t.Fatalf("Error reading response: %v", err)
|
|
}
|
|
mraw = msgPat.FindAllStringSubmatch(string(b[:n]), -1)
|
|
if len(mraw) != 10 {
|
|
t.Fatalf("Expected a response but got %d", len(mraw))
|
|
}
|
|
|
|
// Also make sure the response map gets cleaned up when interest goes away.
|
|
cbar.closeConnection(ClientClosed)
|
|
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nr := barAcc.NumPendingAllResponses(); nr != 0 {
|
|
return fmt.Errorf("Number of responses is %d", nr)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now test bogus reply subjects are handled and do not accumulate the response maps.
|
|
cbar, _, _ = newClientForServer(s)
|
|
defer cbar.close()
|
|
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
// Do not create any interest in the reply subject 'bar'. Just send a request.
|
|
cbar.parseAsync("PUB foo bar 4\r\nhelp\r\n")
|
|
|
|
// Now read the request from crFoo
|
|
l, err = crFoo.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading from client 'bar': %v", err)
|
|
}
|
|
mraw = msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
matches = mraw[0]
|
|
reply = matches[REPLY_INDEX]
|
|
if !strings.HasPrefix(reply, "_R_.") {
|
|
t.Fatalf("Expected an _R_.* like reply, got '%s'", reply)
|
|
}
|
|
crFoo.ReadString('\n')
|
|
|
|
replyOp = fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
|
|
|
|
cfoo.parseAsync(replyOp)
|
|
|
|
// Now wait for a bit, the reply should trip a no interest condition
|
|
// which should clean this up.
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nr := fooAcc.NumPendingAllResponses(); nr != 0 {
|
|
return fmt.Errorf("Number of responses is %d", nr)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Also make sure the response map entry is gone as well.
|
|
fooAcc.mu.RLock()
|
|
lrm := len(fooAcc.exports.responses)
|
|
fooAcc.mu.RUnlock()
|
|
|
|
if lrm != 0 {
|
|
t.Fatalf("Expected the responses to be cleared, got %d entries", lrm)
|
|
}
|
|
}
|
|
|
|
func TestAccountMapsUsers(t *testing.T) {
|
|
// Used for the nkey users to properly sign.
|
|
seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM"
|
|
seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE"
|
|
|
|
confFileName := createConfFile(t, []byte(`
|
|
accounts {
|
|
synadia {
|
|
users = [
|
|
{user: derek, password: foo},
|
|
{nkey: UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR}
|
|
]
|
|
}
|
|
nats {
|
|
users = [
|
|
{user: ivan, password: bar},
|
|
{nkey: UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH}
|
|
]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, confFileName)
|
|
opts, err := ProcessConfigFile(confFileName)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error parsing config file: %v", err)
|
|
}
|
|
opts.NoSigs = true
|
|
s := New(opts)
|
|
defer s.Shutdown()
|
|
synadia, _ := s.LookupAccount("synadia")
|
|
nats, _ := s.LookupAccount("nats")
|
|
|
|
if synadia == nil || nats == nil {
|
|
t.Fatalf("Expected non nil accounts during lookup")
|
|
}
|
|
|
|
// Make sure a normal log in maps the accounts correctly.
|
|
c, _, _ := newClientForServer(s)
|
|
defer c.close()
|
|
connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n")
|
|
c.parse(connectOp)
|
|
if c.acc != synadia {
|
|
t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc)
|
|
}
|
|
|
|
c, _, _ = newClientForServer(s)
|
|
defer c.close()
|
|
connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
|
|
c.parse(connectOp)
|
|
if c.acc != nats {
|
|
t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc)
|
|
}
|
|
|
|
// Now test nkeys as well.
|
|
kp, _ := nkeys.FromSeed([]byte(seed1))
|
|
pubKey, _ := kp.PublicKey()
|
|
|
|
c, cr, l := newClientForServer(s)
|
|
defer c.close()
|
|
// Check for Nonce
|
|
var info nonceInfo
|
|
err = json.Unmarshal([]byte(l[5:]), &info)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse INFO json: %v\n", err)
|
|
}
|
|
if info.Nonce == "" {
|
|
t.Fatalf("Expected a non-empty nonce with nkeys defined")
|
|
}
|
|
sigraw, err := kp.Sign([]byte(info.Nonce))
|
|
if err != nil {
|
|
t.Fatalf("Failed signing nonce: %v", err)
|
|
}
|
|
sig := base64.RawURLEncoding.EncodeToString(sigraw)
|
|
|
|
// PING needed to flush the +OK to us.
|
|
cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
|
|
c.parseAsync(cs)
|
|
l, _ = cr.ReadString('\n')
|
|
if !strings.HasPrefix(l, "+OK") {
|
|
t.Fatalf("Expected an OK, got: %v", l)
|
|
}
|
|
if c.acc != synadia {
|
|
t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc)
|
|
}
|
|
|
|
// Now nats account nkey user.
|
|
kp, _ = nkeys.FromSeed([]byte(seed2))
|
|
pubKey, _ = kp.PublicKey()
|
|
|
|
c, cr, l = newClientForServer(s)
|
|
defer c.close()
|
|
// Check for Nonce
|
|
err = json.Unmarshal([]byte(l[5:]), &info)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse INFO json: %v\n", err)
|
|
}
|
|
if info.Nonce == "" {
|
|
t.Fatalf("Expected a non-empty nonce with nkeys defined")
|
|
}
|
|
sigraw, err = kp.Sign([]byte(info.Nonce))
|
|
if err != nil {
|
|
t.Fatalf("Failed signing nonce: %v", err)
|
|
}
|
|
sig = base64.RawURLEncoding.EncodeToString(sigraw)
|
|
|
|
// PING needed to flush the +OK to us.
|
|
cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
|
|
c.parseAsync(cs)
|
|
l, _ = cr.ReadString('\n')
|
|
if !strings.HasPrefix(l, "+OK") {
|
|
t.Fatalf("Expected an OK, got: %v", l)
|
|
}
|
|
if c.acc != nats {
|
|
t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc)
|
|
}
|
|
}
|
|
|
|
func TestAccountGlobalDefault(t *testing.T) {
|
|
opts := defaultServerOptions
|
|
s := New(&opts)
|
|
|
|
if acc, _ := s.LookupAccount(globalAccountName); acc == nil {
|
|
t.Fatalf("Expected a global default account on a new server, got none.")
|
|
}
|
|
// Make sure we can not create one with same name..
|
|
if _, err := s.RegisterAccount(globalAccountName); err == nil {
|
|
t.Fatalf("Expected error trying to create a new reserved account")
|
|
}
|
|
|
|
// Make sure we can not define one in a config file either.
|
|
confFileName := createConfFile(t, []byte(`accounts { $G {} }`))
|
|
defer removeFile(t, confFileName)
|
|
|
|
if _, err := ProcessConfigFile(confFileName); err == nil {
|
|
t.Fatalf("Expected an error parsing config file with reserved account")
|
|
}
|
|
}
|
|
|
|
func TestAccountCheckStreamImportsEqual(t *testing.T) {
|
|
// Create bare accounts for this test
|
|
fooAcc := NewAccount("foo")
|
|
if err := fooAcc.AddStreamExport(">", nil); err != nil {
|
|
t.Fatalf("Error adding stream export: %v", err)
|
|
}
|
|
|
|
barAcc := NewAccount("bar")
|
|
if err := barAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
bazAcc := NewAccount("baz")
|
|
if err := bazAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if !barAcc.checkStreamImportsEqual(bazAcc) {
|
|
t.Fatal("Expected stream imports to be the same")
|
|
}
|
|
|
|
if err := bazAcc.AddStreamImport(fooAcc, "foo.>", ""); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if barAcc.checkStreamImportsEqual(bazAcc) {
|
|
t.Fatal("Expected stream imports to be different")
|
|
}
|
|
if err := barAcc.AddStreamImport(fooAcc, "foo.>", ""); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if !barAcc.checkStreamImportsEqual(bazAcc) {
|
|
t.Fatal("Expected stream imports to be the same")
|
|
}
|
|
|
|
// Create another account that is named "foo". We want to make sure
|
|
// that the comparison still works (based on account name, not pointer)
|
|
newFooAcc := NewAccount("foo")
|
|
if err := newFooAcc.AddStreamExport(">", nil); err != nil {
|
|
t.Fatalf("Error adding stream export: %v", err)
|
|
}
|
|
batAcc := NewAccount("bat")
|
|
if err := batAcc.AddStreamImport(newFooAcc, "foo", "myPrefix"); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if err := batAcc.AddStreamImport(newFooAcc, "foo.>", ""); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if !batAcc.checkStreamImportsEqual(barAcc) {
|
|
t.Fatal("Expected stream imports to be the same")
|
|
}
|
|
if !batAcc.checkStreamImportsEqual(bazAcc) {
|
|
t.Fatal("Expected stream imports to be the same")
|
|
}
|
|
|
|
// Test with account with different "from"
|
|
expAcc := NewAccount("new_acc")
|
|
if err := expAcc.AddStreamExport(">", nil); err != nil {
|
|
t.Fatalf("Error adding stream export: %v", err)
|
|
}
|
|
aAcc := NewAccount("a")
|
|
if err := aAcc.AddStreamImport(expAcc, "bar", ""); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
bAcc := NewAccount("b")
|
|
if err := bAcc.AddStreamImport(expAcc, "baz", ""); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if aAcc.checkStreamImportsEqual(bAcc) {
|
|
t.Fatal("Expected stream imports to be different")
|
|
}
|
|
|
|
// Test with account with different "prefix"
|
|
aAcc = NewAccount("a")
|
|
if err := aAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
bAcc = NewAccount("b")
|
|
if err := bAcc.AddStreamImport(expAcc, "bar", "diff_prefix"); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if aAcc.checkStreamImportsEqual(bAcc) {
|
|
t.Fatal("Expected stream imports to be different")
|
|
}
|
|
|
|
// Test with account with different "name"
|
|
expAcc = NewAccount("diff_name")
|
|
if err := expAcc.AddStreamExport(">", nil); err != nil {
|
|
t.Fatalf("Error adding stream export: %v", err)
|
|
}
|
|
bAcc = NewAccount("b")
|
|
if err := bAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil {
|
|
t.Fatalf("Error adding stream import: %v", err)
|
|
}
|
|
if aAcc.checkStreamImportsEqual(bAcc) {
|
|
t.Fatal("Expected stream imports to be different")
|
|
}
|
|
}
|
|
|
|
func TestAccountNoDeadlockOnQueueSubRouteMapUpdate(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
nc.QueueSubscribeSync("foo", "bar")
|
|
|
|
var accs []*Account
|
|
for i := 0; i < 10; i++ {
|
|
acc, _ := s.RegisterAccount(fmt.Sprintf("acc%d", i))
|
|
acc.mu.Lock()
|
|
accs = append(accs, acc)
|
|
}
|
|
|
|
opts2 := DefaultOptions()
|
|
opts2.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port))
|
|
s2 := RunServer(opts2)
|
|
defer s2.Shutdown()
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
for _, acc := range accs {
|
|
acc.mu.Unlock()
|
|
}
|
|
wg.Done()
|
|
}()
|
|
|
|
nc.QueueSubscribeSync("foo", "bar")
|
|
nc.Flush()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestAccountDuplicateServiceImportSubject(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
fooAcc, _ := s.RegisterAccount("foo")
|
|
fooAcc.AddServiceExport("remote1", nil)
|
|
fooAcc.AddServiceExport("remote2", nil)
|
|
|
|
barAcc, _ := s.RegisterAccount("bar")
|
|
if err := barAcc.AddServiceImport(fooAcc, "foo", "remote1"); err != nil {
|
|
t.Fatalf("Error adding service import: %v", err)
|
|
}
|
|
if err := barAcc.AddServiceImport(fooAcc, "foo", "remote2"); err == nil || !strings.Contains(err.Error(), "duplicate") {
|
|
t.Fatalf("Expected an error about duplicate service import subject, got %q", err)
|
|
}
|
|
}
|
|
|
|
func TestMultipleStreamImportsWithSameSubjectDifferentPrefix(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
fooAcc, _ := s.RegisterAccount("foo")
|
|
fooAcc.AddStreamExport("test", nil)
|
|
|
|
barAcc, _ := s.RegisterAccount("bar")
|
|
barAcc.AddStreamExport("test", nil)
|
|
|
|
importAcc, _ := s.RegisterAccount("import")
|
|
|
|
if err := importAcc.AddStreamImport(fooAcc, "test", "foo"); err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if err := importAcc.AddStreamImport(barAcc, "test", "bar"); err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// Now make sure we can see messages from both.
|
|
cimport, crImport, _ := newClientForServer(s)
|
|
defer cimport.close()
|
|
if err := cimport.registerWithAccount(importAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'import' account: %v", err)
|
|
}
|
|
if err := cimport.parse([]byte("SUB *.test 1\r\n")); err != nil {
|
|
t.Fatalf("Error for client 'import' from server: %v", err)
|
|
}
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
|
|
cbar, _, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
readMsg := func() {
|
|
t.Helper()
|
|
l, err := crImport.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading msg header from client 'import': %v", err)
|
|
}
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
// Consume msg body too.
|
|
if _, err = crImport.ReadString('\n'); err != nil {
|
|
t.Fatalf("Error reading msg body from client 'import': %v", err)
|
|
}
|
|
}
|
|
|
|
cbar.parseAsync("PUB test 9\r\nhello-bar\r\n")
|
|
readMsg()
|
|
|
|
cfoo.parseAsync("PUB test 9\r\nhello-foo\r\n")
|
|
readMsg()
|
|
}
|
|
|
|
// This should work with prefixes that are different but we also want it to just work with same subject
|
|
// being imported from multiple accounts.
|
|
func TestMultipleStreamImportsWithSameSubject(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
fooAcc, _ := s.RegisterAccount("foo")
|
|
fooAcc.AddStreamExport("test", nil)
|
|
|
|
barAcc, _ := s.RegisterAccount("bar")
|
|
barAcc.AddStreamExport("test", nil)
|
|
|
|
importAcc, _ := s.RegisterAccount("import")
|
|
|
|
if err := importAcc.AddStreamImport(fooAcc, "test", ""); err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
// Since we allow this now, make sure we do detect a duplicate import from same account etc.
|
|
// That should be not allowed.
|
|
if err := importAcc.AddStreamImport(fooAcc, "test", ""); err != ErrStreamImportDuplicate {
|
|
t.Fatalf("Expected ErrStreamImportDuplicate but got %v", err)
|
|
}
|
|
|
|
if err := importAcc.AddStreamImport(barAcc, "test", ""); err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// Now make sure we can see messages from both.
|
|
cimport, crImport, _ := newClientForServer(s)
|
|
defer cimport.close()
|
|
if err := cimport.registerWithAccount(importAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'import' account: %v", err)
|
|
}
|
|
if err := cimport.parse([]byte("SUB test 1\r\n")); err != nil {
|
|
t.Fatalf("Error for client 'import' from server: %v", err)
|
|
}
|
|
|
|
cfoo, _, _ := newClientForServer(s)
|
|
defer cfoo.close()
|
|
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
|
}
|
|
|
|
cbar, _, _ := newClientForServer(s)
|
|
defer cbar.close()
|
|
if err := cbar.registerWithAccount(barAcc); err != nil {
|
|
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
|
}
|
|
|
|
readMsg := func() {
|
|
t.Helper()
|
|
l, err := crImport.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error reading msg header from client 'import': %v", err)
|
|
}
|
|
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
|
if len(mraw) == 0 {
|
|
t.Fatalf("No message received")
|
|
}
|
|
// Consume msg body too.
|
|
if _, err = crImport.ReadString('\n'); err != nil {
|
|
t.Fatalf("Error reading msg body from client 'import': %v", err)
|
|
}
|
|
}
|
|
|
|
cbar.parseAsync("PUB test 9\r\nhello-bar\r\n")
|
|
readMsg()
|
|
|
|
cfoo.parseAsync("PUB test 9\r\nhello-foo\r\n")
|
|
readMsg()
|
|
}
|
|
|
|
func TestAccountBasicRouteMapping(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = -1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
|
|
acc.AddMapping("foo", "bar")
|
|
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
fsub, _ := nc.SubscribeSync("foo")
|
|
bsub, _ := nc.SubscribeSync("bar")
|
|
nc.Publish("foo", nil)
|
|
nc.Flush()
|
|
|
|
checkPending := func(sub *nats.Subscription, expected int) {
|
|
t.Helper()
|
|
if n, _, _ := sub.Pending(); n != expected {
|
|
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
|
|
}
|
|
}
|
|
|
|
checkPending(fsub, 0)
|
|
checkPending(bsub, 1)
|
|
|
|
acc.RemoveMapping("foo")
|
|
|
|
nc.Publish("foo", nil)
|
|
nc.Flush()
|
|
|
|
checkPending(fsub, 1)
|
|
checkPending(bsub, 1)
|
|
}
|
|
|
|
func TestAccountWildcardRouteMapping(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = -1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
|
|
|
|
addMap := func(src, dest string) {
|
|
t.Helper()
|
|
if err := acc.AddMapping(src, dest); err != nil {
|
|
t.Fatalf("Error adding mapping: %v", err)
|
|
}
|
|
}
|
|
|
|
addMap("foo.*.*", "bar.$2.$1")
|
|
addMap("bar.*.>", "baz.$1.>")
|
|
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
pub := func(subj string) {
|
|
t.Helper()
|
|
err := nc.Publish(subj, nil)
|
|
if err == nil {
|
|
err = nc.Flush()
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("Error publishing: %v", err)
|
|
}
|
|
}
|
|
|
|
fsub, _ := nc.SubscribeSync("foo.>")
|
|
bsub, _ := nc.SubscribeSync("bar.>")
|
|
zsub, _ := nc.SubscribeSync("baz.>")
|
|
|
|
checkPending := func(sub *nats.Subscription, expected int) {
|
|
t.Helper()
|
|
if n, _, _ := sub.Pending(); n != expected {
|
|
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
|
|
}
|
|
}
|
|
|
|
pub("foo.1.2")
|
|
|
|
checkPending(fsub, 0)
|
|
checkPending(bsub, 1)
|
|
checkPending(zsub, 0)
|
|
}
|
|
|
|
func TestAccountRouteMappingChangesAfterClientStart(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = -1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
// Create the client first then add in mapping.
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
nc.Flush()
|
|
|
|
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
|
|
acc.AddMapping("foo", "bar")
|
|
|
|
fsub, _ := nc.SubscribeSync("foo")
|
|
bsub, _ := nc.SubscribeSync("bar")
|
|
nc.Publish("foo", nil)
|
|
nc.Flush()
|
|
|
|
checkPending := func(sub *nats.Subscription, expected int) {
|
|
t.Helper()
|
|
if n, _, _ := sub.Pending(); n != expected {
|
|
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
|
|
}
|
|
}
|
|
|
|
checkPending(fsub, 0)
|
|
checkPending(bsub, 1)
|
|
|
|
acc.RemoveMapping("foo")
|
|
|
|
nc.Publish("foo", nil)
|
|
nc.Flush()
|
|
|
|
checkPending(fsub, 1)
|
|
checkPending(bsub, 1)
|
|
}
|
|
|
|
func TestAccountSimpleWeightedRouteMapping(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = -1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
|
|
acc.AddWeightedMappings("foo", NewMapDest("bar", 50))
|
|
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
fsub, _ := nc.SubscribeSync("foo")
|
|
bsub, _ := nc.SubscribeSync("bar")
|
|
|
|
total := 500
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", nil)
|
|
}
|
|
nc.Flush()
|
|
|
|
fpending, _, _ := fsub.Pending()
|
|
bpending, _, _ := bsub.Pending()
|
|
|
|
h := total / 2
|
|
tp := h / 5
|
|
min, max := h-tp, h+tp
|
|
if fpending < min || fpending > max {
|
|
t.Fatalf("Expected about %d msgs, got %d and %d", h, fpending, bpending)
|
|
}
|
|
}
|
|
|
|
func TestAccountMultiWeightedRouteMappings(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = -1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
|
|
|
|
// Check failures for bad weights.
|
|
shouldErr := func(rds ...*MapDest) {
|
|
t.Helper()
|
|
if acc.AddWeightedMappings("foo", rds...) == nil {
|
|
t.Fatalf("Expected an error, got none")
|
|
}
|
|
}
|
|
shouldNotErr := func(rds ...*MapDest) {
|
|
t.Helper()
|
|
if err := acc.AddWeightedMappings("foo", rds...); err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
shouldErr(NewMapDest("bar", 150))
|
|
shouldNotErr(NewMapDest("bar", 50))
|
|
shouldNotErr(NewMapDest("bar", 50), NewMapDest("baz", 50))
|
|
// Same dest duplicated should error.
|
|
shouldErr(NewMapDest("bar", 50), NewMapDest("bar", 50))
|
|
// total over 100
|
|
shouldErr(NewMapDest("bar", 50), NewMapDest("baz", 60))
|
|
|
|
acc.RemoveMapping("foo")
|
|
|
|
// 20 for original, you can leave it off will be auto-added.
|
|
shouldNotErr(NewMapDest("bar", 50), NewMapDest("baz", 30))
|
|
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
fsub, _ := nc.SubscribeSync("foo")
|
|
bsub, _ := nc.SubscribeSync("bar")
|
|
zsub, _ := nc.SubscribeSync("baz")
|
|
|
|
// For checking later.
|
|
rds := []struct {
|
|
sub *nats.Subscription
|
|
w uint8
|
|
}{
|
|
{fsub, 20},
|
|
{bsub, 50},
|
|
{zsub, 30},
|
|
}
|
|
|
|
total := 5000
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", nil)
|
|
}
|
|
nc.Flush()
|
|
|
|
for _, rd := range rds {
|
|
pending, _, _ := rd.sub.Pending()
|
|
expected := total / int(100/rd.w)
|
|
tp := expected / 5 // 20%
|
|
min, max := expected-tp, expected+tp
|
|
if pending < min || pending > max {
|
|
t.Fatalf("Expected about %d msgs for %q, got %d", expected, rd.sub.Subject, pending)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestGlobalAccountRouteMappingsConfiguration(t *testing.T) {
|
|
cf := createConfFile(t, []byte(`
|
|
mappings = {
|
|
foo: bar
|
|
foo.*: [ { dest: bar.v1.$1, weight: 40% }, { destination: baz.v2.$1, weight: 20 } ]
|
|
bar.*.*: RAB.$2.$1
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
|
|
s, _ := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
bsub, _ := nc.SubscribeSync("bar")
|
|
fsub1, _ := nc.SubscribeSync("bar.v1.>")
|
|
fsub2, _ := nc.SubscribeSync("baz.v2.>")
|
|
zsub, _ := nc.SubscribeSync("RAB.>")
|
|
f22sub, _ := nc.SubscribeSync("foo.*")
|
|
|
|
checkPending := func(sub *nats.Subscription, expected int) {
|
|
t.Helper()
|
|
if n, _, _ := sub.Pending(); n != expected {
|
|
t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
|
|
}
|
|
}
|
|
|
|
nc.Publish("foo", nil)
|
|
nc.Publish("bar.11.22", nil)
|
|
|
|
total := 500
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo.22", nil)
|
|
}
|
|
nc.Flush()
|
|
|
|
checkPending(bsub, 1)
|
|
checkPending(zsub, 1)
|
|
|
|
fpending, _, _ := f22sub.Pending()
|
|
fpending1, _, _ := fsub1.Pending()
|
|
fpending2, _, _ := fsub2.Pending()
|
|
|
|
if fpending1 < fpending2 || fpending < fpending2 {
|
|
t.Fatalf("Loadbalancing seems off for the foo.* mappings: %d and %d and %d", fpending, fpending1, fpending2)
|
|
}
|
|
}
|
|
|
|
func TestAccountRouteMappingsConfiguration(t *testing.T) {
|
|
cf := createConfFile(t, []byte(`
|
|
accounts {
|
|
synadia {
|
|
users = [{user: derek, password: foo}]
|
|
mappings = {
|
|
foo: bar
|
|
foo.*: [ { dest: bar.v1.$1, weight: 40% }, { destination: baz.v2.$1, weight: 20 } ]
|
|
bar.*.*: RAB.$2.$1
|
|
}
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
|
|
s, _ := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
// We test functionality above, so for this one just make sure we have mappings for the account.
|
|
acc, _ := s.LookupAccount("synadia")
|
|
if !acc.hasMappings() {
|
|
t.Fatalf("Account %q does not have mappings", "synadia")
|
|
}
|
|
|
|
az, err := s.Accountz(&AccountzOptions{"synadia"})
|
|
if err != nil {
|
|
t.Fatalf("Error getting Accountz: %v", err)
|
|
}
|
|
if az.Account == nil {
|
|
t.Fatalf("Expected an Account")
|
|
}
|
|
if len(az.Account.Mappings) != 3 {
|
|
t.Fatalf("Expected %d mappings, saw %d", 3, len(az.Account.Mappings))
|
|
}
|
|
}
|
|
|
|
func TestAccountRouteMappingsWithLossInjection(t *testing.T) {
|
|
cf := createConfFile(t, []byte(`
|
|
mappings = {
|
|
foo: { dest: foo, weight: 80% }
|
|
bar: { dest: bar, weight: 0% }
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
|
|
s, _ := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
|
|
total := 1000
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", nil)
|
|
}
|
|
nc.Flush()
|
|
|
|
if pending, _, _ := sub.Pending(); pending == total {
|
|
t.Fatalf("Expected some loss and pending to not be same as sent")
|
|
}
|
|
|
|
sub, _ = nc.SubscribeSync("bar")
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("bar", nil)
|
|
}
|
|
nc.Flush()
|
|
|
|
if pending, _, _ := sub.Pending(); pending != 0 {
|
|
t.Fatalf("Expected all messages to be dropped and pending to be 0, got %d", pending)
|
|
}
|
|
}
|
|
|
|
func TestAccountRouteMappingsWithOriginClusterFilter(t *testing.T) {
|
|
cf := createConfFile(t, []byte(`
|
|
mappings = {
|
|
foo: { dest: bar, cluster: SYN, weight: 100% }
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
|
|
s, _ := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
nc := natsConnect(t, s.ClientURL())
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
|
|
total := 1000
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", nil)
|
|
}
|
|
nc.Flush()
|
|
|
|
if pending, _, _ := sub.Pending(); pending != total {
|
|
t.Fatalf("Expected pending to be %d, got %d", total, pending)
|
|
}
|
|
|
|
s.setClusterName("SYN")
|
|
sub, _ = nc.SubscribeSync("bar")
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", nil)
|
|
}
|
|
nc.Flush()
|
|
|
|
if pending, _, _ := sub.Pending(); pending != total {
|
|
t.Fatalf("Expected pending to be %d, got %d", total, pending)
|
|
}
|
|
}
|
|
|
|
func TestAccountServiceImportWithRouteMappings(t *testing.T) {
|
|
cf := createConfFile(t, []byte(`
|
|
accounts {
|
|
foo {
|
|
users = [{user: derek, password: foo}]
|
|
exports = [{service: "request"}]
|
|
}
|
|
bar {
|
|
users = [{user: ivan, password: bar}]
|
|
imports = [{service: {account: "foo", subject:"request"}}]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
|
|
s, opts := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
acc, _ := s.LookupAccount("foo")
|
|
acc.AddMapping("request", "request.v2")
|
|
|
|
// Create the service client first.
|
|
ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port))
|
|
defer ncFoo.Close()
|
|
|
|
fooSub := natsSubSync(t, ncFoo, "request.v2")
|
|
ncFoo.Flush()
|
|
|
|
// Requestor
|
|
ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port))
|
|
defer ncBar.Close()
|
|
|
|
ncBar.Publish("request", nil)
|
|
ncBar.Flush()
|
|
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if n, _, _ := fooSub.Pending(); n != 1 {
|
|
return fmt.Errorf("Expected a request for %q, but got %d", fooSub.Subject, n)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestAccountImportsWithWildcardSupport(t *testing.T) {
|
|
cf := createConfFile(t, []byte(`
|
|
accounts {
|
|
foo {
|
|
users = [{user: derek, password: foo}]
|
|
exports = [
|
|
{ service: "request.*" }
|
|
{ stream: "events.>" }
|
|
{ stream: "info.*.*.>" }
|
|
]
|
|
}
|
|
bar {
|
|
users = [{user: ivan, password: bar}]
|
|
imports = [
|
|
{ service: {account: "foo", subject:"request.*"}, to:"my.request.*"}
|
|
{ stream: {account: "foo", subject:"events.>"}, to:"foo.events.>"}
|
|
{ stream: {account: "foo", subject:"info.*.*.>"}, to:"foo.info.$2.$1.>"}
|
|
]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
|
|
s, opts := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port))
|
|
defer ncFoo.Close()
|
|
|
|
ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port))
|
|
defer ncBar.Close()
|
|
|
|
// Create subscriber for the service endpoint in foo.
|
|
_, err := ncFoo.QueueSubscribe("request.*", "t22", func(m *nats.Msg) {
|
|
if m.Subject != "request.22" {
|
|
t.Fatalf("Expected literal subject for request, got %q", m.Subject)
|
|
}
|
|
m.Respond([]byte("yes!"))
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
ncFoo.Flush()
|
|
|
|
// Now test service import.
|
|
resp, err := ncBar.Request("my.request.22", []byte("yes?"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
if string(resp.Data) != "yes!" {
|
|
t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data)
|
|
}
|
|
|
|
// Now test stream imports.
|
|
esub, _ := ncBar.SubscribeSync("foo.events.*") // subset
|
|
isub, _ := ncBar.SubscribeSync("foo.info.>")
|
|
ncBar.Flush()
|
|
|
|
// Now publish some stream events.
|
|
ncFoo.Publish("events.22", nil)
|
|
ncFoo.Publish("info.11.22.bar", nil)
|
|
ncFoo.Flush()
|
|
|
|
checkPending := func(sub *nats.Subscription, expected int) {
|
|
t.Helper()
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if n, _, _ := sub.Pending(); n != expected {
|
|
return fmt.Errorf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
checkPending(esub, 1)
|
|
checkPending(isub, 1)
|
|
|
|
// Now check to make sure the subjects are correct etc.
|
|
m, err := esub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if m.Subject != "foo.events.22" {
|
|
t.Fatalf("Incorrect subject for stream import, expected %q, got %q", "foo.events.22", m.Subject)
|
|
}
|
|
|
|
m, err = isub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if m.Subject != "foo.info.22.11.bar" {
|
|
t.Fatalf("Incorrect subject for stream import, expected %q, got %q", "foo.info.22.11.bar", m.Subject)
|
|
}
|
|
}
|
|
|
|
// duplicates TestJWTAccountImportsWithWildcardSupport (jwt_test.go) in config
|
|
func TestAccountImportsWithWildcardSupportStreamAndService(t *testing.T) {
|
|
cf := createConfFile(t, []byte(`
|
|
accounts {
|
|
foo {
|
|
users = [{user: derek, password: foo}]
|
|
exports = [
|
|
{ service: "$request.*.$in.*.>" }
|
|
{ stream: "$events.*.$in.*.>" }
|
|
]
|
|
}
|
|
bar {
|
|
users = [{user: ivan, password: bar}]
|
|
imports = [
|
|
{ service: {account: "foo", subject:"$request.*.$in.*.>"}, to:"my.request.$2.$1.>"}
|
|
{ stream: {account: "foo", subject:"$events.*.$in.*.>"}, to:"my.events.$2.$1.>"}
|
|
]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, cf)
|
|
|
|
s, opts := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
|
|
ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port))
|
|
defer ncFoo.Close()
|
|
|
|
ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port))
|
|
defer ncBar.Close()
|
|
|
|
// Create subscriber for the service endpoint in foo.
|
|
_, err := ncFoo.Subscribe("$request.>", func(m *nats.Msg) {
|
|
if m.Subject != "$request.2.$in.1.bar" {
|
|
t.Fatalf("Expected literal subject for request, got %q", m.Subject)
|
|
}
|
|
m.Respond([]byte("yes!"))
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
ncFoo.Flush()
|
|
|
|
// Now test service import.
|
|
if resp, err := ncBar.Request("my.request.1.2.bar", []byte("yes?"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
} else if string(resp.Data) != "yes!" {
|
|
t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data)
|
|
}
|
|
subBar, err := ncBar.SubscribeSync("my.events.>")
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
ncBar.Flush()
|
|
|
|
ncFoo.Publish("$events.1.$in.2.bar", nil)
|
|
|
|
m, err := subBar.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
if m.Subject != "my.events.2.1.bar" {
|
|
t.Fatalf("Expected literal subject for request, got %q", m.Subject)
|
|
}
|
|
}
|
|
|
|
func BenchmarkNewRouteReply(b *testing.B) {
|
|
opts := defaultServerOptions
|
|
s := New(&opts)
|
|
g := s.globalAccount()
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
g.newServiceReply(false)
|
|
}
|
|
}
|
|
|
|
func TestSamplingHeader(t *testing.T) {
|
|
test := func(expectSampling bool, h http.Header) {
|
|
t.Helper()
|
|
b := strings.Builder{}
|
|
b.WriteString("\r\n") // simulate status line
|
|
h.Write(&b)
|
|
b.WriteString("\r\n")
|
|
hdrString := b.String()
|
|
c := &client{parseState: parseState{msgBuf: []byte(hdrString), pa: pubArg{hdr: len(hdrString)}}}
|
|
sample, hdr := shouldSample(&serviceLatency{0, "foo"}, c)
|
|
if expectSampling {
|
|
if !sample {
|
|
t.Fatal("Expected to sample")
|
|
} else if hdr == nil {
|
|
t.Fatal("Expected a header")
|
|
}
|
|
for k, v := range h {
|
|
if hdr.Get(k) != v[0] {
|
|
t.Fatal("Expect header to match")
|
|
}
|
|
}
|
|
} else {
|
|
if sample {
|
|
t.Fatal("Expected not to sample")
|
|
} else if hdr != nil {
|
|
t.Fatal("Expected no header")
|
|
}
|
|
}
|
|
}
|
|
|
|
test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:0"}})
|
|
test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:00"}}) // one byte encoded as two hex digits
|
|
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:1"}})
|
|
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:01"}})
|
|
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:5"}}) // debug and sample
|
|
test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:1"}})
|
|
test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}})
|
|
test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:0"}})
|
|
test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:0"}})
|
|
|
|
test(true, http.Header{"X-B3-Sampled": []string{"1"}})
|
|
test(false, http.Header{"X-B3-Sampled": []string{"0"}})
|
|
test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}) // decision left to recipient
|
|
test(false, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"0"}})
|
|
test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"1"}})
|
|
|
|
test(false, http.Header{"B3": []string{"0"}}) // deny only
|
|
test(false, http.Header{"B3": []string{"0-0-0-0"}})
|
|
test(false, http.Header{"B3": []string{"0-0-0"}})
|
|
test(true, http.Header{"B3": []string{"0-0-1-0"}})
|
|
test(true, http.Header{"B3": []string{"0-0-1"}})
|
|
test(true, http.Header{"B3": []string{"0-0-d"}}) // debug is not a deny
|
|
test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"}})
|
|
test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}})
|
|
test(false, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-0-05e3ac9a4f6e3b90"}})
|
|
|
|
test(true, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}})
|
|
test(false, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00"}})
|
|
}
|
|
|
|
func TestSubjectTransforms(t *testing.T) {
|
|
shouldErr := func(src, dest string) {
|
|
t.Helper()
|
|
if _, err := newTransform(src, dest); err != ErrBadSubject {
|
|
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
|
|
}
|
|
}
|
|
|
|
// Must be valid subjects.
|
|
shouldErr("foo", "")
|
|
shouldErr("foo..", "bar")
|
|
|
|
// Wildcards are allowed in src, but must be matched by token placements on the other side.
|
|
// e.g. foo.* -> bar.$1.
|
|
// Need to have as many pwcs as placements on other side.
|
|
shouldErr("foo.*", "bar.*")
|
|
shouldErr("foo.*", "bar.$2") // Bad pwc token identifier
|
|
shouldErr("foo.*", "bar.$1.>") // fwcs have to match.
|
|
shouldErr("foo.>", "bar.baz") // fwcs have to match.
|
|
shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.
|
|
|
|
shouldBeOK := func(src, dest string) *transform {
|
|
t.Helper()
|
|
tr, err := newTransform(src, dest)
|
|
if err != nil {
|
|
t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest)
|
|
}
|
|
return tr
|
|
}
|
|
|
|
shouldBeOK("foo", "bar")
|
|
shouldBeOK("foo.*.bar.*.baz", "req.$2.$1")
|
|
shouldBeOK("baz.>", "mybaz.>")
|
|
|
|
shouldMatch := func(src, dest, sample, expected string) {
|
|
t.Helper()
|
|
tr := shouldBeOK(src, dest)
|
|
s, err := tr.match(sample)
|
|
if err != nil {
|
|
t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected)
|
|
}
|
|
if s != expected {
|
|
t.Fatalf("Dest does not match what was expected. Got %q, expected %q", s, expected)
|
|
}
|
|
}
|
|
|
|
shouldMatch("foo", "bar", "foo", "bar")
|
|
shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A")
|
|
shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3")
|
|
shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3")
|
|
shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo")
|
|
}
|
|
|
|
func TestAccountSystemPermsWithGlobalAccess(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts {
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`))
|
|
defer removeFile(t, conf)
|
|
|
|
s, _ := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
// Make sure we can connect with no auth to global account as normal.
|
|
nc, err := nats.Connect(s.ClientURL())
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Make sure we can connect to the system account with correct credentials.
|
|
sc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create system client: %v", err)
|
|
}
|
|
defer sc.Close()
|
|
}
|