mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
This addresses the following race: - client connection creates a subscription on a reply subject - client connection sends a request - server sends the subscription to inbound gateway - server sends the message to outbound gateway (those may be to different servers) - receiving server sends to sub interested in request subject - app sends reply - its server then check for interest on the reply's subject In interestOnly mode, there is a possibility that this server has not received the interest on the reply subject yet and would then drop the reply. This PR detects above scenario and will prefix the reply subject to identify the origin cluster if it is detected that the last subscription from the sending connection was created less than a second ago. Once the destination has this prefix, the destination cluster will always send back that message to origin cluster even if there is no registered interest. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
559 lines
17 KiB
Go
559 lines
17 KiB
Go
// Copyright 2018 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// +build !race
|
|
|
|
package server
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/go-nats"
|
|
)
|
|
|
|
// IMPORTANT: Tests in this file are not executed when running with the -race flag.
|
|
// The test name should be prefixed with TestNoRace so we can run only
|
|
// those tests: go test -run=TestNoRace ...
|
|
|
|
func TestNoRaceAvoidSlowConsumerBigMessages(t *testing.T) {
|
|
opts := DefaultOptions() // Use defaults to make sure they avoid pending slow consumer.
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc1.Close()
|
|
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
data := make([]byte, 1024*1024) // 1MB payload
|
|
rand.Read(data)
|
|
|
|
expected := int32(500)
|
|
received := int32(0)
|
|
|
|
done := make(chan bool)
|
|
|
|
// Create Subscription.
|
|
nc1.Subscribe("slow.consumer", func(m *nats.Msg) {
|
|
// Just eat it so that we are not measuring
|
|
// code time, just delivery.
|
|
atomic.AddInt32(&received, 1)
|
|
if received >= expected {
|
|
done <- true
|
|
}
|
|
})
|
|
|
|
// Create Error handler
|
|
nc1.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
|
|
t.Fatalf("Received an error on the subscription's connection: %v\n", err)
|
|
})
|
|
|
|
nc1.Flush()
|
|
|
|
for i := 0; i < int(expected); i++ {
|
|
nc2.Publish("slow.consumer", data)
|
|
}
|
|
nc2.Flush()
|
|
|
|
select {
|
|
case <-done:
|
|
return
|
|
case <-time.After(10 * time.Second):
|
|
r := atomic.LoadInt32(&received)
|
|
if s.NumSlowConsumers() > 0 {
|
|
t.Fatalf("Did not receive all large messages due to slow consumer status: %d of %d", r, expected)
|
|
}
|
|
t.Fatalf("Failed to receive all large messages: %d of %d\n", r, expected)
|
|
}
|
|
}
|
|
|
|
func TestNoRaceRoutedQueueAutoUnsubscribe(t *testing.T) {
|
|
optsA, _ := ProcessConfigFile("./configs/seed.conf")
|
|
optsA.NoSigs, optsA.NoLog = true, true
|
|
srvA := RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
srvARouteURL := fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, srvA.ClusterAddr().Port)
|
|
optsB := nextServerOpts(optsA)
|
|
optsB.Routes = RoutesFromStr(srvARouteURL)
|
|
|
|
srvB := RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
// Wait for these 2 to connect to each other
|
|
checkClusterFormed(t, srvA, srvB)
|
|
|
|
// Have a client connection to each server
|
|
ncA, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncA.Close()
|
|
|
|
ncB, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncB.Close()
|
|
|
|
rbar := int32(0)
|
|
barCb := func(m *nats.Msg) {
|
|
atomic.AddInt32(&rbar, 1)
|
|
}
|
|
rbaz := int32(0)
|
|
bazCb := func(m *nats.Msg) {
|
|
atomic.AddInt32(&rbaz, 1)
|
|
}
|
|
|
|
// Create 125 queue subs with auto-unsubscribe to each server for
|
|
// group bar and group baz. So 250 total per queue group.
|
|
cons := []*nats.Conn{ncA, ncB}
|
|
for _, c := range cons {
|
|
for i := 0; i < 125; i++ {
|
|
qsub, err := c.QueueSubscribe("foo", "bar", barCb)
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := qsub.AutoUnsubscribe(1); err != nil {
|
|
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
|
}
|
|
qsub, err = c.QueueSubscribe("foo", "baz", bazCb)
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := qsub.AutoUnsubscribe(1); err != nil {
|
|
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
|
}
|
|
}
|
|
c.Subscribe("TEST.COMPLETE", func(m *nats.Msg) {})
|
|
}
|
|
|
|
// We coelasce now so for each server we will have all local (250) plus
|
|
// two from the remote side for each queue group. We also create one more
|
|
// and will wait til each server has 254 subscriptions, that will make sure
|
|
// that we have everything setup.
|
|
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
|
subsA := srvA.NumSubscriptions()
|
|
subsB := srvB.NumSubscriptions()
|
|
if subsA != 254 || subsB != 254 {
|
|
return fmt.Errorf("Not all subs processed yet: %d and %d", subsA, subsB)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
expected := int32(250)
|
|
// Now send messages from each server
|
|
for i := int32(0); i < expected; i++ {
|
|
c := cons[i%2]
|
|
c.Publish("foo", []byte("Don't Drop Me!"))
|
|
}
|
|
for _, c := range cons {
|
|
c.Flush()
|
|
}
|
|
|
|
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
|
nbar := atomic.LoadInt32(&rbar)
|
|
nbaz := atomic.LoadInt32(&rbaz)
|
|
if nbar == expected && nbaz == expected {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
|
|
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
|
|
})
|
|
}
|
|
|
|
func TestNoRaceClosedSlowConsumerWriteDeadline(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.WriteDeadline = 10 * time.Millisecond // Make very small to trip.
|
|
opts.MaxPending = 500 * 1024 * 1024 // Set high so it will not trip here.
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer c.Close()
|
|
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
|
t.Fatalf("Error sending protocols to server: %v", err)
|
|
}
|
|
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
|
// for our subscribed client.
|
|
c.(*net.TCPConn).SetReadBuffer(128)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
sender, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer sender.Close()
|
|
|
|
payload := make([]byte, 1024*1024)
|
|
for i := 0; i < 100; i++ {
|
|
if err := sender.Publish("foo", payload); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
}
|
|
|
|
// Flush sender connection to ensure that all data has been sent.
|
|
if err := sender.Flush(); err != nil {
|
|
t.Fatalf("Error on flush: %v", err)
|
|
}
|
|
|
|
// At this point server should have closed connection c.
|
|
checkClosedConns(t, s, 1, 2*time.Second)
|
|
conns := s.closedClients()
|
|
if lc := len(conns); lc != 1 {
|
|
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
|
|
}
|
|
checkReason(t, conns[0].Reason, SlowConsumerWriteDeadline)
|
|
}
|
|
|
|
func TestNoRaceClosedSlowConsumerPendingBytes(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
|
|
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer c.Close()
|
|
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
|
t.Fatalf("Error sending protocols to server: %v", err)
|
|
}
|
|
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
|
// for our subscribed client.
|
|
c.(*net.TCPConn).SetReadBuffer(128)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
sender, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer sender.Close()
|
|
|
|
payload := make([]byte, 1024*1024)
|
|
for i := 0; i < 100; i++ {
|
|
if err := sender.Publish("foo", payload); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
}
|
|
|
|
// Flush sender connection to ensure that all data has been sent.
|
|
if err := sender.Flush(); err != nil {
|
|
t.Fatalf("Error on flush: %v", err)
|
|
}
|
|
|
|
// At this point server should have closed connection c.
|
|
checkClosedConns(t, s, 1, 2*time.Second)
|
|
conns := s.closedClients()
|
|
if lc := len(conns); lc != 1 {
|
|
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
|
|
}
|
|
checkReason(t, conns[0].Reason, SlowConsumerPendingBytes)
|
|
}
|
|
|
|
func TestNoRaceSlowConsumerPendingBytes(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
|
|
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer c.Close()
|
|
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
|
t.Fatalf("Error sending protocols to server: %v", err)
|
|
}
|
|
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
|
// for our subscribed client.
|
|
c.(*net.TCPConn).SetReadBuffer(128)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
sender, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer sender.Close()
|
|
|
|
payload := make([]byte, 1024*1024)
|
|
for i := 0; i < 100; i++ {
|
|
if err := sender.Publish("foo", payload); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
}
|
|
|
|
// Flush sender connection to ensure that all data has been sent.
|
|
if err := sender.Flush(); err != nil {
|
|
t.Fatalf("Error on flush: %v", err)
|
|
}
|
|
|
|
// At this point server should have closed connection c.
|
|
|
|
// On certain platforms, it may take more than one call before
|
|
// getting the error.
|
|
for i := 0; i < 100; i++ {
|
|
if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil {
|
|
// ok
|
|
return
|
|
}
|
|
}
|
|
t.Fatal("Connection should have been closed")
|
|
}
|
|
|
|
func TestNoRaceGatewayNoMissingReplies(t *testing.T) {
|
|
// This test will have following setup:
|
|
//
|
|
// responder1 requestor
|
|
// | |
|
|
// v v
|
|
// [A1]<-------gw------------[B1]
|
|
// | \ |
|
|
// | \______gw__________ | route
|
|
// | _\| |
|
|
// [ ]--------gw----------->[ ]
|
|
// [A2]<-------gw------------[B2]
|
|
// [ ] [ ]
|
|
// ^
|
|
// |
|
|
// responder2
|
|
//
|
|
// There is a possible race that when the requestor creates
|
|
// a subscription on the reply subject, the subject interest
|
|
// being sent from the inbound gateway, and B1 having none,
|
|
// the SUB first goes to B2 before being sent to A1 from
|
|
// B2's inbound GW. But the request can go from B1 to A1
|
|
// right away and the responder1 connecting to A1 may send
|
|
// back the reply before the interest on the reply makes it
|
|
// to A1 (from B2).
|
|
// This test will also verify that if the responder is instead
|
|
// connected to A2, the reply is properly received by requestor
|
|
// on B1.
|
|
|
|
// For this test we want to be in interestOnly mode, so
|
|
// make it happen quickly
|
|
gatewayMaxRUnsubBeforeSwitch = 1
|
|
defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }()
|
|
|
|
// Start with setting up A2 and B2.
|
|
ob2 := testDefaultOptionsForGateway("B")
|
|
sb2 := runGatewayServer(ob2)
|
|
defer sb2.Shutdown()
|
|
|
|
oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
|
|
sa2 := runGatewayServer(oa2)
|
|
defer sa2.Shutdown()
|
|
|
|
waitForOutboundGateways(t, sa2, 1, time.Second)
|
|
waitForInboundGateways(t, sa2, 1, time.Second)
|
|
waitForOutboundGateways(t, sb2, 1, time.Second)
|
|
waitForInboundGateways(t, sb2, 1, time.Second)
|
|
|
|
// Now start A1 which will connect to B2
|
|
oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
|
|
oa1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa2.Cluster.Host, oa2.Cluster.Port))
|
|
sa1 := runGatewayServer(oa1)
|
|
defer sa1.Shutdown()
|
|
|
|
waitForOutboundGateways(t, sa1, 1, time.Second)
|
|
waitForInboundGateways(t, sb2, 2, time.Second)
|
|
|
|
checkClusterFormed(t, sa1, sa2)
|
|
|
|
// Finally, start B1 that will connect to A1.
|
|
ob1 := testGatewayOptionsFromToWithServers(t, "B", "A", sa1)
|
|
ob1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob2.Cluster.Host, ob2.Cluster.Port))
|
|
sb1 := runGatewayServer(ob1)
|
|
defer sb1.Shutdown()
|
|
|
|
// Check that we have the outbound gateway from B1 to A1
|
|
checkFor(t, 3*time.Second, 15*time.Millisecond, func() error {
|
|
c := sb1.getOutboundGatewayConnection("A")
|
|
if c == nil {
|
|
return fmt.Errorf("Outbound connection to A not created yet")
|
|
}
|
|
c.mu.Lock()
|
|
name := c.opts.Name
|
|
nc := c.nc
|
|
c.mu.Unlock()
|
|
if name != sa1.ID() {
|
|
// Force a disconnect
|
|
nc.Close()
|
|
return fmt.Errorf("Was unable to have B1 connect to A1")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
waitForInboundGateways(t, sa1, 1, time.Second)
|
|
checkClusterFormed(t, sb1, sb2)
|
|
|
|
// Slow down GWs connections
|
|
// testSlowDownGatewayConnections(t, sa1, sa2, sb1, sb2)
|
|
|
|
// For this test, since we are using qsubs on A and B, and we
|
|
// want to make sure that it is received only on B, make the
|
|
// recent sub expiration high (especially when running on
|
|
// Travis with GOGC=10
|
|
setRecentSubExpiration := func(s *Server) {
|
|
s.mu.Lock()
|
|
s.gateway.pasi.Lock()
|
|
s.gateway.recSubExp = 10 * time.Second
|
|
s.gateway.pasi.Unlock()
|
|
s.mu.Unlock()
|
|
}
|
|
setRecentSubExpiration(sb1)
|
|
setRecentSubExpiration(sb2)
|
|
|
|
a1URL := fmt.Sprintf("nats://%s:%d", oa1.Host, oa1.Port)
|
|
a2URL := fmt.Sprintf("nats://%s:%d", oa2.Host, oa2.Port)
|
|
b1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port)
|
|
b2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port)
|
|
|
|
ncb1 := natsConnect(t, b1URL)
|
|
defer ncb1.Close()
|
|
|
|
ncb2 := natsConnect(t, b2URL)
|
|
defer ncb2.Close()
|
|
|
|
natsSubSync(t, ncb1, "just.a.sub")
|
|
natsSubSync(t, ncb2, "just.a.sub")
|
|
checkExpectedSubs(t, 2, sb1, sb2)
|
|
|
|
// For this test, we want A to be checking B's interest in order
|
|
// to send messages (which would cause replies to be dropped if
|
|
// there is no interest registered on A). So from A servers,
|
|
// send to various subjects and cause B's to switch to interestOnly
|
|
// mode.
|
|
nca1 := natsConnect(t, a1URL)
|
|
defer nca1.Close()
|
|
for i := 0; i < 10; i++ {
|
|
natsPub(t, nca1, fmt.Sprintf("reject.%d", i), []byte("hello"))
|
|
}
|
|
nca2 := natsConnect(t, a2URL)
|
|
defer nca2.Close()
|
|
for i := 0; i < 10; i++ {
|
|
natsPub(t, nca2, fmt.Sprintf("reject.%d", i), []byte("hello"))
|
|
}
|
|
|
|
checkSwitchedMode := func(t *testing.T, s *Server) {
|
|
t.Helper()
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
var switchedMode bool
|
|
c := s.getOutboundGatewayConnection("B")
|
|
ei, _ := c.gw.outsim.Load(globalAccountName)
|
|
if ei != nil {
|
|
e := ei.(*outsie)
|
|
e.RLock()
|
|
switchedMode = e.ni == nil && e.mode == modeInterestOnly
|
|
e.RUnlock()
|
|
}
|
|
if !switchedMode {
|
|
return fmt.Errorf("Still not switched mode")
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
checkSwitchedMode(t, sa1)
|
|
checkSwitchedMode(t, sa2)
|
|
|
|
// Setup a subscriber on _INBOX.> on each of A's servers.
|
|
total := 1000
|
|
expected := int32(total)
|
|
rcvOnA := int32(0)
|
|
qrcvOnA := int32(0)
|
|
natsSub(t, nca1, "myreply.>", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&rcvOnA, 1)
|
|
})
|
|
natsQueueSub(t, nca2, "myreply.>", "bar", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&qrcvOnA, 1)
|
|
})
|
|
checkExpectedSubs(t, 2, sa1, sa2)
|
|
|
|
// Ok.. so now we will run the actual test where we
|
|
// create a responder on A1 and make sure that every
|
|
// single request from B1 gets the reply. Will repeat
|
|
// test with responder connected to A2.
|
|
sendReqs := func(t *testing.T, subConn *nats.Conn) {
|
|
t.Helper()
|
|
responder := natsSub(t, subConn, "foo", func(m *nats.Msg) {
|
|
nca1.Publish(m.Reply, []byte("reply"))
|
|
})
|
|
natsFlush(t, subConn)
|
|
checkExpectedSubs(t, 3, sa1, sa2)
|
|
|
|
// We are not going to use Request() because this sets
|
|
// a wildcard subscription on an INBOX and less likely
|
|
// to produce the race. Instead we will explicitly set
|
|
// the subscription on the reply subject and create one
|
|
// per request.
|
|
for i := 0; i < total/2; i++ {
|
|
reply := fmt.Sprintf("myreply.%d", i)
|
|
replySub := natsQueueSubSync(t, ncb1, reply, "bar")
|
|
natsFlush(t, ncb1)
|
|
|
|
// Let's make sure we have interest on B2.
|
|
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
|
|
checkFor(t, time.Second, time.Millisecond, func() error {
|
|
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
|
|
return fmt.Errorf("B still not registered interest on %s", reply)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
natsPubReq(t, ncb1, "foo", reply, []byte("request"))
|
|
if _, err := replySub.NextMsg(time.Second); err != nil {
|
|
t.Fatalf("Did not receive reply: %v", err)
|
|
}
|
|
natsUnsub(t, replySub)
|
|
}
|
|
|
|
responder.Unsubscribe()
|
|
natsFlush(t, subConn)
|
|
checkExpectedSubs(t, 2, sa1, sa2)
|
|
}
|
|
sendReqs(t, nca1)
|
|
sendReqs(t, nca2)
|
|
|
|
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
|
if n := atomic.LoadInt32(&rcvOnA); n != expected {
|
|
return fmt.Errorf("Subs on A expected to get %v replies, got %v", expected, n)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// We should not have received a single message on the queue sub
|
|
// on cluster A because messages will have been delivered to
|
|
// the member on cluster B.
|
|
if n := atomic.LoadInt32(&qrcvOnA); n != 0 {
|
|
t.Fatalf("Queue sub on A should not have received message, got %v", n)
|
|
}
|
|
}
|