mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -4398,3 +4398,124 @@ func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) {
|
||||
// Make sure no other errors showed up
|
||||
require_True(t, len(errCh) == 0)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
toSend, msg := 1000, bytes.Repeat([]byte("Z"), 1024)
|
||||
for i := 0; i < toSend; i++ {
|
||||
_, err := js.PublishAsync("foo", msg)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
sreq := &JSApiStreamSnapshotRequest{
|
||||
DeliverSubject: nats.NewInbox(),
|
||||
ChunkSize: 512,
|
||||
}
|
||||
req, _ := json.Marshal(sreq)
|
||||
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, "TEST"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
var resp JSApiStreamSnapshotResponse
|
||||
json.Unmarshal(rmsg.Data, &resp)
|
||||
require_True(t, resp.Error == nil)
|
||||
|
||||
state := *resp.State
|
||||
cfg := *resp.Config
|
||||
|
||||
var snapshot []byte
|
||||
done := make(chan bool)
|
||||
|
||||
sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) {
|
||||
// EOF
|
||||
if len(m.Data) == 0 {
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
// Could be writing to a file here too.
|
||||
snapshot = append(snapshot, m.Data...)
|
||||
// Flow ack
|
||||
m.Respond(nil)
|
||||
})
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
// Wait to receive the snapshot.
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive our snapshot in time")
|
||||
}
|
||||
|
||||
// Delete before we try to restore.
|
||||
require_NoError(t, js.DeleteStream("TEST"))
|
||||
|
||||
checkHealth := func() {
|
||||
for _, s := range c.servers {
|
||||
s.healthz(nil)
|
||||
}
|
||||
}
|
||||
|
||||
var rresp JSApiStreamRestoreResponse
|
||||
rreq := &JSApiStreamRestoreRequest{
|
||||
Config: cfg,
|
||||
State: state,
|
||||
}
|
||||
req, _ = json.Marshal(rreq)
|
||||
|
||||
rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
rresp.Error = nil
|
||||
json.Unmarshal(rmsg.Data, &rresp)
|
||||
require_True(t, resp.Error == nil)
|
||||
|
||||
checkHealth()
|
||||
|
||||
// We will now chunk the snapshot responses (and EOF).
|
||||
var chunk [1024]byte
|
||||
for i, r := 0, bytes.NewReader(snapshot); ; {
|
||||
n, err := r.Read(chunk[:])
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
|
||||
i++
|
||||
// We will call healthz for all servers half way through the restore.
|
||||
if i%100 == 0 {
|
||||
checkHealth()
|
||||
}
|
||||
}
|
||||
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
rresp.Error = nil
|
||||
json.Unmarshal(rmsg.Data, &rresp)
|
||||
require_True(t, resp.Error == nil)
|
||||
|
||||
si, err := js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
require_True(t, si.State.Msgs == uint64(toSend))
|
||||
|
||||
// Make sure stepdown works, this would fail before the fix.
|
||||
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
si, err = js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
require_True(t, si.State.Msgs == uint64(toSend))
|
||||
}
|
||||
|
||||
@@ -3167,7 +3167,8 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
|
||||
for acc, asa := range cc.streams {
|
||||
nasa := make(map[string]*streamAssignment)
|
||||
for stream, sa := range asa {
|
||||
if sa.Group.isMember(ourID) {
|
||||
// If we are a member and we are not being restored, select for check.
|
||||
if sa.Group.isMember(ourID) && sa.Restore == nil {
|
||||
csa := sa.copyGroup()
|
||||
csa.consumers = make(map[string]*consumerAssignment)
|
||||
for consumer, ca := range sa.consumers {
|
||||
|
||||
Reference in New Issue
Block a user