[FIXED] LeafNode: duplicate queue messages in complex routing setup

Suppose a cluster of 2 servers, let's call them leaf1 and leaf2.
These servers are routed and have a leaf connection to another
server, let's call it srv1.
They share the same cluster name.

If a queue subscriber runs on srv1 and a queue subscriber on the
same subject/group name runs on leaf1, if a requestor runs on
leaf2, the request should reach only one of the 2 queue subs.

The defect was that sometimes both queue subs would receive the
message.

The added test checks that only one reply is ever received and
that the local "leaf" cluster is preferred.

Resolves #1722

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-11-18 11:11:15 -07:00
parent bfd388e8b4
commit 55b0f8d855
3 changed files with 84 additions and 0 deletions

View File

@@ -1802,3 +1802,84 @@ func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) {
checkLeafNodeConnected(t, s)
checkLeafNodeConnected(t, sl)
}
func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
// This set the cluster name to "abc"
oSrv1 := DefaultOptions()
oSrv1.LeafNode.Host = "127.0.0.1"
oSrv1.LeafNode.Port = -1
srv1 := RunServer(oSrv1)
defer srv1.Shutdown()
u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", oSrv1.LeafNode.Port))
if err != nil {
t.Fatalf("Error parsing url: %v", err)
}
remoteLeafs := []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}}}
oLeaf1 := DefaultOptions()
oLeaf1.LeafNode.Remotes = remoteLeafs
leaf1 := RunServer(oLeaf1)
defer leaf1.Shutdown()
leaf1ClusterURL := fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port)
oLeaf2 := DefaultOptions()
oLeaf2.LeafNode.Remotes = remoteLeafs
oLeaf2.Routes = RoutesFromStr(leaf1ClusterURL)
leaf2 := RunServer(oLeaf2)
defer leaf2.Shutdown()
checkClusterFormed(t, leaf1, leaf2)
checkLeafNodeConnectedCount(t, srv1, 2)
checkLeafNodeConnected(t, leaf1)
checkLeafNodeConnected(t, leaf2)
ncSrv1 := natsConnect(t, srv1.ClientURL())
defer ncSrv1.Close()
natsQueueSub(t, ncSrv1, "foo", "queue", func(m *nats.Msg) {
m.Respond([]byte("from srv1"))
})
ncLeaf1 := natsConnect(t, leaf1.ClientURL())
defer ncLeaf1.Close()
natsQueueSub(t, ncLeaf1, "foo", "queue", func(m *nats.Msg) {
m.Respond([]byte("from leaf1"))
})
ncLeaf2 := natsConnect(t, leaf2.ClientURL())
defer ncLeaf2.Close()
// Check that "foo" interest is available everywhere.
checkSubInterest(t, srv1, globalAccountName, "foo", time.Second)
checkSubInterest(t, leaf1, globalAccountName, "foo", time.Second)
checkSubInterest(t, leaf2, globalAccountName, "foo", time.Second)
// Send requests (from leaf2). For this test to make sure that
// there is no duplicate, we want to make sure that we check for
// multiple replies and that the reply subject subscription has
// been propagated everywhere.
sub := natsSubSync(t, ncLeaf2, "reply_subj")
natsFlush(t, ncLeaf2)
checkSubInterest(t, srv1, globalAccountName, "reply_subj", time.Second)
checkSubInterest(t, leaf1, globalAccountName, "reply_subj", time.Second)
checkSubInterest(t, leaf2, globalAccountName, "reply_subj", time.Second)
for i := 0; i < 5; i++ {
// Now send the request
natsPubReq(t, ncLeaf2, "foo", sub.Subject, []byte("req"))
// Check that we get the reply
replyMsg := natsNexMsg(t, sub, time.Second)
// But make sure we received only 1!
if otherReply, _ := sub.NextMsg(100 * time.Millisecond); otherReply != nil {
t.Fatalf("Received duplicate reply, first was %q, followed by %q",
replyMsg.Data, otherReply.Data)
}
// We also should have preferred the queue sub that is in the leaf cluster.
if string(replyMsg.Data) != "from leaf1" {
t.Fatalf("Expected reply from leaf1, got %q", replyMsg.Data)
}
}
}