Reworked RAFT's leader change channel

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-12-27 10:42:36 -07:00
parent fc7a4047a5
commit 48fd559bfc

View File

@@ -194,8 +194,8 @@ type raft struct {
apply *ipQueue // of *CommittedEntry
reqs *ipQueue // of *voteRequest
votes *ipQueue // of *voteResponse
leadc chan bool
stepdown *ipQueue // of string
leadc chan bool
quit chan struct{}
}
@@ -378,8 +378,8 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
entry: newIPQueue(), // of *appendEntry
resp: newIPQueue(), // of *appendEntryResponse
apply: newIPQueue(), // of *CommittedEntry
leadc: make(chan bool, 8),
stepdown: newIPQueue(), // of string
leadc: make(chan bool, 1),
observer: cfg.Observer,
extSt: ps.domainExt,
}
@@ -3285,14 +3285,19 @@ func (n *raft) quorumNeeded() int {
// Lock should be held.
func (n *raft) updateLeadChange(isLeader bool) {
select {
case n.leadc <- isLeader:
case <-n.leadc:
// We had an old value not consumed.
// We don't care about values that have not been consumed (transitory states),
// so we dequeue any state that is pending and push the new one.
for {
select {
case n.leadc <- isLeader:
return
default:
n.error("Failed to post lead change to %v for %q", isLeader, n.group)
select {
case <-n.leadc:
default:
// May have been consumed by the "reader" go routine, so go back
// to the top of the loop and try to send again.
}
}
}
}