diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ce70b5e8..2133244b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -207,6 +207,32 @@ func (s *Server) JetStreamSnapshotMeta() error { return cc.meta.Snapshot(js.metaSnapshot()) } +func (s *Server) JetStreamStepdownStream(account, stream string) error { + js, cc := s.getJetStreamCluster() + if js == nil { + return ErrJetStreamNotEnabled + } + if cc == nil { + return ErrJetStreamNotClustered + } + // Grab account + acc, err := s.LookupAccount(account) + if err != nil { + return err + } + // Grab stream + mset, err := acc.LookupStream(stream) + if err != nil { + return err + } + + if node := mset.raftNode(); node != nil && node.Leader() { + node.StepDown() + } + + return nil +} + func (s *Server) JetStreamSnapshotStream(account, stream string) error { js, cc := s.getJetStreamCluster() if js == nil { @@ -1159,7 +1185,6 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) { }() } } - case <-s.quitCh: return case <-qch: @@ -1341,7 +1366,7 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen resp.Error = jsError(err) s.sendAPIErrResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp)) } else { - resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(nil)} + resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(mset.raftNode())} s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp)) } } @@ -2050,7 +2075,9 @@ func (js *jetStream) monitorConsumer(o *Consumer, ca *consumerAssignment) { js.processConsumerLeaderChange(o, ca, isLeader) case <-t.C: // TODO(dlc) - We should have this delayed a bit to not race the invariants. - n.Compact(last) + if last != 0 { + n.Compact(last) + } } } } @@ -3389,7 +3416,12 @@ func (s *Server) clusterInfo(n RaftNode) *ClusterInfo { id, peers := n.ID(), n.Peers() for _, rp := range peers { if rp.ID != id { - pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: rp.Current, Active: now.Sub(rp.Last)} + lastSeen := now.Sub(rp.Last) + current := rp.Current + if current && lastSeen > lostQuorumInterval { + current = false + } + pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: current, Active: lastSeen} ci.Replicas = append(ci.Replicas, pi) } } diff --git a/server/raft.go b/server/raft.go index 5284ff31..89d6e785 100644 --- a/server/raft.go +++ b/server/raft.go @@ -204,6 +204,8 @@ var ( errCorruptPeers = errors.New("raft: corrupt peer state") errStepdownFailed = errors.New("raft: stepdown failed") errPeersNotCurrent = errors.New("raft: all peers are not current") + errFailedToApply = errors.New("raft: could not place apply entry") + errEntryLoadFailed = errors.New("raft: could not load entry from WAL") ) // This will bootstrap a raftNode by writing its config into the store directory. @@ -272,7 +274,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { votes: make(chan *voteResponse, 8), resp: make(chan *appendEntryResponse, 256), propc: make(chan *Entry, 256), - applyc: make(chan *CommittedEntry, 256), + applyc: make(chan *CommittedEntry, 512), leadc: make(chan bool, 4), stepdown: make(chan string, 4), } @@ -292,7 +294,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { if first, err := n.loadFirstEntry(); err == nil { n.pterm, n.pindex = first.pterm, first.pindex if first.commit > 0 { - n.commit = first.commit - 1 + n.commit = first.commit } } // Replay the log. @@ -443,7 +445,7 @@ func (n *raft) Propose(data []byte) error { case <-paused: case <-quit: return errProposalFailed - case <-time.After(400 * time.Millisecond): + case <-time.After(422 * time.Millisecond): return errProposalsPaused } } @@ -534,10 +536,11 @@ func (n *raft) ResumeApply() { // Run catchup.. if n.hcommit > n.commit { for index := n.commit + 1; index <= n.hcommit; index++ { - n.applyCommit(index) + if err := n.applyCommit(index); err != nil { + break + } } } - n.paused = false n.hcommit = 0 } @@ -695,13 +698,12 @@ func (n *raft) StepDown() error { if maybeLeader != noLeader { n.debug("Stepping down, selected %q for new leader", maybeLeader) n.sendAppendEntry([]*Entry{&Entry{EntryLeaderTransfer, []byte(maybeLeader)}}) - } else { - // Force us to stepdown here. - select { - case stepdown <- noLeader: - default: - return errStepdownFailed - } + } + // Force us to stepdown here. + select { + case stepdown <- noLeader: + default: + return errStepdownFailed } return nil } @@ -1177,6 +1179,7 @@ func (n *raft) runAsLeader() { n.switchToFollower(noLeader) return } + case vresp := <-n.votes: if vresp.term > n.currentTerm() { n.switchToFollower(noLeader) @@ -1303,6 +1306,9 @@ func (n *raft) runCatchup(peer, subj string, indexUpdatesC <-chan uint64) { timeout := time.NewTimer(activityInterval) defer timeout.Stop() + stepCheck := time.NewTicker(100 * time.Millisecond) + defer stepCheck.Stop() + // Run as long as we are leader and still not caught up. for n.Leader() { select { @@ -1310,6 +1316,11 @@ func (n *raft) runCatchup(peer, subj string, indexUpdatesC <-chan uint64) { return case <-n.quit: return + case <-stepCheck.C: + if !n.Leader() { + n.debug("Catching up canceled, no longer leader") + return + } case <-timeout.C: n.debug("Catching up for %q stalled", peer) return @@ -1379,10 +1390,10 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) { // applyCommit will update our commit index and apply the entry to the apply chan. // lock should be held. -func (n *raft) applyCommit(index uint64) { +func (n *raft) applyCommit(index uint64) error { if index <= n.commit { n.debug("Ignoring apply commit for %d, already processed", index) - return + return nil } original := n.commit n.commit = index @@ -1396,7 +1407,7 @@ func (n *raft) applyCommit(index uint64) { if err != nil { n.debug("Got an error loading %d index: %v", index, err) n.commit = original - return + return errEntryLoadFailed } ae.buf = nil @@ -1429,13 +1440,15 @@ func (n *raft) applyCommit(index uint64) { select { case n.applyc <- &CommittedEntry{index, committed}: default: - n.debug("Failed to place committed entry onto our apply channel") + n.error("Failed to place committed entry onto our apply channel") n.commit = original + return errFailedToApply } } else { // If we processed inline update our applied index. n.applied = index } + return nil } // Used to track a success response and apply entries. @@ -1473,7 +1486,9 @@ func (n *raft) trackResponse(ar *appendEntryResponse) { if nr := len(results); nr >= n.qn { // We have a quorum. for index := n.commit + 1; index <= ar.index; index++ { - n.applyCommit(index) + if err := n.applyCommit(index); err != nil { + break + } } sendHB = len(n.propc) == 0 } @@ -1594,7 +1609,7 @@ func (n *raft) catchupStalled() bool { // Lock should be held. func (n *raft) createCatchup(ae *appendEntry) string { // Cleanup any old ones. - if n.catchup != nil { + if n.catchup != nil && n.catchup.sub != nil { n.s.sysUnsubscribe(n.catchup.sub) } // Snapshot term and index. @@ -1755,8 +1770,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if maybeLeader == n.id { n.campaign() } - // This will not have commits follow. We also know this will be by itself. - n.commit = ae.pindex + 1 } case EntryAddPeer: if newPeer := string(e.Data); len(newPeer) == idLen { @@ -1782,7 +1795,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.debug("Paused, not applying %d", ae.commit) } else { for index := n.commit + 1; index <= ae.commit; index++ { - n.applyCommit(index) + if err := n.applyCommit(index); err != nil { + break + } } } } @@ -1833,6 +1848,11 @@ func (n *raft) storeToWAL(ae *appendEntry) error { return err } + // Sanity checking for now. + if ae.pindex != seq-1 { + panic(fmt.Sprintf("[%s] Placed an entry at the wrong index, ae is %+v, index is %d\n\n", n.s, ae, seq)) + } + n.pterm = ae.term n.pindex = seq return nil @@ -2091,11 +2111,13 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { // If this is a higher term go ahead and stepdown. if vr.term > n.term { - n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term) n.term = vr.term n.vote = noVote n.writeTermVote() - n.attemptStepDown(noLeader) + if n.state == Candidate { + n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term) + n.attemptStepDown(noLeader) + } } // Only way we get to yes is through here. diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 4207e373..4e1c6117 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -2615,10 +2615,12 @@ func TestJetStreamRestartAdvisories(t *testing.T) { func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { c := createJetStreamClusterExplicit(t, "ND", 2) defer c.shutdown() + // Client based API s := c.randomServer() nc, js := jsClientConnect(t, s) defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"foo"}, @@ -2662,6 +2664,51 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { } } +func TestJetStreamClusterNoDupePeerSelection(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NDP", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Create 10 streams. Make sure none of them have a replica + // that is the same as the leader. + for i := 1; i <= 10; i++ { + si, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("TEST-%d", i), + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Cluster == nil || si.Cluster.Leader == "" || len(si.Cluster.Replicas) != 2 { + t.Fatalf("Unexpected cluster state for stream info: %+v\n", si.Cluster) + } + // Make sure that the replicas are not same as the leader. + for _, pi := range si.Cluster.Replicas { + if pi.Name == si.Cluster.Leader { + t.Fatalf("Found replica that is same as leader, meaning 2 nodes placed on same server") + } + } + // Now do a consumer and check same thing. + sub, err := js.SubscribeSync(si.Config.Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, err := sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error getting consumer info: %v", err) + } + for _, pi := range ci.Cluster.Replicas { + if pi.Name == ci.Cluster.Leader { + t.Fatalf("Found replica that is same as leader, meaning 2 nodes placed on same server") + } + } + } +} + func TestJetStreamClusterStreamPerf(t *testing.T) { // Comment out to run, holding place for now. skip(t) diff --git a/test/norace_test.go b/test/norace_test.go index 0ef346d0..fb43c7b5 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -646,6 +646,78 @@ func TestNoRaceJetStreamWorkQueueLoadBalance(t *testing.T) { } } +func TestJetStreamClusterLargeStreamInlineCatchup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "LSS", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sr := c.randomNonStreamLeader("$G", "TEST") + sr.Shutdown() + + // In case sr was meta leader. + c.waitOnLeader() + + msg, toSend := []byte("Hello JS Clustering"), 5000 + + // Now fill up stream. + for i := 0; i < toSend; i++ { + if _, err = js.Publish("foo", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Check active state as well, shows that the owner answered. + if si.State.Msgs != uint64(toSend) { + t.Fatalf("Expected %d msgs, got bad state: %+v", toSend, si.State) + } + + // Kill our current leader to make just 2. + c.streamLeader("$G", "TEST").Shutdown() + + // Now restart the shutdown peer and wait for it to be current. + sr = c.restartServer(sr) + c.waitOnStreamCurrent(sr, "$G", "TEST") + + // Ask other servers to stepdown as leader so that sr becomes the leader. + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + c.waitOnStreamLeader("$G", "TEST") + if sl := c.streamLeader("$G", "TEST"); sl != sr { + sl.JetStreamStepdownStream("$G", "TEST") + return fmt.Errorf("Server %s is not leader yet", sr) + } + return nil + }) + + si, err = js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Check that we have all of our messsages stored. + // Wait for a bit for upper layers to process. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if si.State.Msgs != uint64(toSend) { + return fmt.Errorf("Expected %d msgs, got %d", toSend, si.State.Msgs) + } + return nil + }) +} + func TestNoRaceLeafNodeSmapUpdate(t *testing.T) { s, opts := runLeafServer() defer s.Shutdown()