From 73ba2d0b2fe5806d36ca75aaa779b25b2f8182d1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 24 Feb 2021 20:47:31 -0800 Subject: [PATCH] File writes to term and vote and peerstate were in the direct route path and could cause delays. This moves the actual writes to a separate Go routine and also allows multiple writes to be compressed into one write under load. We only want latest. Signed-off-by: Derek Collison --- server/raft.go | 121 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 98 insertions(+), 23 deletions(-) diff --git a/server/raft.go b/server/raft.go index dd5ecd34..dd76ac71 100644 --- a/server/raft.go +++ b/server/raft.go @@ -69,6 +69,7 @@ type WAL interface { LoadMsg(index uint64) (subj string, hdr, msg []byte, ts int64, err error) RemoveMsg(index uint64) (bool, error) Compact(index uint64) (uint64, error) + Purge() (uint64, error) Truncate(seq uint64) error State() StreamState Stop() error @@ -121,6 +122,7 @@ type raft struct { sd string id string wal WAL + werr error state RaftState hh hash.Hash64 snapfile string @@ -153,6 +155,12 @@ type raft struct { aesub *subscription + // For holding term and vote and peerstate to be written. + wtv []byte + wps []byte + wtvch chan struct{} + wpsch chan struct{} + // For when we need to catch up as a follower. catchup *catchupState @@ -194,7 +202,7 @@ type lps struct { const ( minElectionTimeout = 1500 * time.Millisecond maxElectionTimeout = 3 * minElectionTimeout - minCampaignTimeout = 50 * time.Millisecond + minCampaignTimeout = 100 * time.Millisecond maxCampaignTimeout = 4 * minCampaignTimeout hbInterval = 250 * time.Millisecond lostQuorumInterval = hbInterval * 3 @@ -272,7 +280,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer // Check the store directory. If we have a memory based WAL we need to make sure the directory is setup. if stat, err := os.Stat(cfg.Store); os.IsNotExist(err) { - if err := os.MkdirAll(cfg.Store, 0755); err != nil { + if err := os.MkdirAll(cfg.Store, 0750); err != nil { return fmt.Errorf("raft: could not create storage directory - %v", err) } } else if stat == nil || !stat.IsDir() { @@ -302,7 +310,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { hash := s.sys.shash s.mu.Unlock() - if err := os.MkdirAll(path.Join(cfg.Store, snapshotsDir), 0755); err != nil { + if err := os.MkdirAll(path.Join(cfg.Store, snapshotsDir), 0750); err != nil { return nil, fmt.Errorf("could not create snapshots directory - %v", err) } @@ -330,6 +338,8 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { c: s.createInternalSystemClient(), sendq: sendq, quit: make(chan struct{}), + wtvch: make(chan struct{}), + wpsch: make(chan struct{}), reqs: make(chan *voteRequest, 8), votes: make(chan *voteResponse, 32), propc: make(chan *Entry, 256), @@ -411,6 +421,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { s.registerRaftNode(n.group, n) s.startGoRoutine(n.run) + s.startGoRoutine(n.fileWriter) return n, nil } @@ -732,7 +743,7 @@ func (n *raft) InstallSnapshot(data []byte) error { sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex) sfile := path.Join(snapDir, sn) - if err := ioutil.WriteFile(sfile, n.encodeSnapshot(snap), 0644); err != nil { + if err := ioutil.WriteFile(sfile, n.encodeSnapshot(snap), 0640); err != nil { n.Unlock() return err } @@ -1826,7 +1837,7 @@ func (n *raft) applyCommit(index uint64) error { n.qn = n.csz/2 + 1 n.peers[newPeer] = &lps{time.Now().UnixNano(), 0} } - writePeerState(n.sd, &peerState{n.peerNames(), n.csz}) + n.writePeerState(&peerState{n.peerNames(), n.csz}) case EntryRemovePeer: oldPeer := string(e.Data) n.debug("Removing peer %q", oldPeer) @@ -1839,7 +1850,7 @@ func (n *raft) applyCommit(index uint64) error { n.qn = n.csz/2 + 1 delete(n.peers, oldPeer) } - writePeerState(n.sd, &peerState{n.peerNames(), n.csz}) + n.writePeerState(&peerState{n.peerNames(), n.csz}) // We pass these up as well. committed = append(committed, e) } @@ -2046,6 +2057,15 @@ func (n *raft) attemptStepDown(newLeader string) { func (n *raft) truncateWal(ae *appendEntry) { n.debug("Truncating and repairing WAL") + + // Special case if already at 0. + if ae.pindex == 0 { + n.pindex = ae.pindex + n.pterm = ae.pterm + n.wal.Purge() + return + } + tindex := ae.pindex - 1 n.wal.Truncate(tindex) n.pindex = tindex @@ -2155,7 +2175,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if ae.term > n.term { n.term = ae.term n.vote = noVote - n.writeTermVote() + if isNew { + n.writeTermVote() + } if n.state != Follower { n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.state, ae.leader) n.attemptStepDown(ae.leader) @@ -2166,10 +2188,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.debug("AppendEntry updating leader to %q", ae.leader) n.leader = ae.leader n.writeTermVote() - if isNew { - n.resetElectionTimeout() - n.updateLeadChange(false) - } + n.resetElectionTimeout() + n.updateLeadChange(false) } if ae.pterm != n.pterm || ae.pindex != n.pindex { @@ -2329,7 +2349,7 @@ func (n *raft) processPeerState(ps *peerState) { } } n.debug("Update peers from leader to %+v", n.peers) - writePeerState(n.sd, ps) + n.writePeerState(ps) } // handleAppendEntryResponse processes responses to append entries. @@ -2524,13 +2544,27 @@ func (n *raft) decodeVoteRequest(msg []byte, reply string) *voteRequest { const peerStateFile = "peers.idx" -// Writes out our peer state. +// Lock should be held. +func (n *raft) writePeerState(ps *peerState) { + pse := encodePeerState(ps) + if bytes.Equal(n.wps, pse) { + return + } + // Stamp latest and kick writer. + n.wps = pse + select { + case n.wpsch <- struct{}{}: + default: + } +} + +// Writes out our peer state outside of a specific raft context. func writePeerState(sd string, ps *peerState) error { psf := path.Join(sd, peerStateFile) if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { return err } - if err := ioutil.WriteFile(psf, encodePeerState(ps), 0644); err != nil { + if err := ioutil.WriteFile(psf, encodePeerState(ps), 0640); err != nil { return err } return nil @@ -2563,22 +2597,63 @@ func (n *raft) readTermVote() (term uint64, voted string, err error) { return term, voted, nil } +func (n *raft) setWriteErr(err error) { + n.Lock() + defer n.Unlock() + n.werr = err +} + +func (n *raft) fileWriter() { + s := n.s + defer s.grWG.Done() + + n.RLock() + tvf := path.Join(n.sd, termVoteFile) + psf := path.Join(n.sd, peerStateFile) + n.RUnlock() + + for s.isRunning() { + select { + case <-n.quit: + return + case <-n.wtvch: + var buf [termVoteLen]byte + n.RLock() + copy(buf[0:], n.wtv) + n.RUnlock() + if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil { + n.setWriteErr(err) + } + case <-n.wpsch: + n.RLock() + buf := append(n.wps[:0:0], n.wps...) + n.RUnlock() + if err := ioutil.WriteFile(psf, buf, 0640); err != nil { + n.setWriteErr(err) + } + } + } +} + // writeTermVote will record the largest term and who we voted for to stable storage. // Lock should be held. -func (n *raft) writeTermVote() error { - tvf := path.Join(n.sd, termVoteFile) - if _, err := os.Stat(tvf); err != nil && !os.IsNotExist(err) { - return err - } +func (n *raft) writeTermVote() { var buf [termVoteLen]byte var le = binary.LittleEndian le.PutUint64(buf[0:], n.term) - // FIXME(dlc) - NoVote copy(buf[8:], n.vote) - if err := ioutil.WriteFile(tvf, buf[:8+len(n.vote)], 0644); err != nil { - return err + b := buf[:8+len(n.vote)] + + // If same as what we have we can ignore. + if bytes.Equal(n.wtv, b) { + return + } + // Stamp latest and kick writer. + n.wtv = b + select { + case n.wtvch <- struct{}{}: + default: } - return nil } // voteResponse is a response to a vote request.