Fixed raft bug on catchup logic with external snapshots

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-12 19:46:30 -08:00
parent 7facbc995b
commit 4759560e29
4 changed files with 24 additions and 12 deletions

1
.gitignore vendored
View File

@@ -42,6 +42,7 @@ _testmain.go
# bin # bin
nats-server nats-server
gnatsd gnatsd
check
# coverage # coverage
coverage.out coverage.out

View File

@@ -3725,7 +3725,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// This will survive going across routes, etc. // This will survive going across routes, etc.
if !si.response { if !si.response {
var ci *ClientInfo var ci *ClientInfo
if hadPrevSi { if hadPrevSi && c.pa.hdr >= 0 {
var cis ClientInfo var cis ClientInfo
if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil { if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
ci = &cis ci = &cis

View File

@@ -1214,7 +1214,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
if !isLeader { if !isLeader {
// If over 32MB go ahead and compact if not the leader. // If over 32MB go ahead and compact if not the leader.
if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit { if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit {
n.Compact(ce.Index) doSnapshot()
} }
} }
} else { } else {
@@ -1370,6 +1370,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if lseq == 0 && mset.lastSeq() != 0 { if lseq == 0 && mset.lastSeq() != 0 {
continue continue
} }
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
if err != errLastSeqMismatch || !isRecovering { if err != errLastSeqMismatch || !isRecovering {
js.srv.Debugf("Got error processing JetStream msg: %v", err) js.srv.Debugf("Got error processing JetStream msg: %v", err)
@@ -3694,7 +3695,6 @@ RETRY:
mset.mu.Lock() mset.mu.Lock()
state := mset.store.State() state := mset.store.State()
sreq = mset.calculateSyncRequest(&state, snap) sreq = mset.calculateSyncRequest(&state, snap)
mset.mu.Unlock() mset.mu.Unlock()
if sreq == nil { if sreq == nil {
return return
@@ -3768,9 +3768,10 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
if err != nil { if err != nil {
return 0, errors.New("bad catchup msg") return 0, errors.New("bad catchup msg")
} }
// Put into our store // Put into our store
// Messages to be skipped have no subject or timestamp. // Messages to be skipped have no subject or timestamp.
// TODO(dlc) - formalize witrh skipMsgOp // TODO(dlc) - formalize with skipMsgOp
if subj == _EMPTY_ && ts == 0 { if subj == _EMPTY_ && ts == 0 {
lseq := mset.store.SkipMsg() lseq := mset.store.SkipMsg()
if lseq != seq { if lseq != seq {

View File

@@ -578,6 +578,7 @@ func (n *raft) PauseApply() {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
n.debug("Pausing apply channel")
n.paused = true n.paused = true
n.hcommit = n.commit n.hcommit = n.commit
} }
@@ -586,15 +587,17 @@ func (n *raft) ResumeApply() {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
n.debug("Resuming apply channel")
n.paused = false
// Run catchup.. // Run catchup..
if n.hcommit > n.commit { if n.hcommit > n.commit {
n.debug("Resuming %d replays", n.hcommit+1-n.commit)
for index := n.commit + 1; index <= n.hcommit; index++ { for index := n.commit + 1; index <= n.hcommit; index++ {
if err := n.applyCommit(index); err != nil { if err := n.applyCommit(index); err != nil {
break break
} }
} }
} }
n.paused = false
n.hcommit = 0 n.hcommit = 0
} }
@@ -668,6 +671,8 @@ func (n *raft) SendSnapshot(data []byte) error {
// all of the log entries up to and including index. This should not be called with // all of the log entries up to and including index. This should not be called with
// entries that have been applied to the FSM but have not been applied to the raft state. // entries that have been applied to the FSM but have not been applied to the raft state.
func (n *raft) InstallSnapshot(data []byte) error { func (n *raft) InstallSnapshot(data []byte) error {
n.debug("Installing snapshot of %d bytes", len(data))
n.Lock() n.Lock()
if n.state == Closed { if n.state == Closed {
n.Unlock() n.Unlock()
@@ -1659,12 +1664,14 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
if start < state.FirstSeq { if start < state.FirstSeq {
n.debug("Need to send snapshot to follower") n.debug("Need to send snapshot to follower")
var err error if lastIndex, err := n.sendSnapshotToFollower(ar.reply); err != nil {
if _, err = n.sendSnapshotToFollower(ar.reply); err != nil { n.error("Error sending snapshot to follower [%s]: %v", ar.peer, err)
n.error("Error sending snapshot to followers: %v", err)
n.attemptStepDown(noLeader) n.attemptStepDown(noLeader)
n.Unlock() n.Unlock()
return return
} else {
n.debug("Snapshot sent, reset first entry to %d", lastIndex)
start = lastIndex
} }
} }
@@ -1673,7 +1680,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
ae, err = n.loadFirstEntry() ae, err = n.loadFirstEntry()
} }
if err != nil || ae == nil { if err != nil || ae == nil {
n.debug("Could not find a starting entry for us: %v", err) n.debug("Could not find a starting entry: %v", err)
n.Unlock() n.Unlock()
return return
} }
@@ -1736,8 +1743,10 @@ func (n *raft) applyCommit(index uint64) error {
case EntrySnapshot: case EntrySnapshot:
committed = append(committed, e) committed = append(committed, e)
case EntryPeerState: case EntryPeerState:
if ps, err := decodePeerState(e.Data); err == nil { if n.state != Leader {
n.processPeerState(ps) if ps, err := decodePeerState(e.Data); err == nil {
n.processPeerState(ps)
}
} }
case EntryAddPeer: case EntryAddPeer:
newPeer := string(e.Data) newPeer := string(e.Data)
@@ -1916,6 +1925,7 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, subject, reply st
// Lock should be held. // Lock should be held.
func (n *raft) cancelCatchup() { func (n *raft) cancelCatchup() {
n.debug("Canceling catchup subscription since we are now up to date") n.debug("Canceling catchup subscription since we are now up to date")
if n.catchup != nil && n.catchup.sub != nil { if n.catchup != nil && n.catchup.sub != nil {
n.s.sysUnsubscribe(n.catchup.sub) n.s.sysUnsubscribe(n.catchup.sub)
} }
@@ -2106,7 +2116,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
} }
n.pindex = ae.pindex n.pindex = ae.pindex
n.pterm = ae.pterm n.pterm = ae.pterm
n.commit = ae.commit n.commit = ae.pindex
n.wal.Compact(n.pindex + 1) n.wal.Compact(n.pindex + 1)
// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.