Small raft improvements.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-05-02 16:44:27 -07:00
parent 9ef71893db
commit ae73f7be55

View File

@@ -1770,12 +1770,18 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) {
// Invoked when being notified that there is something in the entryc's queue
func (n *raft) processAppendEntries() {
ok := !n.outOfResources()
if !ok {
n.debug("AppendEntry not processing inbound, no resources")
canProcess := true
if n.isClosed() {
n.debug("AppendEntry not processing inbound, closed")
canProcess = false
}
if n.outOfResources() {
n.debug("AppendEntry not processing inbound, no resources")
canProcess = false
}
// Always pop the entries, but check if we can process them.
aes := n.entry.pop()
if ok {
if canProcess {
for _, ae := range aes {
n.processAppendEntry(ae, ae.sub)
}
@@ -3565,6 +3571,13 @@ func (n *raft) setWriteErrLocked(err error) {
}
}
// Helper to check if we are closed when we do not hold a lock already.
func (n *raft) isClosed() bool {
n.RLock()
defer n.RUnlock()
return n.state == Closed
}
// Capture our write error if any and hold.
func (n *raft) setWriteErr(err error) {
n.Lock()
@@ -3581,12 +3594,6 @@ func (n *raft) fileWriter() {
psf := filepath.Join(n.sd, peerStateFile)
n.RUnlock()
isClosed := func() bool {
n.RLock()
defer n.RUnlock()
return n.state == Closed
}
for s.isRunning() {
select {
case <-n.quit:
@@ -3599,7 +3606,7 @@ func (n *raft) fileWriter() {
<-dios
err := os.WriteFile(tvf, buf[:], 0640)
dios <- struct{}{}
if err != nil && !isClosed() {
if err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing term and vote file for %q: %v", n.group, err)
}
@@ -3610,7 +3617,7 @@ func (n *raft) fileWriter() {
<-dios
err := os.WriteFile(psf, buf, 0640)
dios <- struct{}{}
if err != nil && !isClosed() {
if err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing peer state file for %q: %v", n.group, err)
}