From 47c87eb71c65acddd004ff63bab7c6c18d0af123 Mon Sep 17 00:00:00 2001 From: Todd Beets Date: Wed, 14 Dec 2022 14:34:27 -0800 Subject: [PATCH] fix and test for clustered mem store asset no-quorum if leader restarted --- server/jetstream_cluster_3_test.go | 83 ++++++++++++++++++++++++++++++ server/raft.go | 9 ---- 2 files changed, 83 insertions(+), 9 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index c03bdee3..11be3931 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2286,3 +2286,86 @@ func TestJetStreamClusterAfterPeerRemoveZeroState(t *testing.T) { return nil }) } + +func TestJetStreamClusterMemLeaderRestart(t *testing.T) { + // Test if R3 clustered mem store asset leader server restarted, that asset remains stable with final quorum + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + ml := c.leader() + nc, jsc := jsClientConnect(t, ml) + defer nc.Close() + + _, err := jsc.AddStream(&nats.StreamConfig{ + Name: "foo", + Storage: nats.MemoryStorage, + Subjects: []string{"foo.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // load up messages + toSend := 10000 + for i := 1; i <= toSend; i++ { + msg := []byte("Hello World") + if _, err = jsc.Publish("foo.a", msg); err != nil { + t.Fatalf("unexpected publish error: %v", err) + } + } + + osi, err := jsc.StreamInfo("foo") + require_NoError(t, err) + // make sure 10000 msgs + require_True(t, osi.State.Msgs == uint64(toSend)) + + // Shutdown the stream leader server + rs := c.serverByName(osi.Cluster.Leader) + rs.Shutdown() + + // Make sure that we have a META leader (there can always be a re-election) + c.waitOnLeader() + + // Should still have quorum and a new leader + checkFor(t, time.Second, 200*time.Millisecond, func() error { + osi, err = jsc.StreamInfo("foo") + if err != nil { + return fmt.Errorf("expected healthy stream asset, got %s", err.Error()) + } + if osi.Cluster.Leader == "" { + return fmt.Errorf("expected healthy stream asset with new leader") + } + if osi.State.Msgs != uint64(toSend) { + return fmt.Errorf("expected healthy stream asset %d messages, got %d messages", toSend, osi.State.Msgs) + } + return nil + }) + + // Now restart the old leader peer (old stream state) + oldrs := rs + rs, _ = RunServerWithConfig(rs.getOpts().ConfigFile) + defer rs.Shutdown() + + // Replaced old with new server + for i := 0; i < len(c.servers); i++ { + if c.servers[i] == oldrs { + c.servers[i] = rs + } + } + + // Wait for cluster to be formed + checkClusterFormed(t, c.servers...) + + // Make sure that we have a leader (there can always be a re-election) + c.waitOnLeader() + + // Can we get stream info after return + osi, err = jsc.StreamInfo("foo") + if err != nil { + t.Fatalf("expected stream asset info return, got %s", err.Error()) + } + + // When asset leader came back did we re-form with quorum + if osi.Cluster.Leader == "" { + t.Fatalf("expected a current leader after old leader restarted") + } +} diff --git a/server/raft.go b/server/raft.go index 1bcfd745..7f0ad3fb 100644 --- a/server/raft.go +++ b/server/raft.go @@ -951,15 +951,6 @@ func (n *raft) InstallSnapshot(data []byte) error { return errCatchupsRunning } - // We don't store snapshots for memory based WALs. - // This matches loadSnapshot logic now. - // But we should compact. - if _, ok := n.wal.(*memStore); ok { - n.wal.Compact(n.applied) - n.Unlock() - return nil - } - var state StreamState n.wal.FastState(&state)