diff --git a/server/norace_test.go b/server/norace_test.go index 51005773..1ab99064 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8138,3 +8138,63 @@ func TestNoRaceCheckAckFloorWithVeryLargeFirstSeqAndNewConsumers(t *testing.T) { t.Fatalf("Check ack floor taking too long!") } } + +func TestNoRaceReplicatedMirrorWithLargeStartingSequenceOverLeafnode(t *testing.T) { + // Cluster B + tmpl := strings.Replace(jsClusterTempl, "store_dir:", "domain: B, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "B", _EMPTY_, 3, 22020, true) + defer c.shutdown() + + // Cluster A + // Domain is "A' + lc := c.createLeafNodesWithStartPortAndDomain("A", 3, 22110, "A") + defer lc.shutdown() + + lc.waitOnClusterReady() + + // Create a stream on B (HUB/CLOUD) and set its starting sequence very high. + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 1_000_000_000}) + require_NoError(t, err) + + // Send in a small amount of messages. + for i := 0; i < 1000; i++ { + sendStreamMsg(t, nc, "foo", "Hello") + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.FirstSeq == 1_000_000_000) + + // Now try to create a replicated mirror on the leaf cluster. + lnc, ljs := jsClientConnect(t, lc.randomServer()) + defer lnc.Close() + + _, err = ljs.AddStream(&nats.StreamConfig{ + Name: "TEST", + Mirror: &nats.StreamSource{ + Name: "TEST", + Domain: "B", + }, + }) + require_NoError(t, err) + + // Make sure we sync quickly. + checkFor(t, time.Second, 200*time.Millisecond, func() error { + si, err = ljs.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs == 1000 && si.State.FirstSeq == 1_000_000_000 { + return nil + } + return fmt.Errorf("Mirror state not correct: %+v", si.State) + }) +} diff --git a/server/stream.go b/server/stream.go index af1110f1..81c13e65 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1671,7 +1671,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err mset.mu.RUnlock() return 0, errors.New("sealed stream") } - store := mset.store + store, mlseq := mset.store, mset.lseq mset.mu.RUnlock() if preq != nil { @@ -1683,11 +1683,17 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err return purged, err } - // Purge consumers. + // Grab our stream state. var state StreamState store.FastState(&state) fseq, lseq := state.FirstSeq, state.LastSeq + // Check if our last has moved past what our original last sequence was, if so reset. + if lseq > mlseq { + mset.setLastSeq(lseq) + } + + // Purge consumers. // Check for filtered purge. if preq != nil && preq.Subject != _EMPTY_ { ss := store.FilteredState(state.FirstSeq, preq.Subject) @@ -2399,7 +2405,14 @@ func (mset *stream) setupMirrorConsumer() error { // Check if we need to skip messages. if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream { - mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream) + // Check to see if delivered is past our last and we have no msgs. This will help the + // case when mirroring a stream that has a very high starting sequence number. + if state.Msgs == 0 && ccr.ConsumerInfo.Delivered.Stream > state.LastSeq { + mset.store.PurgeEx(_EMPTY_, ccr.ConsumerInfo.Delivered.Stream+1, 0) + mset.lseq = ccr.ConsumerInfo.Delivered.Stream + } else { + mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream) + } } // Capture consumer name.