diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index 8c77178b..0e45fe4f 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -230,3 +230,88 @@ func TestServerRestartAndQueueSubs(t *testing.T) { // Now send another 10 messages, from each client.. sendAndCheckMsgs(10) } + +// This will test request semantics across a route +func TestRequestsAcrossRoutes(t *testing.T) { + _, _, optsA, optsB := runServers(t) + + urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port) + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + + nc1, err := nats.Connect(urlA) + if err != nil { + t.Fatalf("Failed to create connection for nc1: %v\n", err) + } + + nc2, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("Failed to create connection for nc2: %v\n", err) + } + ec2, _ := nats.NewEncodedConn(nc2, nats.JSON_ENCODER) + + response := []byte("I will help you") + + // Connect responder to srvA + nc1.Subscribe("foo-req", func(m *nats.Msg) { + nc1.Publish(m.Reply, response) + }) + // Make sure the route and the subscription are propogated. + nc1.Flush() + + var resp string + + for i := 0; i < 100; i++ { + if err := ec2.Request("foo-req", i, &resp, 100*time.Millisecond); err != nil { + t.Fatalf("Received an error on Request test [%d]: %s", i, err) + } + } +} + +// This will test request semantics across a route to queues +func TestRequestsAcrossRoutesToQueues(t *testing.T) { + _, _, optsA, optsB := runServers(t) + + urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port) + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + + nc1, err := nats.Connect(urlA) + if err != nil { + t.Fatalf("Failed to create connection for nc1: %v\n", err) + } + + nc2, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("Failed to create connection for nc2: %v\n", err) + } + ec1, _ := nats.NewEncodedConn(nc1, nats.JSON_ENCODER) + ec2, _ := nats.NewEncodedConn(nc2, nats.JSON_ENCODER) + + response := []byte("I will help you") + + // Connect one responder to srvA + nc1.QueueSubscribe("foo-req", "booboo", func(m *nats.Msg) { + nc1.Publish(m.Reply, response) + }) + // Make sure the route and the subscription are propogated. + nc1.Flush() + + // Connect the other responder to srvB + nc2.QueueSubscribe("foo-req", "booboo", func(m *nats.Msg) { + nc2.Publish(m.Reply, response) + }) + + var resp string + + for i := 0; i < 100; i++ { + if err := ec2.Request("foo-req", i, &resp, 100*time.Millisecond); err != nil { + t.Fatalf("Received an error on Request test [%d]: %s", i, err) + } + } + + for i := 0; i < 100; i++ { + if err := ec1.Request("foo-req", i, &resp, 100*time.Millisecond); err != nil { + t.Fatalf("Received an error on Request test [%d]: %s", i, err) + } + } + +}