[FIXED] If a truncate for a raft WAL failed we could spin.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-02-25 19:07:27 -08:00
parent 65127d636e
commit 4fa0ea32c3
2 changed files with 23 additions and 13 deletions

View File

@@ -1008,7 +1008,7 @@ func (js *jetStream) monitorCluster() {
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else {
} else if err != errNoSnapAvailable {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
@@ -1887,7 +1887,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else {
} else if err != errNoSnapAvailable {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
@@ -4051,7 +4051,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else {
} else if err != errNoSnapAvailable {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}

View File

@@ -975,7 +975,6 @@ func (n *raft) InstallSnapshot(data []byte) error {
// Remember our latest snapshot file.
n.snapfile = sfile
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
n.Unlock()
@@ -1075,9 +1074,8 @@ func (n *raft) setupLastSnapshot() {
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}})
if _, err := n.wal.Compact(snap.lastIndex); err != nil {
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
}
@@ -2737,20 +2735,31 @@ func (n *raft) createCatchup(ae *appendEntry) string {
func (n *raft) truncateWAL(term, index uint64) {
n.debug("Truncating and repairing WAL")
defer func() {
// Check to see if we invalidated any snapshots that might have held state
// from the entries we are truncating.
if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
}()
if err := n.wal.Truncate(index); err != nil {
n.setWriteErrLocked(err)
// If we get an invalid sequence, reset our wal all together.
if err == ErrInvalidSequence {
n.debug("Resetting WAL")
n.wal.Truncate(0)
index, n.pterm, n.pindex = 0, 0, 0
} else {
n.warn("Error truncating WAL: %v", err)
n.setWriteErrLocked(err)
}
return
}
// Set after we know we have truncated properly.
n.pterm, n.pindex = term, index
// Check to see if we invalidated any snapshots that might have held state
// from the entries we are truncating.
if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
}
// Lock should be held
@@ -3140,6 +3149,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
} else {
// Truncate back to our last known.
n.truncateWAL(n.pterm, n.pindex)
n.cancelCatchup()
}
return errEntryStoreFailed
}