Replaced RAFT's append entry response channel

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-12-22 14:37:24 -07:00
parent b5979294db
commit d74dba2df9
3 changed files with 48 additions and 63 deletions

View File

@@ -1421,7 +1421,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
sz += len(proposal.data)
if sz > maxBatch {
node.ProposeDirect(entries)
sz, entries = 0, entries[:0]
// We need to re-craete `entries` because there is a reference
// to it in the node's pae map.
sz, entries = 0, nil
}
}
if len(entries) > 0 {

View File

@@ -188,9 +188,9 @@ type raft struct {
hcommit uint64
// Channels
propc chan *Entry
propc *ipQueue // of *Entry
entryc *ipQueue // of *appendEntry
respc chan *appendEntryResponse
respc *ipQueue // of *appendEntryResponse
applyc *ipQueue // of *CommittedEntry
quit chan struct{}
reqs chan *voteRequest
@@ -243,7 +243,6 @@ type RaftConfig struct {
}
var (
errProposalFailed = errors.New("raft: proposal failed")
errNotLeader = errors.New("raft: not leader")
errAlreadyLeader = errors.New("raft: already leader")
errNilCfg = errors.New("raft: no config given")
@@ -376,9 +375,9 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
wpsch: make(chan struct{}, 1),
reqs: make(chan *voteRequest, 8),
votes: make(chan *voteResponse, 32),
propc: make(chan *Entry, 8192),
propc: newIPQueue(), // of *Entry
entryc: newIPQueue(), // of *appendEntry
respc: make(chan *appendEntryResponse, 32768),
respc: newIPQueue(), // of *appendEntryResponse
applyc: newIPQueue(), // of *CommittedEntry
leadc: make(chan bool, 8),
stepdown: make(chan string, 8),
@@ -608,9 +607,7 @@ func (n *raft) Propose(data []byte) error {
propc := n.propc
n.RUnlock()
// For entering and exiting the system, proposals and apply
// we will block.
propc <- &Entry{EntryNormal, data}
propc.push(&Entry{EntryNormal, data})
return nil
}
@@ -662,11 +659,7 @@ func (n *raft) ProposeAddPeer(peer string) error {
propc := n.propc
n.RUnlock()
select {
case propc <- &Entry{EntryAddPeer, []byte(peer)}:
default:
return errProposalFailed
}
propc.push(&Entry{EntryAddPeer, []byte(peer)})
return nil
}
@@ -684,11 +677,7 @@ func (n *raft) ProposeRemovePeer(peer string) error {
}
if isLeader {
select {
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
default:
return errProposalFailed
}
propc.push(&Entry{EntryRemovePeer, []byte(peer)})
return nil
}
@@ -1526,10 +1515,11 @@ func (n *raft) runAsFollower() {
n.switchToCandidate()
return
}
case <-n.respc:
// Ignore
case <-n.votes:
n.debug("Ignoring old vote response, we have stepped down")
case <-n.respc.ch:
// Ignore
n.respc.popOne()
case vreq := <-n.reqs:
n.processVoteRequest(vreq)
case newLeader := <-n.stepdown:
@@ -1749,11 +1739,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _
return
}
select {
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
default:
n.warn("Failed to place peer removal proposal onto propose channel")
}
propc.push(&Entry{EntryRemovePeer, []byte(peer)})
}
// Called when a peer has forwarded a proposal.
@@ -1774,11 +1760,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account,
return
}
select {
case propc <- &Entry{EntryNormal, msg}:
default:
n.warn("Failed to place forwarded proposal onto propose channel")
}
propc.push(&Entry{EntryNormal, msg})
}
func (n *raft) runAsLeader() {
@@ -1822,24 +1804,32 @@ func (n *raft) runAsLeader() {
return
case <-n.quit:
return
case ar := <-n.respc:
n.processAppendEntryResponse(ar)
case b := <-n.propc:
entries := []*Entry{b}
if b.Type == EntryNormal {
const maxBatch = 256 * 1024
gather:
for sz := 0; sz < maxBatch && len(entries) < math.MaxUint16; {
select {
case e := <-n.propc:
entries = append(entries, e)
sz += len(e.Data) + 1
default:
break gather
}
}
case <-n.respc.ch:
ars := n.respc.pop()
for _, ari := range ars {
ar := ari.(*appendEntryResponse)
n.processAppendEntryResponse(ar)
}
n.sendAppendEntry(entries)
n.respc.recycle(&ars)
case <-n.propc.ch:
const maxBatch = 256 * 1024
var entries []*Entry
es := n.propc.pop()
sz := 0
for i, bi := range es {
b := bi.(*Entry)
entries = append(entries, b)
sz += len(b.Data) + 1
if i != len(es)-1 && sz < maxBatch && len(entries) < math.MaxUint16 {
continue
}
n.sendAppendEntry(entries)
// We need to re-craete `entries` because there is a reference
// to it in the node's pae map.
entries = nil
}
n.propc.recycle(&es)
case <-hb.C:
if n.notActive() {
n.sendHeartbeat()
@@ -2229,9 +2219,6 @@ func (n *raft) applyCommit(index uint64) error {
if fpae {
delete(n.pae, index)
}
// For entering and exiting the system, proposals and apply we
// will block.
// TODO: Not the case with moving to queue, is that ok?
n.applyc.push(&CommittedEntry{index, committed})
} else {
// If we processed inline update our applied index.
@@ -2276,7 +2263,7 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
break
}
}
sendHB = len(n.propc) == 0
sendHB = n.propc.len() == 0
}
}
@@ -2351,8 +2338,9 @@ func (n *raft) runAsCandidate() {
select {
case <-n.entryc.ch:
n.processAppendEntries()
case <-n.respc:
case <-n.respc.ch:
// Ignore
n.respc.popOne()
case <-n.s.quitCh:
n.shutdown(false)
return
@@ -2805,12 +2793,7 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Accoun
msg = copyBytes(msg)
ar := n.decodeAppendEntryResponse(msg)
ar.reply = reply
select {
case n.respc <- ar:
default:
n.warn("AppendEntryResponse failed to be placed on internal channel")
}
n.respc.push(ar)
}
func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
@@ -3327,10 +3310,8 @@ func (n *raft) switchState(state RaftState) {
if n.state == Leader && state != Leader {
n.updateLeadChange(false)
// Drain our responses channel.
for len(n.respc) > 0 {
<-n.respc
}
// Drain the response queue.
n.respc.drain()
} else if state == Leader && n.state != Leader {
if len(n.pae) > 0 {
n.pae = make(map[uint64]*appendEntry)

View File

@@ -1552,6 +1552,8 @@ func (mset *stream) skipMsgs(start, end uint64) {
// So a single message does not get too big.
if len(entries) > 10_000 {
node.ProposeDirect(entries)
// We need to re-craete `entries` because there is a reference
// to it in the node's pae map.
entries = entries[:0]
}
} else {