From 4759560e297af3516a60e0b67eeb5ccfceffc483 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 12 Feb 2021 19:46:30 -0800 Subject: [PATCH] Fixed raft bug on catchup logic with external snapshots Signed-off-by: Derek Collison --- .gitignore | 1 + server/client.go | 2 +- server/jetstream_cluster.go | 7 ++++--- server/raft.go | 26 ++++++++++++++++++-------- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index f5876149..d4dc84d9 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,7 @@ _testmain.go # bin nats-server gnatsd +check # coverage coverage.out diff --git a/server/client.go b/server/client.go index 0f2a49b7..50b16dd1 100644 --- a/server/client.go +++ b/server/client.go @@ -3725,7 +3725,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // This will survive going across routes, etc. if !si.response { var ci *ClientInfo - if hadPrevSi { + if hadPrevSi && c.pa.hdr >= 0 { var cis ClientInfo if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil { ci = &cis diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3e0f8f5d..7c41ce95 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1214,7 +1214,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { if !isLeader { // If over 32MB go ahead and compact if not the leader. if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit { - n.Compact(ce.Index) + doSnapshot() } } } else { @@ -1370,6 +1370,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if lseq == 0 && mset.lastSeq() != 0 { continue } + if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { if err != errLastSeqMismatch || !isRecovering { js.srv.Debugf("Got error processing JetStream msg: %v", err) @@ -3694,7 +3695,6 @@ RETRY: mset.mu.Lock() state := mset.store.State() sreq = mset.calculateSyncRequest(&state, snap) - mset.mu.Unlock() if sreq == nil { return @@ -3768,9 +3768,10 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { if err != nil { return 0, errors.New("bad catchup msg") } + // Put into our store // Messages to be skipped have no subject or timestamp. - // TODO(dlc) - formalize witrh skipMsgOp + // TODO(dlc) - formalize with skipMsgOp if subj == _EMPTY_ && ts == 0 { lseq := mset.store.SkipMsg() if lseq != seq { diff --git a/server/raft.go b/server/raft.go index 6de5cfbb..774ef0d3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -578,6 +578,7 @@ func (n *raft) PauseApply() { n.Lock() defer n.Unlock() + n.debug("Pausing apply channel") n.paused = true n.hcommit = n.commit } @@ -586,15 +587,17 @@ func (n *raft) ResumeApply() { n.Lock() defer n.Unlock() + n.debug("Resuming apply channel") + n.paused = false // Run catchup.. if n.hcommit > n.commit { + n.debug("Resuming %d replays", n.hcommit+1-n.commit) for index := n.commit + 1; index <= n.hcommit; index++ { if err := n.applyCommit(index); err != nil { break } } } - n.paused = false n.hcommit = 0 } @@ -668,6 +671,8 @@ func (n *raft) SendSnapshot(data []byte) error { // all of the log entries up to and including index. This should not be called with // entries that have been applied to the FSM but have not been applied to the raft state. func (n *raft) InstallSnapshot(data []byte) error { + n.debug("Installing snapshot of %d bytes", len(data)) + n.Lock() if n.state == Closed { n.Unlock() @@ -1659,12 +1664,14 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { if start < state.FirstSeq { n.debug("Need to send snapshot to follower") - var err error - if _, err = n.sendSnapshotToFollower(ar.reply); err != nil { - n.error("Error sending snapshot to followers: %v", err) + if lastIndex, err := n.sendSnapshotToFollower(ar.reply); err != nil { + n.error("Error sending snapshot to follower [%s]: %v", ar.peer, err) n.attemptStepDown(noLeader) n.Unlock() return + } else { + n.debug("Snapshot sent, reset first entry to %d", lastIndex) + start = lastIndex } } @@ -1673,7 +1680,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { ae, err = n.loadFirstEntry() } if err != nil || ae == nil { - n.debug("Could not find a starting entry for us: %v", err) + n.debug("Could not find a starting entry: %v", err) n.Unlock() return } @@ -1736,8 +1743,10 @@ func (n *raft) applyCommit(index uint64) error { case EntrySnapshot: committed = append(committed, e) case EntryPeerState: - if ps, err := decodePeerState(e.Data); err == nil { - n.processPeerState(ps) + if n.state != Leader { + if ps, err := decodePeerState(e.Data); err == nil { + n.processPeerState(ps) + } } case EntryAddPeer: newPeer := string(e.Data) @@ -1916,6 +1925,7 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, subject, reply st // Lock should be held. func (n *raft) cancelCatchup() { n.debug("Canceling catchup subscription since we are now up to date") + if n.catchup != nil && n.catchup.sub != nil { n.s.sysUnsubscribe(n.catchup.sub) } @@ -2106,7 +2116,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } n.pindex = ae.pindex n.pterm = ae.pterm - n.commit = ae.commit + n.commit = ae.pindex n.wal.Compact(n.pindex + 1) // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.