diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index a8cfdfc2..34c29e4b 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1721,6 +1721,7 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) { } oldLeader = c.restartServer(oldLeader) + c.waitOnStreamLeader("$G", "TEST") c.waitOnStreamCurrent(oldLeader, "$G", "TEST") // Re-request. @@ -3530,7 +3531,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST") if err != nil { return fmt.Errorf("Could not fetch stream info: %v", err) @@ -3717,6 +3718,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) { sl := c.streamLeader("$G", "TEST") c.removeJetStream(sl) + c.waitOnLeader() c.waitOnStreamLeader("$G", "TEST") // Faster timeout since we loop below checking for condition. @@ -3726,7 +3728,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) { } // Check the stream info is eventually correct. - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST") if err != nil { return fmt.Errorf("Could not fetch stream info: %v", err) @@ -3744,7 +3746,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) { // Now do consumer. c.waitOnConsumerLeader("$G", "TEST", cname) - checkFor(t, 5*time.Second, 50*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 50*time.Millisecond, func() error { ci, err := js.ConsumerInfo("TEST", cname) if err != nil { return fmt.Errorf("Could not fetch consumer info: %v", err) @@ -4351,7 +4353,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js2.StreamInfo("MY_MIRROR_TEST") if err != nil { t.Fatalf("Could not retrieve stream info") @@ -4393,7 +4395,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { t.Fatalf("Did not receive correct response: %+v", scResp.Error) } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js2.StreamInfo("MY_SOURCE_TEST") if err != nil { t.Fatalf("Could not retrieve stream info") @@ -4971,7 +4973,7 @@ func (c *cluster) waitOnPeerCount(n int) { c.t.Helper() c.waitOnLeader() leader := c.leader() - expires := time.Now().Add(10 * time.Second) + expires := time.Now().Add(20 * time.Second) for time.Now().Before(expires) { peers := leader.JetStreamClusterPeers() if len(peers) == n { @@ -4984,7 +4986,7 @@ func (c *cluster) waitOnPeerCount(n int) { func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) { c.t.Helper() - expires := time.Now().Add(10 * time.Second) + expires := time.Now().Add(20 * time.Second) for time.Now().Before(expires) { if leader := c.consumerLeader(account, stream, consumer); leader != nil { time.Sleep(100 * time.Millisecond) @@ -5053,7 +5055,7 @@ func (c *cluster) waitOnStreamCurrent(s *Server, account, stream string) { func (c *cluster) waitOnServerCurrent(s *Server) { c.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(20 * time.Second) for time.Now().Before(expires) { if s.JetStreamIsCurrent() { time.Sleep(100 * time.Millisecond) @@ -5112,7 +5114,7 @@ func (c *cluster) expectNoLeader() { func (c *cluster) waitOnLeader() { c.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(40 * time.Second) for time.Now().Before(expires) { if leader := c.leader(); leader != nil { time.Sleep(100 * time.Millisecond) diff --git a/server/norace_test.go b/server/norace_test.go index c35caa81..8950cc1e 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1373,7 +1373,7 @@ func TestNoRaceJetStreamClusterLargeStreamInlineCatchup(t *testing.T) { c.waitOnStreamCurrent(sr, "$G", "TEST") // Ask other servers to stepdown as leader so that sr becomes the leader. - checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnStreamLeader("$G", "TEST") if sl := c.streamLeader("$G", "TEST"); sl != sr { sl.JetStreamStepdownStream("$G", "TEST") diff --git a/server/raft.go b/server/raft.go index e7ae18f3..1e2f614f 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2051,6 +2051,7 @@ func (n *raft) runAsCandidate() { // We vote for ourselves. votes := 1 + won := false for { elect := n.electTimer() @@ -2064,17 +2065,39 @@ func (n *raft) runAsCandidate() { case <-n.quit: return case <-elect.C: - n.switchToCandidate() + if won { + // we are here if we won the election but some server did not respond + n.switchToLeader() + } else { + n.switchToCandidate() + } return case vresp := <-n.votes: - n.trackPeer(vresp.peer) if vresp.granted && n.term >= vresp.term { + // only track peers that would be our followers + n.trackPeer(vresp.peer) votes++ if n.wonElection(votes) { - // Become LEADER if we have won. - n.switchToLeader() - return + // TODO If this server was also leader in n.term-1, then we could skip the timer as well. + // This would be ok as we'd be guaranteed to have the latest history. + if len(n.peers) == votes { + // Become LEADER if we have won and gotten a quorum with everyone + n.switchToLeader() + return + } else { + // Not everyone is in this quorum, yet? + // Wait for the remaining responses and become leader once everyone did. + // Or Wait until after the election timeout and become leader then. + // In case another server responds with vresp.granted==false and vresp.term > n.term, + // we will start all over again. + won = true + } } + } else if vresp.term > n.term { + // if we observe a bigger term, we should start over again or risk forming a quorum fully knowing + // someone with a better term exists. This is even the right thing to do if won == true. + n.switchToCandidate() + return } case vreq := <-n.reqs: n.processVoteRequest(vreq)