mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
1245 lines
34 KiB
Go
1245 lines
34 KiB
Go
// Copyright 2020-2022 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
//go:build !skip_js_tests
|
|
// +build !skip_js_tests
|
|
|
|
package server
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
jwt "github.com/nats-io/jwt/v2"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
)
|
|
|
|
func TestJetStreamLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) {
|
|
name := "NOT-UNIQUE"
|
|
test := func(s *Server, sIdExpected string, srvs ...*Server) {
|
|
ids := map[string]string{}
|
|
for _, srv := range srvs {
|
|
checkLeafNodeConnectedCount(t, srv, 2)
|
|
ids[srv.ID()] = srv.opts.JetStreamDomain
|
|
}
|
|
// ensure that an update for every server was received
|
|
sysNc := natsConnect(t, fmt.Sprintf("nats://admin:s3cr3t!@127.0.0.1:%d", s.opts.Port))
|
|
defer sysNc.Close()
|
|
sub, err := sysNc.SubscribeSync(fmt.Sprintf(serverStatsSubj, "*"))
|
|
require_NoError(t, err)
|
|
for {
|
|
m, err := sub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
tk := strings.Split(m.Subject, ".")
|
|
if domain, ok := ids[tk[2]]; ok {
|
|
delete(ids, tk[2])
|
|
require_Contains(t, string(m.Data), fmt.Sprintf(`"domain":"%s"`, domain))
|
|
}
|
|
if len(ids) == 0 {
|
|
break
|
|
}
|
|
}
|
|
cnt := 0
|
|
s.nodeToInfo.Range(func(key, value interface{}) bool {
|
|
cnt++
|
|
require_Equal(t, value.(nodeInfo).name, name)
|
|
require_Equal(t, value.(nodeInfo).id, sIdExpected)
|
|
return true
|
|
})
|
|
require_True(t, cnt == 1)
|
|
}
|
|
tmplA := `
|
|
listen: -1
|
|
server_name: %s
|
|
jetstream {
|
|
max_mem_store: 256MB,
|
|
max_file_store: 2GB,
|
|
store_dir: '%s',
|
|
domain: hub
|
|
}
|
|
accounts {
|
|
JSY { users = [ { user: "y", pass: "p" } ]; jetstream: true }
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
leaf {
|
|
port: -1
|
|
}
|
|
`
|
|
tmplL := `
|
|
listen: -1
|
|
server_name: %s
|
|
jetstream {
|
|
max_mem_store: 256MB,
|
|
max_file_store: 2GB,
|
|
store_dir: '%s',
|
|
domain: %s
|
|
}
|
|
accounts {
|
|
JSY { users = [ { user: "y", pass: "p" } ]; jetstream: true }
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
leaf {
|
|
remotes [
|
|
{ urls: [ %s ], account: "JSY" }
|
|
{ urls: [ %s ], account: "$SYS" }
|
|
]
|
|
}
|
|
`
|
|
t.Run("same-domain", func(t *testing.T) {
|
|
confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, name, t.TempDir())))
|
|
sA, oA := RunServerWithConfig(confA)
|
|
defer sA.Shutdown()
|
|
// using same domain as sA
|
|
confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, name, t.TempDir(), "hub",
|
|
fmt.Sprintf("nats://y:p@127.0.0.1:%d", oA.LeafNode.Port),
|
|
fmt.Sprintf("nats://admin:s3cr3t!@127.0.0.1:%d", oA.LeafNode.Port))))
|
|
sL, _ := RunServerWithConfig(confL)
|
|
defer sL.Shutdown()
|
|
// as server name uniqueness is violates, sL.ID() is the expected value
|
|
test(sA, sL.ID(), sA, sL)
|
|
})
|
|
t.Run("different-domain", func(t *testing.T) {
|
|
confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, name, t.TempDir())))
|
|
sA, oA := RunServerWithConfig(confA)
|
|
defer sA.Shutdown()
|
|
// using different domain as sA
|
|
confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, name, t.TempDir(), "spoke",
|
|
fmt.Sprintf("nats://y:p@127.0.0.1:%d", oA.LeafNode.Port),
|
|
fmt.Sprintf("nats://admin:s3cr3t!@127.0.0.1:%d", oA.LeafNode.Port))))
|
|
sL, _ := RunServerWithConfig(confL)
|
|
defer sL.Shutdown()
|
|
checkLeafNodeConnectedCount(t, sL, 2)
|
|
checkLeafNodeConnectedCount(t, sA, 2)
|
|
// ensure sA contains only sA.ID
|
|
test(sA, sA.ID(), sA, sL)
|
|
})
|
|
}
|
|
|
|
func TestJetStreamLeafNodeJwtPermsAndJSDomains(t *testing.T) {
|
|
createAcc := func(js bool) (string, string, nkeys.KeyPair) {
|
|
kp, _ := nkeys.CreateAccount()
|
|
aPub, _ := kp.PublicKey()
|
|
claim := jwt.NewAccountClaims(aPub)
|
|
if js {
|
|
claim.Limits.JetStreamLimits = jwt.JetStreamLimits{
|
|
MemoryStorage: 1024 * 1024,
|
|
DiskStorage: 1024 * 1024,
|
|
Streams: 1, Consumer: 2}
|
|
}
|
|
aJwt, err := claim.Encode(oKp)
|
|
require_NoError(t, err)
|
|
return aPub, aJwt, kp
|
|
}
|
|
sysPub, sysJwt, sysKp := createAcc(false)
|
|
accPub, accJwt, accKp := createAcc(true)
|
|
noExpiration := time.Now().Add(time.Hour)
|
|
// create user for acc to be used in leaf node.
|
|
lnCreds := createUserWithLimit(t, accKp, noExpiration, func(j *jwt.UserPermissionLimits) {
|
|
j.Sub.Deny.Add("subdeny")
|
|
j.Pub.Deny.Add("pubdeny")
|
|
})
|
|
unlimitedCreds := createUserWithLimit(t, accKp, noExpiration, nil)
|
|
|
|
sysCreds := createUserWithLimit(t, sysKp, noExpiration, nil)
|
|
|
|
tmplA := `
|
|
operator: %s
|
|
system_account: %s
|
|
resolver: MEMORY
|
|
resolver_preload: {
|
|
%s: %s
|
|
%s: %s
|
|
}
|
|
listen: 127.0.0.1:-1
|
|
leafnodes: {
|
|
listen: 127.0.0.1:-1
|
|
}
|
|
jetstream :{
|
|
domain: "cluster"
|
|
store_dir: '%s'
|
|
max_mem: 100Mb
|
|
max_file: 100Mb
|
|
}
|
|
`
|
|
|
|
tmplL := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account = SYS
|
|
jetstream: {
|
|
domain: ln1
|
|
store_dir: '%s'
|
|
max_mem: 50Mb
|
|
max_file: 50Mb
|
|
}
|
|
leafnodes:{
|
|
remotes:[{ url:nats://127.0.0.1:%d, account: A, credentials: '%s'},
|
|
{ url:nats://127.0.0.1:%d, account: SYS, credentials: '%s'}]
|
|
}
|
|
`
|
|
|
|
confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, ojwt, sysPub,
|
|
sysPub, sysJwt, accPub, accJwt,
|
|
t.TempDir())))
|
|
sA, _ := RunServerWithConfig(confA)
|
|
defer sA.Shutdown()
|
|
|
|
confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, t.TempDir(),
|
|
sA.opts.LeafNode.Port, lnCreds, sA.opts.LeafNode.Port, sysCreds)))
|
|
sL, _ := RunServerWithConfig(confL)
|
|
defer sL.Shutdown()
|
|
|
|
checkLeafNodeConnectedCount(t, sA, 2)
|
|
checkLeafNodeConnectedCount(t, sL, 2)
|
|
|
|
ncA := natsConnect(t, sA.ClientURL(), nats.UserCredentials(unlimitedCreds))
|
|
defer ncA.Close()
|
|
|
|
ncL := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sL.opts.Port))
|
|
defer ncL.Close()
|
|
|
|
test := func(subject string, cSub, cPub *nats.Conn, remoteServerForSub *Server, accName string, pass bool) {
|
|
t.Helper()
|
|
sub, err := cSub.SubscribeSync(subject)
|
|
require_NoError(t, err)
|
|
require_NoError(t, cSub.Flush())
|
|
// ensure the subscription made it across, or if not sent due to sub deny, make sure it could have made it.
|
|
if remoteServerForSub == nil {
|
|
time.Sleep(200 * time.Millisecond)
|
|
} else {
|
|
checkSubInterest(t, remoteServerForSub, accName, subject, time.Second)
|
|
}
|
|
require_NoError(t, cPub.Publish(subject, []byte("hello world")))
|
|
require_NoError(t, cPub.Flush())
|
|
m, err := sub.NextMsg(500 * time.Millisecond)
|
|
if pass {
|
|
require_NoError(t, err)
|
|
require_True(t, m.Subject == subject)
|
|
require_Equal(t, string(m.Data), "hello world")
|
|
} else {
|
|
require_True(t, err == nats.ErrTimeout)
|
|
}
|
|
}
|
|
|
|
t.Run("sub-on-ln-pass", func(t *testing.T) {
|
|
test("sub", ncL, ncA, sA, accPub, true)
|
|
})
|
|
t.Run("sub-on-ln-fail", func(t *testing.T) {
|
|
test("subdeny", ncL, ncA, nil, "", false)
|
|
})
|
|
t.Run("pub-on-ln-pass", func(t *testing.T) {
|
|
test("pub", ncA, ncL, sL, "A", true)
|
|
})
|
|
t.Run("pub-on-ln-fail", func(t *testing.T) {
|
|
test("pubdeny", ncA, ncL, nil, "A", false)
|
|
})
|
|
}
|
|
|
|
func TestJetStreamLeafNodeClusterExtensionWithSystemAccount(t *testing.T) {
|
|
/*
|
|
Topologies tested here
|
|
same == true
|
|
A <-> B
|
|
^ |\
|
|
| \
|
|
| proxy
|
|
| \
|
|
LA <-> LB
|
|
|
|
same == false
|
|
A <-> B
|
|
^ ^
|
|
| |
|
|
| proxy
|
|
| |
|
|
LA <-> LB
|
|
|
|
The proxy is turned on later, such that the system account connection can be started later, in a controlled way
|
|
This explicitly tests the system state before and after this happens.
|
|
*/
|
|
|
|
tmplA := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
leafnodes: {
|
|
listen: 127.0.0.1:-1
|
|
no_advertise: true
|
|
authorization: {
|
|
timeout: 0.5
|
|
}
|
|
}
|
|
jetstream :{
|
|
domain: "cluster"
|
|
store_dir: '%s'
|
|
max_mem: 100Mb
|
|
max_file: 100Mb
|
|
}
|
|
server_name: A
|
|
cluster: {
|
|
name: clust1
|
|
listen: 127.0.0.1:50554
|
|
routes=[nats-route://127.0.0.1:50555]
|
|
no_advertise: true
|
|
}
|
|
`
|
|
|
|
tmplB := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
leafnodes: {
|
|
listen: 127.0.0.1:-1
|
|
no_advertise: true
|
|
authorization: {
|
|
timeout: 0.5
|
|
}
|
|
}
|
|
jetstream: {
|
|
domain: "cluster"
|
|
store_dir: '%s'
|
|
max_mem: 100Mb
|
|
max_file: 100Mb
|
|
}
|
|
server_name: B
|
|
cluster: {
|
|
name: clust1
|
|
listen: 127.0.0.1:50555
|
|
routes=[nats-route://127.0.0.1:50554]
|
|
no_advertise: true
|
|
}
|
|
`
|
|
|
|
tmplLA := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account = SYS
|
|
jetstream: {
|
|
domain: "cluster"
|
|
store_dir: '%s'
|
|
max_mem: 50Mb
|
|
max_file: 50Mb
|
|
%s
|
|
}
|
|
server_name: LA
|
|
cluster: {
|
|
name: clustL
|
|
listen: 127.0.0.1:50556
|
|
routes=[nats-route://127.0.0.1:50557]
|
|
no_advertise: true
|
|
}
|
|
leafnodes:{
|
|
no_advertise: true
|
|
remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},
|
|
{url:nats://s1:s1@127.0.0.1:%d, account: SYS}]
|
|
}
|
|
`
|
|
|
|
tmplLB := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account = SYS
|
|
jetstream: {
|
|
domain: "cluster"
|
|
store_dir: '%s'
|
|
max_mem: 50Mb
|
|
max_file: 50Mb
|
|
%s
|
|
}
|
|
server_name: LB
|
|
cluster: {
|
|
name: clustL
|
|
listen: 127.0.0.1:50557
|
|
routes=[nats-route://127.0.0.1:50556]
|
|
no_advertise: true
|
|
}
|
|
leafnodes:{
|
|
no_advertise: true
|
|
remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},
|
|
{url:nats://s1:s1@127.0.0.1:%d, account: SYS}]
|
|
}
|
|
`
|
|
|
|
for _, testCase := range []struct {
|
|
// which topology to pick
|
|
same bool
|
|
// If leaf server should be operational and form a Js cluster prior to joining.
|
|
// In this setup this would be an error as you give the wrong hint.
|
|
// But this should work itself out regardless
|
|
leafFunctionPreJoin bool
|
|
}{
|
|
{true, true},
|
|
{true, false},
|
|
{false, true},
|
|
{false, false}} {
|
|
t.Run(fmt.Sprintf("%t-%t", testCase.same, testCase.leafFunctionPreJoin), func(t *testing.T) {
|
|
sd1 := t.TempDir()
|
|
confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, sd1)))
|
|
sA, _ := RunServerWithConfig(confA)
|
|
defer sA.Shutdown()
|
|
|
|
sd2 := t.TempDir()
|
|
confB := createConfFile(t, []byte(fmt.Sprintf(tmplB, sd2)))
|
|
sB, _ := RunServerWithConfig(confB)
|
|
defer sB.Shutdown()
|
|
|
|
checkClusterFormed(t, sA, sB)
|
|
|
|
c := cluster{t: t, servers: []*Server{sA, sB}}
|
|
c.waitOnLeader()
|
|
|
|
// starting this will allow the second remote in tmplL to successfully connect.
|
|
port := sB.opts.LeafNode.Port
|
|
if testCase.same {
|
|
port = sA.opts.LeafNode.Port
|
|
}
|
|
p := &proxyAcceptDetectFailureLate{acceptPort: port}
|
|
defer p.close()
|
|
lPort := p.runEx(t, true)
|
|
|
|
hint := ""
|
|
if testCase.leafFunctionPreJoin {
|
|
hint = fmt.Sprintf("extension_hint: %s", strings.ToUpper(jsNoExtend))
|
|
}
|
|
|
|
sd3 := t.TempDir()
|
|
// deliberately pick server sA and proxy
|
|
confLA := createConfFile(t, []byte(fmt.Sprintf(tmplLA, sd3, hint, sA.opts.LeafNode.Port, lPort)))
|
|
sLA, _ := RunServerWithConfig(confLA)
|
|
defer sLA.Shutdown()
|
|
|
|
sd4 := t.TempDir()
|
|
// deliberately pick server sA and proxy
|
|
confLB := createConfFile(t, []byte(fmt.Sprintf(tmplLB, sd4, hint, sA.opts.LeafNode.Port, lPort)))
|
|
sLB, _ := RunServerWithConfig(confLB)
|
|
defer sLB.Shutdown()
|
|
|
|
checkClusterFormed(t, sLA, sLB)
|
|
|
|
strmCfg := func(name, placementCluster string) *nats.StreamConfig {
|
|
if placementCluster == "" {
|
|
return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name}}
|
|
}
|
|
return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name},
|
|
Placement: &nats.Placement{Cluster: placementCluster}}
|
|
}
|
|
// Only after the system account is fully connected can streams be placed anywhere.
|
|
testJSFunctions := func(pass bool) {
|
|
ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sA.opts.Port))
|
|
defer ncA.Close()
|
|
jsA, err := ncA.JetStream()
|
|
require_NoError(t, err)
|
|
_, err = jsA.AddStream(strmCfg(fmt.Sprintf("fooA1-%t", pass), ""))
|
|
require_NoError(t, err)
|
|
_, err = jsA.AddStream(strmCfg(fmt.Sprintf("fooA2-%t", pass), "clust1"))
|
|
require_NoError(t, err)
|
|
_, err = jsA.AddStream(strmCfg(fmt.Sprintf("fooA3-%t", pass), "clustL"))
|
|
if pass {
|
|
require_NoError(t, err)
|
|
} else {
|
|
require_Error(t, err)
|
|
require_Contains(t, err.Error(), "no suitable peers for placement")
|
|
}
|
|
ncL := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sLA.opts.Port))
|
|
defer ncL.Close()
|
|
jsL, err := ncL.JetStream()
|
|
require_NoError(t, err)
|
|
_, err = jsL.AddStream(strmCfg(fmt.Sprintf("fooL1-%t", pass), ""))
|
|
require_NoError(t, err)
|
|
_, err = jsL.AddStream(strmCfg(fmt.Sprintf("fooL2-%t", pass), "clustL"))
|
|
require_NoError(t, err)
|
|
_, err = jsL.AddStream(strmCfg(fmt.Sprintf("fooL3-%t", pass), "clust1"))
|
|
if pass {
|
|
require_NoError(t, err)
|
|
} else {
|
|
require_Error(t, err)
|
|
require_Contains(t, err.Error(), "no suitable peers for placement")
|
|
}
|
|
}
|
|
clusterLnCnt := func(expected int) error {
|
|
cnt := 0
|
|
for _, s := range c.servers {
|
|
cnt += s.NumLeafNodes()
|
|
}
|
|
if cnt == expected {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("not enought leaf node connections, got %d needed %d", cnt, expected)
|
|
}
|
|
|
|
// Even though there are two remotes defined in tmplL, only one will be able to connect.
|
|
checkFor(t, 10*time.Second, time.Second/4, func() error { return clusterLnCnt(2) })
|
|
checkLeafNodeConnectedCount(t, sLA, 1)
|
|
checkLeafNodeConnectedCount(t, sLB, 1)
|
|
c.waitOnPeerCount(2)
|
|
|
|
if testCase.leafFunctionPreJoin {
|
|
cl := cluster{t: t, servers: []*Server{sLA, sLB}}
|
|
cl.waitOnLeader()
|
|
cl.waitOnPeerCount(2)
|
|
testJSFunctions(false)
|
|
} else {
|
|
// In cases where the leaf nodes have to wait for the system account to connect,
|
|
// JetStream should not be operational during that time
|
|
ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sLA.opts.Port))
|
|
defer ncA.Close()
|
|
jsA, err := ncA.JetStream()
|
|
require_NoError(t, err)
|
|
_, err = jsA.AddStream(strmCfg("fail-false", ""))
|
|
require_Error(t, err)
|
|
}
|
|
// Starting the proxy will connect the system accounts.
|
|
// After they are connected the clusters are merged.
|
|
// Once this happened, all streams in test can be placed anywhere in the cluster.
|
|
// Before that only the cluster the client is connected to can be used for placement
|
|
p.start()
|
|
|
|
// Even though there are two remotes defined in tmplL, only one will be able to connect.
|
|
checkFor(t, 10*time.Second, time.Second/4, func() error { return clusterLnCnt(4) })
|
|
checkLeafNodeConnectedCount(t, sLA, 2)
|
|
checkLeafNodeConnectedCount(t, sLB, 2)
|
|
|
|
// The leader will reside in the main cluster only
|
|
c.waitOnPeerCount(4)
|
|
testJSFunctions(true)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestJetStreamLeafNodeClusterMixedModeExtensionWithSystemAccount(t *testing.T) {
|
|
/* Topology used in this test:
|
|
CLUSTER(A <-> B <-> C (NO JS))
|
|
^
|
|
|
|
|
LA
|
|
*/
|
|
|
|
// once every server is up, we expect these peers to be part of the JetStream meta cluster
|
|
expectedJetStreamPeers := map[string]struct{}{
|
|
"A": {},
|
|
"B": {},
|
|
"LA": {},
|
|
}
|
|
|
|
tmplA := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
leafnodes: {
|
|
listen: 127.0.0.1:-1
|
|
no_advertise: true
|
|
authorization: {
|
|
timeout: 0.5
|
|
}
|
|
}
|
|
jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb }
|
|
server_name: A
|
|
cluster: {
|
|
name: clust1
|
|
listen: 127.0.0.1:50554
|
|
routes=[nats-route://127.0.0.1:50555,nats-route://127.0.0.1:50556]
|
|
no_advertise: true
|
|
}
|
|
`
|
|
|
|
tmplB := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
leafnodes: {
|
|
listen: 127.0.0.1:-1
|
|
no_advertise: true
|
|
authorization: {
|
|
timeout: 0.5
|
|
}
|
|
}
|
|
jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb }
|
|
server_name: B
|
|
cluster: {
|
|
name: clust1
|
|
listen: 127.0.0.1:50555
|
|
routes=[nats-route://127.0.0.1:50554,nats-route://127.0.0.1:50556]
|
|
no_advertise: true
|
|
}
|
|
`
|
|
|
|
tmplC := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
leafnodes: {
|
|
listen: 127.0.0.1:-1
|
|
no_advertise: true
|
|
authorization: {
|
|
timeout: 0.5
|
|
}
|
|
}
|
|
jetstream: {
|
|
enabled: false
|
|
%s
|
|
}
|
|
server_name: C
|
|
cluster: {
|
|
name: clust1
|
|
listen: 127.0.0.1:50556
|
|
routes=[nats-route://127.0.0.1:50554,nats-route://127.0.0.1:50555]
|
|
no_advertise: true
|
|
}
|
|
`
|
|
|
|
tmplLA := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account = SYS
|
|
# the extension hint is to simplify this test. without it present we would need a cluster of size 2
|
|
jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb, extension_hint: will_extend }
|
|
server_name: LA
|
|
leafnodes:{
|
|
no_advertise: true
|
|
remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},
|
|
{url:nats://s1:s1@127.0.0.1:%d, account: SYS}]
|
|
}
|
|
# add the cluster here so we can test placement
|
|
cluster: { name: clustL }
|
|
`
|
|
for _, withDomain := range []bool{true, false} {
|
|
t.Run(fmt.Sprintf("with-domain:%t", withDomain), func(t *testing.T) {
|
|
var jsDisabledDomainString string
|
|
var jsEnabledDomainString string
|
|
if withDomain {
|
|
jsEnabledDomainString = `domain: "domain", `
|
|
jsDisabledDomainString = `domain: "domain"`
|
|
} else {
|
|
// in case no domain name is set, fall back to the extension hint.
|
|
// since JS is disabled, the value of this does not clash with other uses.
|
|
jsDisabledDomainString = "extension_hint: will_extend"
|
|
}
|
|
|
|
sd1 := t.TempDir()
|
|
confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, jsEnabledDomainString, sd1)))
|
|
sA, _ := RunServerWithConfig(confA)
|
|
defer sA.Shutdown()
|
|
|
|
sd2 := t.TempDir()
|
|
confB := createConfFile(t, []byte(fmt.Sprintf(tmplB, jsEnabledDomainString, sd2)))
|
|
sB, _ := RunServerWithConfig(confB)
|
|
defer sB.Shutdown()
|
|
|
|
confC := createConfFile(t, []byte(fmt.Sprintf(tmplC, jsDisabledDomainString)))
|
|
sC, _ := RunServerWithConfig(confC)
|
|
defer sC.Shutdown()
|
|
|
|
checkClusterFormed(t, sA, sB, sC)
|
|
c := cluster{t: t, servers: []*Server{sA, sB, sC}}
|
|
c.waitOnPeerCount(2)
|
|
|
|
sd3 := t.TempDir()
|
|
// deliberately pick server sC (no JS) to connect to
|
|
confLA := createConfFile(t, []byte(fmt.Sprintf(tmplLA, jsEnabledDomainString, sd3, sC.opts.LeafNode.Port, sC.opts.LeafNode.Port)))
|
|
sLA, _ := RunServerWithConfig(confLA)
|
|
defer sLA.Shutdown()
|
|
|
|
checkLeafNodeConnectedCount(t, sC, 2)
|
|
checkLeafNodeConnectedCount(t, sLA, 2)
|
|
c.waitOnPeerCount(3)
|
|
peers := c.leader().JetStreamClusterPeers()
|
|
for _, peer := range peers {
|
|
if _, ok := expectedJetStreamPeers[peer]; !ok {
|
|
t.Fatalf("Found unexpected peer %q", peer)
|
|
}
|
|
}
|
|
|
|
// helper to create stream config with uniqe name and subject
|
|
cnt := 0
|
|
strmCfg := func(placementCluster string) *nats.StreamConfig {
|
|
name := fmt.Sprintf("s-%d", cnt)
|
|
cnt++
|
|
if placementCluster == "" {
|
|
return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name}}
|
|
}
|
|
return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name},
|
|
Placement: &nats.Placement{Cluster: placementCluster}}
|
|
}
|
|
|
|
test := func(port int, expectedDefPlacement string) {
|
|
ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", port))
|
|
defer ncA.Close()
|
|
jsA, err := ncA.JetStream()
|
|
require_NoError(t, err)
|
|
si, err := jsA.AddStream(strmCfg(""))
|
|
require_NoError(t, err)
|
|
require_Contains(t, si.Cluster.Name, expectedDefPlacement)
|
|
si, err = jsA.AddStream(strmCfg("clust1"))
|
|
require_NoError(t, err)
|
|
require_Contains(t, si.Cluster.Name, "clust1")
|
|
si, err = jsA.AddStream(strmCfg("clustL"))
|
|
require_NoError(t, err)
|
|
require_Contains(t, si.Cluster.Name, "clustL")
|
|
}
|
|
|
|
test(sA.opts.Port, "clust1")
|
|
test(sB.opts.Port, "clust1")
|
|
test(sC.opts.Port, "clust1")
|
|
test(sLA.opts.Port, "clustL")
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestJetStreamLeafNodeCredsDenies(t *testing.T) {
|
|
tmplL := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account = SYS
|
|
jetstream: {
|
|
domain: "cluster"
|
|
store_dir: '%s'
|
|
max_mem: 50Mb
|
|
max_file: 50Mb
|
|
}
|
|
leafnodes:{
|
|
remotes:[{url:nats://a1:a1@127.0.0.1:50555, account: A, credentials: '%s' },
|
|
{url:nats://s1:s1@127.0.0.1:50555, account: SYS, credentials: '%s', deny_imports: foo, deny_exports: bar}]
|
|
}
|
|
`
|
|
akp, err := nkeys.CreateAccount()
|
|
require_NoError(t, err)
|
|
creds := createUserWithLimit(t, akp, time.Time{}, func(pl *jwt.UserPermissionLimits) {
|
|
pl.Pub.Deny.Add(jsAllAPI)
|
|
pl.Sub.Deny.Add(jsAllAPI)
|
|
})
|
|
|
|
sd := t.TempDir()
|
|
|
|
confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, sd, creds, creds)))
|
|
opts := LoadConfig(confL)
|
|
sL, err := NewServer(opts)
|
|
require_NoError(t, err)
|
|
|
|
l := captureNoticeLogger{}
|
|
sL.SetLogger(&l, false, false)
|
|
|
|
go sL.Start()
|
|
defer sL.Shutdown()
|
|
|
|
// wait till the notices got printed
|
|
UNTIL_READY:
|
|
for {
|
|
<-time.After(50 * time.Millisecond)
|
|
l.Lock()
|
|
for _, n := range l.notices {
|
|
if strings.Contains(n, "Server is ready") {
|
|
l.Unlock()
|
|
break UNTIL_READY
|
|
}
|
|
}
|
|
l.Unlock()
|
|
}
|
|
|
|
l.Lock()
|
|
cnt := 0
|
|
for _, n := range l.notices {
|
|
if strings.Contains(n, "LeafNode Remote for Account A uses credentials file") ||
|
|
strings.Contains(n, "LeafNode Remote for System Account uses") ||
|
|
strings.Contains(n, "Remote for System Account uses restricted export permissions") ||
|
|
strings.Contains(n, "Remote for System Account uses restricted import permissions") {
|
|
cnt++
|
|
}
|
|
}
|
|
l.Unlock()
|
|
require_True(t, cnt == 4)
|
|
}
|
|
|
|
func TestJetStreamLeafNodeDefaultDomainCfg(t *testing.T) {
|
|
tmplHub := `
|
|
listen: 127.0.0.1:%d
|
|
accounts :{
|
|
A:{ jetstream: %s, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
jetstream : %s
|
|
server_name: HUB
|
|
leafnodes: {
|
|
listen: 127.0.0.1:%d
|
|
}
|
|
%s
|
|
`
|
|
|
|
tmplL := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
jetstream: { domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }
|
|
server_name: LEAF
|
|
leafnodes: {
|
|
remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},%s]
|
|
}
|
|
%s
|
|
`
|
|
|
|
test := func(domain string, sysShared bool) {
|
|
confHub := createConfFile(t, []byte(fmt.Sprintf(tmplHub, -1, "disabled", "disabled", -1, "")))
|
|
sHub, _ := RunServerWithConfig(confHub)
|
|
defer sHub.Shutdown()
|
|
|
|
noDomainFix := ""
|
|
if domain == _EMPTY_ {
|
|
noDomainFix = `default_js_domain:{A:""}`
|
|
}
|
|
|
|
sys := ""
|
|
if sysShared {
|
|
sys = fmt.Sprintf(`{url:nats://s1:s1@127.0.0.1:%d, account: SYS}`, sHub.opts.LeafNode.Port)
|
|
}
|
|
|
|
sdLeaf := t.TempDir()
|
|
confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, domain, sdLeaf, sHub.opts.LeafNode.Port, sys, noDomainFix)))
|
|
sLeaf, _ := RunServerWithConfig(confL)
|
|
defer sLeaf.Shutdown()
|
|
|
|
lnCnt := 1
|
|
if sysShared {
|
|
lnCnt++
|
|
}
|
|
|
|
checkLeafNodeConnectedCount(t, sHub, lnCnt)
|
|
checkLeafNodeConnectedCount(t, sLeaf, lnCnt)
|
|
|
|
ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sHub.opts.Port))
|
|
defer ncA.Close()
|
|
jsA, err := ncA.JetStream()
|
|
require_NoError(t, err)
|
|
|
|
_, err = jsA.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}})
|
|
require_True(t, err == nats.ErrNoResponders)
|
|
|
|
// Add in default domain and restart server
|
|
require_NoError(t, os.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub,
|
|
sHub.opts.Port,
|
|
"disabled",
|
|
"disabled",
|
|
sHub.opts.LeafNode.Port,
|
|
fmt.Sprintf(`default_js_domain: {A:"%s"}`, domain))), 0664))
|
|
|
|
sHub.Shutdown()
|
|
sHub.WaitForShutdown()
|
|
checkLeafNodeConnectedCount(t, sLeaf, 0)
|
|
sHubUpd1, _ := RunServerWithConfig(confHub)
|
|
defer sHubUpd1.Shutdown()
|
|
|
|
checkLeafNodeConnectedCount(t, sHubUpd1, lnCnt)
|
|
checkLeafNodeConnectedCount(t, sLeaf, lnCnt)
|
|
|
|
_, err = jsA.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}})
|
|
require_NoError(t, err)
|
|
|
|
// Enable jetstream in hub.
|
|
sdHub := t.TempDir()
|
|
jsEnabled := fmt.Sprintf(`{ domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }`, domain, sdHub)
|
|
require_NoError(t, os.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub,
|
|
sHubUpd1.opts.Port,
|
|
"disabled",
|
|
jsEnabled,
|
|
sHubUpd1.opts.LeafNode.Port,
|
|
fmt.Sprintf(`default_js_domain: {A:"%s"}`, domain))), 0664))
|
|
|
|
sHubUpd1.Shutdown()
|
|
sHubUpd1.WaitForShutdown()
|
|
checkLeafNodeConnectedCount(t, sLeaf, 0)
|
|
sHubUpd2, _ := RunServerWithConfig(confHub)
|
|
defer sHubUpd2.Shutdown()
|
|
|
|
checkLeafNodeConnectedCount(t, sHubUpd2, lnCnt)
|
|
checkLeafNodeConnectedCount(t, sLeaf, lnCnt)
|
|
|
|
_, err = jsA.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, Subjects: []string{"bar"}})
|
|
require_NoError(t, err)
|
|
|
|
// Enable jetstream in account A of hub
|
|
// This is a mis config, as you can't have it both ways, local jetstream but default to another one
|
|
require_NoError(t, os.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub,
|
|
sHubUpd2.opts.Port,
|
|
"enabled",
|
|
jsEnabled,
|
|
sHubUpd2.opts.LeafNode.Port,
|
|
fmt.Sprintf(`default_js_domain: {A:"%s"}`, domain))), 0664))
|
|
|
|
if domain != _EMPTY_ {
|
|
// in case no domain name exists there are no additional guard rails, hence no error
|
|
// It is the users responsibility to get this edge case right
|
|
sHubUpd2.Shutdown()
|
|
sHubUpd2.WaitForShutdown()
|
|
checkLeafNodeConnectedCount(t, sLeaf, 0)
|
|
sHubUpd3, err := NewServer(LoadConfig(confHub))
|
|
sHubUpd3.Shutdown()
|
|
|
|
require_Error(t, err)
|
|
require_Contains(t, err.Error(), `default_js_domain contains account name "A" with enabled JetStream`)
|
|
}
|
|
}
|
|
|
|
t.Run("with-domain-sys", func(t *testing.T) {
|
|
test("domain", true)
|
|
})
|
|
t.Run("with-domain-nosys", func(t *testing.T) {
|
|
test("domain", false)
|
|
})
|
|
t.Run("no-domain", func(t *testing.T) {
|
|
test("", true)
|
|
})
|
|
t.Run("no-domain", func(t *testing.T) {
|
|
test("", false)
|
|
})
|
|
}
|
|
|
|
func TestJetStreamLeafNodeDefaultDomainJwtExplicit(t *testing.T) {
|
|
tmplHub := `
|
|
listen: 127.0.0.1:%d
|
|
operator: %s
|
|
system_account: %s
|
|
resolver: MEM
|
|
resolver_preload: {
|
|
%s:%s
|
|
%s:%s
|
|
}
|
|
jetstream : disabled
|
|
server_name: HUB
|
|
leafnodes: {
|
|
listen: 127.0.0.1:%d
|
|
}
|
|
%s
|
|
`
|
|
|
|
tmplL := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
|
|
SYS:{ users:[ {user:s1,password:s1}]},
|
|
}
|
|
system_account: SYS
|
|
jetstream: { domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }
|
|
server_name: LEAF
|
|
leafnodes: {
|
|
remotes:[{url:nats://127.0.0.1:%d, account: A, credentials: '%s'},
|
|
{url:nats://127.0.0.1:%d, account: SYS, credentials: '%s'}]
|
|
}
|
|
%s
|
|
`
|
|
|
|
test := func(domain string) {
|
|
noDomainFix := ""
|
|
if domain == _EMPTY_ {
|
|
noDomainFix = `default_js_domain:{A:""}`
|
|
}
|
|
|
|
sysKp, syspub := createKey(t)
|
|
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
|
|
sysCreds := newUser(t, sysKp)
|
|
|
|
aKp, aPub := createKey(t)
|
|
aClaim := jwt.NewAccountClaims(aPub)
|
|
aJwt := encodeClaim(t, aClaim, aPub)
|
|
aCreds := newUser(t, aKp)
|
|
|
|
confHub := createConfFile(t, []byte(fmt.Sprintf(tmplHub, -1, ojwt, syspub, syspub, sysJwt, aPub, aJwt, -1, "")))
|
|
sHub, _ := RunServerWithConfig(confHub)
|
|
defer sHub.Shutdown()
|
|
|
|
sdLeaf := t.TempDir()
|
|
confL := createConfFile(t, []byte(fmt.Sprintf(tmplL,
|
|
domain,
|
|
sdLeaf,
|
|
sHub.opts.LeafNode.Port,
|
|
aCreds,
|
|
sHub.opts.LeafNode.Port,
|
|
sysCreds,
|
|
noDomainFix)))
|
|
sLeaf, _ := RunServerWithConfig(confL)
|
|
defer sLeaf.Shutdown()
|
|
|
|
checkLeafNodeConnectedCount(t, sHub, 2)
|
|
checkLeafNodeConnectedCount(t, sLeaf, 2)
|
|
|
|
ncA := natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", sHub.opts.Port), createUserCreds(t, nil, aKp))
|
|
defer ncA.Close()
|
|
jsA, err := ncA.JetStream()
|
|
require_NoError(t, err)
|
|
|
|
_, err = jsA.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}})
|
|
require_True(t, err == nats.ErrNoResponders)
|
|
|
|
// Add in default domain and restart server
|
|
require_NoError(t, os.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub,
|
|
sHub.opts.Port, ojwt, syspub, syspub, sysJwt, aPub, aJwt, sHub.opts.LeafNode.Port,
|
|
fmt.Sprintf(`default_js_domain: {%s:"%s"}`, aPub, domain))), 0664))
|
|
|
|
sHub.Shutdown()
|
|
sHub.WaitForShutdown()
|
|
checkLeafNodeConnectedCount(t, sLeaf, 0)
|
|
sHubUpd1, _ := RunServerWithConfig(confHub)
|
|
defer sHubUpd1.Shutdown()
|
|
|
|
checkLeafNodeConnectedCount(t, sHubUpd1, 2)
|
|
checkLeafNodeConnectedCount(t, sLeaf, 2)
|
|
|
|
_, err = jsA.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, Subjects: []string{"bar"}})
|
|
require_NoError(t, err)
|
|
}
|
|
t.Run("with-domain", func(t *testing.T) {
|
|
test("domain")
|
|
})
|
|
t.Run("no-domain", func(t *testing.T) {
|
|
test("")
|
|
})
|
|
}
|
|
|
|
func TestJetStreamLeafNodeDefaultDomainClusterBothEnds(t *testing.T) {
|
|
// test to ensure that default domain functions when both ends of the leaf node connection are clusters
|
|
tmplHub1 := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enabled, users:[ {user:a1,password:a1}]},
|
|
B:{ jetstream: enabled, users:[ {user:b1,password:b1}]}
|
|
}
|
|
jetstream : { domain: "DHUB", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }
|
|
server_name: HUB1
|
|
cluster: {
|
|
name: HUB
|
|
listen: 127.0.0.1:50554
|
|
routes=[nats-route://127.0.0.1:50555]
|
|
}
|
|
leafnodes: {
|
|
listen:127.0.0.1:-1
|
|
}
|
|
`
|
|
|
|
tmplHub2 := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enabled, users:[ {user:a1,password:a1}]},
|
|
B:{ jetstream: enabled, users:[ {user:b1,password:b1}]}
|
|
}
|
|
jetstream : { domain: "DHUB", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }
|
|
server_name: HUB2
|
|
cluster: {
|
|
name: HUB
|
|
listen: 127.0.0.1:50555
|
|
routes=[nats-route://127.0.0.1:50554]
|
|
}
|
|
leafnodes: {
|
|
listen:127.0.0.1:-1
|
|
}
|
|
`
|
|
|
|
tmplL1 := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enabled, users:[ {user:a1,password:a1}]},
|
|
B:{ jetstream: disabled, users:[ {user:b1,password:b1}]}
|
|
}
|
|
jetstream: { domain: "DLEAF", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }
|
|
server_name: LEAF1
|
|
cluster: {
|
|
name: LEAF
|
|
listen: 127.0.0.1:50556
|
|
routes=[nats-route://127.0.0.1:50557]
|
|
}
|
|
leafnodes: {
|
|
remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},{url:nats://b1:b1@127.0.0.1:%d, account: B}]
|
|
}
|
|
default_js_domain: {B:"DHUB"}
|
|
`
|
|
|
|
tmplL2 := `
|
|
listen: 127.0.0.1:-1
|
|
accounts :{
|
|
A:{ jetstream: enabled, users:[ {user:a1,password:a1}]},
|
|
B:{ jetstream: disabled, users:[ {user:b1,password:b1}]}
|
|
}
|
|
jetstream: { domain: "DLEAF", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }
|
|
server_name: LEAF2
|
|
cluster: {
|
|
name: LEAF
|
|
listen: 127.0.0.1:50557
|
|
routes=[nats-route://127.0.0.1:50556]
|
|
}
|
|
leafnodes: {
|
|
remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},{url:nats://b1:b1@127.0.0.1:%d, account: B}]
|
|
}
|
|
default_js_domain: {B:"DHUB"}
|
|
`
|
|
|
|
sd1 := t.TempDir()
|
|
confHub1 := createConfFile(t, []byte(fmt.Sprintf(tmplHub1, sd1)))
|
|
sHub1, _ := RunServerWithConfig(confHub1)
|
|
defer sHub1.Shutdown()
|
|
|
|
sd2 := t.TempDir()
|
|
confHub2 := createConfFile(t, []byte(fmt.Sprintf(tmplHub2, sd2)))
|
|
sHub2, _ := RunServerWithConfig(confHub2)
|
|
defer sHub2.Shutdown()
|
|
|
|
checkClusterFormed(t, sHub1, sHub2)
|
|
c1 := cluster{t: t, servers: []*Server{sHub1, sHub2}}
|
|
c1.waitOnPeerCount(2)
|
|
|
|
sd3 := t.TempDir()
|
|
confLeaf1 := createConfFile(t, []byte(fmt.Sprintf(tmplL1, sd3, sHub1.getOpts().LeafNode.Port, sHub1.getOpts().LeafNode.Port)))
|
|
sLeaf1, _ := RunServerWithConfig(confLeaf1)
|
|
defer sLeaf1.Shutdown()
|
|
|
|
confLeaf2 := createConfFile(t, []byte(fmt.Sprintf(tmplL2, sd3, sHub1.getOpts().LeafNode.Port, sHub1.getOpts().LeafNode.Port)))
|
|
sLeaf2, _ := RunServerWithConfig(confLeaf2)
|
|
defer sLeaf2.Shutdown()
|
|
|
|
checkClusterFormed(t, sLeaf1, sLeaf2)
|
|
c2 := cluster{t: t, servers: []*Server{sLeaf1, sLeaf2}}
|
|
c2.waitOnPeerCount(2)
|
|
|
|
checkLeafNodeConnectedCount(t, sHub1, 4)
|
|
checkLeafNodeConnectedCount(t, sLeaf1, 2)
|
|
checkLeafNodeConnectedCount(t, sLeaf2, 2)
|
|
|
|
ncB := natsConnect(t, fmt.Sprintf("nats://b1:b1@127.0.0.1:%d", sLeaf1.getOpts().Port))
|
|
defer ncB.Close()
|
|
jsB1, err := ncB.JetStream()
|
|
require_NoError(t, err)
|
|
si, err := jsB1.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}})
|
|
require_NoError(t, err)
|
|
require_Equal(t, si.Cluster.Name, "HUB")
|
|
|
|
jsB2, err := ncB.JetStream(nats.Domain("DHUB"))
|
|
require_NoError(t, err)
|
|
si, err = jsB2.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, Subjects: []string{"bar"}})
|
|
require_NoError(t, err)
|
|
require_Equal(t, si.Cluster.Name, "HUB")
|
|
}
|
|
|
|
func TestJetStreamLeafNodeSvcImportExportCycle(t *testing.T) {
|
|
accounts := `
|
|
accounts {
|
|
SYS: {
|
|
users: [{user: admin, password: admin}]
|
|
}
|
|
LEAF_USER: {
|
|
users: [{user: leaf_user, password: leaf_user}]
|
|
imports: [
|
|
{service: {account: LEAF_INGRESS, subject: "foo"}}
|
|
{service: {account: LEAF_INGRESS, subject: "_INBOX.>"}}
|
|
{service: {account: LEAF_INGRESS, subject: "$JS.leaf.API.>"}, to: "JS.leaf_ingress@leaf.API.>" }
|
|
]
|
|
jetstream: enabled
|
|
}
|
|
LEAF_INGRESS: {
|
|
users: [{user: leaf_ingress, password: leaf_ingress}]
|
|
exports: [
|
|
{service: "foo", accounts: [LEAF_USER]}
|
|
{service: "_INBOX.>", accounts: [LEAF_USER]}
|
|
{service: "$JS.leaf.API.>", response_type: "stream", accounts: [LEAF_USER]}
|
|
]
|
|
imports: [
|
|
]
|
|
jetstream: enabled
|
|
}
|
|
}
|
|
system_account: SYS
|
|
`
|
|
|
|
hconf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
%s
|
|
listen: "127.0.0.1:-1"
|
|
leafnodes {
|
|
listen: "127.0.0.1:-1"
|
|
}
|
|
`, accounts)))
|
|
defer os.Remove(hconf)
|
|
s, o := RunServerWithConfig(hconf)
|
|
defer s.Shutdown()
|
|
|
|
lconf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
%s
|
|
server_name: leaf-server
|
|
jetstream {
|
|
store_dir: '%s'
|
|
domain=leaf
|
|
}
|
|
|
|
listen: "127.0.0.1:-1"
|
|
leafnodes {
|
|
remotes = [
|
|
{
|
|
urls: ["nats-leaf://leaf_ingress:leaf_ingress@127.0.0.1:%v"]
|
|
account: "LEAF_INGRESS"
|
|
}
|
|
]
|
|
}
|
|
`, accounts, t.TempDir(), o.LeafNode.Port)))
|
|
defer os.Remove(lconf)
|
|
sl, so := RunServerWithConfig(lconf)
|
|
defer sl.Shutdown()
|
|
|
|
checkLeafNodeConnected(t, sl)
|
|
|
|
nc := natsConnect(t, fmt.Sprintf("nats://leaf_user:leaf_user@127.0.0.1:%v", so.Port))
|
|
defer nc.Close()
|
|
|
|
js, _ := nc.JetStream(nats.APIPrefix("JS.leaf_ingress@leaf.API."))
|
|
|
|
_, err := js.AddStream(&nats.StreamConfig{
|
|
Name: "TEST",
|
|
Subjects: []string{"foo"},
|
|
Storage: nats.FileStorage,
|
|
})
|
|
require_NoError(t, err)
|
|
|
|
_, err = js.Publish("foo", []byte("msg"))
|
|
require_NoError(t, err)
|
|
}
|