Merge pull request #3713 from nats-io/lazarus-mem-leader

fix and test for clustered mem store asset no-quorum if leader restarted
This commit is contained in:
Derek Collison
2022-12-14 16:56:33 -08:00
committed by GitHub
2 changed files with 83 additions and 9 deletions

View File

@@ -2286,3 +2286,86 @@ func TestJetStreamClusterAfterPeerRemoveZeroState(t *testing.T) {
return nil
})
}
func TestJetStreamClusterMemLeaderRestart(t *testing.T) {
// Test if R3 clustered mem store asset leader server restarted, that asset remains stable with final quorum
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
ml := c.leader()
nc, jsc := jsClientConnect(t, ml)
defer nc.Close()
_, err := jsc.AddStream(&nats.StreamConfig{
Name: "foo",
Storage: nats.MemoryStorage,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)
// load up messages
toSend := 10000
for i := 1; i <= toSend; i++ {
msg := []byte("Hello World")
if _, err = jsc.Publish("foo.a", msg); err != nil {
t.Fatalf("unexpected publish error: %v", err)
}
}
osi, err := jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure 10000 msgs
require_True(t, osi.State.Msgs == uint64(toSend))
// Shutdown the stream leader server
rs := c.serverByName(osi.Cluster.Leader)
rs.Shutdown()
// Make sure that we have a META leader (there can always be a re-election)
c.waitOnLeader()
// Should still have quorum and a new leader
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("foo")
if err != nil {
return fmt.Errorf("expected healthy stream asset, got %s", err.Error())
}
if osi.Cluster.Leader == "" {
return fmt.Errorf("expected healthy stream asset with new leader")
}
if osi.State.Msgs != uint64(toSend) {
return fmt.Errorf("expected healthy stream asset %d messages, got %d messages", toSend, osi.State.Msgs)
}
return nil
})
// Now restart the old leader peer (old stream state)
oldrs := rs
rs, _ = RunServerWithConfig(rs.getOpts().ConfigFile)
defer rs.Shutdown()
// Replaced old with new server
for i := 0; i < len(c.servers); i++ {
if c.servers[i] == oldrs {
c.servers[i] = rs
}
}
// Wait for cluster to be formed
checkClusterFormed(t, c.servers...)
// Make sure that we have a leader (there can always be a re-election)
c.waitOnLeader()
// Can we get stream info after return
osi, err = jsc.StreamInfo("foo")
if err != nil {
t.Fatalf("expected stream asset info return, got %s", err.Error())
}
// When asset leader came back did we re-form with quorum
if osi.Cluster.Leader == "" {
t.Fatalf("expected a current leader after old leader restarted")
}
}

View File

@@ -951,15 +951,6 @@ func (n *raft) InstallSnapshot(data []byte) error {
return errCatchupsRunning
}
// We don't store snapshots for memory based WALs.
// This matches loadSnapshot logic now.
// But we should compact.
if _, ok := n.wal.(*memStore); ok {
n.wal.Compact(n.applied)
n.Unlock()
return nil
}
var state StreamState
n.wal.FastState(&state)