From d74dba2df9785fea23a7d87d90deb360bfa11be4 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 22 Dec 2021 14:37:24 -0700 Subject: [PATCH] Replaced RAFT's append entry response channel Signed-off-by: Ivan Kozlovic --- server/consumer.go | 4 +- server/raft.go | 105 +++++++++++++++++++-------------------------- server/stream.go | 2 + 3 files changed, 48 insertions(+), 63 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 2afd135d..e4f2ed44 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1421,7 +1421,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { sz += len(proposal.data) if sz > maxBatch { node.ProposeDirect(entries) - sz, entries = 0, entries[:0] + // We need to re-craete `entries` because there is a reference + // to it in the node's pae map. + sz, entries = 0, nil } } if len(entries) > 0 { diff --git a/server/raft.go b/server/raft.go index c16b28f1..3dd9eedf 100644 --- a/server/raft.go +++ b/server/raft.go @@ -188,9 +188,9 @@ type raft struct { hcommit uint64 // Channels - propc chan *Entry + propc *ipQueue // of *Entry entryc *ipQueue // of *appendEntry - respc chan *appendEntryResponse + respc *ipQueue // of *appendEntryResponse applyc *ipQueue // of *CommittedEntry quit chan struct{} reqs chan *voteRequest @@ -243,7 +243,6 @@ type RaftConfig struct { } var ( - errProposalFailed = errors.New("raft: proposal failed") errNotLeader = errors.New("raft: not leader") errAlreadyLeader = errors.New("raft: already leader") errNilCfg = errors.New("raft: no config given") @@ -376,9 +375,9 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { wpsch: make(chan struct{}, 1), reqs: make(chan *voteRequest, 8), votes: make(chan *voteResponse, 32), - propc: make(chan *Entry, 8192), + propc: newIPQueue(), // of *Entry entryc: newIPQueue(), // of *appendEntry - respc: make(chan *appendEntryResponse, 32768), + respc: newIPQueue(), // of *appendEntryResponse applyc: newIPQueue(), // of *CommittedEntry leadc: make(chan bool, 8), stepdown: make(chan string, 8), @@ -608,9 +607,7 @@ func (n *raft) Propose(data []byte) error { propc := n.propc n.RUnlock() - // For entering and exiting the system, proposals and apply - // we will block. - propc <- &Entry{EntryNormal, data} + propc.push(&Entry{EntryNormal, data}) return nil } @@ -662,11 +659,7 @@ func (n *raft) ProposeAddPeer(peer string) error { propc := n.propc n.RUnlock() - select { - case propc <- &Entry{EntryAddPeer, []byte(peer)}: - default: - return errProposalFailed - } + propc.push(&Entry{EntryAddPeer, []byte(peer)}) return nil } @@ -684,11 +677,7 @@ func (n *raft) ProposeRemovePeer(peer string) error { } if isLeader { - select { - case propc <- &Entry{EntryRemovePeer, []byte(peer)}: - default: - return errProposalFailed - } + propc.push(&Entry{EntryRemovePeer, []byte(peer)}) return nil } @@ -1526,10 +1515,11 @@ func (n *raft) runAsFollower() { n.switchToCandidate() return } - case <-n.respc: - // Ignore case <-n.votes: n.debug("Ignoring old vote response, we have stepped down") + case <-n.respc.ch: + // Ignore + n.respc.popOne() case vreq := <-n.reqs: n.processVoteRequest(vreq) case newLeader := <-n.stepdown: @@ -1749,11 +1739,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ return } - select { - case propc <- &Entry{EntryRemovePeer, []byte(peer)}: - default: - n.warn("Failed to place peer removal proposal onto propose channel") - } + propc.push(&Entry{EntryRemovePeer, []byte(peer)}) } // Called when a peer has forwarded a proposal. @@ -1774,11 +1760,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, return } - select { - case propc <- &Entry{EntryNormal, msg}: - default: - n.warn("Failed to place forwarded proposal onto propose channel") - } + propc.push(&Entry{EntryNormal, msg}) } func (n *raft) runAsLeader() { @@ -1822,24 +1804,32 @@ func (n *raft) runAsLeader() { return case <-n.quit: return - case ar := <-n.respc: - n.processAppendEntryResponse(ar) - case b := <-n.propc: - entries := []*Entry{b} - if b.Type == EntryNormal { - const maxBatch = 256 * 1024 - gather: - for sz := 0; sz < maxBatch && len(entries) < math.MaxUint16; { - select { - case e := <-n.propc: - entries = append(entries, e) - sz += len(e.Data) + 1 - default: - break gather - } - } + case <-n.respc.ch: + ars := n.respc.pop() + for _, ari := range ars { + ar := ari.(*appendEntryResponse) + n.processAppendEntryResponse(ar) } - n.sendAppendEntry(entries) + n.respc.recycle(&ars) + case <-n.propc.ch: + const maxBatch = 256 * 1024 + var entries []*Entry + + es := n.propc.pop() + sz := 0 + for i, bi := range es { + b := bi.(*Entry) + entries = append(entries, b) + sz += len(b.Data) + 1 + if i != len(es)-1 && sz < maxBatch && len(entries) < math.MaxUint16 { + continue + } + n.sendAppendEntry(entries) + // We need to re-craete `entries` because there is a reference + // to it in the node's pae map. + entries = nil + } + n.propc.recycle(&es) case <-hb.C: if n.notActive() { n.sendHeartbeat() @@ -2229,9 +2219,6 @@ func (n *raft) applyCommit(index uint64) error { if fpae { delete(n.pae, index) } - // For entering and exiting the system, proposals and apply we - // will block. - // TODO: Not the case with moving to queue, is that ok? n.applyc.push(&CommittedEntry{index, committed}) } else { // If we processed inline update our applied index. @@ -2276,7 +2263,7 @@ func (n *raft) trackResponse(ar *appendEntryResponse) { break } } - sendHB = len(n.propc) == 0 + sendHB = n.propc.len() == 0 } } @@ -2351,8 +2338,9 @@ func (n *raft) runAsCandidate() { select { case <-n.entryc.ch: n.processAppendEntries() - case <-n.respc: + case <-n.respc.ch: // Ignore + n.respc.popOne() case <-n.s.quitCh: n.shutdown(false) return @@ -2805,12 +2793,7 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Accoun msg = copyBytes(msg) ar := n.decodeAppendEntryResponse(msg) ar.reply = reply - - select { - case n.respc <- ar: - default: - n.warn("AppendEntryResponse failed to be placed on internal channel") - } + n.respc.push(ar) } func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { @@ -3327,10 +3310,8 @@ func (n *raft) switchState(state RaftState) { if n.state == Leader && state != Leader { n.updateLeadChange(false) - // Drain our responses channel. - for len(n.respc) > 0 { - <-n.respc - } + // Drain the response queue. + n.respc.drain() } else if state == Leader && n.state != Leader { if len(n.pae) > 0 { n.pae = make(map[uint64]*appendEntry) diff --git a/server/stream.go b/server/stream.go index 2da43e0d..f477ebc0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1552,6 +1552,8 @@ func (mset *stream) skipMsgs(start, end uint64) { // So a single message does not get too big. if len(entries) > 10_000 { node.ProposeDirect(entries) + // We need to re-craete `entries` because there is a reference + // to it in the node's pae map. entries = entries[:0] } } else {