From 4fa0ea32c36e2ff790411bf03fa350737afff3b9 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 25 Feb 2023 19:07:27 -0800 Subject: [PATCH] [FIXED] If a truncate for a raft WAL failed we could spin. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 6 +++--- server/raft.go | 30 ++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ed2e6d5f..00f935e1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1008,7 +1008,7 @@ func (js *jetStream) monitorCluster() { if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else { + } else if err != errNoSnapAvailable { s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } @@ -1887,7 +1887,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else { + } else if err != errNoSnapAvailable { s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } @@ -4051,7 +4051,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else { + } else if err != errNoSnapAvailable { s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err) } } diff --git a/server/raft.go b/server/raft.go index eda9bd40..5b05798a 100644 --- a/server/raft.go +++ b/server/raft.go @@ -975,7 +975,6 @@ func (n *raft) InstallSnapshot(data []byte) error { // Remember our latest snapshot file. n.snapfile = sfile - if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) n.Unlock() @@ -1075,9 +1074,8 @@ func (n *raft) setupLastSnapshot() { n.pterm = snap.lastTerm n.commit = snap.lastIndex n.applied = snap.lastIndex - n.apply.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}) - if _, err := n.wal.Compact(snap.lastIndex); err != nil { + if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) } } @@ -2737,20 +2735,31 @@ func (n *raft) createCatchup(ae *appendEntry) string { func (n *raft) truncateWAL(term, index uint64) { n.debug("Truncating and repairing WAL") + defer func() { + // Check to see if we invalidated any snapshots that might have held state + // from the entries we are truncating. + if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index { + os.Remove(n.snapfile) + n.snapfile = _EMPTY_ + } + }() + if err := n.wal.Truncate(index); err != nil { - n.setWriteErrLocked(err) + // If we get an invalid sequence, reset our wal all together. + if err == ErrInvalidSequence { + n.debug("Resetting WAL") + n.wal.Truncate(0) + index, n.pterm, n.pindex = 0, 0, 0 + } else { + n.warn("Error truncating WAL: %v", err) + n.setWriteErrLocked(err) + } return } // Set after we know we have truncated properly. n.pterm, n.pindex = term, index - // Check to see if we invalidated any snapshots that might have held state - // from the entries we are truncating. - if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index { - os.Remove(n.snapfile) - n.snapfile = _EMPTY_ - } } // Lock should be held @@ -3140,6 +3149,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error { } else { // Truncate back to our last known. n.truncateWAL(n.pterm, n.pindex) + n.cancelCatchup() } return errEntryStoreFailed }