Moved back to channel handling of append entry to avoid inline processing with disk IO in route path.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-28 18:34:24 -08:00
parent 0a6958b98c
commit 74b416afa1

View File

@@ -173,6 +173,7 @@ type raft struct {
// Channels
propc chan *Entry
entryc chan *appendEntry
applyc chan *CommittedEntry
sendq chan *pubMsg
quit chan struct{}
@@ -338,8 +339,9 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
wpsch: make(chan struct{}, 1),
reqs: make(chan *voteRequest, 8),
votes: make(chan *voteResponse, 32),
propc: make(chan *Entry, 4096),
applyc: make(chan *CommittedEntry, 4096),
propc: make(chan *Entry, 8192),
entryc: make(chan *appendEntry, 8192),
applyc: make(chan *CommittedEntry, 8192),
leadc: make(chan bool, 8),
stepdown: make(chan string, 8),
}
@@ -1308,6 +1310,8 @@ func (n *raft) runAsFollower() {
for {
elect := n.electTimer()
select {
case ae := <-n.entryc:
n.processAppendEntry(ae, ae.sub)
case <-n.s.quitCh:
return
case <-n.quit:
@@ -1339,6 +1343,7 @@ type appendEntry struct {
entries []*Entry
// internal use only.
reply string
sub *subscription
buf []byte
}
@@ -1412,7 +1417,7 @@ func (ae *appendEntry) encode() []byte {
}
// This can not be used post the wire level callback since we do not copy.
func (n *raft) decodeAppendEntry(msg []byte, reply string) *appendEntry {
func (n *raft) decodeAppendEntry(msg []byte, sub *subscription, reply string) *appendEntry {
if len(msg) < appendEntryBaseLen {
return nil
}
@@ -1435,6 +1440,7 @@ func (n *raft) decodeAppendEntry(msg []byte, reply string) *appendEntry {
ri += int(le)
}
ae.reply = reply
ae.sub = sub
ae.buf = msg
return ae
}
@@ -1596,6 +1602,8 @@ func (n *raft) runAsLeader() {
case newLeader := <-n.stepdown:
n.switchToFollower(newLeader)
return
case ae := <-n.entryc:
n.processAppendEntry(ae, ae.sub)
}
}
}
@@ -1816,7 +1824,7 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
if err != nil {
return nil, err
}
return n.decodeAppendEntry(msg, _EMPTY_), nil
return n.decodeAppendEntry(msg, nil, _EMPTY_), nil
}
// applyCommit will update our commit index and apply the entry to the apply chan.
@@ -2027,6 +2035,8 @@ func (n *raft) runAsCandidate() {
}
case vreq := <-n.reqs:
n.processVoteRequest(vreq)
case ae := <-n.entryc:
n.processAppendEntry(ae, ae.sub)
case newLeader := <-n.stepdown:
n.switchToFollower(newLeader)
return
@@ -2034,18 +2044,15 @@ func (n *raft) runAsCandidate() {
}
}
// handleAppendEntry handles an append entry from the wire. We can't rely on msg being available
// past this callback so will do a bunch of processing here to avoid copies, channels etc.
// handleAppendEntry handles an append entry from the wire.
func (n *raft) handleAppendEntry(sub *subscription, c *client, subject, reply string, msg []byte) {
ae := n.decodeAppendEntry(msg, reply)
if ae == nil {
return
}
start := time.Now()
n.processAppendEntry(ae, sub)
d := time.Since(start)
if d > 20*time.Millisecond {
n.debug("INLINE PAE TOOK TOO LONG %v\n\n", d)
msg = append(msg[:0:0], msg...)
if ae := n.decodeAppendEntry(msg, sub, reply); ae != nil {
select {
case n.entryc <- ae:
default:
n.warn("AppendEntry failed to be placed on internal channel")
}
}
}
@@ -2435,7 +2442,7 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, subject,
}
func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
return &appendEntry{n.id, n.term, n.commit, n.pterm, n.pindex, entries, _EMPTY_, nil}
return &appendEntry{n.id, n.term, n.commit, n.pterm, n.pindex, entries, _EMPTY_, nil, nil}
}
// Store our append entry to our WAL.