diff --git a/server/events_test.go b/server/events_test.go index 17b5a281..a4cb9555 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -17,11 +17,13 @@ import ( "bytes" "crypto/sha256" "encoding/json" + "errors" "fmt" "math/rand" "net/http" "net/http/httptest" "os" + "reflect" "strings" "sync" "sync/atomic" @@ -1976,6 +1978,806 @@ func TestServerEventsStatsZ(t *testing.T) { } } +func TestServerEventsHealthZSingleServer(t *testing.T) { + type healthzResp struct { + Healthz HealthStatus `json:"data"` + Server ServerInfo `json:"server"` + } + cfg := fmt.Sprintf(`listen: 127.0.0.1:-1 + + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + no_auth_user: one + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }`, t.TempDir()) + + serverHealthzReqSubj := "$SYS.REQ.SERVER.%s.HEALTHZ" + s, _ := RunServerWithConfig(createConfFile(t, []byte(cfg))) + defer s.Shutdown() + + ncs, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Error connecting to cluster: %v", err) + } + + defer ncs.Close() + ncAcc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncAcc.Close() + js, err := ncAcc.JetStream() + if err != nil { + t.Fatalf("Error creating JetStream context: %v", err) + } + _, err = js.AddStream(&nats.StreamConfig{ + Name: "test", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + _, err = js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "cons", + }) + if err != nil { + t.Fatalf("Error creating consumer: %v", err) + } + + subj := fmt.Sprintf(serverHealthzReqSubj, s.ID()) + + tests := []struct { + name string + req *HealthzEventOptions + expected HealthStatus + }{ + { + name: "no parameters", + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with js enabled only", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + JSEnabledOnly: true, + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with server only", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + JSServerOnly: true, + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with account name", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with account name and stream", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with account name, stream and consumer", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + Consumer: "cons", + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with stream only", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Stream: "test", + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 400, + Error: `"account" must not be empty when checking stream health`, + }, + }, + { + name: "with stream only, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Details: true, + Stream: "test", + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 400, + Errors: []HealthzError{ + { + Type: HealthzErrorBadRequest, + Error: `"account" must not be empty when checking stream health`, + }, + }, + }, + }, + { + name: "with account and consumer", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Consumer: "cons", + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 400, + Error: `"stream" must not be empty when checking consumer health`, + }, + }, + { + name: "with account and consumer, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Consumer: "cons", + Details: true, + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 400, + Errors: []HealthzError{ + { + Type: HealthzErrorBadRequest, + Error: `"stream" must not be empty when checking consumer health`, + }, + }, + }, + }, + { + name: "account not found", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "abc", + }, + }, + expected: HealthStatus{ + Status: "unavailable", + StatusCode: 404, + Error: `JetStream account "abc" not found`, + }, + }, + { + name: "account not found, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "abc", + Details: true, + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 404, + Errors: []HealthzError{ + { + Type: HealthzErrorAccount, + Account: "abc", + Error: `JetStream account "abc" not found`, + }, + }, + }, + }, + { + name: "stream not found", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "abc", + }, + }, + expected: HealthStatus{ + Status: "unavailable", + StatusCode: 404, + Error: `JetStream stream "abc" not found on account "ONE"`, + }, + }, + { + name: "stream not found, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "abc", + Details: true, + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 404, + Errors: []HealthzError{ + { + Type: HealthzErrorStream, + Account: "ONE", + Stream: "abc", + Error: `JetStream stream "abc" not found on account "ONE"`, + }, + }, + }, + }, + { + name: "consumer not found", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + Consumer: "abc", + }, + }, + expected: HealthStatus{ + Status: "unavailable", + StatusCode: 404, + Error: `JetStream consumer "abc" not found for stream "test" on account "ONE"`, + }, + }, + { + name: "consumer not found, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + Consumer: "abc", + Details: true, + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 404, + Errors: []HealthzError{ + { + Type: HealthzErrorConsumer, + Account: "ONE", + Stream: "test", + Consumer: "abc", + Error: `JetStream consumer "abc" not found for stream "test" on account "ONE"`, + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var body []byte + var err error + if test.req != nil { + body, err = json.Marshal(test.req) + if err != nil { + t.Fatalf("Error marshaling request body: %v", err) + } + } + msg, err := ncs.Request(subj, body, 1*time.Second) + if err != nil { + t.Fatalf("Error trying to request healthz: %v", err) + } + var health healthzResp + if err := json.Unmarshal(msg.Data, &health); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if !reflect.DeepEqual(health.Healthz, test.expected) { + t.Errorf("Invalid healthz status; want: %+v; got: %+v", test.expected, health.Healthz) + } + }) + } +} + +func TestServerEventsHealthZClustered(t *testing.T) { + type healthzResp struct { + Healthz HealthStatus `json:"data"` + Server ServerInfo `json:"server"` + } + serverHealthzReqSubj := "$SYS.REQ.SERVER.%s.HEALTHZ" + c := createJetStreamClusterWithTemplate(t, jsClusterAccountsTempl, "JSC", 3) + defer c.shutdown() + + ncs, err := nats.Connect(c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Error connecting to cluster: %v", err) + } + + defer ncs.Close() + ncAcc, err := nats.Connect(c.randomServer().ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncAcc.Close() + js, err := ncAcc.JetStream() + if err != nil { + t.Fatalf("Error creating JetStream context: %v", err) + } + _, err = js.AddStream(&nats.StreamConfig{ + Name: "test", + Subjects: []string{"foo"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + _, err = js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "cons", + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating consumer: %v", err) + } + + subj := fmt.Sprintf(serverHealthzReqSubj, c.servers[0].ID()) + pingSubj := fmt.Sprintf(serverHealthzReqSubj, "PING") + + tests := []struct { + name string + req *HealthzEventOptions + expected HealthStatus + expectedError string + }{ + { + name: "no parameters", + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with js enabled only", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + JSEnabledOnly: true, + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with server only", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + JSServerOnly: true, + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with account name", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with account name and stream", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with account name, stream and consumer", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + Consumer: "cons", + }, + }, + expected: HealthStatus{Status: "ok", StatusCode: 200}, + }, + { + name: "with stream only", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Stream: "test", + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 400, + Error: `"account" must not be empty when checking stream health`, + }, + expectedError: "Bad request:", + }, + { + name: "with stream only, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Details: true, + Stream: "test", + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 400, + Errors: []HealthzError{ + { + Type: HealthzErrorBadRequest, + Error: `"account" must not be empty when checking stream health`, + }, + }, + }, + }, + { + name: "account not found", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "abc", + }, + }, + expected: HealthStatus{ + Status: "unavailable", + StatusCode: 404, + Error: `JetStream account "abc" not found`, + }, + expectedError: `account "abc" not found`, + }, + { + name: "account not found, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "abc", + Details: true, + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 404, + Errors: []HealthzError{ + { + Type: HealthzErrorAccount, + Account: "abc", + Error: `JetStream account "abc" not found`, + }, + }, + }, + }, + { + name: "stream not found", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "abc", + }, + }, + expected: HealthStatus{ + Status: "unavailable", + StatusCode: 404, + Error: `JetStream stream "abc" not found on account "ONE"`, + }, + expectedError: `stream "abc" not found`, + }, + { + name: "stream not found, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "abc", + Details: true, + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 404, + Errors: []HealthzError{ + { + Type: HealthzErrorStream, + Account: "ONE", + Stream: "abc", + Error: `JetStream stream "abc" not found on account "ONE"`, + }, + }, + }, + }, + { + name: "consumer not found", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + Consumer: "abc", + }, + }, + expected: HealthStatus{ + Status: "unavailable", + StatusCode: 404, + Error: `JetStream consumer "abc" not found for stream "test" on account "ONE"`, + }, + expectedError: `consumer "abc" not found for stream "test"`, + }, + { + name: "consumer not found, detailed", + req: &HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + Consumer: "abc", + Details: true, + }, + }, + expected: HealthStatus{ + Status: "error", + StatusCode: 404, + Errors: []HealthzError{ + { + Type: HealthzErrorConsumer, + Account: "ONE", + Stream: "test", + Consumer: "abc", + Error: `JetStream consumer "abc" not found for stream "test" on account "ONE"`, + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var body []byte + var err error + if test.req != nil { + body, err = json.Marshal(test.req) + if err != nil { + t.Fatalf("Error marshaling request body: %v", err) + } + } + msg, err := ncs.Request(subj, body, 1*time.Second) + if err != nil { + t.Fatalf("Error trying to request healthz: %v", err) + } + var health healthzResp + if err := json.Unmarshal(msg.Data, &health); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if !reflect.DeepEqual(health.Healthz, test.expected) { + t.Errorf("Invalid healthz status; want: %+v; got: %+v", test.expected, health.Healthz) + } + + reply := ncs.NewRespInbox() + sub, err := ncs.SubscribeSync(reply) + if err != nil { + t.Fatalf("Error creating subscription: %v", err) + } + defer sub.Unsubscribe() + + // now PING all servers + if err := ncs.PublishRequest(pingSubj, reply, body); err != nil { + t.Fatalf("Publish error: %v", err) + } + for i := 0; i < 3; i++ { + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error fetching healthz PING response: %v", err) + } + var health healthzResp + if err := json.Unmarshal(msg.Data, &health); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if !reflect.DeepEqual(health.Healthz, test.expected) { + t.Errorf("Invalid healthz status; want: %+v; got: %+v", test.expected, health.Healthz) + } + } + if _, err := sub.NextMsg(50 * time.Millisecond); !errors.Is(err, nats.ErrTimeout) { + t.Fatalf("Expected timeout error; got: %v", err) + } + }) + } +} + +func TestServerEventsHealthZClustered_NoReplicas(t *testing.T) { + type healthzResp struct { + Healthz HealthStatus `json:"data"` + Server ServerInfo `json:"server"` + } + serverHealthzReqSubj := "$SYS.REQ.SERVER.%s.HEALTHZ" + c := createJetStreamClusterWithTemplate(t, jsClusterAccountsTempl, "JSC", 3) + defer c.shutdown() + + ncs, err := nats.Connect(c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Error connecting to cluster: %v", err) + } + + defer ncs.Close() + ncAcc, err := nats.Connect(c.randomServer().ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncAcc.Close() + js, err := ncAcc.JetStream() + if err != nil { + t.Fatalf("Error creating JetStream context: %v", err) + } + + pingSubj := fmt.Sprintf(serverHealthzReqSubj, "PING") + + t.Run("non-replicated stream", func(t *testing.T) { + _, err = js.AddStream(&nats.StreamConfig{ + Name: "test", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + _, err = js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "cons", + }) + if err != nil { + t.Fatalf("Error creating consumer: %v", err) + } + body, err := json.Marshal(HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test", + }, + }) + if err != nil { + t.Fatalf("Error marshaling request body: %v", err) + } + + reply := ncs.NewRespInbox() + sub, err := ncs.SubscribeSync(reply) + if err != nil { + t.Fatalf("Error creating subscription: %v", err) + } + defer sub.Unsubscribe() + + // now PING all servers + if err := ncs.PublishRequest(pingSubj, reply, body); err != nil { + t.Fatalf("Publish error: %v", err) + } + var healthy int + for i := 0; i < 3; i++ { + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error fetching healthz PING response: %v", err) + } + var health healthzResp + if err := json.Unmarshal(msg.Data, &health); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if health.Healthz.Status == "ok" { + healthy++ + continue + } + if !strings.Contains(health.Healthz.Error, `stream "test" not found`) { + t.Errorf("Expected error to contain: %q, got: %s", `stream "test" not found`, health.Healthz.Error) + } + } + if healthy != 1 { + t.Fatalf("Expected 1 healthy server; got: %d", healthy) + } + if _, err := sub.NextMsg(50 * time.Millisecond); !errors.Is(err, nats.ErrTimeout) { + t.Fatalf("Expected timeout error; got: %v", err) + } + }) + + t.Run("non-replicated consumer", func(t *testing.T) { + _, err = js.AddStream(&nats.StreamConfig{ + Name: "test-repl", + Subjects: []string{"bar"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + _, err = js.AddConsumer("test-repl", &nats.ConsumerConfig{ + Name: "cons-single", + }) + if err != nil { + t.Fatalf("Error creating consumer: %v", err) + } + body, err := json.Marshal(HealthzEventOptions{ + HealthzOptions: HealthzOptions{ + Account: "ONE", + Stream: "test-repl", + Consumer: "cons-single", + }, + }) + if err != nil { + t.Fatalf("Error marshaling request body: %v", err) + } + + reply := ncs.NewRespInbox() + sub, err := ncs.SubscribeSync(reply) + if err != nil { + t.Fatalf("Error creating subscription: %v", err) + } + defer sub.Unsubscribe() + + // now PING all servers + if err := ncs.PublishRequest(pingSubj, reply, body); err != nil { + t.Fatalf("Publish error: %v", err) + } + var healthy int + for i := 0; i < 3; i++ { + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error fetching healthz PING response: %v", err) + } + var health healthzResp + if err := json.Unmarshal(msg.Data, &health); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if health.Healthz.Status == "ok" { + healthy++ + continue + } + if !strings.Contains(health.Healthz.Error, `consumer "cons-single" not found`) { + t.Errorf("Expected error to contain: %q, got: %s", `consumer "cons-single" not found`, health.Healthz.Error) + } + } + if healthy != 1 { + t.Fatalf("Expected 1 healthy server; got: %d", healthy) + } + if _, err := sub.NextMsg(50 * time.Millisecond); !errors.Is(err, nats.ErrTimeout) { + t.Fatalf("Expected timeout error; got: %v", err) + } + }) + +} + +func TestServerEventsHealthZJetStreamNotEnabled(t *testing.T) { + type healthzResp struct { + Healthz HealthStatus `json:"data"` + Server ServerInfo `json:"server"` + } + cfg := `listen: 127.0.0.1:-1 + + accounts { + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + + serverHealthzReqSubj := "$SYS.REQ.SERVER.%s.HEALTHZ" + s, _ := RunServerWithConfig(createConfFile(t, []byte(cfg))) + defer s.Shutdown() + + ncs, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Error connecting to cluster: %v", err) + } + + defer ncs.Close() + + subj := fmt.Sprintf(serverHealthzReqSubj, s.ID()) + + msg, err := ncs.Request(subj, nil, 1*time.Second) + if err != nil { + t.Fatalf("Error trying to request healthz: %v", err) + } + var health healthzResp + if err := json.Unmarshal(msg.Data, &health); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if health.Healthz.Status != "ok" { + t.Errorf("Invalid healthz status; want: %q; got: %q", "ok", health.Healthz.Status) + } + if health.Healthz.Error != "" { + t.Errorf("HealthZ error: %s", health.Healthz.Error) + } +} + func TestServerEventsPingStatsZ(t *testing.T) { sa, _, sb, optsB, akp := runTrustedCluster(t) defer sa.Shutdown() diff --git a/server/monitor.go b/server/monitor.go index 6203cf7b..7721fd6d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2717,9 +2717,13 @@ type JSzOptions struct { // HealthzOptions are options passed to Healthz type HealthzOptions struct { // Deprecated: Use JSEnabledOnly instead - JSEnabled bool `json:"js-enabled,omitempty"` - JSEnabledOnly bool `json:"js-enabled-only,omitempty"` - JSServerOnly bool `json:"js-server-only,omitempty"` + JSEnabled bool `json:"js-enabled,omitempty"` + JSEnabledOnly bool `json:"js-enabled-only,omitempty"` + JSServerOnly bool `json:"js-server-only,omitempty"` + Account string `json:"account,omitempty"` + Stream string `json:"stream,omitempty"` + Consumer string `json:"consumer,omitempty"` + Details bool `json:"details,omitempty"` } // ProfilezOptions are options passed to Profilez @@ -3078,8 +3082,72 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { } type HealthStatus struct { - Status string `json:"status"` - Error string `json:"error,omitempty"` + Status string `json:"status"` + StatusCode int `json:"status_code,omitempty"` + Error string `json:"error,omitempty"` + Errors []HealthzError `json:"errors,omitempty"` +} + +type HealthzError struct { + Type HealthZErrorType `json:"type"` + Account string `json:"account,omitempty"` + Stream string `json:"stream,omitempty"` + Consumer string `json:"consumer,omitempty"` + Error string `json:"error,omitempty"` +} + +type HealthZErrorType int + +const ( + HealthzErrorConn HealthZErrorType = iota + HealthzErrorBadRequest + HealthzErrorJetStream + HealthzErrorAccount + HealthzErrorStream + HealthzErrorConsumer +) + +func (t HealthZErrorType) String() string { + switch t { + case HealthzErrorConn: + return "CONNECTION" + case HealthzErrorBadRequest: + return "BAD_REQUEST" + case HealthzErrorJetStream: + return "JETSTREAM" + case HealthzErrorAccount: + return "ACCOUNT" + case HealthzErrorStream: + return "STREAM" + case HealthzErrorConsumer: + return "CONSUMER" + default: + return "unknown" + } +} + +func (t HealthZErrorType) MarshalJSON() ([]byte, error) { + return json.Marshal(t.String()) +} + +func (t *HealthZErrorType) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString("CONNECTION"): + *t = HealthzErrorConn + case jsonString("BAD_REQUEST"): + *t = HealthzErrorBadRequest + case jsonString("JETSTREAM"): + *t = HealthzErrorJetStream + case jsonString("ACCOUNT"): + *t = HealthzErrorAccount + case jsonString("STREAM"): + *t = HealthzErrorStream + case jsonString("CONSUMER"): + *t = HealthzErrorConsumer + default: + return fmt.Errorf("unknown healthz error type %q", data) + } + return nil } // https://tools.ietf.org/id/draft-inadarei-api-health-check-05.html @@ -3104,10 +3172,19 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) { return } + includeDetails, err := decodeBool(w, r, "details") + if err != nil { + return + } + hs := s.healthz(&HealthzOptions{ JSEnabled: jsEnabled, JSEnabledOnly: jsEnabledOnly, JSServerOnly: jsServerOnly, + Account: r.URL.Query().Get("account"), + Stream: r.URL.Query().Get("stream"), + Consumer: r.URL.Query().Get("consumer"), + Details: includeDetails, }) if hs.Error != _EMPTY_ { s.Warnf("Healthcheck failed: %q", hs.Error) @@ -3129,10 +3206,65 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { if opts == nil { opts = &HealthzOptions{} } + details := opts.Details + defer func() { + // for response with details enabled, ses status to either "error" or "ok" + if details { + if len(health.Errors) != 0 { + health.Status = "error" + } else { + health.Status = "ok" + } + } + // if no specific status code was set, set it based on the presence of errors + if health.StatusCode == 0 { + if health.Error != "" || len(health.Errors) != 0 { + health.StatusCode = http.StatusInternalServerError + } else { + health.StatusCode = http.StatusOK + } + } + }() + + if opts.Account == "" && opts.Stream != "" { + health.StatusCode = http.StatusBadRequest + if !details { + health.Status = "error" + health.Error = fmt.Sprintf("%q must not be empty when checking stream health", "account") + } else { + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorBadRequest, + Error: fmt.Sprintf("%q must not be empty when checking stream health", "account"), + }) + } + return health + } + + if opts.Stream == "" && opts.Consumer != "" { + health.StatusCode = http.StatusBadRequest + if !details { + health.Status = "error" + health.Error = fmt.Sprintf("%q must not be empty when checking consumer health", "stream") + } else { + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorBadRequest, + Error: fmt.Sprintf("%q must not be empty when checking consumer health", "stream"), + }) + } + return health + } if err := s.readyForConnections(time.Millisecond); err != nil { + health.StatusCode = http.StatusInternalServerError health.Status = "error" - health.Error = err.Error() + if !details { + health.Error = err.Error() + } else { + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorConn, + Error: err.Error(), + }) + } return health } @@ -3145,10 +3277,18 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // Access the Jetstream state to perform additional checks. js := s.getJetStream() - + const na = "unavailable" if !js.isEnabled() { - health.Status = "unavailable" - health.Error = NewJSNotEnabledError().Error() + health.StatusCode = http.StatusServiceUnavailable + health.Status = na + if !details { + health.Error = NewJSNotEnabledError().Error() + } else { + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorJetStream, + Error: NewJSNotEnabledError().Error(), + }) + } return health } // Only check if JS is enabled, skip meta and asset check. @@ -3161,30 +3301,124 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { cc := js.cluster js.mu.RUnlock() - const na = "unavailable" - // Currently single server we make sure the streams were recovered. if cc == nil { sdir := js.config.StoreDir // Whip through account folders and pull each stream name. fis, _ := os.ReadDir(sdir) + var accFound, streamFound, consumerFound bool for _, fi := range fis { if fi.Name() == snapStagingDir { continue } + if opts.Account != "" { + if fi.Name() != opts.Account { + continue + } + accFound = true + } acc, err := s.LookupAccount(fi.Name()) if err != nil { - health.Status = na - health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name()) - return health + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name()) + return health + } + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorAccount, + Account: fi.Name(), + Error: fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name()), + }) + continue } sfis, _ := os.ReadDir(filepath.Join(sdir, fi.Name(), "streams")) for _, sfi := range sfis { + if opts.Stream != "" { + if sfi.Name() != opts.Stream { + continue + } + streamFound = true + } stream := sfi.Name() - if _, err := acc.lookupStream(stream); err != nil { - health.Status = na - health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream) - return health + s, err := acc.lookupStream(stream) + if err != nil { + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream) + return health + } + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorStream, + Account: acc.Name, + Stream: stream, + Error: fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream), + }) + continue + } + if streamFound { + // if consumer option is passed, verify that the consumer exists on stream + if opts.Consumer != "" { + for _, cons := range s.consumers { + if cons.name == opts.Consumer { + consumerFound = true + break + } + } + } + break + } + } + if accFound { + break + } + } + if opts.Account != "" && !accFound { + health.StatusCode = http.StatusNotFound + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account) + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorAccount, + Account: opts.Account, + Error: fmt.Sprintf("JetStream account %q not found", opts.Account), + }, + } + } + return health + } + if opts.Stream != "" && !streamFound { + health.StatusCode = http.StatusNotFound + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account) + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorStream, + Account: opts.Account, + Stream: opts.Stream, + Error: fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account), + }, + } + } + return health + } + if opts.Consumer != "" && !consumerFound { + health.StatusCode = http.StatusNotFound + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account) + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorConsumer, + Account: opts.Account, + Stream: opts.Stream, + Consumer: opts.Consumer, + Error: fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account), + }, } } } @@ -3199,14 +3433,32 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // If no meta leader. if meta == nil || meta.GroupLeader() == _EMPTY_ { - health.Status = na - health.Error = "JetStream has not established contact with a meta leader" + if !details { + health.Status = na + health.Error = "JetStream has not established contact with a meta leader" + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorJetStream, + Error: "JetStream has not established contact with a meta leader", + }, + } + } return health } // If we are not current with the meta leader. if !meta.Healthy() { - health.Status = na - health.Error = "JetStream is not current with the meta leader" + if !details { + health.Status = na + health.Error = "JetStream is not current with the meta leader" + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorJetStream, + Error: "JetStream is not current with the meta leader", + }, + } + } return health } @@ -3220,25 +3472,123 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { ourID := meta.ID() // Copy the meta layer so we do not need to hold the js read lock for an extended period of time. + var streams map[string]map[string]*streamAssignment js.mu.RLock() - streams := make(map[string]map[string]*streamAssignment, len(cc.streams)) - for acc, asa := range cc.streams { + if opts.Account == "" { + streams = make(map[string]map[string]*streamAssignment, len(cc.streams)) + for acc, asa := range cc.streams { + nasa := make(map[string]*streamAssignment) + for stream, sa := range asa { + // If we are a member and we are not being restored, select for check. + if sa.Group.isMember(ourID) && sa.Restore == nil { + csa := sa.copyGroup() + csa.consumers = make(map[string]*consumerAssignment) + for consumer, ca := range sa.consumers { + if ca.Group.isMember(ourID) { + // Use original here. Not a copy. + csa.consumers[consumer] = ca + } + } + nasa[stream] = csa + } + } + streams[acc] = nasa + } + } else { + streams = make(map[string]map[string]*streamAssignment, 1) + asa, ok := cc.streams[opts.Account] + if !ok { + health.StatusCode = http.StatusNotFound + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account) + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorAccount, + Account: opts.Account, + Error: fmt.Sprintf("JetStream account %q not found", opts.Account), + }, + } + } + js.mu.RUnlock() + return health + } nasa := make(map[string]*streamAssignment) - for stream, sa := range asa { - // If we are a member and we are not being restored, select for check. - if sa.Group.isMember(ourID) && sa.Restore == nil { - csa := sa.copyGroup() - csa.consumers = make(map[string]*consumerAssignment) - for consumer, ca := range sa.consumers { - if ca.Group.isMember(ourID) { - // Use original here. Not a copy. - csa.consumers[consumer] = ca + if opts.Stream != "" { + sa, ok := asa[opts.Stream] + if !ok || !sa.Group.isMember(ourID) { + health.StatusCode = http.StatusNotFound + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account) + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorStream, + Account: opts.Account, + Stream: opts.Stream, + Error: fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account), + }, } } - nasa[stream] = csa + js.mu.RUnlock() + return health + } + csa := sa.copyGroup() + csa.consumers = make(map[string]*consumerAssignment) + var consumerFound bool + for consumer, ca := range sa.consumers { + if opts.Consumer != "" { + if consumer != opts.Consumer || !ca.Group.isMember(ourID) { + continue + } + consumerFound = true + } + // If we are a member and we are not being restored, select for check. + if sa.Group.isMember(ourID) && sa.Restore == nil { + csa.consumers[consumer] = ca + } + if consumerFound { + break + } + } + if opts.Consumer != "" && !consumerFound { + health.StatusCode = http.StatusNotFound + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account) + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorConsumer, + Account: opts.Account, + Stream: opts.Stream, + Consumer: opts.Consumer, + Error: fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account), + }, + } + } + js.mu.RUnlock() + return health + } + nasa[opts.Stream] = csa + } else { + for stream, sa := range asa { + // If we are a member and we are not being restored, select for check. + if sa.Group.isMember(ourID) && sa.Restore == nil { + csa := sa.copyGroup() + csa.consumers = make(map[string]*consumerAssignment) + for consumer, ca := range sa.consumers { + if ca.Group.isMember(ourID) { + csa.consumers[consumer] = ca + } + } + nasa[stream] = csa + } } } - streams[acc] = nasa + streams[opts.Account] = nasa } js.mu.RUnlock() @@ -3246,25 +3596,51 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { for accName, asa := range streams { acc, err := s.LookupAccount(accName) if err != nil && len(asa) > 0 { - health.Status = na - health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err) - return health + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err) + return health + } + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorAccount, + Account: accName, + Error: fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err), + }) + continue } for stream, sa := range asa { // Make sure we can look up if !js.isStreamHealthy(acc, sa) { - health.Status = na - health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream) - return health + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream) + return health + } + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorStream, + Account: accName, + Stream: stream, + Error: fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream), + }) + continue } mset, _ := acc.lookupStream(stream) // Now check consumers. for consumer, ca := range sa.consumers { if !js.isConsumerHealthy(mset, consumer, ca) { - health.Status = na - health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) - return health + if !details { + health.Status = na + health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) + return health + } + health.Errors = append(health.Errors, HealthzError{ + Type: HealthzErrorConsumer, + Account: accName, + Stream: stream, + Consumer: consumer, + Error: fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer), + }) } } }