mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -5667,22 +5667,14 @@ func TestJetStreamClusterStreamInfoDeletedDetails(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "MMS", 5)
|
||||
c := createJetStreamClusterExplicit(t, "MMS", 9)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client for API requests.
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Origin
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
MaxAge: 100 * time.Millisecond,
|
||||
})
|
||||
|
||||
ts := c.streamLeader("$G", "TEST")
|
||||
|
||||
sendBatch := func(t *testing.T, n int) {
|
||||
sendBatch := func(n int) {
|
||||
t.Helper()
|
||||
// Send a batch to a given subject.
|
||||
for i := 0; i < n; i++ {
|
||||
@@ -5692,9 +5684,9 @@ func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
checkStream := func(t *testing.T, stream string, num uint64) {
|
||||
checkStream := func(stream string, num uint64) {
|
||||
t.Helper()
|
||||
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
|
||||
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo(stream)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -5706,54 +5698,55 @@ func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
checkMirror := func(t *testing.T, num uint64) { t.Helper(); checkStream(t, "M", num) }
|
||||
checkTest := func(t *testing.T, num uint64) { t.Helper(); checkStream(t, "TEST", num) }
|
||||
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
|
||||
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }
|
||||
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
replicas int
|
||||
}{
|
||||
{"R-1", 1},
|
||||
{"R-2", 2},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
// Create mirror now.
|
||||
for ms := ts; ms == ts; {
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "M",
|
||||
Mirror: &nats.StreamSource{Name: "TEST"},
|
||||
Replicas: test.replicas,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
ms = c.streamLeader("$G", "M")
|
||||
if ms == ts {
|
||||
// Delete and retry.
|
||||
js.DeleteStream("M")
|
||||
} else {
|
||||
defer js.DeleteStream("M")
|
||||
}
|
||||
}
|
||||
|
||||
sendBatch(t, 10)
|
||||
checkMirror(t, 10)
|
||||
|
||||
// Now shutdown the server with the mirror.
|
||||
ms := c.streamLeader("$G", "M")
|
||||
ms.Shutdown()
|
||||
|
||||
// Send more messages but let them expire.
|
||||
sendBatch(t, 10)
|
||||
checkTest(t, 0)
|
||||
|
||||
c.restartServer(ms)
|
||||
c.waitOnStreamLeader("$G", "M")
|
||||
|
||||
sendBatch(t, 10)
|
||||
checkMirror(t, 20)
|
||||
})
|
||||
// Origin
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
MaxAge: 100 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
ts := c.streamLeader("$G", "TEST")
|
||||
ml := c.leader()
|
||||
|
||||
// Create mirror now.
|
||||
for ms := ts; ms == ts || ms == ml; {
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "M",
|
||||
Mirror: &nats.StreamSource{Name: "TEST"},
|
||||
Replicas: 2,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
ms = c.streamLeader("$G", "M")
|
||||
if ts == ms || ms == ml {
|
||||
// Delete and retry.
|
||||
js.DeleteStream("M")
|
||||
}
|
||||
}
|
||||
|
||||
sendBatch(10)
|
||||
checkMirror(10)
|
||||
|
||||
// Now shutdown the server with the mirror.
|
||||
ms := c.streamLeader("$G", "M")
|
||||
ms.Shutdown()
|
||||
|
||||
// Send more messages but let them expire.
|
||||
sendBatch(10)
|
||||
checkTest(0)
|
||||
|
||||
c.restartServer(ms)
|
||||
c.checkClusterFormed()
|
||||
c.waitOnStreamLeader("$G", "M")
|
||||
|
||||
sendBatch(10)
|
||||
checkMirror(20)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
|
||||
|
||||
@@ -1366,6 +1366,7 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
|
||||
// Now send off request to create/update our consumer. This will be all API based even in single server mode.
|
||||
// We calculate durable names apriori so we do not need to save them off.
|
||||
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
|
||||
@@ -1440,7 +1441,9 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
|
||||
// When an upstream stream expires messages or in general has messages that we want
|
||||
// that are no longer available we need to adjust here.
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
|
||||
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
|
||||
for seq := state.LastSeq + 1; seq <= ccr.ConsumerInfo.Delivered.Stream; seq++ {
|
||||
if mset.node != nil {
|
||||
|
||||
Reference in New Issue
Block a user