Additional stabilizations for split votes, general fixes

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-20 11:27:25 -08:00
parent 7c0b6faf2c
commit 4c965323f2
2 changed files with 31 additions and 3 deletions

View File

@@ -1551,6 +1551,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, stream, subj
err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, stream)
s.Warnf(err.Error())
doneCh <- err
return
}
}
})

View File

@@ -263,7 +263,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
propc: make(chan *Entry, 256),
applyc: make(chan *CommittedEntry, 256),
leadc: make(chan bool, 4),
stepdown: make(chan string),
stepdown: make(chan string, 4),
}
n.c.registerWithAccount(sacc)
@@ -663,6 +663,7 @@ func randCampaignTimeout() time.Duration {
// Campaign will have our node start a leadership vote.
// Lock should be held.
func (n *raft) campaign() error {
n.debug("Starting campaign")
if n.state == Leader {
return errAlreadyLeader
}
@@ -864,12 +865,16 @@ func (n *raft) notice(format string, args ...interface{}) {
n.s.Noticef(nf, args...)
}
func (n *raft) runAsFollower() {
func (n *raft) electTimer() *time.Timer {
n.RLock()
elect := n.elect
n.RUnlock()
return elect
}
func (n *raft) runAsFollower() {
for {
elect := n.electTimer()
select {
case <-n.s.quitCh:
return
@@ -1354,7 +1359,6 @@ func (n *raft) trackPeer(peer string) error {
func (n *raft) runAsCandidate() {
n.Lock()
elect := n.elect
// Drain old responses.
for len(n.votes) > 0 {
<-n.votes
@@ -1368,6 +1372,7 @@ func (n *raft) runAsCandidate() {
votes := 1
for {
elect := n.electTimer()
select {
case <-n.s.quitCh:
return
@@ -1532,6 +1537,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.leader = ae.leader
n.vote = noVote
n.writeTermVote()
if isNew {
n.resetElectionTimeout()
}
}
// TODO(dlc) - Do both catchup and delete new behaviors from spec.
@@ -1878,6 +1886,7 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _, reply string,
select {
case n.votes <- vr:
default:
// FIXME(dlc)
n.error("Failed to place vote response on chan for %q", n.group)
}
}
@@ -1888,6 +1897,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
n.RUnlock()
n.debug("Received a voteRequest %+v", vr)
defer n.debug("Sending a voteResponse %+v -> %q", &vresp, vr.reply)
if err := n.trackPeer(vr.candidate); err != nil {
n.sendReply(vr.reply, vresp.encode())
@@ -1903,11 +1913,26 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
return nil
}
// If this is a higher term go ahead and stepdown.
if vresp.term > n.term {
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
stepdown := n.stepdown
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.Unlock()
stepdown <- noLeader
n.sendReply(vr.reply, vresp.encode())
return nil
}
// Only way we get to yes is through here.
if vr.lastIndex >= n.pindex && n.vote == noVote || n.vote == vr.candidate {
vresp.granted = true
n.vote = vr.candidate
n.writeTermVote()
n.resetElectionTimeout()
}
n.Unlock()
@@ -1941,6 +1966,8 @@ func (n *raft) requestVote() {
subj, reply := n.vsubj, n.vreply
n.Unlock()
n.debug("Sending out voteRequest %+v", vr)
// Now send it out.
n.sendRPC(subj, reply, vr.encode())
}