Replaced RAFT's append entry channel

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-12-22 14:20:06 -07:00
parent ceb06d6a13
commit b5979294db

View File

@@ -189,7 +189,7 @@ type raft struct {
// Channels
propc chan *Entry
entryc chan *appendEntry
entryc *ipQueue // of *appendEntry
respc chan *appendEntryResponse
applyc *ipQueue // of *CommittedEntry
quit chan struct{}
@@ -377,7 +377,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
reqs: make(chan *voteRequest, 8),
votes: make(chan *voteResponse, 32),
propc: make(chan *Entry, 8192),
entryc: make(chan *appendEntry, 32768),
entryc: newIPQueue(), // of *appendEntry
respc: make(chan *appendEntryResponse, 32768),
applyc: newIPQueue(), // of *CommittedEntry
leadc: make(chan bool, 8),
@@ -1493,12 +1493,22 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) {
n.extSt = extSt
}
// Invoked when being notified that there is something in the entryc's queue
func (n *raft) processAppendEntries() {
aes := n.entryc.pop()
for _, aei := range aes {
ae := aei.(*appendEntry)
n.processAppendEntry(ae, ae.sub)
}
n.entryc.recycle(&aes)
}
func (n *raft) runAsFollower() {
for {
elect := n.electTimer()
select {
case ae := <-n.entryc:
n.processAppendEntry(ae, ae.sub)
case <-n.entryc.ch:
n.processAppendEntries()
case <-n.s.quitCh:
n.shutdown(false)
return
@@ -1849,8 +1859,8 @@ func (n *raft) runAsLeader() {
case newLeader := <-n.stepdown:
n.switchToFollower(newLeader)
return
case ae := <-n.entryc:
n.processAppendEntry(ae, ae.sub)
case <-n.entryc.ch:
n.processAppendEntries()
}
}
}
@@ -2339,8 +2349,8 @@ func (n *raft) runAsCandidate() {
for {
elect := n.electTimer()
select {
case ae := <-n.entryc:
n.processAppendEntry(ae, ae.sub)
case <-n.entryc.ch:
n.processAppendEntries()
case <-n.respc:
// Ignore
case <-n.s.quitCh:
@@ -2409,11 +2419,7 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subje
msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
select {
case n.entryc <- ae:
default:
n.warn("AppendEntry failed to be placed on internal channel")
}
n.entryc.push(ae)
} else {
n.warn("AppendEntry failed to be placed on internal channel: corrupt entry")
}