Merge pull request #1994 from nats-io/qrm-frm

Fixed a quorum formation issue that caused truncation
This commit is contained in:
Derek Collison
2021-03-12 05:51:02 -06:00
committed by GitHub
3 changed files with 40 additions and 15 deletions

View File

@@ -1721,6 +1721,7 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
}
oldLeader = c.restartServer(oldLeader)
c.waitOnStreamLeader("$G", "TEST")
c.waitOnStreamCurrent(oldLeader, "$G", "TEST")
// Re-request.
@@ -3530,7 +3531,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
@@ -3717,6 +3718,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) {
sl := c.streamLeader("$G", "TEST")
c.removeJetStream(sl)
c.waitOnLeader()
c.waitOnStreamLeader("$G", "TEST")
// Faster timeout since we loop below checking for condition.
@@ -3726,7 +3728,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) {
}
// Check the stream info is eventually correct.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
@@ -3744,7 +3746,7 @@ func TestJetStreamClusterRemoveServer(t *testing.T) {
// Now do consumer.
c.waitOnConsumerLeader("$G", "TEST", cname)
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 50*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", cname)
if err != nil {
return fmt.Errorf("Could not fetch consumer info: %v", err)
@@ -4351,7 +4353,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("MY_MIRROR_TEST")
if err != nil {
t.Fatalf("Could not retrieve stream info")
@@ -4393,7 +4395,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("MY_SOURCE_TEST")
if err != nil {
t.Fatalf("Could not retrieve stream info")
@@ -4971,7 +4973,7 @@ func (c *cluster) waitOnPeerCount(n int) {
c.t.Helper()
c.waitOnLeader()
leader := c.leader()
expires := time.Now().Add(10 * time.Second)
expires := time.Now().Add(20 * time.Second)
for time.Now().Before(expires) {
peers := leader.JetStreamClusterPeers()
if len(peers) == n {
@@ -4984,7 +4986,7 @@ func (c *cluster) waitOnPeerCount(n int) {
func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) {
c.t.Helper()
expires := time.Now().Add(10 * time.Second)
expires := time.Now().Add(20 * time.Second)
for time.Now().Before(expires) {
if leader := c.consumerLeader(account, stream, consumer); leader != nil {
time.Sleep(100 * time.Millisecond)
@@ -5053,7 +5055,7 @@ func (c *cluster) waitOnStreamCurrent(s *Server, account, stream string) {
func (c *cluster) waitOnServerCurrent(s *Server) {
c.t.Helper()
expires := time.Now().Add(5 * time.Second)
expires := time.Now().Add(20 * time.Second)
for time.Now().Before(expires) {
if s.JetStreamIsCurrent() {
time.Sleep(100 * time.Millisecond)
@@ -5112,7 +5114,7 @@ func (c *cluster) expectNoLeader() {
func (c *cluster) waitOnLeader() {
c.t.Helper()
expires := time.Now().Add(5 * time.Second)
expires := time.Now().Add(40 * time.Second)
for time.Now().Before(expires) {
if leader := c.leader(); leader != nil {
time.Sleep(100 * time.Millisecond)

View File

@@ -1373,7 +1373,7 @@ func TestNoRaceJetStreamClusterLargeStreamInlineCatchup(t *testing.T) {
c.waitOnStreamCurrent(sr, "$G", "TEST")
// Ask other servers to stepdown as leader so that sr becomes the leader.
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader("$G", "TEST")
if sl := c.streamLeader("$G", "TEST"); sl != sr {
sl.JetStreamStepdownStream("$G", "TEST")

View File

@@ -2051,6 +2051,7 @@ func (n *raft) runAsCandidate() {
// We vote for ourselves.
votes := 1
won := false
for {
elect := n.electTimer()
@@ -2064,17 +2065,39 @@ func (n *raft) runAsCandidate() {
case <-n.quit:
return
case <-elect.C:
n.switchToCandidate()
if won {
// we are here if we won the election but some server did not respond
n.switchToLeader()
} else {
n.switchToCandidate()
}
return
case vresp := <-n.votes:
n.trackPeer(vresp.peer)
if vresp.granted && n.term >= vresp.term {
// only track peers that would be our followers
n.trackPeer(vresp.peer)
votes++
if n.wonElection(votes) {
// Become LEADER if we have won.
n.switchToLeader()
return
// TODO If this server was also leader in n.term-1, then we could skip the timer as well.
// This would be ok as we'd be guaranteed to have the latest history.
if len(n.peers) == votes {
// Become LEADER if we have won and gotten a quorum with everyone
n.switchToLeader()
return
} else {
// Not everyone is in this quorum, yet?
// Wait for the remaining responses and become leader once everyone did.
// Or Wait until after the election timeout and become leader then.
// In case another server responds with vresp.granted==false and vresp.term > n.term,
// we will start all over again.
won = true
}
}
} else if vresp.term > n.term {
// if we observe a bigger term, we should start over again or risk forming a quorum fully knowing
// someone with a better term exists. This is even the right thing to do if won == true.
n.switchToCandidate()
return
}
case vreq := <-n.reqs:
n.processVoteRequest(vreq)