mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -2049,22 +2049,27 @@ func TestJetStreamClusterMirrorAndSourceWorkQueues(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
// Allow sync consumers to connect.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if _, err = js.Publish("foo", []byte("ok")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
|
||||
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
|
||||
return fmt.Errorf("Expected no msgs for %q, got %d", "WQ22", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
|
||||
return fmt.Errorf("Expected 1 msg for %q, got %d", "M", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
|
||||
fmt.Printf("si.State is %+v\n", si.State)
|
||||
return fmt.Errorf("Expected 1 msg for %q, got %d", "S", si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
|
||||
t.Fatalf("Expected no msgs, got %d", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
|
||||
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
|
||||
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) {
|
||||
@@ -2102,23 +2107,26 @@ func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
// Allow sync consumers to connect.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if _, err = js.Publish("foo", []byte("ok")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
// This one will be 0 since no other interest exists.
|
||||
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
|
||||
t.Fatalf("Expected no msgs, got %d", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
|
||||
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
|
||||
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
|
||||
}
|
||||
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
|
||||
// This one will be 0 since no other interest exists.
|
||||
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
|
||||
return fmt.Errorf("Expected no msgs for %q, got %d", "WQ22", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
|
||||
return fmt.Errorf("Expected 1 msg for %q, got %d", "M", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
|
||||
return fmt.Errorf("Expected 1 msg for %q, got %d", "S", si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Now create other interest on WQ22.
|
||||
sub, err := js.SubscribeSync("foo")
|
||||
@@ -2126,23 +2134,26 @@ func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
// Allow consumer state to propagate.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if _, err = js.Publish("foo", []byte("ok")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
// This one should be 1 now since we will hold for the other subscriber.
|
||||
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 1 {
|
||||
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("M"); si.State.Msgs != 2 {
|
||||
t.Fatalf("Expected 2 msgs, got %d", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("S"); si.State.Msgs != 2 {
|
||||
t.Fatalf("Expected 2 msgs, got %d", si.State.Msgs)
|
||||
}
|
||||
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
|
||||
// This one will be 0 since no other interest exists.
|
||||
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 1 {
|
||||
return fmt.Errorf("Expected 1 msg for %q, got %d", "WQ22", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("M"); si.State.Msgs != 2 {
|
||||
return fmt.Errorf("Expected 2 msgs for %q, got %d", "M", si.State.Msgs)
|
||||
}
|
||||
if si, _ := js.StreamInfo("S"); si.State.Msgs != 2 {
|
||||
return fmt.Errorf("Expected 2 msgs for %q, got %d", "S", si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterInterestRetentionWithFilteredConsumers(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user