// Copyright 2020-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "encoding/binary" "errors" "fmt" "io/ioutil" "math/rand" "os" "path" "sync" "sync/atomic" "time" ) type RaftNode interface { Propose(entry []byte) error PausePropose() ResumePropose() Snapshot(snap []byte) error Applied(index uint64) Compact(index uint64) error State() RaftState Size() (entries, bytes uint64) Leader() bool Current() bool GroupLeader() string StepDown() error Campaign() error ID() string Group() string Peers() []*Peer ProposeAddPeer(peer string) error ProposeRemovePeer(peer string) error ApplyC() <-chan *CommittedEntry PauseApply() ResumeApply() LeadChangeC() <-chan bool QuitC() <-chan struct{} Stop() Delete() } type WAL interface { StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) LoadMsg(index uint64) (subj string, hdr, msg []byte, ts int64, err error) RemoveMsg(index uint64) (bool, error) Compact(index uint64) (uint64, error) State() StreamState Stop() error Delete() error } type Peer struct { ID string Current bool Last time.Time Index uint64 } type RaftState uint8 // Allowable states for a NATS Consensus Group. const ( Follower RaftState = iota Leader Candidate Observer Closed ) func (state RaftState) String() string { switch state { case Follower: return "FOLLOWER" case Candidate: return "CANDIDATE" case Leader: return "LEADER" case Observer: return "OBSERVER" case Closed: return "CLOSED" } return "UNKNOWN" } type raft struct { sync.RWMutex group string sd string id string wal WAL state RaftState csz int qn int peers map[string]*lps acks map[uint64]map[string]struct{} elect *time.Timer term uint64 pterm uint64 pindex uint64 commit uint64 applied uint64 leader string vote string hash string s *Server c *client dflag bool // Subjects for votes, updates, replays. vsubj string vreply string asubj string areply string // For when we need to catch up as a follower. catchup *catchupState // For leader or server catching up a follower. progress map[string]chan uint64 // For when we have paused our applyC. paused bool hcommit uint64 // Channels propc chan *Entry pausec chan struct{} applyc chan *CommittedEntry sendq chan *pubMsg quit chan struct{} reqs chan *voteRequest votes chan *voteResponse resp chan *appendEntryResponse leadc chan bool stepdown chan string } // cacthupState structure that holds our subscription, and catchup term and index // as well as starting term and index and how many updates we have seen. type catchupState struct { sub *subscription cterm uint64 cindex uint64 pterm uint64 pindex uint64 hbs int } // lps holds peer state of last time and last index replicated. type lps struct { ts int64 li uint64 } const ( minElectionTimeout = 350 * time.Millisecond maxElectionTimeout = 3 * minElectionTimeout minCampaignTimeout = 5 * time.Millisecond maxCampaignTimeout = 5 * minCampaignTimeout hbInterval = 200 * time.Millisecond ) type RaftConfig struct { Name string Store string Log WAL } var ( errProposalFailed = errors.New("raft: proposal failed") errProposalsPaused = errors.New("raft: proposals paused") errNotLeader = errors.New("raft: not leader") errAlreadyLeader = errors.New("raft: already leader") errNotCurrent = errors.New("raft: not current") errNilCfg = errors.New("raft: no config given") errUnknownPeer = errors.New("raft: unknown peer") errCorruptPeers = errors.New("raft: corrupt peer state") errStepdownFailed = errors.New("raft: stepdown failed") errPeersNotCurrent = errors.New("raft: all peers are not current") ) // This will bootstrap a raftNode by writing its config into the store directory. func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeersKnown bool) error { if cfg == nil { return errNilCfg } // Check validity of peers if presented. for _, p := range knownPeers { if len(p) != idLen { return fmt.Errorf("raft: illegal peer: %q", p) } } expected := len(knownPeers) // We need to adjust this is all peers are not known. if !allPeersKnown { if expected < 2 { expected = 2 } if ncr := s.configuredRoutes(); expected < ncr { expected = ncr } } return writePeerState(cfg.Store, &peerState{knownPeers, expected}) } // startRaftNode will start the raft node. func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { if cfg == nil { return nil, errNilCfg } s.mu.Lock() if s.sys == nil || s.sys.sendq == nil { s.mu.Unlock() return nil, ErrNoSysAccount } sendq := s.sys.sendq sacc := s.sys.account hash := s.sys.shash s.mu.Unlock() ps, err := readPeerState(cfg.Store) if err != nil { return nil, err } if ps == nil || ps.clusterSize < 2 { return nil, errors.New("raft: cluster too small") } n := &raft{ id: hash[:idLen], group: cfg.Name, sd: cfg.Store, wal: cfg.Log, state: Follower, csz: ps.clusterSize, qn: ps.clusterSize/2 + 1, hash: hash, peers: make(map[string]*lps), acks: make(map[uint64]map[string]struct{}), s: s, c: s.createInternalSystemClient(), sendq: sendq, quit: make(chan struct{}), reqs: make(chan *voteRequest, 4), votes: make(chan *voteResponse, 8), resp: make(chan *appendEntryResponse, 256), propc: make(chan *Entry, 256), applyc: make(chan *CommittedEntry, 256), leadc: make(chan bool, 4), stepdown: make(chan string, 4), } n.c.registerWithAccount(sacc) if atomic.LoadInt32(&s.logging.debug) > 0 { n.dflag = true } if term, vote, err := n.readTermVote(); err != nil && term > 0 { n.term = term n.vote = vote } if state := n.wal.State(); state.Msgs > 0 { // TODO(dlc) - Recover our state here. if first, err := n.loadFirstEntry(); err == nil { n.pterm, n.pindex = first.pterm, first.pindex if first.commit > 0 { n.commit = first.commit - 1 } } // Replay the log. // Since doing this in place we need to make sure we have enough room on the applyc. needed := state.Msgs + 1 // 1 is for nil to mark end of replay. if uint64(cap(n.applyc)) < needed { n.applyc = make(chan *CommittedEntry, needed) } for index := state.FirstSeq; index <= state.LastSeq; index++ { ae, err := n.loadEntry(index) if err != nil { panic("err loading entry from WAL") } n.processAppendEntry(ae, nil) } } // Send nil entry to signal the upper layers we are done doing replay/restore. n.applyc <- nil // Setup our internal subscriptions. if err := n.createInternalSubs(); err != nil { n.shutdown(true) return nil, err } // Make sure to track ourselves. n.trackPeer(n.id) // Track known peers for _, peer := range ps.knownPeers { // Set these to 0 to start. if peer != n.id { n.peers[peer] = &lps{0, 0} } } n.notice("Started") s.registerRaftNode(n.group, n) s.startGoRoutine(n.run) return n, nil } // Maps node names back to server names. func (s *Server) serverNameForNode(node string) string { s.mu.Lock() sn := s.nodeToName[node] s.mu.Unlock() return sn } // Server will track all raft nodes. func (s *Server) registerRaftNode(group string, n RaftNode) { s.rnMu.Lock() defer s.rnMu.Unlock() if s.raftNodes == nil { s.raftNodes = make(map[string]RaftNode) } s.raftNodes[group] = n } func (s *Server) unregisterRaftNode(group string) { s.rnMu.Lock() defer s.rnMu.Unlock() if s.raftNodes != nil { delete(s.raftNodes, group) } } func (s *Server) lookupRaftNode(group string) RaftNode { s.rnMu.RLock() defer s.rnMu.RUnlock() var n RaftNode if s.raftNodes != nil { n = s.raftNodes[group] } return n } func (s *Server) shutdownRaftNodes() { if s == nil { return } var nodes []RaftNode s.rnMu.RLock() for _, n := range s.raftNodes { nodes = append(nodes, n) } s.rnMu.RUnlock() for _, node := range nodes { if node.Leader() { node.StepDown() } node.Stop() } } // Formal API // Propose will propose a new entry to the group. // This should only be called on the leader. func (n *raft) Propose(data []byte) error { n.RLock() if n.state != Leader { n.RUnlock() return errNotLeader } propc, paused, quit := n.propc, n.pausec, n.quit n.RUnlock() if paused != nil { select { case <-paused: case <-quit: return errProposalFailed case <-time.After(400 * time.Millisecond): return errProposalsPaused } } select { case propc <- &Entry{EntryNormal, data}: default: return errProposalFailed } return nil } // PausePropose will pause new proposals. func (n *raft) PausePropose() { n.Lock() if n.pausec == nil { n.pausec = make(chan struct{}) } n.Unlock() } // ResumePropose will resum new proposals. func (n *raft) ResumePropose() { n.Lock() paused := n.pausec n.pausec = nil n.Unlock() if paused != nil { close(paused) } } // ProposeAddPeer is called to add a peer to the group. func (n *raft) ProposeAddPeer(peer string) error { n.RLock() if n.state != Leader { n.RUnlock() return errNotLeader } propc := n.propc n.RUnlock() select { case propc <- &Entry{EntryAddPeer, []byte(peer)}: default: return errProposalFailed } return nil } // ProposeRemovePeer is called to remove a peer from the group. func (n *raft) ProposeRemovePeer(peer string) error { return errors.New("no impl") } // PauseApply will allow us to pause processing of append entries onto our // external apply chan. func (n *raft) PauseApply() { n.Lock() defer n.Unlock() n.paused = true n.hcommit = n.commit } func (n *raft) ResumeApply() { n.Lock() defer n.Unlock() // Run catchup.. if n.hcommit > n.commit { for index := n.commit + 1; index <= n.hcommit; index++ { n.applyCommit(index) } } n.paused = false n.hcommit = 0 } // Compact will compact our WAL. If this node is a leader we will want // all our peers to be at least to the same index. Non-leaders just compact // directly. This is for when we know we have our state on stable storage. // E.g JS Consumers. func (n *raft) Compact(index uint64) error { n.Lock() defer n.Unlock() // If we are not the leader compact at will. if n.state != Leader { _, err := n.wal.Compact(index) return err } // We are the leader so we need to make sure all peers are at least up to this index. for peer, ps := range n.peers { if peer != n.id && ps.li < index { return errPeersNotCurrent } } return nil } // Applied is to be called when the FSM has applied the committed entries. func (n *raft) Applied(index uint64) { n.Lock() defer n.Unlock() // FIXME(dlc) - Check spec on error conditions, storage n.applied = index // FIXME(dlc) - Can be more efficient here. if ae, err := n.loadEntry(index); ae != nil && err == nil { // Check to see if we have a snapshot here. // Snapshots will be by themselves but we range anyway. for _, e := range ae.entries { if e.Type == EntrySnapshot { n.debug("Found snapshot entry: compacting log to index %d", index) n.wal.Compact(index) } } } } // Snapshot is used to snapshot the fsm. This can only be called from a leader. // For now these are assumed to be small and will be placed into the log itself. // TODO(dlc) - For meta and consumers this is straightforward, and for streams sans the messages this is as well. func (n *raft) Snapshot(snap []byte) error { n.Lock() defer n.Unlock() n.debug("Snapshot called with %d bytes, applied is %d", len(snap), n.applied) if n.state != Leader { return errNotLeader } if !n.isCurrent() { return errNotCurrent } select { case n.propc <- &Entry{EntrySnapshot, snap}: default: return errProposalFailed } return nil } // Leader returns if we are the leader for our group. func (n *raft) Leader() bool { if n == nil { return false } n.RLock() isLeader := n.state == Leader n.RUnlock() return isLeader } // Lock should be held. func (n *raft) isCurrent() bool { // First check if we match commit and applied. if n.commit != n.applied { return false } // Make sure we are the leader or we know we have heard from the leader recently. if n.state == Leader { return true } // Check here on catchup status. if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { n.cancelCatchup() } // Check to see that we have heard from the current leader lately. if n.leader != noLeader && n.leader != n.id && n.catchup == nil { const okInterval = int64(hbInterval) * 2 ts := time.Now().UnixNano() if ps := n.peers[n.leader]; ps != nil && ps.ts > 0 && (ts-ps.ts) <= okInterval { return true } } return false } // Current returns if we are the leader for our group or an up to date follower. func (n *raft) Current() bool { if n == nil { return false } n.RLock() defer n.RUnlock() return n.isCurrent() } // GroupLeader returns the current leader of the group. func (n *raft) GroupLeader() string { if n == nil { return noLeader } n.RLock() defer n.RUnlock() return n.leader } // StepDown will have a leader stepdown and optionally do a leader transfer. func (n *raft) StepDown() error { n.Lock() if n.state != Leader { n.Unlock() return errNotLeader } n.debug("Being asked to stepdown") // See if we have up to date followers. nowts := time.Now().UnixNano() maybeLeader := noLeader for peer, ps := range n.peers { // If not us and alive and caughtup. if peer != n.id && (nowts-ps.ts) < int64(hbInterval*2) { if n.s.getRouteByHash([]byte(peer)) != nil { n.debug("Looking at %q which is %v behind", peer, time.Duration(nowts-ps.ts)) maybeLeader = peer break } } } stepdown := n.stepdown n.Unlock() if maybeLeader != noLeader { n.debug("Stepping down, selected %q for new leader", maybeLeader) n.sendAppendEntry([]*Entry{&Entry{EntryLeaderTransfer, []byte(maybeLeader)}}) } else { // Force us to stepdown here. select { case stepdown <- noLeader: default: return errStepdownFailed } } return nil } // Campaign will have our node start a leadership vote. func (n *raft) Campaign() error { n.Lock() defer n.Unlock() return n.campaign() } func randCampaignTimeout() time.Duration { delta := rand.Int63n(int64(maxCampaignTimeout - minCampaignTimeout)) return (minCampaignTimeout + time.Duration(delta)) } // Campaign will have our node start a leadership vote. // Lock should be held. func (n *raft) campaign() error { n.debug("Starting campaign") if n.state == Leader { return errAlreadyLeader } if n.state == Follower { n.resetElect(randCampaignTimeout()) } return nil } // State return the current state for this node. func (n *raft) State() RaftState { n.RLock() state := n.state n.RUnlock() return state } // Size returns number of entries and total bytes for our WAL. func (n *raft) Size() (uint64, uint64) { n.RLock() state := n.wal.State() n.RUnlock() return state.Msgs, state.Bytes } func (n *raft) ID() string { n.RLock() defer n.RUnlock() return n.id } func (n *raft) Group() string { n.RLock() defer n.RUnlock() return n.group } func (n *raft) Peers() []*Peer { n.RLock() defer n.RUnlock() if n.state != Leader { return nil } var peers []*Peer for id, ps := range n.peers { p := &Peer{ID: id, Current: id == n.leader || ps.li >= n.applied, Last: time.Unix(0, ps.ts)} peers = append(peers, p) } return peers } func (n *raft) Stop() { n.shutdown(false) } func (n *raft) Delete() { n.shutdown(true) } func (n *raft) ApplyC() <-chan *CommittedEntry { return n.applyc } func (n *raft) LeadChangeC() <-chan bool { return n.leadc } func (n *raft) QuitC() <-chan struct{} { return n.quit } func (n *raft) shutdown(shouldDelete bool) { n.Lock() if n.state == Closed { n.Unlock() return } close(n.quit) n.c.closeConnection(InternalClient) n.state = Closed s, g, wal := n.s, n.group, n.wal // Delete our peer state and vote state. if shouldDelete { os.Remove(path.Join(n.sd, peerStateFile)) os.Remove(path.Join(n.sd, termVoteFile)) } n.Unlock() s.unregisterRaftNode(g) if shouldDelete { n.notice("Deleted") } else { n.notice("Shutdown") } if wal != nil { if shouldDelete { wal.Delete() } else { wal.Stop() } } } func (n *raft) newInbox(cn string) string { var b [replySuffixLen]byte rn := rand.Int63() for i, l := 0, rn; i < len(b); i++ { b[i] = digits[l%base] l /= base } return fmt.Sprintf(raftReplySubj, b[:]) } const ( raftVoteSubj = "$NRG.V.%s.%s" raftAppendSubj = "$NRG.E.%s.%s" raftReplySubj = "$NRG.R.%s" ) func (n *raft) createInternalSubs() error { cn := n.s.ClusterName() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, cn, n.group), n.newInbox(cn) n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, cn, n.group), n.newInbox(cn) // Votes if _, err := n.s.sysSubscribe(n.vreply, n.handleVoteResponse); err != nil { return err } if _, err := n.s.sysSubscribe(n.vsubj, n.handleVoteRequest); err != nil { return err } // AppendEntry if _, err := n.s.sysSubscribe(n.areply, n.handleAppendEntryResponse); err != nil { return err } if _, err := n.s.sysSubscribe(n.asubj, n.handleAppendEntry); err != nil { return err } // TODO(dlc) change events. return nil } func randElectionTimeout() time.Duration { delta := rand.Int63n(int64(maxElectionTimeout - minElectionTimeout)) return (minElectionTimeout + time.Duration(delta)) } // Lock should be held. func (n *raft) resetElectionTimeout() { n.resetElect(randElectionTimeout()) } // Lock should be held. func (n *raft) resetElect(et time.Duration) { if n.elect == nil { n.elect = time.NewTimer(et) } else { if !n.elect.Stop() && len(n.elect.C) > 0 { <-n.elect.C } n.elect.Reset(et) } } func (n *raft) run() { s := n.s defer s.grWG.Done() n.Lock() n.resetElectionTimeout() n.Unlock() for s.isRunning() { switch n.State() { case Follower: n.runAsFollower() case Candidate: n.runAsCandidate() case Leader: n.runAsLeader() case Observer: // TODO(dlc) - fix. n.runAsFollower() case Closed: return } } } func (n *raft) debug(format string, args ...interface{}) { if n.dflag { nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format) n.s.Debugf(nf, args...) } } func (n *raft) error(format string, args ...interface{}) { nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format) n.s.Errorf(nf, args...) } func (n *raft) notice(format string, args ...interface{}) { nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format) n.s.Noticef(nf, args...) } func (n *raft) electTimer() *time.Timer { n.RLock() elect := n.elect n.RUnlock() return elect } func (n *raft) runAsFollower() { for { elect := n.electTimer() select { case <-n.s.quitCh: return case <-n.quit: return case <-elect.C: n.switchToCandidate() return case vreq := <-n.reqs: n.processVoteRequest(vreq) case newLeader := <-n.stepdown: n.switchToFollower(newLeader) return } } } // CommitEntry is handed back to the user to apply a commit to their FSM. type CommittedEntry struct { Index uint64 Entries []*Entry } type appendEntry struct { leader string term uint64 commit uint64 pterm uint64 pindex uint64 entries []*Entry // internal use only. reply string buf []byte } type EntryType uint8 const ( EntryNormal EntryType = iota EntrySnapshot EntryPeerState EntryAddPeer EntryRemovePeer EntryLeaderTransfer ) func (t EntryType) String() string { switch t { case EntryNormal: return "Normal" case EntrySnapshot: return "Snapshot" case EntryPeerState: return "PeerState" case EntryAddPeer: return "AddPeer" case EntryRemovePeer: return "RemovePeer" case EntryLeaderTransfer: return "LeaderTransfer" } return fmt.Sprintf("Unknown [%d]", uint8(t)) } type Entry struct { Type EntryType Data []byte } func (ae *appendEntry) String() string { return fmt.Sprintf("&{leader:%s term:%d commit:%d pterm:%d pindex:%d entries: %d}", ae.leader, ae.term, ae.commit, ae.pterm, ae.pindex, len(ae.entries)) } const appendEntryBaseLen = idLen + 4*8 + 2 func (ae *appendEntry) encode() []byte { var elen int for _, e := range ae.entries { elen += len(e.Data) + 1 + 4 // 1 is type, 4 is for size. } var le = binary.LittleEndian buf := make([]byte, appendEntryBaseLen+elen) copy(buf[:idLen], ae.leader) le.PutUint64(buf[8:], ae.term) le.PutUint64(buf[16:], ae.commit) le.PutUint64(buf[24:], ae.pterm) le.PutUint64(buf[32:], ae.pindex) le.PutUint16(buf[40:], uint16(len(ae.entries))) wi := 42 for _, e := range ae.entries { le.PutUint32(buf[wi:], uint32(len(e.Data)+1)) wi += 4 buf[wi] = byte(e.Type) wi++ copy(buf[wi:], e.Data) wi += len(e.Data) } return buf[:wi] } // This can not be used post the wire level callback since we do not copy. func (n *raft) decodeAppendEntry(msg []byte, reply string) *appendEntry { if len(msg) < appendEntryBaseLen { return nil } var le = binary.LittleEndian ae := &appendEntry{ leader: string(msg[:idLen]), term: le.Uint64(msg[8:]), commit: le.Uint64(msg[16:]), pterm: le.Uint64(msg[24:]), pindex: le.Uint64(msg[32:]), } // Decode Entries. ne, ri := int(le.Uint16(msg[40:])), 42 for i := 0; i < ne; i++ { le := int(le.Uint32(msg[ri:])) ri += 4 etype := EntryType(msg[ri]) ae.entries = append(ae.entries, &Entry{etype, msg[ri+1 : ri+le]}) ri += int(le) } ae.reply = reply ae.buf = msg return ae } // appendEntryResponse is our response to a received appendEntry. type appendEntryResponse struct { term uint64 index uint64 peer string success bool // internal reply string } // We want to make sure this does not change from system changing length of syshash. const idLen = 8 const appendEntryResponseLen = 24 + 1 func (ar *appendEntryResponse) encode() []byte { var buf [appendEntryResponseLen]byte var le = binary.LittleEndian le.PutUint64(buf[0:], ar.term) le.PutUint64(buf[8:], ar.index) copy(buf[16:], ar.peer) if ar.success { buf[24] = 1 } else { buf[24] = 0 } return buf[:appendEntryResponseLen] } func (n *raft) decodeAppendEntryResponse(msg []byte) *appendEntryResponse { if len(msg) != appendEntryResponseLen { return nil } var le = binary.LittleEndian ar := &appendEntryResponse{ term: le.Uint64(msg[0:]), index: le.Uint64(msg[8:]), peer: string(msg[16 : 16+idLen]), } ar.success = msg[24] == 1 return ar } func (n *raft) runAsLeader() { n.sendPeerState() hb := time.NewTicker(hbInterval) defer hb.Stop() for { select { case <-n.s.quitCh: return case <-n.quit: return case b := <-n.propc: entries := []*Entry{b} if b.Type == EntryNormal { const maxBatch = 256 * 1024 gather: for sz := 0; sz < maxBatch; { select { case e := <-n.propc: entries = append(entries, e) sz += len(e.Data) + 1 default: break gather } } } n.sendAppendEntry(entries) case <-hb.C: n.sendHeartbeat() case vresp := <-n.votes: if vresp.term > n.term { n.switchToFollower(noLeader) return } n.trackPeer(vresp.peer) case vreq := <-n.reqs: n.processVoteRequest(vreq) case newLeader := <-n.stepdown: n.switchToFollower(newLeader) return case ar := <-n.resp: n.trackPeer(ar.peer) if ar.success { n.trackResponse(ar) } else if ar.reply != _EMPTY_ { n.catchupFollower(ar) } } } } // Lock should be held. func (n *raft) loadFirstEntry() (ae *appendEntry, err error) { return n.loadEntry(n.wal.State().FirstSeq) } func (n *raft) runCatchup(peer, subj string, indexUpdatesC <-chan uint64) { n.RLock() s, reply := n.s, n.areply n.RUnlock() defer s.grWG.Done() defer func() { n.Lock() delete(n.progress, peer) if len(n.progress) == 0 { n.progress = nil } // Check if this is a new peer and if so go ahead and propose adding them. _, ok := n.peers[peer] n.Unlock() if !ok { n.debug("Catchup done for %q, will add into peers", peer) n.ProposeAddPeer(peer) } }() n.debug("Running catchup for %q", peer) const maxOutstanding = 48 * 1024 * 1024 // 48MB for now. next, total, om := uint64(0), 0, make(map[uint64]int) sendNext := func() { for total <= maxOutstanding { next++ ae, err := n.loadEntry(next) if err != nil { if err != ErrStoreEOF { n.debug("Got an error loading %d index: %v", next, err) } return } // Update our tracking total. om[next] = len(ae.buf) total += len(ae.buf) n.sendRPC(subj, reply, ae.buf) } } const activityInterval = 2 * time.Second timeout := time.NewTimer(activityInterval) defer timeout.Stop() // Run as long as we are leader and still not caught up. for n.Leader() { select { case <-n.s.quitCh: return case <-n.quit: return case <-timeout.C: n.debug("Catching up for %q stalled", peer) return case index := <-indexUpdatesC: // Update our activity timer. timeout.Reset(activityInterval) // Update outstanding total. total -= om[index] delete(om, index) n.RLock() finished := index >= n.pindex n.RUnlock() // Check if we are done. if finished { n.debug("Finished catching up") return } // Still have more catching up to do. if next < index { n.debug("Adjusting next to %d from %d", index, next) next = index } sendNext() } } } func (n *raft) catchupFollower(ar *appendEntryResponse) { n.debug("Being asked to catch up follower: %q", ar.peer) n.Lock() if n.progress == nil { n.progress = make(map[string]chan uint64) } if _, ok := n.progress[ar.peer]; ok { n.debug("Existing entry for catching up %q\n", ar.peer) n.Unlock() return } ae, err := n.loadEntry(ar.index + 1) if err != nil { ae, err = n.loadFirstEntry() } if err != nil || ae == nil { n.debug("Could not find a starting entry for us: %v", err) n.Unlock() return } if ae.pindex != ar.index || ae.pterm != ar.term { n.debug("Our first entry does not match") } // Create a chan for delivering updates from responses. indexUpdates := make(chan uint64, 1024) indexUpdates <- ae.pindex n.progress[ar.peer] = indexUpdates n.Unlock() n.s.startGoRoutine(func() { n.runCatchup(ar.peer, ar.reply, indexUpdates) }) } func (n *raft) loadEntry(index uint64) (*appendEntry, error) { _, _, msg, _, err := n.wal.LoadMsg(index) if err != nil { return nil, err } return n.decodeAppendEntry(msg, _EMPTY_), nil } // applyCommit will update our commit index and apply the entry to the apply chan. // lock should be held. func (n *raft) applyCommit(index uint64) { original := n.commit n.commit = index if n.state == Leader { delete(n.acks, index) } // FIXME(dlc) - Can keep this in memory if this too slow. ae, err := n.loadEntry(index) if err != nil { n.debug("Got an error loading %d index: %v", index, err) n.commit = original return } ae.buf = nil var committed []*Entry for _, e := range ae.entries { switch e.Type { case EntryNormal: committed = append(committed, e) case EntrySnapshot: committed = append(committed, e) case EntryPeerState: if ps, err := decodePeerState(e.Data); err == nil { n.processPeerState(ps) } case EntryAddPeer: newPeer := string(e.Data) n.debug("Added peer %q", newPeer) if _, ok := n.peers[newPeer]; !ok { // We are not tracking this one automatically so we need to bump cluster size. n.debug("Expanding our clustersize: %d -> %d", n.csz, n.csz+1) n.csz++ n.qn = n.csz/2 + 1 n.peers[newPeer] = &lps{time.Now().UnixNano(), 0} } writePeerState(n.sd, &peerState{n.peerNames(), n.csz}) } } // Pass to the upper layers if we have normal entries. if len(committed) > 0 { select { case n.applyc <- &CommittedEntry{index, committed}: default: n.debug("Failed to place committed entry onto our apply channel") n.commit = original } } else { // If we processed inline update our applied index. n.applied = index } } // Used to track a success response and apply entries. func (n *raft) trackResponse(ar *appendEntryResponse) { n.Lock() // Update peer's last index. if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li { ps.li = ar.index } // If we are tracking this peer as a catchup follower, update that here. if indexUpdateC := n.progress[ar.peer]; indexUpdateC != nil { select { case indexUpdateC <- ar.index: default: n.debug("Failed to place tracking response for catchup, will try again") n.Unlock() indexUpdateC <- ar.index n.Lock() } } // Ignore items already committed. if ar.index <= n.commit { n.Unlock() return } // See if we have items to apply. var sendHB bool if results := n.acks[ar.index]; results != nil { results[ar.peer] = struct{}{} if nr := len(results); nr >= n.qn { // We have a quorum. n.applyCommit(ar.index) sendHB = len(n.propc) == 0 } } n.Unlock() if sendHB { n.sendHeartbeat() } } // Track interactions with this peer. func (n *raft) trackPeer(peer string) error { n.Lock() var needPeerUpdate bool if n.state == Leader { if _, ok := n.peers[peer]; !ok { // This is someone new, if we have registered all of the peers already // this is an error. if len(n.peers) >= n.csz { n.Unlock() n.debug("Leader detected a new peer! %q", peer) return errUnknownPeer } needPeerUpdate = true } } if ps := n.peers[peer]; ps != nil { ps.ts = time.Now().UnixNano() } else { n.peers[peer] = &lps{time.Now().UnixNano(), 0} } n.Unlock() if needPeerUpdate { n.sendPeerState() } return nil } func (n *raft) runAsCandidate() { n.Lock() // Drain old responses. for len(n.votes) > 0 { <-n.votes } n.Unlock() // Send out our request for votes. n.requestVote() // We vote for ourselves. votes := 1 for { elect := n.electTimer() select { case <-n.s.quitCh: return case <-n.quit: return case <-elect.C: n.switchToCandidate() return case vresp := <-n.votes: n.trackPeer(vresp.peer) if vresp.granted && n.term >= vresp.term { votes++ if n.wonElection(votes) { // Become LEADER if we have won. n.switchToLeader() return } } case vreq := <-n.reqs: n.processVoteRequest(vreq) case newLeader := <-n.stepdown: n.switchToFollower(newLeader) return } } } // handleAppendEntry handles an append entry from the wire. We can't rely on msg being available // past this callback so will do a bunch of processing here to avoid copies, channels etc. func (n *raft) handleAppendEntry(sub *subscription, c *client, subject, reply string, msg []byte) { ae := n.decodeAppendEntry(msg, reply) if ae == nil { return } n.processAppendEntry(ae, sub) } // Lock should be held. func (n *raft) cancelCatchup() { n.debug("Canceling catchup subscription since we are now up to date") n.s.sysUnsubscribe(n.catchup.sub) n.catchup = nil } // catchupStalled will try to determine if we are stalled. This is called // on a new entry from our leader. // Lock should be held. func (n *raft) catchupStalled() bool { if n.catchup == nil { return false } const maxHBs = 3 if n.catchup.pindex == n.pindex { n.catchup.hbs++ } else { n.catchup.pindex = n.pindex n.catchup.hbs = 0 } return n.catchup.hbs >= maxHBs } // Lock should be held. func (n *raft) createCatchup(ae *appendEntry) string { // Cleanup any old ones. if n.catchup != nil { n.s.sysUnsubscribe(n.catchup.sub) } // Snapshot term and index. n.catchup = &catchupState{ cterm: ae.pterm, cindex: ae.pindex, pterm: n.pterm, pindex: n.pindex, } inbox := n.newInbox(n.s.ClusterName()) sub, _ := n.s.sysSubscribe(inbox, n.handleAppendEntry) n.catchup.sub = sub return inbox } // processAppendEntry will process an appendEntry. func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Lock() // Just return if closed. if n.state == Closed { n.Unlock() return } // If we received an append entry as a candidate we should convert to a follower. if n.state == Candidate { n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader) n.term = ae.term n.Unlock() n.stepdown <- ae.leader return } // Catching up state. catchingUp := n.catchup != nil // Is this a new entry or a replay on startup? isNew := sub != nil && (!catchingUp || sub != n.catchup.sub) if isNew { n.resetElectionTimeout() // Track leader directly if ae.leader != noLeader { if ps := n.peers[ae.leader]; ps != nil { ps.ts = time.Now().UnixNano() } else { n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0} } } } // Ignore old terms. if isNew && ae.term < n.term { n.Unlock() n.debug("AppendEntry ignoring old term") return } // Check state if we are catching up. if catchingUp && isNew { if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { // If we are here we are good, so if we have a catchup pending we can cancel. n.cancelCatchup() catchingUp = false } else { var ar *appendEntryResponse var inbox string // Check to see if we are stalled. If so recreate our catchup state and resend response. if n.catchupStalled() { n.debug("Catchup may be stalled, will request again") inbox = n.createCatchup(ae) ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_} } // Ignore new while catching up or replaying. n.Unlock() if ar != nil { n.sendRPC(ae.reply, inbox, ar.encode()) } return } } // If this term is greater than ours. if ae.term > n.term { n.term = ae.term n.vote = noVote n.writeTermVote() if n.state != Follower { n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.state, ae.leader) n.Unlock() n.stepdown <- ae.leader return } } if n.leader != ae.leader && n.state == Follower { n.debug("AppendEntry updating leader to %q", ae.leader) n.leader = ae.leader n.vote = noVote n.writeTermVote() if isNew { n.resetElectionTimeout() } } // TODO(dlc) - Do both catchup and delete new behaviors from spec. if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if we are catching up and this is a snapshot, if so reset our wal's index. // Snapshots will always be by themselves. if catchingUp && len(ae.entries) > 0 && ae.entries[0].Type == EntrySnapshot { n.debug("Should reset index for wal to %d", ae.pindex+1) n.wal.Compact(ae.pindex + 1) n.pindex = ae.pindex n.commit = ae.pindex } else { n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) // Reset our term. n.term = n.pterm // Setup our state for catching up. inbox := n.createCatchup(ae) ar := appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_} n.Unlock() n.sendRPC(ae.reply, inbox, ar.encode()) return } } // Save to our WAL if we have entries. if len(ae.entries) > 0 { // Only store if an original which will have sub != nil if sub != nil { if err := n.storeToWAL(ae); err != nil { n.debug("Error storing to WAL: %v", err) if err == ErrStoreClosed { n.Unlock() return } } } else { // This is a replay on startup so just take the appendEntry version. n.pterm = ae.term n.pindex = ae.pindex + 1 } // Check to see if we have any related entries to process here. for _, e := range ae.entries { switch e.Type { case EntryLeaderTransfer: if isNew { maybeLeader := string(e.Data) if maybeLeader == n.id { n.campaign() } // These will not have commits follow them. We also know this will be by itself. n.commit = ae.pindex + 1 } case EntryAddPeer: if newPeer := string(e.Data); len(newPeer) == idLen { // Track directly if ps := n.peers[newPeer]; ps != nil { ps.ts = time.Now().UnixNano() } else { n.peers[newPeer] = &lps{time.Now().UnixNano(), 0} } } } } } // Apply anything we need here. if ae.commit > n.commit { if n.paused { n.hcommit = ae.commit n.debug("Paused, not applying %d", ae.commit) } else { for index := n.commit + 1; index <= ae.commit; index++ { n.applyCommit(index) } } } ar := appendEntryResponse{n.pterm, n.pindex, n.id, true, _EMPTY_} n.Unlock() // Success. Send our response. n.sendRPC(ae.reply, _EMPTY_, ar.encode()) } // Lock should be held. func (n *raft) processPeerState(ps *peerState) { // Update our version of peers to that of the leader. n.csz = ps.clusterSize n.peers = make(map[string]*lps) for _, peer := range ps.knownPeers { n.peers[peer] = &lps{0, 0} } n.debug("Update peers from leader to %+v", n.peers) writePeerState(n.sd, ps) } // handleAppendEntryResponse just places the decoded response on the appropriate channel. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, subject, reply string, msg []byte) { aer := n.decodeAppendEntryResponse(msg) if reply != _EMPTY_ { aer.reply = reply } select { case n.resp <- aer: default: n.error("Failed to place add entry response on chan for %q", n.group) } } func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { return &appendEntry{n.id, n.term, n.commit, n.pterm, n.pindex, entries, _EMPTY_, nil} } // lock should be held. func (n *raft) storeToWAL(ae *appendEntry) error { if ae.buf == nil { panic("nil buffer for appendEntry!") } seq, _, err := n.wal.StoreMsg(_EMPTY_, nil, ae.buf) if err != nil { return err } n.pterm = ae.term n.pindex = seq return nil } func (n *raft) sendAppendEntry(entries []*Entry) { n.Lock() defer n.Unlock() ae := n.buildAppendEntry(entries) ae.buf = ae.encode() // If we have entries store this in our wal. if len(entries) > 0 { if err := n.storeToWAL(ae); err != nil { panic("Error storing!\n") } // We count ourselves. n.acks[n.pindex] = map[string]struct{}{n.id: struct{}{}} } n.sendRPC(n.asubj, n.areply, ae.buf) } type peerState struct { knownPeers []string clusterSize int } func encodePeerState(ps *peerState) []byte { var le = binary.LittleEndian buf := make([]byte, 4+4+(8*len(ps.knownPeers))) le.PutUint32(buf[0:], uint32(ps.clusterSize)) le.PutUint32(buf[4:], uint32(len(ps.knownPeers))) wi := 8 for _, peer := range ps.knownPeers { copy(buf[wi:], peer) wi += idLen } return buf } func decodePeerState(buf []byte) (*peerState, error) { if len(buf) < 8 { return nil, errCorruptPeers } var le = binary.LittleEndian ps := &peerState{clusterSize: int(le.Uint32(buf[0:]))} expectedPeers := int(le.Uint32(buf[4:])) buf = buf[8:] for i, ri, n := 0, 0, expectedPeers; i < n && ri < len(buf); i++ { ps.knownPeers = append(ps.knownPeers, string(buf[ri:ri+idLen])) ri += idLen } if len(ps.knownPeers) != expectedPeers { return nil, errCorruptPeers } return ps, nil } // Lock should be held. func (n *raft) peerNames() []string { var peers []string for peer := range n.peers { peers = append(peers, peer) } return peers } func (n *raft) currentPeerState() *peerState { n.RLock() ps := &peerState{n.peerNames(), n.csz} n.RUnlock() return ps } // sendPeerState will send our current peer state to the cluster. func (n *raft) sendPeerState() { n.sendAppendEntry([]*Entry{&Entry{EntryPeerState, encodePeerState(n.currentPeerState())}}) } func (n *raft) sendHeartbeat() { n.sendAppendEntry(nil) } type voteRequest struct { term uint64 lastTerm uint64 lastIndex uint64 candidate string // internal only. reply string } const voteRequestLen = 24 + idLen func (vr *voteRequest) encode() []byte { var buf [voteRequestLen]byte var le = binary.LittleEndian le.PutUint64(buf[0:], vr.term) le.PutUint64(buf[8:], vr.lastTerm) le.PutUint64(buf[16:], vr.lastIndex) copy(buf[24:24+idLen], vr.candidate) return buf[:voteRequestLen] } func (n *raft) decodeVoteRequest(msg []byte, reply string) *voteRequest { if len(msg) != voteRequestLen { return nil } // Need to copy for now b/c of candidate. msg = append(msg[:0:0], msg...) var le = binary.LittleEndian return &voteRequest{ term: le.Uint64(msg[0:]), lastTerm: le.Uint64(msg[8:]), lastIndex: le.Uint64(msg[16:]), candidate: string(msg[24 : 24+idLen]), reply: reply, } } const peerStateFile = "peers.idx" // Writes out our peer state. func writePeerState(sd string, ps *peerState) error { psf := path.Join(sd, peerStateFile) if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { return err } if err := ioutil.WriteFile(psf, encodePeerState(ps), 0644); err != nil { return err } return nil } func readPeerState(sd string) (ps *peerState, err error) { buf, err := ioutil.ReadFile(path.Join(sd, peerStateFile)) if err != nil { return nil, err } return decodePeerState(buf) } const termVoteFile = "tav.idx" const termVoteLen = idLen + 8 // readTermVote will read the largest term and who we voted from to stable storage. // Lock should be held. func (n *raft) readTermVote() (term uint64, voted string, err error) { buf, err := ioutil.ReadFile(path.Join(n.sd, termVoteFile)) if err != nil { return 0, noVote, err } if len(buf) < termVoteLen { return 0, noVote, nil } var le = binary.LittleEndian term = le.Uint64(buf[0:]) voted = string(buf[8:]) return term, voted, nil } // writeTermVote will record the largest term and who we voted for to stable storage. // Lock should be held. func (n *raft) writeTermVote() error { tvf := path.Join(n.sd, termVoteFile) if _, err := os.Stat(tvf); err != nil && !os.IsNotExist(err) { return err } var buf [termVoteLen]byte var le = binary.LittleEndian le.PutUint64(buf[0:], n.term) // FIXME(dlc) - NoVote copy(buf[8:], n.vote) if err := ioutil.WriteFile(tvf, buf[:8+len(n.vote)], 0644); err != nil { return err } return nil } // voteResponse is a response to a vote request. type voteResponse struct { term uint64 peer string granted bool } const voteResponseLen = 8 + 8 + 1 func (vr *voteResponse) encode() []byte { var buf [voteResponseLen]byte var le = binary.LittleEndian le.PutUint64(buf[0:], vr.term) copy(buf[8:], vr.peer) if vr.granted { buf[16] = 1 } else { buf[16] = 0 } return buf[:voteResponseLen] } func (n *raft) decodeVoteResponse(msg []byte) *voteResponse { if len(msg) != voteResponseLen { return nil } var le = binary.LittleEndian vr := &voteResponse{term: le.Uint64(msg[0:]), peer: string(msg[8:16])} vr.granted = msg[16] == 1 return vr } func (n *raft) handleVoteResponse(sub *subscription, c *client, _, reply string, msg []byte) { vr := n.decodeVoteResponse(msg) n.debug("Received a voteResponse %+v", vr) if vr == nil { n.error("Received malformed vote response for %q", n.group) return } select { case n.votes <- vr: default: // FIXME(dlc) n.error("Failed to place vote response on chan for %q", n.group) } } func (n *raft) processVoteRequest(vr *voteRequest) error { n.RLock() vresp := voteResponse{n.term, n.id, false} n.RUnlock() n.debug("Received a voteRequest %+v", vr) defer n.debug("Sending a voteResponse %+v -> %q", &vresp, vr.reply) if err := n.trackPeer(vr.candidate); err != nil { n.sendReply(vr.reply, vresp.encode()) return err } n.Lock() // Ignore if we are newer. if vr.term < n.term { n.Unlock() n.sendReply(vr.reply, vresp.encode()) return nil } // If this is a higher term go ahead and stepdown. if vresp.term > n.term { n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term) stepdown := n.stepdown n.term = vresp.term n.vote = noVote n.writeTermVote() n.Unlock() stepdown <- noLeader n.sendReply(vr.reply, vresp.encode()) return nil } // Only way we get to yes is through here. if vr.lastIndex >= n.pindex && n.vote == noVote || n.vote == vr.candidate { vresp.granted = true n.vote = vr.candidate n.writeTermVote() n.resetElectionTimeout() } n.Unlock() n.sendReply(vr.reply, vresp.encode()) return nil } func (n *raft) handleVoteRequest(sub *subscription, c *client, subject, reply string, msg []byte) { vr := n.decodeVoteRequest(msg, reply) if vr == nil { n.error("Received malformed vote request for %q", n.group) return } select { case n.reqs <- vr: default: n.error("Failed to place vote request on chan for %q", n.group) } } func (n *raft) requestVote() { n.Lock() if n.state != Candidate { n.Unlock() panic("raft requestVote not from candidate") } n.vote = n.id n.writeTermVote() vr := voteRequest{n.term, n.pterm, n.pindex, n.id, _EMPTY_} subj, reply := n.vsubj, n.vreply n.Unlock() n.debug("Sending out voteRequest %+v", vr) // Now send it out. n.sendRPC(subj, reply, vr.encode()) } func (n *raft) sendRPC(subject, reply string, msg []byte) { n.sendq <- &pubMsg{nil, subject, reply, nil, msg, false} } func (n *raft) sendReply(subject string, msg []byte) { n.sendq <- &pubMsg{nil, subject, _EMPTY_, nil, msg, false} } func (n *raft) wonElection(votes int) bool { return votes >= n.quorumNeeded() } // Return the quorum size for a given cluster config. func (n *raft) quorumNeeded() int { n.RLock() qn := n.qn n.RUnlock() return qn } // Lock should be held. func (n *raft) updateLeadChange(isLeader bool) { select { case n.leadc <- isLeader: case <-n.leadc: // We had an old value not consumed. select { case n.leadc <- isLeader: default: n.error("Failed to post lead change to %v for %q", isLeader, n.group) } } } // Lock should be held. func (n *raft) switchState(state RaftState) { if n.state == Closed { return } // Reset the election timer. n.resetElectionTimeout() if n.state == Leader && state != Leader { n.updateLeadChange(false) } else if state == Leader && n.state != Leader { n.updateLeadChange(true) } n.state = state n.vote = noVote n.writeTermVote() } const noLeader = _EMPTY_ const noVote = _EMPTY_ func (n *raft) switchToFollower(leader string) { n.notice("Switching to follower") n.Lock() defer n.Unlock() n.leader = leader n.switchState(Follower) } func (n *raft) switchToCandidate() { n.notice("Switching to candidate") n.Lock() defer n.Unlock() // Increment the term. n.term++ // Clear current Leader. n.leader = noLeader n.resetElectionTimeout() n.switchState(Candidate) } func (n *raft) switchToLeader() { n.notice("Switching to leader") n.Lock() defer n.Unlock() n.leader = n.id n.switchState(Leader) }