mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2780 from nats-io/raft_updates
[IMPROVED] JetStream clustering with lots of streams/consumers
This commit is contained in:
@@ -485,21 +485,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
|
||||
|
||||
// outOfResources checks to see if we are out of resources.
|
||||
func (n *raft) outOfResources() bool {
|
||||
n.RLock()
|
||||
js := n.js
|
||||
if !n.track || js == nil {
|
||||
n.RUnlock()
|
||||
return false
|
||||
}
|
||||
n.RUnlock()
|
||||
|
||||
js.mu.RLock()
|
||||
jsDisabled := js.disabled
|
||||
js.mu.RUnlock()
|
||||
if jsDisabled {
|
||||
return false
|
||||
}
|
||||
|
||||
return js.limitsExceeded(n.wtype)
|
||||
}
|
||||
|
||||
@@ -1537,6 +1526,8 @@ func (n *raft) runAsFollower() {
|
||||
}
|
||||
case <-n.respc:
|
||||
// Ignore
|
||||
case <-n.votes:
|
||||
n.debug("Ignoring old vote response, we have stepped down")
|
||||
case vreq := <-n.reqs:
|
||||
n.processVoteRequest(vreq)
|
||||
case newLeader := <-n.stepdown:
|
||||
@@ -2827,10 +2818,6 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
|
||||
|
||||
// handleAppendEntryResponse processes responses to append entries.
|
||||
func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
|
||||
// Ignore if not the leader.
|
||||
if !n.Leader() {
|
||||
return
|
||||
}
|
||||
msg = copyBytes(msg)
|
||||
ar := n.decodeAppendEntryResponse(msg)
|
||||
ar.reply = reply
|
||||
@@ -3140,7 +3127,10 @@ func (n *raft) fileWriter() {
|
||||
n.RLock()
|
||||
copy(buf[0:], n.wtv)
|
||||
n.RUnlock()
|
||||
if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil {
|
||||
<-dios
|
||||
err := ioutil.WriteFile(tvf, buf[:], 0640)
|
||||
dios <- struct{}{}
|
||||
if err != nil {
|
||||
n.setWriteErr(err)
|
||||
n.warn("Error writing term and vote file for %q: %v", n.group, err)
|
||||
}
|
||||
@@ -3148,7 +3138,10 @@ func (n *raft) fileWriter() {
|
||||
n.RLock()
|
||||
buf := copyBytes(n.wps)
|
||||
n.RUnlock()
|
||||
if err := ioutil.WriteFile(psf, buf, 0640); err != nil {
|
||||
<-dios
|
||||
err := ioutil.WriteFile(psf, buf, 0640)
|
||||
dios <- struct{}{}
|
||||
if err != nil {
|
||||
n.setWriteErr(err)
|
||||
n.warn("Error writing peer state file for %q: %v", n.group, err)
|
||||
}
|
||||
@@ -3216,10 +3209,6 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _ *Account, _, r
|
||||
n.error("Received malformed vote response for %q", n.group)
|
||||
return
|
||||
}
|
||||
if state := n.State(); state != Candidate && state != Leader {
|
||||
n.debug("Ignoring old vote response, we have stepped down")
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case n.votes <- vr:
|
||||
|
||||
Reference in New Issue
Block a user