Purge operations would be replayed on restart regardless if they had already been processed.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-04 06:33:41 -08:00
parent d71c845a92
commit c49e3247bb
3 changed files with 60 additions and 5 deletions

View File

@@ -119,6 +119,7 @@ type consumerAssignment struct {
type streamPurge struct {
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
LastSeq uint64 `json:"last_seq"`
Subject string `json:"subject"`
Reply string `json:"reply"`
}
@@ -1342,7 +1343,9 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
continue
}
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
js.srv.Debugf("Got error processing JetStream msg: %v", err)
if err != errLastSeqMismatch || !isRecovering {
js.srv.Debugf("Got error processing JetStream msg: %v", err)
}
}
case deleteMsgOp:
md, err := decodeMsgDelete(buf[1:])
@@ -1357,8 +1360,8 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
} else {
removed, err = mset.EraseMsg(md.Seq)
}
if err != nil {
s.Warnf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err)
if err != nil && !isRecovering {
s.Debugf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err)
}
js.mu.RLock()
@@ -1383,6 +1386,11 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
if err != nil {
panic(err.Error())
}
// Ignore if we are recovering and we have already processed.
if isRecovering && mset.State().LastSeq > sp.LastSeq {
continue
}
s := js.server()
purged, err := mset.Purge()
if err != nil {
@@ -2774,7 +2782,7 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, mset *Stream, str
}
if n := sa.Group.node; n != nil {
sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci}
sp := &streamPurge{Stream: stream, LastSeq: mset.State().LastSeq, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeStreamPurge(sp))
} else if mset != nil {
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}

View File

@@ -2003,7 +2003,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
// Sanity checking for now.
if ae.pindex != seq-1 {
panic(fmt.Sprintf("[%s] Placed an entry at the wrong index, ae is %+v, index is %d\n\n", n.s, ae, seq))
panic(fmt.Sprintf("[%s-%s] Placed an entry at the wrong index, ae is %+v, index is %d\n\n", n.s, n.group, ae, seq))
}
n.pterm = ae.term

View File

@@ -3169,6 +3169,53 @@ func TestJetStreamClusterRemoveServer(t *testing.T) {
})
}
func TestJetStreamClusterPurgeReplayAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "P3F", 3)
defer c.shutdown()
// Client based API
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendBatch := func(n int) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}
sendBatch(10)
if err := js.PurgeStream("TEST"); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
sendBatch(10)
c.stopAll()
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
s = c.randomServer()
nc, js = jsClientConnect(t, s)
defer nc.Close()
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 10 {
t.Fatalf("Expected 10 msgs after restart, got %d", si.State.Msgs)
}
}
func TestJetStreamClusterSuperClusterBasics(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()