diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index d8af96c2..fb53caf1 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1958,3 +1958,179 @@ func TestJetStreamClusterReplacementPolicyAfterPeerRemoveNoPlace(t *testing.T) { } } } + +// https://github.com/nats-io/nats-server/issues/3191 +func TestJetStreamClusterLeafnodeDuplicateConsumerMessages(t *testing.T) { + // Cluster B + c := createJetStreamCluster(t, jsClusterTempl, "B", _EMPTY_, 2, 22020, false) + defer c.shutdown() + + // Cluster A + // Domain is "A' + lc := c.createLeafNodesWithStartPortAndDomain("A", 2, 22110, "A") + defer lc.shutdown() + + lc.waitOnClusterReady() + + // We want A-S-1 connected to B-S-1 and A-S-2 connected to B-S-2 + // So adjust if needed. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + for i, ls := range lc.servers { + ls.mu.RLock() + var remoteServer string + for _, rc := range ls.leafs { + rc.mu.Lock() + remoteServer = rc.leaf.remoteServer + rc.mu.Unlock() + break + } + ls.mu.RUnlock() + + wantedRemote := fmt.Sprintf("S-%d", i+1) + if remoteServer != wantedRemote { + ls.Shutdown() + lc.restartServer(ls) + return fmt.Errorf("Leafnode server %d not connected to %q", i+1, wantedRemote) + } + } + return nil + }) + + // Wait on ready again. + lc.waitOnClusterReady() + + // Create a stream and a durable pull consumer on cluster A. + lnc, ljs := jsClientConnect(t, lc.randomServer()) + defer lnc.Close() + + _, err := ljs.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + require_NoError(t, err) + + // Make sure stream leader is on S-1 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + si, err := ljs.StreamInfo("TEST") + require_NoError(t, err) + if si.Cluster.Leader == "A-S-1" { + return nil + } + _, err = lnc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second) + require_NoError(t, err) + return fmt.Errorf("Stream leader not placed on A-S-1") + }) + + _, err = ljs.StreamInfo("TEST") + require_NoError(t, err) + + _, err = ljs.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "dlc", + Replicas: 2, + MaxDeliver: 1, + AckPolicy: nats.AckNonePolicy, + }) + require_NoError(t, err) + + // Make sure consumer leader is on S-2 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + ci, err := ljs.ConsumerInfo("TEST", "dlc") + require_NoError(t, err) + if ci.Cluster.Leader == "A-S-2" { + return nil + } + _, err = lnc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second) + require_NoError(t, err) + return fmt.Errorf("Stream leader not placed on A-S-1") + }) + + _, err = ljs.ConsumerInfo("TEST", "dlc") + require_NoError(t, err) + + // Send 2 messages. + sendStreamMsg(t, lnc, "foo", "M-1") + sendStreamMsg(t, lnc, "foo", "M-2") + + // Now bind apps to cluster B servers and bind to pull consumer. + nc1, _ := jsClientConnect(t, c.servers[0]) + defer nc1.Close() + js1, err := nc1.JetStream(nats.Domain("A")) + require_NoError(t, err) + + sub1, err := js1.PullSubscribe("foo", "dlc", nats.BindStream("TEST")) + require_NoError(t, err) + defer sub1.Unsubscribe() + + nc2, _ := jsClientConnect(t, c.servers[1]) + defer nc2.Close() + js2, err := nc2.JetStream(nats.Domain("A")) + require_NoError(t, err) + + sub2, err := js2.PullSubscribe("foo", "dlc", nats.BindStream("TEST")) + require_NoError(t, err) + defer sub2.Unsubscribe() + + // Make sure we can properly get messages. + msgs, err := sub1.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + require_True(t, string(msgs[0].Data) == "M-1") + + msgs, err = sub2.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + require_True(t, string(msgs[0].Data) == "M-2") + + // Make sure delivered state makes it to other server to not accidentally send M-2 again + // and fail the test below. + time.Sleep(250 * time.Millisecond) + + // Now let's introduce and event, where A-S-2 will now reconnect after a restart to B-S-2 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + ls := lc.servers[1] + wantedRemote := "S-1" + var remoteServer string + + ls.mu.RLock() + for _, rc := range ls.leafs { + rc.mu.Lock() + remoteServer = rc.leaf.remoteServer + rc.mu.Unlock() + break + } + ls.mu.RUnlock() + + if remoteServer != wantedRemote { + ls.Shutdown() + lc.restartServer(ls) + return fmt.Errorf("Leafnode server not connected to %q", wantedRemote) + } + return nil + }) + + // Wait on ready again. + lc.waitOnClusterReady() + lc.waitOnStreamLeader(globalAccountName, "TEST") + lc.waitOnConsumerLeader(globalAccountName, "TEST", "dlc") + + // Send 2 more messages. + sendStreamMsg(t, lnc, "foo", "M-3") + sendStreamMsg(t, lnc, "foo", "M-4") + + msgs, err = sub1.Fetch(2) + require_NoError(t, err) + require_True(t, len(msgs) == 2) + require_True(t, string(msgs[0].Data) == "M-3") + require_True(t, string(msgs[1].Data) == "M-4") + + // Send 2 more messages. + sendStreamMsg(t, lnc, "foo", "M-5") + sendStreamMsg(t, lnc, "foo", "M-6") + + msgs, err = sub2.Fetch(2) + require_NoError(t, err) + require_True(t, len(msgs) == 2) + require_True(t, string(msgs[0].Data) == "M-5") + require_True(t, string(msgs[1].Data) == "M-6") +}