mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
@@ -5666,89 +5666,6 @@ func TestJetStreamClusterStreamInfoDeletedDetails(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "MMS", 9)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client for API requests.
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
sendBatch := func(n int) {
|
||||
t.Helper()
|
||||
// Send a batch to a given subject.
|
||||
for i := 0; i < n; i++ {
|
||||
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checkStream := func(stream string, num uint64) {
|
||||
t.Helper()
|
||||
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo(stream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if si.State.Msgs != num {
|
||||
return fmt.Errorf("Expected %d msgs, got %d", num, si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
|
||||
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }
|
||||
|
||||
// Origin
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
MaxAge: 100 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
ts := c.streamLeader("$G", "TEST")
|
||||
ml := c.leader()
|
||||
|
||||
// Create mirror now.
|
||||
for ms := ts; ms == ts || ms == ml; {
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "M",
|
||||
Mirror: &nats.StreamSource{Name: "TEST"},
|
||||
Replicas: 2,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
ms = c.streamLeader("$G", "M")
|
||||
if ts == ms || ms == ml {
|
||||
// Delete and retry.
|
||||
js.DeleteStream("M")
|
||||
}
|
||||
}
|
||||
|
||||
sendBatch(10)
|
||||
checkMirror(10)
|
||||
|
||||
// Now shutdown the server with the mirror.
|
||||
ms := c.streamLeader("$G", "M")
|
||||
ms.Shutdown()
|
||||
|
||||
// Send more messages but let them expire.
|
||||
sendBatch(10)
|
||||
checkTest(0)
|
||||
|
||||
c.restartServer(ms)
|
||||
c.checkClusterFormed()
|
||||
c.waitOnStreamLeader("$G", "M")
|
||||
|
||||
sendBatch(10)
|
||||
checkMirror(20)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "MSE", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
@@ -1756,6 +1756,89 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "MMS", 9)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client for API requests.
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
sendBatch := func(n int) {
|
||||
t.Helper()
|
||||
// Send a batch to a given subject.
|
||||
for i := 0; i < n; i++ {
|
||||
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checkStream := func(stream string, num uint64) {
|
||||
t.Helper()
|
||||
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo(stream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if si.State.Msgs != num {
|
||||
return fmt.Errorf("Expected %d msgs, got %d", num, si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
|
||||
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }
|
||||
|
||||
// Origin
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
MaxAge: 100 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
ts := c.streamLeader("$G", "TEST")
|
||||
ml := c.leader()
|
||||
|
||||
// Create mirror now.
|
||||
for ms := ts; ms == ts || ms == ml; {
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "M",
|
||||
Mirror: &nats.StreamSource{Name: "TEST"},
|
||||
Replicas: 2,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
ms = c.streamLeader("$G", "M")
|
||||
if ts == ms || ms == ml {
|
||||
// Delete and retry.
|
||||
js.DeleteStream("M")
|
||||
}
|
||||
}
|
||||
|
||||
sendBatch(10)
|
||||
checkMirror(10)
|
||||
|
||||
// Now shutdown the server with the mirror.
|
||||
ms := c.streamLeader("$G", "M")
|
||||
ms.Shutdown()
|
||||
|
||||
// Send more messages but let them expire.
|
||||
sendBatch(10)
|
||||
checkTest(0)
|
||||
|
||||
c.restartServer(ms)
|
||||
c.checkClusterFormed()
|
||||
c.waitOnStreamLeader("$G", "M")
|
||||
|
||||
sendBatch(10)
|
||||
checkMirror(20)
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) {
|
||||
// Uncomment to run. Needs to be on a big machine.
|
||||
skip(t)
|
||||
|
||||
Reference in New Issue
Block a user