Merge pull request #3249 from nats-io/catchup_eof

Fix for stalled catchup in endless cycle on EOF
This commit is contained in:
Derek Collison
2022-07-07 17:54:07 -07:00
committed by GitHub
2 changed files with 77 additions and 0 deletions

View File

@@ -6287,6 +6287,9 @@ RETRY:
}
msgsQ.recycle(&mrecs)
case <-notActive.C:
if mrecs := msgsQ.pop(); len(mrecs) > 0 {
msgsQ.recycle(&mrecs)
}
s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.name())
goto RETRY
case <-s.quitCh:
@@ -6657,6 +6660,24 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
sm, err := mset.store.LoadMsg(seq, &smv)
// if this is not a deleted msg, bail out.
if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg {
if err == ErrStoreEOF {
var state StreamState
mset.store.FastState(&state)
if seq > state.LastSeq {
// The snapshot has a larger last sequence then we have. This could be due to a truncation
// when trying to recover after corruption, still not 100% sure. Could be off by 1 too somehow,
// but tested a ton of those with no success.
s.Warnf("Catchup for stream '%s > %s' completed, but requested sequence %d was larger then current state: %+v",
mset.account(), mset.name(), seq, state)
// Try our best to redo our invalidated snapshot as well.
if n := mset.raftNode(); n != nil {
n.InstallSnapshot(mset.stateSnapshot())
}
// Signal EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
}
s.Warnf("Error loading message for catchup '%s > %s': %v", mset.account(), mset.name(), err)
return false
}

View File

@@ -11541,3 +11541,59 @@ func TestJetStreamClusterLeafNodeSPOFMigrateLeaders(t *testing.T) {
return err
})
}
func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
// Shutdown a replica
rs := c.randomNonStreamLeader("$G", "TEST")
rs.Shutdown()
if s == rs {
nc.Close()
s = c.randomServer()
nc, js = jsClientConnect(t, s)
defer nc.Close()
}
msg, toSend := []byte("OK"), 100
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("foo", msg)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(2 * time.Second):
t.Fatalf("Did not receive completion signal")
}
sl := c.streamLeader("$G", "TEST")
mset, err := sl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
// Force snapshot
require_NoError(t, mset.raftNode().InstallSnapshot(mset.stateSnapshot()))
// Now truncate the store on purpose.
err = mset.store.Truncate(50)
require_NoError(t, err)
// Restart Server.
rs = c.restartServer(rs)
// Make sure we can become current.
// With bug we would fail here.
c.waitOnStreamCurrent(rs, "$G", "TEST")
}