Split vote improvements

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-18 18:29:18 -08:00
parent d58e68051a
commit 048011d7f1

View File

@@ -400,7 +400,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
}
}
n.notice("Started")
n.debug("Started")
n.Lock()
n.resetElectionTimeout()
@@ -478,7 +478,7 @@ func (s *Server) transferRaftLeaders() bool {
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
s.Noticef("Transferring any raft leaders")
s.Debugf("Transferring any raft leaders")
}
for _, n := range s.raftNodes {
nodes = append(nodes, n)
@@ -1066,9 +1066,9 @@ func (n *raft) shutdown(shouldDelete bool) {
s.unregisterRaftNode(g)
if shouldDelete {
n.notice("Deleted")
n.debug("Deleted")
} else {
n.notice("Shutdown")
n.debug("Shutdown")
}
if wal != nil {
if shouldDelete {
@@ -1479,7 +1479,6 @@ func (n *raft) runAsLeader() {
n.switchToFollower(noLeader)
return
}
case vresp := <-n.votes:
if vresp.term > n.currentTerm() {
n.switchToFollower(noLeader)
@@ -1997,6 +1996,24 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return
}
// Are we receiving from another leader.
if n.state == Leader {
if ae.term > n.term {
n.term = ae.term
n.vote = noVote
n.writeTermVote()
n.debug("Received append entry from another leader, stepping down to %q", ae.leader)
n.attemptStepDown(ae.leader)
} else {
// Let them know we are the leader.
ar := &appendEntryResponse{n.term, n.pindex, n.id, false, _EMPTY_}
n.Unlock()
n.debug("AppendEntry ignoring old term from another leader")
n.sendRPC(ae.reply, _EMPTY_, ar.encode())
return
}
}
// If we received an append entry as a candidate we should convert to a follower.
if n.state == Candidate {
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
@@ -2026,8 +2043,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Ignore old terms.
if isNew && ae.term < n.term {
ar := &appendEntryResponse{n.term, n.pindex, n.id, false, _EMPTY_}
n.Unlock()
n.debug("AppendEntry ignoring old term")
n.sendRPC(ae.reply, _EMPTY_, ar.encode())
return
}
@@ -2090,7 +2109,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if eae, err := n.loadEntry(ae.pindex); err == nil && eae != nil {
// If terms mismatched, delete that entry and all others past it.
if ae.pterm > eae.pterm {
n.wal.Truncate(ae.pindex - 1)
n.wal.Truncate(ae.pindex)
n.pindex = ae.pindex
n.pterm = ae.pterm
ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
@@ -2252,8 +2271,18 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, subject,
n.trackPeer(ar.peer)
if ar.success {
n.trackResponse(ar)
} else if ar.reply != _EMPTY_ {
n.catchupFollower(ar)
} else {
// False here, check to make sure they do not have a higher term.
if ar.term > n.term {
n.term = ar.term
n.vote = noVote
n.writeTermVote()
n.Lock()
n.attemptStepDown(noLeader)
n.Unlock()
} else if ar.reply != _EMPTY_ {
n.catchupFollower(ar)
}
}
}
@@ -2516,6 +2545,11 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _, reply string,
n.error("Received malformed vote response for %q", n.group)
return
}
if state := n.State(); state != Candidate && state != Leader {
n.debug("Ignoring old vote response, we have stepped down")
return
}
select {
case n.votes <- vr:
default:
@@ -2525,21 +2559,18 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _, reply string,
}
func (n *raft) processVoteRequest(vr *voteRequest) error {
n.RLock()
vresp := voteResponse{n.term, n.id, false}
n.RUnlock()
n.debug("Received a voteRequest %+v", vr)
defer n.debug("Sending a voteResponse %+v -> %q", &vresp, vr.reply)
if err := n.trackPeer(vr.candidate); err != nil {
n.sendReply(vr.reply, vresp.encode())
return err
}
n.Lock()
n.resetElectionTimeout()
vresp := &voteResponse{n.term, n.id, false}
defer n.debug("Sending a voteResponse %+v -> %q", &vresp, vr.reply)
// Ignore if we are newer.
if vr.term < n.term {
n.Unlock()
@@ -2552,14 +2583,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
n.term = vr.term
n.vote = noVote
n.writeTermVote()
if n.state == Candidate {
if n.state != Follower {
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term)
n.attemptStepDown(noLeader)
}
}
// Only way we get to yes is through here.
if vr.lastIndex >= n.pindex && n.vote == noVote || n.vote == vr.candidate {
voteOk := n.vote == noVote || n.vote == vr.candidate
if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex {
vresp.granted = true
n.vote = vr.candidate
n.writeTermVote()
@@ -2588,7 +2620,7 @@ func (n *raft) requestVote() {
n.Lock()
if n.state != Candidate {
n.Unlock()
panic("raft requestVote not from candidate")
return
}
n.vote = n.id
n.writeTermVote()
@@ -2666,7 +2698,7 @@ func (n *raft) switchToFollower(leader string) {
if n.state == Closed {
return
}
n.notice("Switching to follower")
n.debug("Switching to follower")
n.leader = leader
n.switchState(Follower)
}
@@ -2678,7 +2710,7 @@ func (n *raft) switchToCandidate() {
return
}
if n.state != Candidate {
n.notice("Switching to candidate")
n.debug("Switching to candidate")
} else if n.lostQuorumLocked() {
// We signal to the upper layers such that can alert on quorum lost.
n.updateLeadChange(false)
@@ -2696,7 +2728,7 @@ func (n *raft) switchToLeader() {
if n.state == Closed {
return
}
n.notice("Switching to leader")
n.debug("Switching to leader")
n.leader = n.id
n.switchState(Leader)
}