Replaced RAFT's vote request and response channels

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-12-23 18:05:51 -07:00
parent d74dba2df9
commit 2ad95f7e52

View File

@@ -193,8 +193,8 @@ type raft struct {
respc *ipQueue // of *appendEntryResponse
applyc *ipQueue // of *CommittedEntry
quit chan struct{}
reqs chan *voteRequest
votes chan *voteResponse
reqs *ipQueue // of *voteRequest
votes *ipQueue // of *voteResponse
leadc chan bool
stepdown chan string
}
@@ -373,8 +373,8 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
quit: make(chan struct{}),
wtvch: make(chan struct{}, 1),
wpsch: make(chan struct{}, 1),
reqs: make(chan *voteRequest, 8),
votes: make(chan *voteResponse, 32),
reqs: newIPQueue(), // of *voteRequest
votes: newIPQueue(), // of *voteResponse
propc: newIPQueue(), // of *Entry
entryc: newIPQueue(), // of *appendEntry
respc: newIPQueue(), // of *appendEntryResponse
@@ -1492,6 +1492,20 @@ func (n *raft) processAppendEntries() {
n.entryc.recycle(&aes)
}
func convertVoteRequest(i interface{}) *voteRequest {
if i == nil {
return nil
}
return i.(*voteRequest)
}
func convertVoteResponse(i interface{}) *voteResponse {
if i == nil {
return nil
}
return i.(*voteResponse)
}
func (n *raft) runAsFollower() {
for {
elect := n.electTimer()
@@ -1515,13 +1529,15 @@ func (n *raft) runAsFollower() {
n.switchToCandidate()
return
}
case <-n.votes:
case <-n.votes.ch:
n.debug("Ignoring old vote response, we have stepped down")
n.votes.popOne()
case <-n.respc.ch:
// Ignore
n.respc.popOne()
case vreq := <-n.reqs:
n.processVoteRequest(vreq)
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
n.processVoteRequest(convertVoteRequest(n.reqs.popOne()))
case newLeader := <-n.stepdown:
n.switchToFollower(newLeader)
return
@@ -1838,14 +1854,20 @@ func (n *raft) runAsLeader() {
n.switchToFollower(noLeader)
return
}
case vresp := <-n.votes:
case <-n.votes.ch:
// Because of drain() it is possible that we get nil from popOne().
vresp := convertVoteResponse(n.votes.popOne())
if vresp == nil {
continue
}
if vresp.term > n.currentTerm() {
n.switchToFollower(noLeader)
return
}
n.trackPeer(vresp.peer)
case vreq := <-n.reqs:
n.processVoteRequest(vreq)
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
n.processVoteRequest(convertVoteRequest(n.reqs.popOne()))
case newLeader := <-n.stepdown:
n.switchToFollower(newLeader)
return
@@ -2321,9 +2343,7 @@ func (n *raft) numActivePeers() int {
func (n *raft) runAsCandidate() {
n.Lock()
// Drain old responses.
for len(n.votes) > 0 {
<-n.votes
}
n.votes.drain()
n.Unlock()
// Send out our request for votes.
@@ -2354,7 +2374,12 @@ func (n *raft) runAsCandidate() {
n.switchToCandidate()
}
return
case vresp := <-n.votes:
case <-n.votes.ch:
// Because of drain() it is possible that we get nil from popOne().
vresp := convertVoteResponse(n.votes.popOne())
if vresp == nil {
continue
}
n.RLock()
nterm, lxfer := n.term, n.lxfer
n.RUnlock()
@@ -2389,8 +2414,9 @@ func (n *raft) runAsCandidate() {
n.attemptStepDown(noLeader)
return
}
case vreq := <-n.reqs:
n.processVoteRequest(vreq)
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
n.processVoteRequest(convertVoteRequest(n.reqs.popOne()))
case newLeader := <-n.stepdown:
n.switchToFollower(newLeader)
return
@@ -3177,15 +3203,20 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _ *Account, _, r
return
}
select {
case n.votes <- vr:
default:
// FIXME(dlc)
n.error("Failed to place vote response on chan for %q", n.group)
if state := n.State(); state != Candidate && state != Leader {
n.debug("Ignoring old vote response, we have stepped down")
return
}
n.votes.push(vr)
}
func (n *raft) processVoteRequest(vr *voteRequest) error {
// To simplify calling code, we can possibly pass `nil` to this function.
// If that is the case, does not consider it an error.
if vr == nil {
return nil
}
n.debug("Received a voteRequest %+v", vr)
if err := n.trackPeer(vr.candidate); err != nil {
@@ -3236,11 +3267,7 @@ func (n *raft) handleVoteRequest(sub *subscription, c *client, _ *Account, subje
n.error("Received malformed vote request for %q", n.group)
return
}
select {
case n.reqs <- vr:
default:
n.error("Failed to place vote request on chan for %q", n.group)
}
n.reqs.push(vr)
}
func (n *raft) requestVote() {