Fix RAFT WAL repair.

When we stored a message in the raft layer in a wrong position (state corrupt), we would panic, leaving the message there.
On restart we would truncate the WAL and try to repair, but we truncated to the wrong index of the bad entry.

This change also includes additional changes to truncateWAL and also reduces the conditional for panic on storeMsg.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-09-20 18:41:37 -07:00
parent e69c6164a9
commit 12bb46032c
3 changed files with 115 additions and 38 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.5.1-beta.4"
VERSION = "2.5.1-beta.5"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -3242,3 +3242,75 @@ func TestNoRaceJetStreamLastSubjSeqAndFilestoreCompact(t *testing.T) {
}
}
}
func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}, Replicas: 3}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
numMsgs := 1000
for i := 0; i < numMsgs; i++ {
js.PublishAsync("foo", []byte("WAL"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
for _, m := range fetchMsgs(t, sub, 100, 5*time.Second) {
m.Ack()
}
nc.Flush()
// Grab the consumer leader.
cl := c.consumerLeader("$G", "TEST", "dlc")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
o := mset.lookupConsumer("dlc")
if o == nil {
t.Fatalf("Error looking up consumer %q", "dlc")
}
// Grab underlying raft node and the WAL (filestore) and we will attempt to "corrupt" it.
node := o.raftNode().(*raft)
fs := node.wal.(*fileStore)
fcfg, cfg := fs.fcfg, fs.cfg.StreamConfig
c.stopAll()
// Manipulate directly with cluster down.
fs, err = newFileStore(fcfg, cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
state := fs.State()
_, _, msg, _, err := fs.LoadMsg(state.LastSeq)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ae := node.decodeAppendEntry(msg, nil, _EMPTY_)
// Let's put a non-contigous AppendEntry into the system.
// node.storeToWAL(ae) could panic so need to do by hand.
ae.pindex += 10
if _, _, err := fs.StoreMsg(_EMPTY_, nil, ae.encode()); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fs.Stop()
c.restartAllSamePorts()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "dlc")
}

View File

@@ -235,20 +235,22 @@ type RaftConfig struct {
}
var (
errProposalFailed = errors.New("raft: proposal failed")
errNotLeader = errors.New("raft: not leader")
errAlreadyLeader = errors.New("raft: already leader")
errNilCfg = errors.New("raft: no config given")
errCorruptPeers = errors.New("raft: corrupt peer state")
errStepdownFailed = errors.New("raft: stepdown failed")
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
errNodeClosed = errors.New("raft: node is closed")
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
errNoSnapAvailable = errors.New("raft: no snapshot available")
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
errStepdownNoPeer = errors.New("raft: stepdown failed, could not match new leader")
errProposalFailed = errors.New("raft: proposal failed")
errNotLeader = errors.New("raft: not leader")
errAlreadyLeader = errors.New("raft: already leader")
errNilCfg = errors.New("raft: no config given")
errCorruptPeers = errors.New("raft: corrupt peer state")
errStepdownFailed = errors.New("raft: stepdown failed")
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
errEntryStoreFailed = errors.New("raft: could not storeentry to WAL")
errNodeClosed = errors.New("raft: node is closed")
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
errNoSnapAvailable = errors.New("raft: no snapshot available")
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
errStepdownNoPeer = errors.New("raft: stepdown failed, could not match new leader")
errNoPeerState = errors.New("raft: no peerstate")
)
// This will bootstrap a raftNode by writing its config into the store directory.
@@ -335,7 +337,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
return nil, err
}
if ps == nil {
return nil, errors.New("raft: no peerstate")
return nil, errNoPeerState
}
n := &raft{
@@ -418,12 +420,17 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
for index := state.FirstSeq; index <= state.LastSeq; index++ {
ae, err := n.loadEntry(index)
if err != nil {
n.warn("Could not load %d from WAL [%+v] with error: %v", index, state, err)
continue
n.warn("Could not load %d from WAL [%+v]: %v", index, state, err)
if err := n.wal.Truncate(index); err != nil {
n.setWriteErrLocked(err)
}
break
}
if ae.pindex != index-1 {
n.warn("Corrupt WAL, truncating and fixing")
n.truncateWal(ae)
n.warn("Corrupt WAL, will truncate")
if err := n.wal.Truncate(index); err != nil {
n.setWriteErrLocked(err)
}
break
}
n.processAppendEntry(ae, nil)
@@ -2406,15 +2413,13 @@ func (n *raft) attemptStepDown(newLeader string) {
}
}
func (n *raft) truncateWal(ae *appendEntry) {
// Truncate our WAL and reset.
func (n *raft) truncateWAL(pterm, pindex uint64) {
n.debug("Truncating and repairing WAL")
if err := n.wal.Truncate(ae.pindex); err != nil {
n.pterm, n.pindex = pterm, pindex
if err := n.wal.Truncate(pindex); err != nil {
n.setWriteErrLocked(err)
return
}
n.pindex = ae.pindex
n.pterm = ae.term
}
// Lock should be held
@@ -2548,7 +2553,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if eae, err := n.loadEntry(ae.pindex); err == nil && eae != nil {
// If terms mismatched, delete that entry and all others past it.
if ae.pterm > eae.pterm {
n.truncateWal(ae)
n.truncateWAL(ae.pterm, ae.pindex)
ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
} else {
ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, true, _EMPTY_}
@@ -2567,11 +2572,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if catchingUp {
// Check if only our terms do not match here.
if ae.pindex == n.pindex {
n.truncateWal(ae)
n.cancelCatchup()
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.pterm = ae.pterm
n.truncateWAL(ae.pterm, ae.pindex)
n.cancelCatchup()
n.Unlock()
return
}
@@ -2628,11 +2632,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}
// Heartbeat or do we have entries.
isHeartbeat := len(ae.entries) == 0
// Save to our WAL if we have entries.
if !isHeartbeat {
if len(ae.entries) > 0 {
// Only store if an original which will have sub != nil
if sub != nil {
if err := n.storeToWAL(ae); err != nil {
@@ -2771,9 +2772,14 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
}
// Sanity checking for now.
if ae.pindex != seq-1 {
n.Unlock()
panic(fmt.Sprintf("[%s-%s] Placed an entry at the wrong index, ae is %+v, seq is %d, n.pindex is %d\n\n", n.s, n.group, ae, seq, n.pindex))
if index := ae.pindex + 1; index != seq {
// We are missing store state from our state.
if index > seq {
panic(fmt.Sprintf("[%s | %s] Wrong index, ae is %+v, seq is %d, n.pindex is %d\n\n", n.s, n.group, ae, seq, n.pindex))
}
// Truncate back to last known.
n.truncateWAL(n.pterm, n.pindex)
return errEntryStoreFailed
}
n.pterm = ae.term
@@ -2791,7 +2797,6 @@ func (n *raft) sendAppendEntry(entries []*Entry) {
// If we have entries store this in our wal.
if len(entries) > 0 {
if err := n.storeToWAL(ae); err != nil {
n.setWriteErrLocked(err)
return
}
// We count ourselves.