From 333e2fc2f14a82c08a87938348af9d83ce39ff65 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 7 Jul 2022 13:42:41 -0700 Subject: [PATCH] Fix for stalled catchup in endless cycle on EOF trying to retrieve catchup msg. A customer experienced and endless failure to have a stream cacthup. The current leader was being asked for a message from a snapshot that was larger then what we had, resulting in EOF which silently failed. We now detect this and signal end of catchup and redo the bad snapshot if possible. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 21 ++++++++++++ server/jetstream_cluster_test.go | 56 ++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9315c37a..7c412799 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6275,6 +6275,9 @@ RETRY: } msgsQ.recycle(&mrecs) case <-notActive.C: + if mrecs := msgsQ.pop(); len(mrecs) > 0 { + msgsQ.recycle(&mrecs) + } s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.name()) goto RETRY case <-s.quitCh: @@ -6645,6 +6648,24 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { sm, err := mset.store.LoadMsg(seq, &smv) // if this is not a deleted msg, bail out. if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg { + if err == ErrStoreEOF { + var state StreamState + mset.store.FastState(&state) + if seq > state.LastSeq { + // The snapshot has a larger last sequence then we have. This could be due to a truncation + // when trying to recover after corruption, still not 100% sure. Could be off by 1 too somehow, + // but tested a ton of those with no success. + s.Warnf("Catchup for stream '%s > %s' completed, but requested sequence %d was larger then current state: %+v", + mset.account(), mset.name(), seq, state) + // Try our best to redo our invalidated snapshot as well. + if n := mset.raftNode(); n != nil { + n.InstallSnapshot(mset.stateSnapshot()) + } + // Signal EOF + s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) + return false + } + } s.Warnf("Error loading message for catchup '%s > %s': %v", mset.account(), mset.name(), err) return false } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 13242115..1373e317 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -11541,3 +11541,59 @@ func TestJetStreamClusterLeafNodeSPOFMigrateLeaders(t *testing.T) { return err }) } + +func TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Shutdown a replica + rs := c.randomNonStreamLeader("$G", "TEST") + rs.Shutdown() + if s == rs { + nc.Close() + s = c.randomServer() + nc, js = jsClientConnect(t, s) + defer nc.Close() + } + + msg, toSend := []byte("OK"), 100 + for i := 0; i < toSend; i++ { + _, err := js.PublishAsync("foo", msg) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(2 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + sl := c.streamLeader("$G", "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + // Force snapshot + require_NoError(t, mset.raftNode().InstallSnapshot(mset.stateSnapshot())) + + // Now truncate the store on purpose. + err = mset.store.Truncate(50) + require_NoError(t, err) + + // Restart Server. + rs = c.restartServer(rs) + + // Make sure we can become current. + // With bug we would fail here. + c.waitOnStreamCurrent(rs, "$G", "TEST") +}