diff --git a/server/raft.go b/server/raft.go index 281b6a60..cbee334c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -130,6 +130,7 @@ type raft struct { qn int peers map[string]*lps acks map[uint64]map[string]struct{} + pae map[uint64]*appendEntry elect *time.Timer active time.Time llqrt time.Time @@ -332,6 +333,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { hash: hash, peers: make(map[string]*lps), acks: make(map[uint64]map[string]struct{}), + pae: make(map[uint64]*appendEntry), s: s, c: s.createInternalSystemClient(), sq: sq, @@ -1858,22 +1860,13 @@ func (n *raft) applyCommit(index uint64) error { delete(n.acks, index) } - // FIXME(dlc) - Can keep this in memory if this too slow. - ae, err := n.loadEntry(index) - if err != nil { - state := n.wal.State() - if index < state.FirstSeq { - return nil - } - if err != ErrStoreClosed && err != ErrStoreEOF { - if err == errBadMsg { - n.setWriteErrLocked(err) - } - n.warn("Got an error loading %d index: %v", index, err) - } + ae := n.pae[index] + if ae == nil { + n.warn("Could not load append entry %d", index) n.commit = original return errEntryLoadFailed } + delete(n.pae, index) ae.buf = nil var committed []*Entry @@ -2362,6 +2355,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Unlock() return } + // Save in memory for faster processing during applyCommit. + n.pae[n.pindex] = ae + if len(n.pae) > paeWarnThreshold { + n.warn("%d append entries pending", len(n.pae)) + } + } else { // This is a replay on startup so just take the appendEntry version. n.pterm = ae.term @@ -2506,6 +2505,8 @@ func (n *raft) storeToWAL(ae *appendEntry) error { return nil } +const paeWarnThreshold = 1024 + func (n *raft) sendAppendEntry(entries []*Entry) { n.Lock() defer n.Unlock() @@ -2520,6 +2521,12 @@ func (n *raft) sendAppendEntry(entries []*Entry) { // We count ourselves. n.acks[n.pindex] = map[string]struct{}{n.id: struct{}{}} n.active = time.Now() + + // Save in memory for faster processing during applyCommit. + n.pae[n.pindex] = ae + if len(n.pae) > paeWarnThreshold { + n.warn("%d append entries pending", len(n.pae)) + } } n.sendRPC(n.asubj, n.areply, ae.buf) }