Merge pull request #1870 from nats-io/raft_inline_catchup

Fixed bug on raft inline catchup when apply channel was full.
This commit is contained in:
Derek Collison
2021-01-30 14:33:08 -07:00
committed by GitHub
4 changed files with 200 additions and 27 deletions

View File

@@ -207,6 +207,32 @@ func (s *Server) JetStreamSnapshotMeta() error {
return cc.meta.Snapshot(js.metaSnapshot())
}
func (s *Server) JetStreamStepdownStream(account, stream string) error {
js, cc := s.getJetStreamCluster()
if js == nil {
return ErrJetStreamNotEnabled
}
if cc == nil {
return ErrJetStreamNotClustered
}
// Grab account
acc, err := s.LookupAccount(account)
if err != nil {
return err
}
// Grab stream
mset, err := acc.LookupStream(stream)
if err != nil {
return err
}
if node := mset.raftNode(); node != nil && node.Leader() {
node.StepDown()
}
return nil
}
func (s *Server) JetStreamSnapshotStream(account, stream string) error {
js, cc := s.getJetStreamCluster()
if js == nil {
@@ -1159,7 +1185,6 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
}()
}
}
case <-s.quitCh:
return
case <-qch:
@@ -1341,7 +1366,7 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
resp.Error = jsError(err)
s.sendAPIErrResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(nil)}
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(mset.raftNode())}
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
}
}
@@ -2050,7 +2075,9 @@ func (js *jetStream) monitorConsumer(o *Consumer, ca *consumerAssignment) {
js.processConsumerLeaderChange(o, ca, isLeader)
case <-t.C:
// TODO(dlc) - We should have this delayed a bit to not race the invariants.
n.Compact(last)
if last != 0 {
n.Compact(last)
}
}
}
}
@@ -3389,7 +3416,12 @@ func (s *Server) clusterInfo(n RaftNode) *ClusterInfo {
id, peers := n.ID(), n.Peers()
for _, rp := range peers {
if rp.ID != id {
pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: rp.Current, Active: now.Sub(rp.Last)}
lastSeen := now.Sub(rp.Last)
current := rp.Current
if current && lastSeen > lostQuorumInterval {
current = false
}
pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: current, Active: lastSeen}
ci.Replicas = append(ci.Replicas, pi)
}
}

View File

@@ -204,6 +204,8 @@ var (
errCorruptPeers = errors.New("raft: corrupt peer state")
errStepdownFailed = errors.New("raft: stepdown failed")
errPeersNotCurrent = errors.New("raft: all peers are not current")
errFailedToApply = errors.New("raft: could not place apply entry")
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
)
// This will bootstrap a raftNode by writing its config into the store directory.
@@ -272,7 +274,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
votes: make(chan *voteResponse, 8),
resp: make(chan *appendEntryResponse, 256),
propc: make(chan *Entry, 256),
applyc: make(chan *CommittedEntry, 256),
applyc: make(chan *CommittedEntry, 512),
leadc: make(chan bool, 4),
stepdown: make(chan string, 4),
}
@@ -292,7 +294,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
if first, err := n.loadFirstEntry(); err == nil {
n.pterm, n.pindex = first.pterm, first.pindex
if first.commit > 0 {
n.commit = first.commit - 1
n.commit = first.commit
}
}
// Replay the log.
@@ -443,7 +445,7 @@ func (n *raft) Propose(data []byte) error {
case <-paused:
case <-quit:
return errProposalFailed
case <-time.After(400 * time.Millisecond):
case <-time.After(422 * time.Millisecond):
return errProposalsPaused
}
}
@@ -534,10 +536,11 @@ func (n *raft) ResumeApply() {
// Run catchup..
if n.hcommit > n.commit {
for index := n.commit + 1; index <= n.hcommit; index++ {
n.applyCommit(index)
if err := n.applyCommit(index); err != nil {
break
}
}
}
n.paused = false
n.hcommit = 0
}
@@ -695,13 +698,12 @@ func (n *raft) StepDown() error {
if maybeLeader != noLeader {
n.debug("Stepping down, selected %q for new leader", maybeLeader)
n.sendAppendEntry([]*Entry{&Entry{EntryLeaderTransfer, []byte(maybeLeader)}})
} else {
// Force us to stepdown here.
select {
case stepdown <- noLeader:
default:
return errStepdownFailed
}
}
// Force us to stepdown here.
select {
case stepdown <- noLeader:
default:
return errStepdownFailed
}
return nil
}
@@ -1177,6 +1179,7 @@ func (n *raft) runAsLeader() {
n.switchToFollower(noLeader)
return
}
case vresp := <-n.votes:
if vresp.term > n.currentTerm() {
n.switchToFollower(noLeader)
@@ -1303,6 +1306,9 @@ func (n *raft) runCatchup(peer, subj string, indexUpdatesC <-chan uint64) {
timeout := time.NewTimer(activityInterval)
defer timeout.Stop()
stepCheck := time.NewTicker(100 * time.Millisecond)
defer stepCheck.Stop()
// Run as long as we are leader and still not caught up.
for n.Leader() {
select {
@@ -1310,6 +1316,11 @@ func (n *raft) runCatchup(peer, subj string, indexUpdatesC <-chan uint64) {
return
case <-n.quit:
return
case <-stepCheck.C:
if !n.Leader() {
n.debug("Catching up canceled, no longer leader")
return
}
case <-timeout.C:
n.debug("Catching up for %q stalled", peer)
return
@@ -1379,10 +1390,10 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
// applyCommit will update our commit index and apply the entry to the apply chan.
// lock should be held.
func (n *raft) applyCommit(index uint64) {
func (n *raft) applyCommit(index uint64) error {
if index <= n.commit {
n.debug("Ignoring apply commit for %d, already processed", index)
return
return nil
}
original := n.commit
n.commit = index
@@ -1396,7 +1407,7 @@ func (n *raft) applyCommit(index uint64) {
if err != nil {
n.debug("Got an error loading %d index: %v", index, err)
n.commit = original
return
return errEntryLoadFailed
}
ae.buf = nil
@@ -1429,13 +1440,15 @@ func (n *raft) applyCommit(index uint64) {
select {
case n.applyc <- &CommittedEntry{index, committed}:
default:
n.debug("Failed to place committed entry onto our apply channel")
n.error("Failed to place committed entry onto our apply channel")
n.commit = original
return errFailedToApply
}
} else {
// If we processed inline update our applied index.
n.applied = index
}
return nil
}
// Used to track a success response and apply entries.
@@ -1473,7 +1486,9 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
if nr := len(results); nr >= n.qn {
// We have a quorum.
for index := n.commit + 1; index <= ar.index; index++ {
n.applyCommit(index)
if err := n.applyCommit(index); err != nil {
break
}
}
sendHB = len(n.propc) == 0
}
@@ -1594,7 +1609,7 @@ func (n *raft) catchupStalled() bool {
// Lock should be held.
func (n *raft) createCatchup(ae *appendEntry) string {
// Cleanup any old ones.
if n.catchup != nil {
if n.catchup != nil && n.catchup.sub != nil {
n.s.sysUnsubscribe(n.catchup.sub)
}
// Snapshot term and index.
@@ -1755,8 +1770,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if maybeLeader == n.id {
n.campaign()
}
// This will not have commits follow. We also know this will be by itself.
n.commit = ae.pindex + 1
}
case EntryAddPeer:
if newPeer := string(e.Data); len(newPeer) == idLen {
@@ -1782,7 +1795,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("Paused, not applying %d", ae.commit)
} else {
for index := n.commit + 1; index <= ae.commit; index++ {
n.applyCommit(index)
if err := n.applyCommit(index); err != nil {
break
}
}
}
}
@@ -1833,6 +1848,11 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
return err
}
// Sanity checking for now.
if ae.pindex != seq-1 {
panic(fmt.Sprintf("[%s] Placed an entry at the wrong index, ae is %+v, index is %d\n\n", n.s, ae, seq))
}
n.pterm = ae.term
n.pindex = seq
return nil
@@ -2091,11 +2111,13 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
// If this is a higher term go ahead and stepdown.
if vr.term > n.term {
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term)
n.term = vr.term
n.vote = noVote
n.writeTermVote()
n.attemptStepDown(noLeader)
if n.state == Candidate {
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term)
n.attemptStepDown(noLeader)
}
}
// Only way we get to yes is through here.

View File

@@ -2615,10 +2615,12 @@ func TestJetStreamRestartAdvisories(t *testing.T) {
func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "ND", 2)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
@@ -2662,6 +2664,51 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) {
}
}
func TestJetStreamClusterNoDupePeerSelection(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NDP", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Create 10 streams. Make sure none of them have a replica
// that is the same as the leader.
for i := 1; i <= 10; i++ {
si, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST-%d", i),
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster == nil || si.Cluster.Leader == "" || len(si.Cluster.Replicas) != 2 {
t.Fatalf("Unexpected cluster state for stream info: %+v\n", si.Cluster)
}
// Make sure that the replicas are not same as the leader.
for _, pi := range si.Cluster.Replicas {
if pi.Name == si.Cluster.Leader {
t.Fatalf("Found replica that is same as leader, meaning 2 nodes placed on same server")
}
}
// Now do a consumer and check same thing.
sub, err := js.SubscribeSync(si.Config.Name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error getting consumer info: %v", err)
}
for _, pi := range ci.Cluster.Replicas {
if pi.Name == ci.Cluster.Leader {
t.Fatalf("Found replica that is same as leader, meaning 2 nodes placed on same server")
}
}
}
}
func TestJetStreamClusterStreamPerf(t *testing.T) {
// Comment out to run, holding place for now.
skip(t)

View File

@@ -646,6 +646,78 @@ func TestNoRaceJetStreamWorkQueueLoadBalance(t *testing.T) {
}
}
func TestJetStreamClusterLargeStreamInlineCatchup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "LSS", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sr := c.randomNonStreamLeader("$G", "TEST")
sr.Shutdown()
// In case sr was meta leader.
c.waitOnLeader()
msg, toSend := []byte("Hello JS Clustering"), 5000
// Now fill up stream.
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Check active state as well, shows that the owner answered.
if si.State.Msgs != uint64(toSend) {
t.Fatalf("Expected %d msgs, got bad state: %+v", toSend, si.State)
}
// Kill our current leader to make just 2.
c.streamLeader("$G", "TEST").Shutdown()
// Now restart the shutdown peer and wait for it to be current.
sr = c.restartServer(sr)
c.waitOnStreamCurrent(sr, "$G", "TEST")
// Ask other servers to stepdown as leader so that sr becomes the leader.
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader("$G", "TEST")
if sl := c.streamLeader("$G", "TEST"); sl != sr {
sl.JetStreamStepdownStream("$G", "TEST")
return fmt.Errorf("Server %s is not leader yet", sr)
}
return nil
})
si, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Check that we have all of our messsages stored.
// Wait for a bit for upper layers to process.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Expected %d msgs, got %d", toSend, si.State.Msgs)
}
return nil
})
}
func TestNoRaceLeafNodeSmapUpdate(t *testing.T) {
s, opts := runLeafServer()
defer s.Shutdown()