Merge pull request #3694 from nats-io/test-3191

Extensive test in support of issue #3191.
This commit is contained in:
Derek Collison
2022-12-07 13:00:15 -08:00
committed by GitHub

View File

@@ -1958,3 +1958,179 @@ func TestJetStreamClusterReplacementPolicyAfterPeerRemoveNoPlace(t *testing.T) {
}
}
}
// https://github.com/nats-io/nats-server/issues/3191
func TestJetStreamClusterLeafnodeDuplicateConsumerMessages(t *testing.T) {
// Cluster B
c := createJetStreamCluster(t, jsClusterTempl, "B", _EMPTY_, 2, 22020, false)
defer c.shutdown()
// Cluster A
// Domain is "A'
lc := c.createLeafNodesWithStartPortAndDomain("A", 2, 22110, "A")
defer lc.shutdown()
lc.waitOnClusterReady()
// We want A-S-1 connected to B-S-1 and A-S-2 connected to B-S-2
// So adjust if needed.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for i, ls := range lc.servers {
ls.mu.RLock()
var remoteServer string
for _, rc := range ls.leafs {
rc.mu.Lock()
remoteServer = rc.leaf.remoteServer
rc.mu.Unlock()
break
}
ls.mu.RUnlock()
wantedRemote := fmt.Sprintf("S-%d", i+1)
if remoteServer != wantedRemote {
ls.Shutdown()
lc.restartServer(ls)
return fmt.Errorf("Leafnode server %d not connected to %q", i+1, wantedRemote)
}
}
return nil
})
// Wait on ready again.
lc.waitOnClusterReady()
// Create a stream and a durable pull consumer on cluster A.
lnc, ljs := jsClientConnect(t, lc.randomServer())
defer lnc.Close()
_, err := ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
require_NoError(t, err)
// Make sure stream leader is on S-1
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := ljs.StreamInfo("TEST")
require_NoError(t, err)
if si.Cluster.Leader == "A-S-1" {
return nil
}
_, err = lnc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
return fmt.Errorf("Stream leader not placed on A-S-1")
})
_, err = ljs.StreamInfo("TEST")
require_NoError(t, err)
_, err = ljs.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dlc",
Replicas: 2,
MaxDeliver: 1,
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)
// Make sure consumer leader is on S-2
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ci, err := ljs.ConsumerInfo("TEST", "dlc")
require_NoError(t, err)
if ci.Cluster.Leader == "A-S-2" {
return nil
}
_, err = lnc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second)
require_NoError(t, err)
return fmt.Errorf("Stream leader not placed on A-S-1")
})
_, err = ljs.ConsumerInfo("TEST", "dlc")
require_NoError(t, err)
// Send 2 messages.
sendStreamMsg(t, lnc, "foo", "M-1")
sendStreamMsg(t, lnc, "foo", "M-2")
// Now bind apps to cluster B servers and bind to pull consumer.
nc1, _ := jsClientConnect(t, c.servers[0])
defer nc1.Close()
js1, err := nc1.JetStream(nats.Domain("A"))
require_NoError(t, err)
sub1, err := js1.PullSubscribe("foo", "dlc", nats.BindStream("TEST"))
require_NoError(t, err)
defer sub1.Unsubscribe()
nc2, _ := jsClientConnect(t, c.servers[1])
defer nc2.Close()
js2, err := nc2.JetStream(nats.Domain("A"))
require_NoError(t, err)
sub2, err := js2.PullSubscribe("foo", "dlc", nats.BindStream("TEST"))
require_NoError(t, err)
defer sub2.Unsubscribe()
// Make sure we can properly get messages.
msgs, err := sub1.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)
require_True(t, string(msgs[0].Data) == "M-1")
msgs, err = sub2.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)
require_True(t, string(msgs[0].Data) == "M-2")
// Make sure delivered state makes it to other server to not accidentally send M-2 again
// and fail the test below.
time.Sleep(250 * time.Millisecond)
// Now let's introduce and event, where A-S-2 will now reconnect after a restart to B-S-2
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ls := lc.servers[1]
wantedRemote := "S-1"
var remoteServer string
ls.mu.RLock()
for _, rc := range ls.leafs {
rc.mu.Lock()
remoteServer = rc.leaf.remoteServer
rc.mu.Unlock()
break
}
ls.mu.RUnlock()
if remoteServer != wantedRemote {
ls.Shutdown()
lc.restartServer(ls)
return fmt.Errorf("Leafnode server not connected to %q", wantedRemote)
}
return nil
})
// Wait on ready again.
lc.waitOnClusterReady()
lc.waitOnStreamLeader(globalAccountName, "TEST")
lc.waitOnConsumerLeader(globalAccountName, "TEST", "dlc")
// Send 2 more messages.
sendStreamMsg(t, lnc, "foo", "M-3")
sendStreamMsg(t, lnc, "foo", "M-4")
msgs, err = sub1.Fetch(2)
require_NoError(t, err)
require_True(t, len(msgs) == 2)
require_True(t, string(msgs[0].Data) == "M-3")
require_True(t, string(msgs[1].Data) == "M-4")
// Send 2 more messages.
sendStreamMsg(t, lnc, "foo", "M-5")
sendStreamMsg(t, lnc, "foo", "M-6")
msgs, err = sub2.Fetch(2)
require_NoError(t, err)
require_True(t, len(msgs) == 2)
require_True(t, string(msgs[0].Data) == "M-5")
require_True(t, string(msgs[1].Data) == "M-6")
}