From ffe50d8573f655855963e9e686338cddd41b4568 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 12 Jan 2022 20:37:00 -0700 Subject: [PATCH] [IMPROVED] JetStream clustering with lots of streams/consumers Some operations could cause the route to block due to lock being held during store operations. On macOS, having lots of streams/consumers and restarting the cluster would cause lots of concurrent IO that would cause lock to be held for too long, causing head-of-line blocking in processing of messages from a route. Signed-off-by: Ivan Kozlovic --- server/raft.go | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/server/raft.go b/server/raft.go index 65333221..fb7687f9 100644 --- a/server/raft.go +++ b/server/raft.go @@ -485,21 +485,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { - n.RLock() js := n.js if !n.track || js == nil { - n.RUnlock() return false } - n.RUnlock() - - js.mu.RLock() - jsDisabled := js.disabled - js.mu.RUnlock() - if jsDisabled { - return false - } - return js.limitsExceeded(n.wtype) } @@ -1537,6 +1526,8 @@ func (n *raft) runAsFollower() { } case <-n.respc: // Ignore + case <-n.votes: + n.debug("Ignoring old vote response, we have stepped down") case vreq := <-n.reqs: n.processVoteRequest(vreq) case newLeader := <-n.stepdown: @@ -2827,10 +2818,6 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { // handleAppendEntryResponse processes responses to append entries. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - // Ignore if not the leader. - if !n.Leader() { - return - } msg = copyBytes(msg) ar := n.decodeAppendEntryResponse(msg) ar.reply = reply @@ -3140,7 +3127,10 @@ func (n *raft) fileWriter() { n.RLock() copy(buf[0:], n.wtv) n.RUnlock() - if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil { + <-dios + err := ioutil.WriteFile(tvf, buf[:], 0640) + dios <- struct{}{} + if err != nil { n.setWriteErr(err) n.warn("Error writing term and vote file for %q: %v", n.group, err) } @@ -3148,7 +3138,10 @@ func (n *raft) fileWriter() { n.RLock() buf := copyBytes(n.wps) n.RUnlock() - if err := ioutil.WriteFile(psf, buf, 0640); err != nil { + <-dios + err := ioutil.WriteFile(psf, buf, 0640) + dios <- struct{}{} + if err != nil { n.setWriteErr(err) n.warn("Error writing peer state file for %q: %v", n.group, err) } @@ -3216,10 +3209,6 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _ *Account, _, r n.error("Received malformed vote response for %q", n.group) return } - if state := n.State(); state != Candidate && state != Leader { - n.debug("Ignoring old vote response, we have stepped down") - return - } select { case n.votes <- vr: