From f3f04e64426adcf4f70e91850bf58de3667f78da Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 15 Apr 2021 06:24:19 -0700 Subject: [PATCH] Fix flapper Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 109 +++++++++++++++---------------- server/stream.go | 3 + 2 files changed, 54 insertions(+), 58 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index ed176fdc..a0862af2 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5667,22 +5667,14 @@ func TestJetStreamClusterStreamInfoDeletedDetails(t *testing.T) { } func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) { - c := createJetStreamClusterExplicit(t, "MMS", 5) + c := createJetStreamClusterExplicit(t, "MMS", 9) defer c.shutdown() // Client for API requests. nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - // Origin - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - MaxAge: 100 * time.Millisecond, - }) - - ts := c.streamLeader("$G", "TEST") - - sendBatch := func(t *testing.T, n int) { + sendBatch := func(n int) { t.Helper() // Send a batch to a given subject. for i := 0; i < n; i++ { @@ -5692,9 +5684,9 @@ func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) { } } - checkStream := func(t *testing.T, stream string, num uint64) { + checkStream := func(stream string, num uint64) { t.Helper() - checkFor(t, 5*time.Second, 50*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { si, err := js.StreamInfo(stream) if err != nil { return err @@ -5706,54 +5698,55 @@ func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) { }) } - checkMirror := func(t *testing.T, num uint64) { t.Helper(); checkStream(t, "M", num) } - checkTest := func(t *testing.T, num uint64) { t.Helper(); checkStream(t, "TEST", num) } + checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) } + checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) } - for _, test := range []struct { - name string - replicas int - }{ - {"R-1", 1}, - {"R-2", 2}, - } { - t.Run(test.name, func(t *testing.T) { - // Create mirror now. - for ms := ts; ms == ts; { - _, err = js.AddStream(&nats.StreamConfig{ - Name: "M", - Mirror: &nats.StreamSource{Name: "TEST"}, - Replicas: test.replicas, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - ms = c.streamLeader("$G", "M") - if ms == ts { - // Delete and retry. - js.DeleteStream("M") - } else { - defer js.DeleteStream("M") - } - } - - sendBatch(t, 10) - checkMirror(t, 10) - - // Now shutdown the server with the mirror. - ms := c.streamLeader("$G", "M") - ms.Shutdown() - - // Send more messages but let them expire. - sendBatch(t, 10) - checkTest(t, 0) - - c.restartServer(ms) - c.waitOnStreamLeader("$G", "M") - - sendBatch(t, 10) - checkMirror(t, 20) - }) + // Origin + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + MaxAge: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) } + + ts := c.streamLeader("$G", "TEST") + ml := c.leader() + + // Create mirror now. + for ms := ts; ms == ts || ms == ml; { + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{Name: "TEST"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ms = c.streamLeader("$G", "M") + if ts == ms || ms == ml { + // Delete and retry. + js.DeleteStream("M") + } + } + + sendBatch(10) + checkMirror(10) + + // Now shutdown the server with the mirror. + ms := c.streamLeader("$G", "M") + ms.Shutdown() + + // Send more messages but let them expire. + sendBatch(10) + checkTest(0) + + c.restartServer(ms) + c.checkClusterFormed() + c.waitOnStreamLeader("$G", "M") + + sendBatch(10) + checkMirror(20) } func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) { diff --git a/server/stream.go b/server/stream.go index 5465a80f..36ce823e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1366,6 +1366,7 @@ func (mset *stream) setupMirrorConsumer() error { // Now send off request to create/update our consumer. This will be all API based even in single server mode. // We calculate durable names apriori so we do not need to save them off. + var state StreamState mset.store.FastState(&state) @@ -1440,7 +1441,9 @@ func (mset *stream) setupMirrorConsumer() error { // When an upstream stream expires messages or in general has messages that we want // that are no longer available we need to adjust here. + var state StreamState mset.store.FastState(&state) + if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream { for seq := state.LastSeq + 1; seq <= ccr.ConsumerInfo.Delivered.Stream; seq++ { if mset.node != nil {