mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Add LeafNode import/export test with routes
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -22,9 +22,11 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/gnatsd/logger"
|
||||
"github.com/nats-io/gnatsd/server"
|
||||
"github.com/nats-io/go-nats"
|
||||
"github.com/nats-io/jwt"
|
||||
@@ -1039,6 +1041,7 @@ func TestLeafNodeExportsImports(t *testing.T) {
|
||||
s, opts, conf := runLeafNodeOperatorServer(t)
|
||||
defer os.Remove(conf)
|
||||
defer s.Shutdown()
|
||||
s.SetLogger(logger.NewTestLogger("[S ] ", true), true, true)
|
||||
|
||||
// Setup the two accounts for this server.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
@@ -1093,6 +1096,7 @@ func TestLeafNodeExportsImports(t *testing.T) {
|
||||
sl, lopts, lnconf := runSolicitWithCredentials(t, opts, mycreds)
|
||||
defer os.Remove(lnconf)
|
||||
defer sl.Shutdown()
|
||||
sl.SetLogger(logger.NewTestLogger("[LN] ", true), true, true)
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
|
||||
@@ -1153,6 +1157,175 @@ func TestLeafNodeExportsImports(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeadNodeExportImportComplexSetup(t *testing.T) {
|
||||
content := `
|
||||
port: -1
|
||||
operator = "./configs/nkeys/op.jwt"
|
||||
resolver = MEMORY
|
||||
cluster {
|
||||
port: -1
|
||||
}
|
||||
leafnodes {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
`
|
||||
conf := createConfFile(t, []byte(content))
|
||||
defer os.Remove(conf)
|
||||
s1, s1Opts := RunServerWithConfig(conf)
|
||||
defer s1.Shutdown()
|
||||
s1.SetLogger(logger.NewTestLogger("[S1] ", true), true, true)
|
||||
|
||||
content = fmt.Sprintf(`
|
||||
port: -1
|
||||
operator = "./configs/nkeys/op.jwt"
|
||||
resolver = MEMORY
|
||||
cluster {
|
||||
port: -1
|
||||
routes: ["nats://%s:%d"]
|
||||
}
|
||||
leafnodes {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
`, s1Opts.Cluster.Host, s1Opts.Cluster.Port)
|
||||
conf = createConfFile(t, []byte(content))
|
||||
s2, s2Opts := RunServerWithConfig(conf)
|
||||
defer s2.Shutdown()
|
||||
s2.SetLogger(logger.NewTestLogger("[S2] ", true), true, true)
|
||||
|
||||
// Setup the two accounts for this server.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create second account with exports
|
||||
acc2, akp2 := createAccount(t, s1)
|
||||
akp2Pub, _ := akp2.PublicKey()
|
||||
akp2AC := jwt.NewAccountClaims(akp2Pub)
|
||||
streamExport := &jwt.Export{Subject: "foo.stream", Type: jwt.Stream}
|
||||
serviceExport := &jwt.Export{Subject: "req.echo", Type: jwt.Service}
|
||||
akp2AC.Exports.Add(streamExport, serviceExport)
|
||||
akp2ACJWT, err := akp2AC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
if err := s1.AccountResolver().Store(akp2Pub, akp2ACJWT); err != nil {
|
||||
t.Fatalf("Account Resolver returned an error: %v", err)
|
||||
}
|
||||
s1.UpdateAccountClaims(acc2, akp2AC)
|
||||
|
||||
// Now create the first account and add on the imports. This will be what is used in the leafnode.
|
||||
acc1, akp1 := createAccount(t, s1)
|
||||
akp1Pub, _ := akp1.PublicKey()
|
||||
akp1AC := jwt.NewAccountClaims(akp1Pub)
|
||||
streamImport := &jwt.Import{Account: akp2Pub, Subject: "foo.stream", To: "import", Type: jwt.Stream}
|
||||
serviceImport := &jwt.Import{Account: akp2Pub, Subject: "import.request", To: "req.echo", Type: jwt.Service}
|
||||
akp1AC.Imports.Add(streamImport, serviceImport)
|
||||
akp1ACJWT, err := akp1AC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
if err := s1.AccountResolver().Store(akp1Pub, akp1ACJWT); err != nil {
|
||||
t.Fatalf("Account Resolver returned an error: %v", err)
|
||||
}
|
||||
s1.UpdateAccountClaims(acc1, akp1AC)
|
||||
|
||||
if err := s2.AccountResolver().Store(akp2Pub, akp2ACJWT); err != nil {
|
||||
t.Fatalf("Account Resolver returned an error: %v", err)
|
||||
}
|
||||
// Just make sure that account object registered in S2 is not acc2
|
||||
if a, err := s2.LookupAccount(acc2.Name); err != nil || a == acc2 {
|
||||
t.Fatalf("Lookup account error: %v - accounts are same: %v", err, a == acc2)
|
||||
}
|
||||
|
||||
if err := s2.AccountResolver().Store(akp1Pub, akp1ACJWT); err != nil {
|
||||
t.Fatalf("Account Resolver returned an error: %v", err)
|
||||
}
|
||||
// Just make sure that account object registered in S2 is not acc1
|
||||
if a, err := s2.LookupAccount(acc1.Name); err != nil || a == acc1 {
|
||||
t.Fatalf("Lookup account error: %v - accounts are same: %v", err, a == acc1)
|
||||
}
|
||||
|
||||
// Create the user will we use to connect the leafnode.
|
||||
kp1, _ := nkeys.CreateUser()
|
||||
pub1, _ := kp1.PublicKey()
|
||||
nuc1 := jwt.NewUserClaims(pub1)
|
||||
ujwt1, err := nuc1.Encode(akp1)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
|
||||
// Create the leaf node server using the first account.
|
||||
seed, _ := kp1.Seed()
|
||||
mycreds := genCredsFile(t, ujwt1, seed)
|
||||
defer os.Remove(mycreds)
|
||||
|
||||
sl, lopts, lnconf := runSolicitWithCredentials(t, s1Opts, mycreds)
|
||||
defer os.Remove(lnconf)
|
||||
defer sl.Shutdown()
|
||||
sl.SetLogger(logger.NewTestLogger("[LN] ", true), true, true)
|
||||
|
||||
checkLeafNodeConnected(t, s1)
|
||||
|
||||
// Url to server s2
|
||||
s2URL := fmt.Sprintf("nats://%s:%d", s2Opts.Host, s2Opts.Port)
|
||||
|
||||
// Imported
|
||||
nc1, err := nats.Connect(s2URL, createUserCreds(t, s2, akp1))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc1.Close()
|
||||
|
||||
// Exported
|
||||
nc2, err := nats.Connect(s2URL, createUserCreds(t, s2, akp2))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
// Leaf node connection.
|
||||
lurl := fmt.Sprintf("nats://%s:%d", lopts.Host, lopts.Port)
|
||||
ncl, err := nats.Connect(lurl)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer ncl.Close()
|
||||
|
||||
// So everything should be setup here. So let's test streams first.
|
||||
lsub, _ := ncl.SubscribeSync("import.foo.stream")
|
||||
|
||||
// Wait for the sub to propagate to s2.
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
if acc1.RoutedSubs() == 0 {
|
||||
return fmt.Errorf("Still no routed subscription")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Pub to other account with export on original subject.
|
||||
nc2.Publish("foo.stream", nil)
|
||||
|
||||
if _, err = lsub.NextMsg(1 * time.Second); err != nil {
|
||||
t.Fatalf("Did not receive stream message: %s", err)
|
||||
}
|
||||
|
||||
// Services
|
||||
// Create listener on nc2 (which connects to s2)
|
||||
gotIt := int32(0)
|
||||
nc2.Subscribe("req.echo", func(msg *nats.Msg) {
|
||||
atomic.AddInt32(&gotIt, 1)
|
||||
nc2.Publish(msg.Reply, []byte("WORKED"))
|
||||
})
|
||||
nc2.Flush()
|
||||
|
||||
// Now send the request on the leaf node client.
|
||||
if _, err := ncl.Request("import.request", []byte("fingers crossed"), 500*time.Millisecond); err != nil {
|
||||
if atomic.LoadInt32(&gotIt) == 0 {
|
||||
t.Fatalf("Request was not received")
|
||||
}
|
||||
t.Fatalf("Did not receive response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeInfoURLs(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
|
||||
Reference in New Issue
Block a user