From b316cccfd1d9e17125842b95546af9f2033db743 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 11 Mar 2021 19:13:40 -0500 Subject: [PATCH 1/3] Fixed a quorum formation issue that caused truncation When a new leader is elected it has to give everyone a chance to reply, so that we can observe rejections with higher term. The maximum election timeout is 7.5 seconds. The new behavior of waiting for the election timeout caused unit tests to fail. Hence upping the timeout there as well. Signed-off-by: Matthias Hanel --- server/jetstream_cluster_test.go | 12 ++++++------ server/norace_test.go | 2 +- server/raft.go | 33 +++++++++++++++++++++++++++----- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index a8cfdfc2..b00c961a 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3726,7 +3726,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) { } // Check the stream info is eventually correct. - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST") if err != nil { return fmt.Errorf("Could not fetch stream info: %v", err) @@ -3744,7 +3744,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) { // Now do consumer. c.waitOnConsumerLeader("$G", "TEST", cname) - checkFor(t, 5*time.Second, 50*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 50*time.Millisecond, func() error { ci, err := js.ConsumerInfo("TEST", cname) if err != nil { return fmt.Errorf("Could not fetch consumer info: %v", err) @@ -4971,7 +4971,7 @@ func (c *cluster) waitOnPeerCount(n int) { c.t.Helper() c.waitOnLeader() leader := c.leader() - expires := time.Now().Add(10 * time.Second) + expires := time.Now().Add(20 * time.Second) for time.Now().Before(expires) { peers := leader.JetStreamClusterPeers() if len(peers) == n { @@ -4984,7 +4984,7 @@ func (c *cluster) waitOnPeerCount(n int) { func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) { c.t.Helper() - expires := time.Now().Add(10 * time.Second) + expires := time.Now().Add(20 * time.Second) for time.Now().Before(expires) { if leader := c.consumerLeader(account, stream, consumer); leader != nil { time.Sleep(100 * time.Millisecond) @@ -5053,7 +5053,7 @@ func (c *cluster) waitOnStreamCurrent(s *Server, account, stream string) { func (c *cluster) waitOnServerCurrent(s *Server) { c.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(20 * time.Second) for time.Now().Before(expires) { if s.JetStreamIsCurrent() { time.Sleep(100 * time.Millisecond) @@ -5112,7 +5112,7 @@ func (c *cluster) expectNoLeader() { func (c *cluster) waitOnLeader() { c.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(20 * time.Second) for time.Now().Before(expires) { if leader := c.leader(); leader != nil { time.Sleep(100 * time.Millisecond) diff --git a/server/norace_test.go b/server/norace_test.go index c35caa81..8950cc1e 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1373,7 +1373,7 @@ func TestNoRaceJetStreamClusterLargeStreamInlineCatchup(t *testing.T) { c.waitOnStreamCurrent(sr, "$G", "TEST") // Ask other servers to stepdown as leader so that sr becomes the leader. - checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnStreamLeader("$G", "TEST") if sl := c.streamLeader("$G", "TEST"); sl != sr { sl.JetStreamStepdownStream("$G", "TEST") diff --git a/server/raft.go b/server/raft.go index e7ae18f3..1e2f614f 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2051,6 +2051,7 @@ func (n *raft) runAsCandidate() { // We vote for ourselves. votes := 1 + won := false for { elect := n.electTimer() @@ -2064,17 +2065,39 @@ func (n *raft) runAsCandidate() { case <-n.quit: return case <-elect.C: - n.switchToCandidate() + if won { + // we are here if we won the election but some server did not respond + n.switchToLeader() + } else { + n.switchToCandidate() + } return case vresp := <-n.votes: - n.trackPeer(vresp.peer) if vresp.granted && n.term >= vresp.term { + // only track peers that would be our followers + n.trackPeer(vresp.peer) votes++ if n.wonElection(votes) { - // Become LEADER if we have won. - n.switchToLeader() - return + // TODO If this server was also leader in n.term-1, then we could skip the timer as well. + // This would be ok as we'd be guaranteed to have the latest history. + if len(n.peers) == votes { + // Become LEADER if we have won and gotten a quorum with everyone + n.switchToLeader() + return + } else { + // Not everyone is in this quorum, yet? + // Wait for the remaining responses and become leader once everyone did. + // Or Wait until after the election timeout and become leader then. + // In case another server responds with vresp.granted==false and vresp.term > n.term, + // we will start all over again. + won = true + } } + } else if vresp.term > n.term { + // if we observe a bigger term, we should start over again or risk forming a quorum fully knowing + // someone with a better term exists. This is even the right thing to do if won == true. + n.switchToCandidate() + return } case vreq := <-n.reqs: n.processVoteRequest(vreq) From 6a0debbb71316dc62d785e600eb58ce6b2db8ddd Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Fri, 12 Mar 2021 00:52:58 -0500 Subject: [PATCH 2/3] more timeout changes Signed-off-by: Matthias Hanel --- server/jetstream_cluster_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index b00c961a..535fb8e5 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3530,7 +3530,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST") if err != nil { return fmt.Errorf("Could not fetch stream info: %v", err) @@ -4351,7 +4351,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js2.StreamInfo("MY_MIRROR_TEST") if err != nil { t.Fatalf("Could not retrieve stream info") @@ -4393,7 +4393,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { t.Fatalf("Did not receive correct response: %+v", scResp.Error) } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js2.StreamInfo("MY_SOURCE_TEST") if err != nil { t.Fatalf("Could not retrieve stream info") From efdb80cc48d302c0c8232ccece60bff485d1cc19 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Fri, 12 Mar 2021 01:51:28 -0500 Subject: [PATCH 3/3] More unit test fixes Signed-off-by: Matthias Hanel --- server/jetstream_cluster_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 535fb8e5..34c29e4b 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1721,6 +1721,7 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) { } oldLeader = c.restartServer(oldLeader) + c.waitOnStreamLeader("$G", "TEST") c.waitOnStreamCurrent(oldLeader, "$G", "TEST") // Re-request. @@ -3717,6 +3718,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) { sl := c.streamLeader("$G", "TEST") c.removeJetStream(sl) + c.waitOnLeader() c.waitOnStreamLeader("$G", "TEST") // Faster timeout since we loop below checking for condition. @@ -5112,7 +5114,7 @@ func (c *cluster) expectNoLeader() { func (c *cluster) waitOnLeader() { c.t.Helper() - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(40 * time.Second) for time.Now().Before(expires) { if leader := c.leader(); leader != nil { time.Sleep(100 * time.Millisecond)