Make sure that we do not become a candidate/leader too soon or if we are not caughtup.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-03-19 16:48:17 -07:00
committed by Ivan Kozlovic
parent 7fd5f4dc24
commit d7e1e5ae61
2 changed files with 63 additions and 21 deletions

View File

@@ -1442,10 +1442,9 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
rg.node = n
// See if we are preferred and should start campaign immediately.
if n.ID() == rg.Preferred {
if n.ID() == rg.Preferred && n.Term() == 0 {
n.Campaign()
}
return nil
}
@@ -1602,6 +1601,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
} else if n.GroupLeader() != noLeader {
js.setStreamAssignmentRecovering(sa)
}
js.processStreamLeaderChange(mset, isLeader)
case <-t.C:
doSnapshot()
@@ -1807,7 +1807,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// We can skip if we know this is less than what we already have.
if lseq < last {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
mset.account(), mset.name(), lseq, last)
continue
}
@@ -1831,7 +1832,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if isClusterResetErr(err) || isOutOfSpaceErr(err) {
return err
}
s.Debugf("Apply stream entries error processing message: %v", err)
s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v",
mset.account(), mset.name(), err)
}
case deleteMsgOp:
md, err := decodeMsgDelete(buf[1:])
@@ -1858,7 +1860,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
if err != nil && !isRecovering {
s.Debugf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v",
s.Debugf("JetStream cluster failed to delete stream msg %d from '%s > %s': %v",
md.Seq, md.Stream, md.Client.serviceAccount(), err)
}
@@ -5220,6 +5222,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error {
mset.clfs = snap.Failed
mset.store.FastState(&state)
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
mset.mu.Unlock()
@@ -5240,7 +5243,9 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error {
}
// Pause the apply channel for our raft group while we catch up.
n.PauseApply()
if err := n.PauseApply(); err != nil {
return err
}
defer n.ResumeApply()
// Set our catchup state.
@@ -5308,7 +5313,8 @@ RETRY:
})
if err != nil {
s.Errorf("Could not subscribe to stream catchup: %v", err)
return err
err = nil
goto RETRY
}
b, _ := json.Marshal(sreq)
@@ -5368,8 +5374,11 @@ RETRY:
case <-qch:
return nil
case isLeader := <-lch:
js.processStreamLeaderChange(mset, isLeader)
return nil
if isLeader {
n.StepDown()
notActive.Reset(activityInterval)
goto RETRY
}
}
}
}
@@ -5534,7 +5543,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
s := mset.srv
defer s.grWG.Done()
const maxOutBytes = int64(1 * 1024 * 1024) // 1MB for now.
const maxOutBytes = int64(4 * 1024 * 1024) // 4MB for now.
const maxOutMsgs = int32(16384)
outb := int64(0)
outm := int32(0)

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2021 The NATS Authors
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -48,6 +48,7 @@ type RaftNode interface {
Leader() bool
Quorum() bool
Current() bool
Term() uint64
GroupLeader() string
HadPreviousLeader() bool
StepDown(preferred ...string) error
@@ -61,7 +62,7 @@ type RaftNode interface {
AdjustBootClusterSize(csz int) error
ClusterSize() int
ApplyQ() *ipQueue // of *CommittedEntry
PauseApply()
PauseApply() error
ResumeApply()
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
@@ -183,8 +184,9 @@ type raft struct {
progress map[string]*ipQueue // of uint64
// For when we have paused our applyC.
paused bool
hcommit uint64
paused bool
hcommit uint64
pobserver bool
// Queues and Channels
prop *ipQueue // of *Entry
@@ -225,7 +227,7 @@ const (
minElectionTimeoutDefault = 2 * time.Second
maxElectionTimeoutDefault = 5 * time.Second
minCampaignTimeoutDefault = 100 * time.Millisecond
maxCampaignTimeoutDefault = 4 * minCampaignTimeoutDefault
maxCampaignTimeoutDefault = 8 * minCampaignTimeoutDefault
hbIntervalDefault = 500 * time.Millisecond
lostQuorumIntervalDefault = hbIntervalDefault * 5
)
@@ -758,20 +760,38 @@ func (n *raft) AdjustClusterSize(csz int) error {
// PauseApply will allow us to pause processing of append entries onto our
// external apply chan.
func (n *raft) PauseApply() {
func (n *raft) PauseApply() error {
n.Lock()
defer n.Unlock()
n.debug("Pausing apply channel")
if n.state == Leader {
return errAlreadyLeader
}
// If we are currently a candidate make sure we step down.
if n.state == Candidate {
n.stepdown.push(noLeader)
}
n.debug("Pausing our apply channel")
n.paused = true
n.hcommit = n.commit
// Also prevent us from trying to become a leader while paused and catching up.
n.pobserver, n.observer = n.observer, true
n.resetElect(48 * time.Hour)
return nil
}
func (n *raft) ResumeApply() {
n.Lock()
defer n.Unlock()
n.debug("Resuming apply channel")
if !n.paused {
return
}
n.debug("Resuming our apply channel")
n.observer, n.pobserver = n.pobserver, false
n.paused = false
// Run catchup..
if n.hcommit > n.commit {
@@ -783,6 +803,7 @@ func (n *raft) ResumeApply() {
}
}
n.hcommit = 0
n.resetElectionTimeout()
}
// Compact will compact our WAL.
@@ -1233,6 +1254,7 @@ func (n *raft) campaign() error {
}
n.lxfer = true
n.resetElect(randCampaignTimeout())
return nil
}
@@ -1534,6 +1556,7 @@ func convertVoteResponse(i interface{}) *voteResponse {
func (n *raft) runAsFollower() {
for {
elect := n.electTimer()
select {
case <-n.entry.ch:
n.processAppendEntries()
@@ -1839,6 +1862,9 @@ func (n *raft) runAsLeader() {
hb := time.NewTicker(hbInterval)
defer hb.Stop()
lq := time.NewTicker(hbInterval * 2)
defer lq.Stop()
for {
select {
case <-n.s.quitCh:
@@ -1876,6 +1902,7 @@ func (n *raft) runAsLeader() {
if n.notActive() {
n.sendHeartbeat()
}
case <-lq.C:
if n.lostQuorum() {
n.switchToFollower(noLeader)
return
@@ -1886,7 +1913,7 @@ func (n *raft) runAsLeader() {
if vresp == nil {
continue
}
if vresp.term > n.currentTerm() {
if vresp.term > n.Term() {
n.switchToFollower(noLeader)
return
}
@@ -1949,7 +1976,7 @@ func (n *raft) notActive() bool {
}
// Return our current term.
func (n *raft) currentTerm() uint64 {
func (n *raft) Term() uint64 {
n.RLock()
defer n.RUnlock()
return n.term
@@ -2744,7 +2771,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
case EntryLeaderTransfer:
if isNew {
maybeLeader := string(e.Data)
if maybeLeader == n.id {
if maybeLeader == n.id && !n.observer && !n.paused {
n.campaign()
}
}
@@ -3369,6 +3396,7 @@ func (n *raft) switchToFollower(leader string) {
return
}
n.debug("Switching to follower")
n.lxfer = false
n.updateLeader(leader)
n.switchState(Follower)
@@ -3380,6 +3408,11 @@ func (n *raft) switchToCandidate() {
if n.state == Closed {
return
}
// If we are catching up or are in observer mode we can not switch.
if n.observer || n.paused {
return
}
if n.state != Candidate {
n.debug("Switching to candidate")
} else {