File writes to term and vote and peerstate were in the direct route path and could cause delays.

This moves the actual writes to a separate Go routine and also allows multiple writes to
be compressed into one write under load. We only want latest.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-24 20:47:31 -08:00
parent 2d24571e17
commit 73ba2d0b2f

View File

@@ -69,6 +69,7 @@ type WAL interface {
LoadMsg(index uint64) (subj string, hdr, msg []byte, ts int64, err error)
RemoveMsg(index uint64) (bool, error)
Compact(index uint64) (uint64, error)
Purge() (uint64, error)
Truncate(seq uint64) error
State() StreamState
Stop() error
@@ -121,6 +122,7 @@ type raft struct {
sd string
id string
wal WAL
werr error
state RaftState
hh hash.Hash64
snapfile string
@@ -153,6 +155,12 @@ type raft struct {
aesub *subscription
// For holding term and vote and peerstate to be written.
wtv []byte
wps []byte
wtvch chan struct{}
wpsch chan struct{}
// For when we need to catch up as a follower.
catchup *catchupState
@@ -194,7 +202,7 @@ type lps struct {
const (
minElectionTimeout = 1500 * time.Millisecond
maxElectionTimeout = 3 * minElectionTimeout
minCampaignTimeout = 50 * time.Millisecond
minCampaignTimeout = 100 * time.Millisecond
maxCampaignTimeout = 4 * minCampaignTimeout
hbInterval = 250 * time.Millisecond
lostQuorumInterval = hbInterval * 3
@@ -272,7 +280,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
// Check the store directory. If we have a memory based WAL we need to make sure the directory is setup.
if stat, err := os.Stat(cfg.Store); os.IsNotExist(err) {
if err := os.MkdirAll(cfg.Store, 0755); err != nil {
if err := os.MkdirAll(cfg.Store, 0750); err != nil {
return fmt.Errorf("raft: could not create storage directory - %v", err)
}
} else if stat == nil || !stat.IsDir() {
@@ -302,7 +310,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
hash := s.sys.shash
s.mu.Unlock()
if err := os.MkdirAll(path.Join(cfg.Store, snapshotsDir), 0755); err != nil {
if err := os.MkdirAll(path.Join(cfg.Store, snapshotsDir), 0750); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}
@@ -330,6 +338,8 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
c: s.createInternalSystemClient(),
sendq: sendq,
quit: make(chan struct{}),
wtvch: make(chan struct{}),
wpsch: make(chan struct{}),
reqs: make(chan *voteRequest, 8),
votes: make(chan *voteResponse, 32),
propc: make(chan *Entry, 256),
@@ -411,6 +421,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
s.registerRaftNode(n.group, n)
s.startGoRoutine(n.run)
s.startGoRoutine(n.fileWriter)
return n, nil
}
@@ -732,7 +743,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex)
sfile := path.Join(snapDir, sn)
if err := ioutil.WriteFile(sfile, n.encodeSnapshot(snap), 0644); err != nil {
if err := ioutil.WriteFile(sfile, n.encodeSnapshot(snap), 0640); err != nil {
n.Unlock()
return err
}
@@ -1826,7 +1837,7 @@ func (n *raft) applyCommit(index uint64) error {
n.qn = n.csz/2 + 1
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0}
}
writePeerState(n.sd, &peerState{n.peerNames(), n.csz})
n.writePeerState(&peerState{n.peerNames(), n.csz})
case EntryRemovePeer:
oldPeer := string(e.Data)
n.debug("Removing peer %q", oldPeer)
@@ -1839,7 +1850,7 @@ func (n *raft) applyCommit(index uint64) error {
n.qn = n.csz/2 + 1
delete(n.peers, oldPeer)
}
writePeerState(n.sd, &peerState{n.peerNames(), n.csz})
n.writePeerState(&peerState{n.peerNames(), n.csz})
// We pass these up as well.
committed = append(committed, e)
}
@@ -2046,6 +2057,15 @@ func (n *raft) attemptStepDown(newLeader string) {
func (n *raft) truncateWal(ae *appendEntry) {
n.debug("Truncating and repairing WAL")
// Special case if already at 0.
if ae.pindex == 0 {
n.pindex = ae.pindex
n.pterm = ae.pterm
n.wal.Purge()
return
}
tindex := ae.pindex - 1
n.wal.Truncate(tindex)
n.pindex = tindex
@@ -2155,7 +2175,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if ae.term > n.term {
n.term = ae.term
n.vote = noVote
n.writeTermVote()
if isNew {
n.writeTermVote()
}
if n.state != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.state, ae.leader)
n.attemptStepDown(ae.leader)
@@ -2166,10 +2188,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("AppendEntry updating leader to %q", ae.leader)
n.leader = ae.leader
n.writeTermVote()
if isNew {
n.resetElectionTimeout()
n.updateLeadChange(false)
}
n.resetElectionTimeout()
n.updateLeadChange(false)
}
if ae.pterm != n.pterm || ae.pindex != n.pindex {
@@ -2329,7 +2349,7 @@ func (n *raft) processPeerState(ps *peerState) {
}
}
n.debug("Update peers from leader to %+v", n.peers)
writePeerState(n.sd, ps)
n.writePeerState(ps)
}
// handleAppendEntryResponse processes responses to append entries.
@@ -2524,13 +2544,27 @@ func (n *raft) decodeVoteRequest(msg []byte, reply string) *voteRequest {
const peerStateFile = "peers.idx"
// Writes out our peer state.
// Lock should be held.
func (n *raft) writePeerState(ps *peerState) {
pse := encodePeerState(ps)
if bytes.Equal(n.wps, pse) {
return
}
// Stamp latest and kick writer.
n.wps = pse
select {
case n.wpsch <- struct{}{}:
default:
}
}
// Writes out our peer state outside of a specific raft context.
func writePeerState(sd string, ps *peerState) error {
psf := path.Join(sd, peerStateFile)
if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) {
return err
}
if err := ioutil.WriteFile(psf, encodePeerState(ps), 0644); err != nil {
if err := ioutil.WriteFile(psf, encodePeerState(ps), 0640); err != nil {
return err
}
return nil
@@ -2563,22 +2597,63 @@ func (n *raft) readTermVote() (term uint64, voted string, err error) {
return term, voted, nil
}
func (n *raft) setWriteErr(err error) {
n.Lock()
defer n.Unlock()
n.werr = err
}
func (n *raft) fileWriter() {
s := n.s
defer s.grWG.Done()
n.RLock()
tvf := path.Join(n.sd, termVoteFile)
psf := path.Join(n.sd, peerStateFile)
n.RUnlock()
for s.isRunning() {
select {
case <-n.quit:
return
case <-n.wtvch:
var buf [termVoteLen]byte
n.RLock()
copy(buf[0:], n.wtv)
n.RUnlock()
if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil {
n.setWriteErr(err)
}
case <-n.wpsch:
n.RLock()
buf := append(n.wps[:0:0], n.wps...)
n.RUnlock()
if err := ioutil.WriteFile(psf, buf, 0640); err != nil {
n.setWriteErr(err)
}
}
}
}
// writeTermVote will record the largest term and who we voted for to stable storage.
// Lock should be held.
func (n *raft) writeTermVote() error {
tvf := path.Join(n.sd, termVoteFile)
if _, err := os.Stat(tvf); err != nil && !os.IsNotExist(err) {
return err
}
func (n *raft) writeTermVote() {
var buf [termVoteLen]byte
var le = binary.LittleEndian
le.PutUint64(buf[0:], n.term)
// FIXME(dlc) - NoVote
copy(buf[8:], n.vote)
if err := ioutil.WriteFile(tvf, buf[:8+len(n.vote)], 0644); err != nil {
return err
b := buf[:8+len(n.vote)]
// If same as what we have we can ignore.
if bytes.Equal(n.wtv, b) {
return
}
// Stamp latest and kick writer.
n.wtv = b
select {
case n.wtvch <- struct{}{}:
default:
}
return nil
}
// voteResponse is a response to a vote request.