diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5fade2c4..87269a12 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -2049,22 +2049,27 @@ func TestJetStreamClusterMirrorAndSourceWorkQueues(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + // Allow sync consumers to connect. + time.Sleep(500 * time.Millisecond) if _, err = js.Publish("foo", []byte("ok")); err != nil { t.Fatalf("Unexpected publish error: %v", err) } - time.Sleep(250 * time.Millisecond) + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 { + return fmt.Errorf("Expected no msgs for %q, got %d", "WQ22", si.State.Msgs) + } + if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for %q, got %d", "M", si.State.Msgs) + } + if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 { + fmt.Printf("si.State is %+v\n", si.State) + return fmt.Errorf("Expected 1 msg for %q, got %d", "S", si.State.Msgs) + } + return nil + }) - if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 { - t.Fatalf("Expected no msgs, got %d", si.State.Msgs) - } - if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 { - t.Fatalf("Expected 1 msg, got %d", si.State.Msgs) - } - if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 { - t.Fatalf("Expected 1 msg, got %d", si.State.Msgs) - } } func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) { @@ -2102,23 +2107,26 @@ func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + // Allow sync consumers to connect. + time.Sleep(500 * time.Millisecond) if _, err = js.Publish("foo", []byte("ok")); err != nil { t.Fatalf("Unexpected publish error: %v", err) } - time.Sleep(250 * time.Millisecond) - - // This one will be 0 since no other interest exists. - if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 { - t.Fatalf("Expected no msgs, got %d", si.State.Msgs) - } - if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 { - t.Fatalf("Expected 1 msg, got %d", si.State.Msgs) - } - if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 { - t.Fatalf("Expected 1 msg, got %d", si.State.Msgs) - } + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + // This one will be 0 since no other interest exists. + if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 { + return fmt.Errorf("Expected no msgs for %q, got %d", "WQ22", si.State.Msgs) + } + if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for %q, got %d", "M", si.State.Msgs) + } + if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for %q, got %d", "S", si.State.Msgs) + } + return nil + }) // Now create other interest on WQ22. sub, err := js.SubscribeSync("foo") @@ -2126,23 +2134,26 @@ func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + // Allow consumer state to propagate. + time.Sleep(500 * time.Millisecond) if _, err = js.Publish("foo", []byte("ok")); err != nil { t.Fatalf("Unexpected publish error: %v", err) } - time.Sleep(250 * time.Millisecond) - - // This one should be 1 now since we will hold for the other subscriber. - if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 1 { - t.Fatalf("Expected 1 msg, got %d", si.State.Msgs) - } - if si, _ := js.StreamInfo("M"); si.State.Msgs != 2 { - t.Fatalf("Expected 2 msgs, got %d", si.State.Msgs) - } - if si, _ := js.StreamInfo("S"); si.State.Msgs != 2 { - t.Fatalf("Expected 2 msgs, got %d", si.State.Msgs) - } + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + // This one will be 0 since no other interest exists. + if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for %q, got %d", "WQ22", si.State.Msgs) + } + if si, _ := js.StreamInfo("M"); si.State.Msgs != 2 { + return fmt.Errorf("Expected 2 msgs for %q, got %d", "M", si.State.Msgs) + } + if si, _ := js.StreamInfo("S"); si.State.Msgs != 2 { + return fmt.Errorf("Expected 2 msgs for %q, got %d", "S", si.State.Msgs) + } + return nil + }) } func TestJetStreamClusterInterestRetentionWithFilteredConsumers(t *testing.T) {