diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6ccd1700..fc4dc8fe 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1442,10 +1442,9 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor rg.node = n // See if we are preferred and should start campaign immediately. - if n.ID() == rg.Preferred { + if n.ID() == rg.Preferred && n.Term() == 0 { n.Campaign() } - return nil } @@ -1602,6 +1601,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { } else if n.GroupLeader() != noLeader { js.setStreamAssignmentRecovering(sa) } + js.processStreamLeaderChange(mset, isLeader) case <-t.C: doSnapshot() @@ -1807,7 +1807,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // We can skip if we know this is less than what we already have. if lseq < last { - s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last) + s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", + mset.account(), mset.name(), lseq, last) continue } @@ -1831,7 +1832,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if isClusterResetErr(err) || isOutOfSpaceErr(err) { return err } - s.Debugf("Apply stream entries error processing message: %v", err) + s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v", + mset.account(), mset.name(), err) } case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) @@ -1858,7 +1860,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } if err != nil && !isRecovering { - s.Debugf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", + s.Debugf("JetStream cluster failed to delete stream msg %d from '%s > %s': %v", md.Seq, md.Stream, md.Client.serviceAccount(), err) } @@ -5220,6 +5222,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error { mset.clfs = snap.Failed mset.store.FastState(&state) sreq := mset.calculateSyncRequest(&state, snap) + s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name) mset.mu.Unlock() @@ -5240,7 +5243,9 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error { } // Pause the apply channel for our raft group while we catch up. - n.PauseApply() + if err := n.PauseApply(); err != nil { + return err + } defer n.ResumeApply() // Set our catchup state. @@ -5308,7 +5313,8 @@ RETRY: }) if err != nil { s.Errorf("Could not subscribe to stream catchup: %v", err) - return err + err = nil + goto RETRY } b, _ := json.Marshal(sreq) @@ -5368,8 +5374,11 @@ RETRY: case <-qch: return nil case isLeader := <-lch: - js.processStreamLeaderChange(mset, isLeader) - return nil + if isLeader { + n.StepDown() + notActive.Reset(activityInterval) + goto RETRY + } } } } @@ -5534,7 +5543,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { s := mset.srv defer s.grWG.Done() - const maxOutBytes = int64(1 * 1024 * 1024) // 1MB for now. + const maxOutBytes = int64(4 * 1024 * 1024) // 4MB for now. const maxOutMsgs = int32(16384) outb := int64(0) outm := int32(0) diff --git a/server/raft.go b/server/raft.go index 104d21e3..5ddcc335 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -48,6 +48,7 @@ type RaftNode interface { Leader() bool Quorum() bool Current() bool + Term() uint64 GroupLeader() string HadPreviousLeader() bool StepDown(preferred ...string) error @@ -61,7 +62,7 @@ type RaftNode interface { AdjustBootClusterSize(csz int) error ClusterSize() int ApplyQ() *ipQueue // of *CommittedEntry - PauseApply() + PauseApply() error ResumeApply() LeadChangeC() <-chan bool QuitC() <-chan struct{} @@ -183,8 +184,9 @@ type raft struct { progress map[string]*ipQueue // of uint64 // For when we have paused our applyC. - paused bool - hcommit uint64 + paused bool + hcommit uint64 + pobserver bool // Queues and Channels prop *ipQueue // of *Entry @@ -225,7 +227,7 @@ const ( minElectionTimeoutDefault = 2 * time.Second maxElectionTimeoutDefault = 5 * time.Second minCampaignTimeoutDefault = 100 * time.Millisecond - maxCampaignTimeoutDefault = 4 * minCampaignTimeoutDefault + maxCampaignTimeoutDefault = 8 * minCampaignTimeoutDefault hbIntervalDefault = 500 * time.Millisecond lostQuorumIntervalDefault = hbIntervalDefault * 5 ) @@ -758,20 +760,38 @@ func (n *raft) AdjustClusterSize(csz int) error { // PauseApply will allow us to pause processing of append entries onto our // external apply chan. -func (n *raft) PauseApply() { +func (n *raft) PauseApply() error { n.Lock() defer n.Unlock() - n.debug("Pausing apply channel") + if n.state == Leader { + return errAlreadyLeader + } + // If we are currently a candidate make sure we step down. + if n.state == Candidate { + n.stepdown.push(noLeader) + } + + n.debug("Pausing our apply channel") n.paused = true n.hcommit = n.commit + // Also prevent us from trying to become a leader while paused and catching up. + n.pobserver, n.observer = n.observer, true + n.resetElect(48 * time.Hour) + + return nil } func (n *raft) ResumeApply() { n.Lock() defer n.Unlock() - n.debug("Resuming apply channel") + if !n.paused { + return + } + + n.debug("Resuming our apply channel") + n.observer, n.pobserver = n.pobserver, false n.paused = false // Run catchup.. if n.hcommit > n.commit { @@ -783,6 +803,7 @@ func (n *raft) ResumeApply() { } } n.hcommit = 0 + n.resetElectionTimeout() } // Compact will compact our WAL. @@ -1233,6 +1254,7 @@ func (n *raft) campaign() error { } n.lxfer = true n.resetElect(randCampaignTimeout()) + return nil } @@ -1534,6 +1556,7 @@ func convertVoteResponse(i interface{}) *voteResponse { func (n *raft) runAsFollower() { for { elect := n.electTimer() + select { case <-n.entry.ch: n.processAppendEntries() @@ -1839,6 +1862,9 @@ func (n *raft) runAsLeader() { hb := time.NewTicker(hbInterval) defer hb.Stop() + lq := time.NewTicker(hbInterval * 2) + defer lq.Stop() + for { select { case <-n.s.quitCh: @@ -1876,6 +1902,7 @@ func (n *raft) runAsLeader() { if n.notActive() { n.sendHeartbeat() } + case <-lq.C: if n.lostQuorum() { n.switchToFollower(noLeader) return @@ -1886,7 +1913,7 @@ func (n *raft) runAsLeader() { if vresp == nil { continue } - if vresp.term > n.currentTerm() { + if vresp.term > n.Term() { n.switchToFollower(noLeader) return } @@ -1949,7 +1976,7 @@ func (n *raft) notActive() bool { } // Return our current term. -func (n *raft) currentTerm() uint64 { +func (n *raft) Term() uint64 { n.RLock() defer n.RUnlock() return n.term @@ -2744,7 +2771,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { case EntryLeaderTransfer: if isNew { maybeLeader := string(e.Data) - if maybeLeader == n.id { + if maybeLeader == n.id && !n.observer && !n.paused { n.campaign() } } @@ -3369,6 +3396,7 @@ func (n *raft) switchToFollower(leader string) { return } n.debug("Switching to follower") + n.lxfer = false n.updateLeader(leader) n.switchState(Follower) @@ -3380,6 +3408,11 @@ func (n *raft) switchToCandidate() { if n.state == Closed { return } + // If we are catching up or are in observer mode we can not switch. + if n.observer || n.paused { + return + } + if n.state != Candidate { n.debug("Switching to candidate") } else {