mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
fix and test for clustered mem store asset no-quorum if leader restarted
This commit is contained in:
@@ -2286,3 +2286,86 @@ func TestJetStreamClusterAfterPeerRemoveZeroState(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMemLeaderRestart(t *testing.T) {
|
||||
// Test if R3 clustered mem store asset leader server restarted, that asset remains stable with final quorum
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
ml := c.leader()
|
||||
nc, jsc := jsClientConnect(t, ml)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := jsc.AddStream(&nats.StreamConfig{
|
||||
Name: "foo",
|
||||
Storage: nats.MemoryStorage,
|
||||
Subjects: []string{"foo.*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// load up messages
|
||||
toSend := 10000
|
||||
for i := 1; i <= toSend; i++ {
|
||||
msg := []byte("Hello World")
|
||||
if _, err = jsc.Publish("foo.a", msg); err != nil {
|
||||
t.Fatalf("unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
osi, err := jsc.StreamInfo("foo")
|
||||
require_NoError(t, err)
|
||||
// make sure 10000 msgs
|
||||
require_True(t, osi.State.Msgs == uint64(toSend))
|
||||
|
||||
// Shutdown the stream leader server
|
||||
rs := c.serverByName(osi.Cluster.Leader)
|
||||
rs.Shutdown()
|
||||
|
||||
// Make sure that we have a META leader (there can always be a re-election)
|
||||
c.waitOnLeader()
|
||||
|
||||
// Should still have quorum and a new leader
|
||||
checkFor(t, time.Second, 200*time.Millisecond, func() error {
|
||||
osi, err = jsc.StreamInfo("foo")
|
||||
if err != nil {
|
||||
return fmt.Errorf("expected healthy stream asset, got %s", err.Error())
|
||||
}
|
||||
if osi.Cluster.Leader == "" {
|
||||
return fmt.Errorf("expected healthy stream asset with new leader")
|
||||
}
|
||||
if osi.State.Msgs != uint64(toSend) {
|
||||
return fmt.Errorf("expected healthy stream asset %d messages, got %d messages", toSend, osi.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Now restart the old leader peer (old stream state)
|
||||
oldrs := rs
|
||||
rs, _ = RunServerWithConfig(rs.getOpts().ConfigFile)
|
||||
defer rs.Shutdown()
|
||||
|
||||
// Replaced old with new server
|
||||
for i := 0; i < len(c.servers); i++ {
|
||||
if c.servers[i] == oldrs {
|
||||
c.servers[i] = rs
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for cluster to be formed
|
||||
checkClusterFormed(t, c.servers...)
|
||||
|
||||
// Make sure that we have a leader (there can always be a re-election)
|
||||
c.waitOnLeader()
|
||||
|
||||
// Can we get stream info after return
|
||||
osi, err = jsc.StreamInfo("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("expected stream asset info return, got %s", err.Error())
|
||||
}
|
||||
|
||||
// When asset leader came back did we re-form with quorum
|
||||
if osi.Cluster.Leader == "" {
|
||||
t.Fatalf("expected a current leader after old leader restarted")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -951,15 +951,6 @@ func (n *raft) InstallSnapshot(data []byte) error {
|
||||
return errCatchupsRunning
|
||||
}
|
||||
|
||||
// We don't store snapshots for memory based WALs.
|
||||
// This matches loadSnapshot logic now.
|
||||
// But we should compact.
|
||||
if _, ok := n.wal.(*memStore); ok {
|
||||
n.wal.Compact(n.applied)
|
||||
n.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
var state StreamState
|
||||
n.wal.FastState(&state)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user