From 1bb1a3cae16a3f22082535ad17eb0719a8121da4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 28 Jun 2023 15:27:45 -0700 Subject: [PATCH] Do not health check streams that are actively being restored. Could leave them in a bad state. Signed-off-by: Derek Collison --- server/jetstream_cluster_3_test.go | 121 +++++++++++++++++++++++++++++ server/monitor.go | 3 +- 2 files changed, 123 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 540cc152..0b6925fe 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4398,3 +4398,124 @@ func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) { // Make sure no other errors showed up require_True(t, len(errCh) == 0) } + +func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + toSend, msg := 10_000, bytes.Repeat([]byte("Z"), 1024) + for i := 0; i < toSend; i++ { + _, err := js.PublishAsync("foo", msg) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + sreq := &JSApiStreamSnapshotRequest{ + DeliverSubject: nats.NewInbox(), + ChunkSize: 512, + } + req, _ := json.Marshal(sreq) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, "TEST"), req, time.Second) + require_NoError(t, err) + + var resp JSApiStreamSnapshotResponse + json.Unmarshal(rmsg.Data, &resp) + require_True(t, resp.Error == nil) + + state := *resp.State + cfg := *resp.Config + + var snapshot []byte + done := make(chan bool) + + sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) { + // EOF + if len(m.Data) == 0 { + done <- true + return + } + // Could be writing to a file here too. + snapshot = append(snapshot, m.Data...) + // Flow ack + m.Respond(nil) + }) + defer sub.Unsubscribe() + + // Wait to receive the snapshot. + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive our snapshot in time") + } + + // Delete before we try to restore. + require_NoError(t, js.DeleteStream("TEST")) + + checkHealth := func() { + for _, s := range c.servers { + s.healthz(nil) + } + } + + var rresp JSApiStreamRestoreResponse + rreq := &JSApiStreamRestoreRequest{ + Config: cfg, + State: state, + } + req, _ = json.Marshal(rreq) + + rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second) + require_NoError(t, err) + + rresp.Error = nil + json.Unmarshal(rmsg.Data, &rresp) + require_True(t, resp.Error == nil) + + checkHealth() + + // We will now chunk the snapshot responses (and EOF). + var chunk [1024]byte + for i, r := 0, bytes.NewReader(snapshot); ; { + n, err := r.Read(chunk[:]) + if err != nil { + break + } + nc.Request(rresp.DeliverSubject, chunk[:n], time.Second) + i++ + // We will call healthz for all servers half way through the restore. + if i%100 == 0 { + checkHealth() + } + } + rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second) + require_NoError(t, err) + rresp.Error = nil + json.Unmarshal(rmsg.Data, &rresp) + require_True(t, resp.Error == nil) + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.Msgs == uint64(toSend)) + + // Make sure stepdown works, this would fail before the fix. + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, 5*time.Second) + require_NoError(t, err) + + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.Msgs == uint64(toSend)) +} diff --git a/server/monitor.go b/server/monitor.go index 8e0c11a6..654bb916 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3147,7 +3147,8 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { for acc, asa := range cc.streams { nasa := make(map[string]*streamAssignment) for stream, sa := range asa { - if sa.Group.isMember(ourID) { + // If we are a member and we are not being restored, select for check. + if sa.Group.isMember(ourID) && sa.Restore == nil { csa := sa.copyGroup() csa.consumers = make(map[string]*consumerAssignment) for consumer, ca := range sa.consumers {