Under heavy load retreiving the append entry from the WAL

while trying to also send new append entries was causing contention.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-06 07:50:11 -08:00
parent 6d9cb21648
commit 63b620972d

View File

@@ -130,6 +130,7 @@ type raft struct {
qn int
peers map[string]*lps
acks map[uint64]map[string]struct{}
pae map[uint64]*appendEntry
elect *time.Timer
active time.Time
llqrt time.Time
@@ -332,6 +333,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
hash: hash,
peers: make(map[string]*lps),
acks: make(map[uint64]map[string]struct{}),
pae: make(map[uint64]*appendEntry),
s: s,
c: s.createInternalSystemClient(),
sq: sq,
@@ -1858,22 +1860,13 @@ func (n *raft) applyCommit(index uint64) error {
delete(n.acks, index)
}
// FIXME(dlc) - Can keep this in memory if this too slow.
ae, err := n.loadEntry(index)
if err != nil {
state := n.wal.State()
if index < state.FirstSeq {
return nil
}
if err != ErrStoreClosed && err != ErrStoreEOF {
if err == errBadMsg {
n.setWriteErrLocked(err)
}
n.warn("Got an error loading %d index: %v", index, err)
}
ae := n.pae[index]
if ae == nil {
n.warn("Could not load append entry %d", index)
n.commit = original
return errEntryLoadFailed
}
delete(n.pae, index)
ae.buf = nil
var committed []*Entry
@@ -2362,6 +2355,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Unlock()
return
}
// Save in memory for faster processing during applyCommit.
n.pae[n.pindex] = ae
if len(n.pae) > paeWarnThreshold {
n.warn("%d append entries pending", len(n.pae))
}
} else {
// This is a replay on startup so just take the appendEntry version.
n.pterm = ae.term
@@ -2506,6 +2505,8 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
return nil
}
const paeWarnThreshold = 1024
func (n *raft) sendAppendEntry(entries []*Entry) {
n.Lock()
defer n.Unlock()
@@ -2520,6 +2521,12 @@ func (n *raft) sendAppendEntry(entries []*Entry) {
// We count ourselves.
n.acks[n.pindex] = map[string]struct{}{n.id: struct{}{}}
n.active = time.Now()
// Save in memory for faster processing during applyCommit.
n.pae[n.pindex] = ae
if len(n.pae) > paeWarnThreshold {
n.warn("%d append entries pending", len(n.pae))
}
}
n.sendRPC(n.asubj, n.areply, ae.buf)
}