mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Added Gateway test for service import with queue group
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -165,6 +165,15 @@ func natsQueueSub(t *testing.T, nc *nats.Conn, subj, queue string, cb nats.MsgHa
|
||||
return sub
|
||||
}
|
||||
|
||||
func natsQueueSubSync(t *testing.T, nc *nats.Conn, subj, queue string) *nats.Subscription {
|
||||
t.Helper()
|
||||
sub, err := nc.QueueSubscribeSync(subj, queue)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
return sub
|
||||
}
|
||||
|
||||
func natsFlush(t *testing.T, nc *nats.Conn) {
|
||||
t.Helper()
|
||||
if err := nc.Flush(); err != nil {
|
||||
@@ -3172,6 +3181,183 @@ func TestGatewayServiceImport(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestGatewayServiceImportWithQueue(t *testing.T) {
|
||||
oa := testDefaultOptionsForGateway("A")
|
||||
setAccountUserPassInOptions(oa, "$foo", "clientA", "password")
|
||||
setAccountUserPassInOptions(oa, "$bar", "yyyyyyy", "password")
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
ob := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
|
||||
setAccountUserPassInOptions(ob, "$foo", "xxxxxxx", "password")
|
||||
setAccountUserPassInOptions(ob, "$bar", "clientB", "password")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
// Get accounts
|
||||
fooA, _ := sa.LookupAccount("$foo")
|
||||
barA, _ := sa.LookupAccount("$bar")
|
||||
fooB, _ := sb.LookupAccount("$foo")
|
||||
barB, _ := sb.LookupAccount("$bar")
|
||||
|
||||
// Add in the service export for the requests. Make it public.
|
||||
fooA.AddServiceExport("test.request", nil)
|
||||
fooB.AddServiceExport("test.request", nil)
|
||||
|
||||
// Add import abilities to server B's bar account from foo.
|
||||
if err := barB.AddServiceImport(fooB, "foo.request", "test.request"); err != nil {
|
||||
t.Fatalf("Error adding service import: %v", err)
|
||||
}
|
||||
// Same on A.
|
||||
if err := barA.AddServiceImport(fooA, "foo.request", "test.request"); err != nil {
|
||||
t.Fatalf("Error adding service import: %v", err)
|
||||
}
|
||||
|
||||
// clientA will be connected to srvA and be the service endpoint and responder.
|
||||
aURL := fmt.Sprintf("nats://clientA:password@127.0.0.1:%d", oa.Port)
|
||||
clientA := natsConnect(t, aURL)
|
||||
defer clientA.Close()
|
||||
|
||||
subA := natsQueueSubSync(t, clientA, "test.request", "queue")
|
||||
natsFlush(t, clientA)
|
||||
|
||||
// Now setup client B on srvB who will do a sub from account $bar
|
||||
// that should map account $foo's foo subject.
|
||||
bURL := fmt.Sprintf("nats://clientB:password@127.0.0.1:%d", ob.Port)
|
||||
clientB := natsConnect(t, bURL)
|
||||
defer clientB.Close()
|
||||
|
||||
subB := natsQueueSubSync(t, clientB, "reply", "queue2")
|
||||
natsFlush(t, clientB)
|
||||
|
||||
// Wait for queue interest on test.request from A to be registered
|
||||
// on server B.
|
||||
checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second)
|
||||
|
||||
for i := 0; i < 1; i++ {
|
||||
// Send the request from clientB on foo.request,
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
natsFlush(t, clientB)
|
||||
|
||||
// Expect the request on A
|
||||
msg, err := subA.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("subA failed to get request: %v", err)
|
||||
}
|
||||
if msg.Subject != "test.request" || string(msg.Data) != "hi" {
|
||||
t.Fatalf("Unexpected message: %v", msg)
|
||||
}
|
||||
if msg.Reply == "reply" {
|
||||
t.Fatalf("Expected randomized reply, but got original")
|
||||
}
|
||||
|
||||
// Send reply
|
||||
natsPub(t, clientA, msg.Reply, []byte("ok"))
|
||||
natsFlush(t, clientA)
|
||||
|
||||
msg, err = subB.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("subB failed to get reply: %v", err)
|
||||
}
|
||||
if msg.Subject != "reply" || string(msg.Data) != "ok" {
|
||||
t.Fatalf("Unexpected message: %v", msg)
|
||||
}
|
||||
|
||||
expected := int64(i + 3)
|
||||
vz, _ := sa.Varz(nil)
|
||||
if vz.OutMsgs != expected {
|
||||
t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs)
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
expected = 3
|
||||
} else {
|
||||
expected = 4
|
||||
}
|
||||
vz, _ = sb.Varz(nil)
|
||||
if vz.OutMsgs != expected {
|
||||
t.Fatalf("Expected %d outMsgs for B, got %v", expected, vz.OutMsgs)
|
||||
}
|
||||
}
|
||||
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
if ts := fooA.TotalSubs(); ts != 1 {
|
||||
return fmt.Errorf("Expected one sub to be left on fooA, but got %d", ts)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Speed up exiration
|
||||
fooA.SetAutoExpireTTL(10 * time.Millisecond)
|
||||
|
||||
// Send 100 requests from clientB on foo.request,
|
||||
for i := 0; i < 100; i++ {
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
}
|
||||
natsFlush(t, clientB)
|
||||
|
||||
// Consume the requests, but don't reply to them...
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := subA.NextMsg(time.Second); err != nil {
|
||||
t.Fatalf("subA did not receive request: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// These reply subjects will be dangling off of $foo account on serverA.
|
||||
// Remove our service endpoint and wait for the dangling replies to go to zero.
|
||||
natsUnsub(t, subA)
|
||||
natsFlush(t, clientA)
|
||||
|
||||
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
|
||||
if ts := fooA.TotalSubs(); ts != 0 {
|
||||
return fmt.Errorf("Number of subs is %d, should be zero", ts)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Repeat similar test but without the small TTL and verify
|
||||
// that if B is shutdown, the dangling subs for replies are
|
||||
// cleared from the account sublist.
|
||||
fooA.SetAutoExpireTTL(10 * time.Second)
|
||||
|
||||
subA = natsQueueSubSync(t, clientA, "test.request", "queue")
|
||||
natsFlush(t, clientA)
|
||||
|
||||
// Send 100 requests from clientB on foo.request,
|
||||
for i := 0; i < 100; i++ {
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
}
|
||||
natsFlush(t, clientB)
|
||||
|
||||
// Consume the requests, but don't reply to them...
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := subA.NextMsg(time.Second); err != nil {
|
||||
t.Fatalf("subA did not receive request: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown B
|
||||
clientB.Close()
|
||||
sb.Shutdown()
|
||||
|
||||
// Close our last sub
|
||||
natsUnsub(t, subA)
|
||||
natsFlush(t, clientA)
|
||||
|
||||
// Verify that they are gone before the 10 sec TTL
|
||||
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
|
||||
if ts := fooA.TotalSubs(); ts != 0 {
|
||||
return fmt.Errorf("Number of subs is %d, should be zero", ts)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
/*
|
||||
func TestGatewayPermissions(t *testing.T) {
|
||||
bo := testDefaultOptionsForGateway("B")
|
||||
|
||||
Reference in New Issue
Block a user