diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index bb6048c6..77ed700d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -119,6 +119,7 @@ type consumerAssignment struct { type streamPurge struct { Client *ClientInfo `json:"client,omitempty"` Stream string `json:"stream"` + LastSeq uint64 `json:"last_seq"` Subject string `json:"subject"` Reply string `json:"reply"` } @@ -1342,7 +1343,9 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco continue } if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { - js.srv.Debugf("Got error processing JetStream msg: %v", err) + if err != errLastSeqMismatch || !isRecovering { + js.srv.Debugf("Got error processing JetStream msg: %v", err) + } } case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) @@ -1357,8 +1360,8 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco } else { removed, err = mset.EraseMsg(md.Seq) } - if err != nil { - s.Warnf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err) + if err != nil && !isRecovering { + s.Debugf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err) } js.mu.RLock() @@ -1383,6 +1386,11 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco if err != nil { panic(err.Error()) } + // Ignore if we are recovering and we have already processed. + if isRecovering && mset.State().LastSeq > sp.LastSeq { + continue + } + s := js.server() purged, err := mset.Purge() if err != nil { @@ -2774,7 +2782,7 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, mset *Stream, str } if n := sa.Group.node; n != nil { - sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci} + sp := &streamPurge{Stream: stream, LastSeq: mset.State().LastSeq, Subject: subject, Reply: reply, Client: ci} n.Propose(encodeStreamPurge(sp)) } else if mset != nil { var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} diff --git a/server/raft.go b/server/raft.go index 2e8d1bd6..d6054443 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2003,7 +2003,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error { // Sanity checking for now. if ae.pindex != seq-1 { - panic(fmt.Sprintf("[%s] Placed an entry at the wrong index, ae is %+v, index is %d\n\n", n.s, ae, seq)) + panic(fmt.Sprintf("[%s-%s] Placed an entry at the wrong index, ae is %+v, index is %d\n\n", n.s, n.group, ae, seq)) } n.pterm = ae.term diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 6662bd8e..1f8fc7c5 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -3169,6 +3169,53 @@ func TestJetStreamClusterRemoveServer(t *testing.T) { }) } +func TestJetStreamClusterPurgeReplayAfterRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "P3F", 3) + defer c.shutdown() + + // Client based API + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sendBatch := func(n int) { + t.Helper() + // Send a batch to a given subject. + for i := 0; i < n; i++ { + if _, err := js.Publish("TEST", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + } + + sendBatch(10) + if err := js.PurgeStream("TEST"); err != nil { + t.Fatalf("Unexpected purge error: %v", err) + } + sendBatch(10) + + c.stopAll() + c.restartAll() + + c.waitOnStreamLeader("$G", "TEST") + + s = c.randomServer() + nc, js = jsClientConnect(t, s) + defer nc.Close() + + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 10 { + t.Fatalf("Expected 10 msgs after restart, got %d", si.State.Msgs) + } +} + func TestJetStreamClusterSuperClusterBasics(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 3) defer sc.shutdown()