From 8d194e8bf9810dc2510abef12a7cb7266a057830 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 30 Aug 2023 15:34:15 +0100 Subject: [PATCH] Tweak `TestJetStreamClusterMetaSnapshotsMultiChange` and `TestJetStreamClusterStreamUpdateSyncBug` Signed-off-by: Neil Twigg --- server/jetstream_cluster_1_test.go | 7 +++++++ server/jetstream_cluster_2_test.go | 1 + 2 files changed, 8 insertions(+) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index a38da17c..1439818c 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -817,18 +817,22 @@ func TestJetStreamClusterMetaSnapshotsMultiChange(t *testing.T) { if _, err := js.AddStream(&nats.StreamConfig{Name: "S1"}); err != nil { t.Fatalf("Unexpected error: %v", err) } + c.waitOnStreamLeader(globalAccountName, "S1") _, err := js.AddConsumer("S1", &nats.ConsumerConfig{Durable: "S1C1", AckPolicy: nats.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } + c.waitOnConsumerLeader(globalAccountName, "S1", "S1C1") if _, err = js.AddStream(&nats.StreamConfig{Name: "S2"}); err != nil { t.Fatalf("Unexpected error: %v", err) } + c.waitOnStreamLeader(globalAccountName, "S2") _, err = js.AddConsumer("S2", &nats.ConsumerConfig{Durable: "S2C1", AckPolicy: nats.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } + c.waitOnConsumerLeader(globalAccountName, "S2", "S2C1") // Add in a new server to the group. This way we know we can delete the original streams and consumers. rs := c.addInNewServer() @@ -853,10 +857,12 @@ func TestJetStreamClusterMetaSnapshotsMultiChange(t *testing.T) { if _, err = js.AddStream(&nats.StreamConfig{Name: "S3"}); err != nil { t.Fatalf("Unexpected error: %v", err) } + c.waitOnStreamLeader(globalAccountName, "S3") _, err = js.AddConsumer("S3", &nats.ConsumerConfig{Durable: "S3C1", AckPolicy: nats.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } + c.waitOnConsumerLeader(globalAccountName, "S3", "S3C1") // Delete stream S2 resp, _ := nc.Request(fmt.Sprintf(JSApiStreamDeleteT, "S2"), nil, time.Second) var dResp JSApiStreamDeleteResponse @@ -880,6 +886,7 @@ func TestJetStreamClusterMetaSnapshotsMultiChange(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + c.waitOnConsumerLeader(globalAccountName, "S1", "S1C2") cl := c.leader() cl.JetStreamSnapshotMeta() diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 31ac47c5..879703fb 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -3317,6 +3317,7 @@ func TestJetStreamClusterStreamUpdateSyncBug(t *testing.T) { c.waitOnAllCurrent() nsl = c.restartServer(nsl) + c.waitOnStreamLeader("$G", "TEST") c.waitOnStreamCurrent(nsl, "$G", "TEST") mset, _ = nsl.GlobalAccount().lookupStream("TEST")