diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 061c3f5f..5d84853d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -770,7 +770,7 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b func (js *jetStream) monitorCluster() { s, n := js.server(), js.getMetaGroup() - qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyC() + qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyQ() defer s.grWG.Done() @@ -1466,7 +1466,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { return } - qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyC() + qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyQ() s.Debugf("Starting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) defer s.Debugf("Exiting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) @@ -2999,7 +2999,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { return } - qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyC() + qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyQ() s.Debugf("Starting consumer monitor for '%s > %s > %s", o.acc.Name, ca.Stream, ca.Name) defer s.Debugf("Exiting consumer monitor for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name) diff --git a/server/raft.go b/server/raft.go index f7f4e353..9aecaaa7 100644 --- a/server/raft.go +++ b/server/raft.go @@ -61,7 +61,7 @@ type RaftNode interface { AdjustClusterSize(csz int) error AdjustBootClusterSize(csz int) error ClusterSize() int - ApplyC() *ipQueue // of *CommittedEntry + ApplyQ() *ipQueue // of *CommittedEntry PauseApply() ResumeApply() LeadChangeC() <-chan bool @@ -188,10 +188,10 @@ type raft struct { hcommit uint64 // Queues and Channels - propc *ipQueue // of *Entry - entryc *ipQueue // of *appendEntry - respc *ipQueue // of *appendEntryResponse - applyc *ipQueue // of *CommittedEntry + prop *ipQueue // of *Entry + entry *ipQueue // of *appendEntry + resp *ipQueue // of *appendEntryResponse + apply *ipQueue // of *CommittedEntry reqs *ipQueue // of *voteRequest votes *ipQueue // of *voteResponse leadc chan bool @@ -374,10 +374,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { wpsch: make(chan struct{}, 1), reqs: newIPQueue(), // of *voteRequest votes: newIPQueue(), // of *voteResponse - propc: newIPQueue(), // of *Entry - entryc: newIPQueue(), // of *appendEntry - respc: newIPQueue(), // of *appendEntryResponse - applyc: newIPQueue(), // of *CommittedEntry + prop: newIPQueue(), // of *Entry + entry: newIPQueue(), // of *appendEntry + resp: newIPQueue(), // of *appendEntryResponse + apply: newIPQueue(), // of *CommittedEntry leadc: make(chan bool, 8), stepdown: newIPQueue(), // of string observer: cfg.Observer, @@ -441,7 +441,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { } // Send nil entry to signal the upper layers we are done doing replay/restore. - n.applyc.push(nil) + n.apply.push(nil) // Setup our internal subscriptions. if err := n.createInternalSubs(); err != nil { @@ -603,10 +603,10 @@ func (n *raft) Propose(data []byte) error { n.RUnlock() return werr } - propc := n.propc + prop := n.prop n.RUnlock() - propc.push(&Entry{EntryNormal, data}) + prop.push(&Entry{EntryNormal, data}) return nil } @@ -655,17 +655,17 @@ func (n *raft) ProposeAddPeer(peer string) error { n.RUnlock() return werr } - propc := n.propc + prop := n.prop n.RUnlock() - propc.push(&Entry{EntryAddPeer, []byte(peer)}) + prop.push(&Entry{EntryAddPeer, []byte(peer)}) return nil } // ProposeRemovePeer is called to remove a peer from the group. func (n *raft) ProposeRemovePeer(peer string) error { n.RLock() - propc, subj := n.propc, n.rpsubj + prop, subj := n.prop, n.rpsubj isLeader := n.state == Leader werr := n.werr n.RUnlock() @@ -676,7 +676,7 @@ func (n *raft) ProposeRemovePeer(peer string) error { } if isLeader { - propc.push(&Entry{EntryRemovePeer, []byte(peer)}) + prop.push(&Entry{EntryRemovePeer, []byte(peer)}) return nil } @@ -1005,7 +1005,7 @@ func (n *raft) setupLastSnapshot() { n.pterm = snap.lastTerm n.commit = snap.lastIndex n.applied = snap.lastIndex - n.applyc.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}) + n.apply.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}) if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) } @@ -1274,7 +1274,7 @@ func (n *raft) Peers() []*Peer { return peers } -func (n *raft) ApplyC() *ipQueue { return n.applyc } // queue of *CommittedEntry +func (n *raft) ApplyQ() *ipQueue { return n.apply } // queue of *CommittedEntry func (n *raft) LeadChangeC() <-chan bool { return n.leadc } func (n *raft) QuitC() <-chan struct{} { return n.quit } @@ -1479,12 +1479,12 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) { // Invoked when being notified that there is something in the entryc's queue func (n *raft) processAppendEntries() { - aes := n.entryc.pop() + aes := n.entry.pop() for _, aei := range aes { ae := aei.(*appendEntry) n.processAppendEntry(ae, ae.sub) } - n.entryc.recycle(&aes) + n.entry.recycle(&aes) } func convertVoteRequest(i interface{}) *voteRequest { @@ -1505,7 +1505,7 @@ func (n *raft) runAsFollower() { for { elect := n.electTimer() select { - case <-n.entryc.ch: + case <-n.entry.ch: n.processAppendEntries() case <-n.s.quitCh: n.shutdown(false) @@ -1527,9 +1527,9 @@ func (n *raft) runAsFollower() { case <-n.votes.ch: n.debug("Ignoring old vote response, we have stepped down") n.votes.popOne() - case <-n.respc.ch: + case <-n.resp.ch: // Ignore - n.respc.popOne() + n.resp.popOne() case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). n.processVoteRequest(convertVoteRequest(n.reqs.popOne())) @@ -1743,7 +1743,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ peer := string(copyBytes(msg)) n.RLock() - propc, werr := n.propc, n.werr + prop, werr := n.prop, n.werr n.RUnlock() // Ignore if we have had a write error previous. @@ -1751,7 +1751,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ return } - propc.push(&Entry{EntryRemovePeer, []byte(peer)}) + prop.push(&Entry{EntryRemovePeer, []byte(peer)}) } // Called when a peer has forwarded a proposal. @@ -1764,7 +1764,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, msg = copyBytes(msg) n.RLock() - propc, werr := n.propc, n.werr + prop, werr := n.prop, n.werr n.RUnlock() // Ignore if we have had a write error previous. @@ -1772,7 +1772,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, return } - propc.push(&Entry{EntryNormal, msg}) + prop.push(&Entry{EntryNormal, msg}) } func (n *raft) runAsLeader() { @@ -1816,18 +1816,18 @@ func (n *raft) runAsLeader() { return case <-n.quit: return - case <-n.respc.ch: - ars := n.respc.pop() + case <-n.resp.ch: + ars := n.resp.pop() for _, ari := range ars { ar := ari.(*appendEntryResponse) n.processAppendEntryResponse(ar) } - n.respc.recycle(&ars) - case <-n.propc.ch: + n.resp.recycle(&ars) + case <-n.prop.ch: const maxBatch = 256 * 1024 var entries []*Entry - es := n.propc.pop() + es := n.prop.pop() sz := 0 for i, bi := range es { b := bi.(*Entry) @@ -1841,7 +1841,7 @@ func (n *raft) runAsLeader() { // to it in the node's pae map. entries = nil } - n.propc.recycle(&es) + n.prop.recycle(&es) case <-hb.C: if n.notActive() { n.sendHeartbeat() @@ -1868,7 +1868,7 @@ func (n *raft) runAsLeader() { newLeader := n.stepdown.popOne().(string) n.switchToFollower(newLeader) return - case <-n.entryc.ch: + case <-n.entry.ch: n.processAppendEntries() } } @@ -2226,7 +2226,7 @@ func (n *raft) applyCommit(index uint64) error { if fpae { delete(n.pae, index) } - n.applyc.push(&CommittedEntry{index, committed}) + n.apply.push(&CommittedEntry{index, committed}) } else { // If we processed inline update our applied index. n.applied = index @@ -2266,7 +2266,7 @@ func (n *raft) trackResponse(ar *appendEntryResponse) { break } } - sendHB = n.propc.len() == 0 + sendHB = n.prop.len() == 0 } } @@ -2337,11 +2337,11 @@ func (n *raft) runAsCandidate() { for { elect := n.electTimer() select { - case <-n.entryc.ch: + case <-n.entry.ch: n.processAppendEntries() - case <-n.respc.ch: + case <-n.resp.ch: // Ignore - n.respc.popOne() + n.resp.popOne() case <-n.s.quitCh: n.shutdown(false) return @@ -2415,7 +2415,7 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subje msg = copyBytes(msg) if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil { - n.entryc.push(ae) + n.entry.push(ae) } else { n.warn("AppendEntry failed to be placed on internal channel: corrupt entry") } @@ -2665,7 +2665,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. - n.applyc.push(&CommittedEntry{n.commit, ae.entries[:1]}) + n.apply.push(&CommittedEntry{n.commit, ae.entries[:1]}) n.Unlock() return @@ -2790,7 +2790,7 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Accoun msg = copyBytes(msg) ar := n.decodeAppendEntryResponse(msg) ar.reply = reply - n.respc.push(ar) + n.resp.push(ar) } func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { @@ -3309,7 +3309,7 @@ func (n *raft) switchState(state RaftState) { if n.state == Leader && state != Leader { n.updateLeadChange(false) // Drain the response queue. - n.respc.drain() + n.resp.drain() } else if state == Leader && n.state != Leader { if len(n.pae) > 0 { n.pae = make(map[uint64]*appendEntry)