[ADDED] Filter Healthz results based on stream and consumer names, add 'details` param

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
Piotr Piotrowski
2023-04-24 23:53:47 +02:00
parent 65e8db731c
commit 27dc50eb8f
2 changed files with 1222 additions and 44 deletions

View File

@@ -2717,9 +2717,13 @@ type JSzOptions struct {
// HealthzOptions are options passed to Healthz
type HealthzOptions struct {
// Deprecated: Use JSEnabledOnly instead
JSEnabled bool `json:"js-enabled,omitempty"`
JSEnabledOnly bool `json:"js-enabled-only,omitempty"`
JSServerOnly bool `json:"js-server-only,omitempty"`
JSEnabled bool `json:"js-enabled,omitempty"`
JSEnabledOnly bool `json:"js-enabled-only,omitempty"`
JSServerOnly bool `json:"js-server-only,omitempty"`
Account string `json:"account,omitempty"`
Stream string `json:"stream,omitempty"`
Consumer string `json:"consumer,omitempty"`
Details bool `json:"details,omitempty"`
}
// ProfilezOptions are options passed to Profilez
@@ -3078,8 +3082,72 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) {
}
type HealthStatus struct {
Status string `json:"status"`
Error string `json:"error,omitempty"`
Status string `json:"status"`
StatusCode int `json:"status_code,omitempty"`
Error string `json:"error,omitempty"`
Errors []HealthzError `json:"errors,omitempty"`
}
type HealthzError struct {
Type HealthZErrorType `json:"type"`
Account string `json:"account,omitempty"`
Stream string `json:"stream,omitempty"`
Consumer string `json:"consumer,omitempty"`
Error string `json:"error,omitempty"`
}
type HealthZErrorType int
const (
HealthzErrorConn HealthZErrorType = iota
HealthzErrorBadRequest
HealthzErrorJetStream
HealthzErrorAccount
HealthzErrorStream
HealthzErrorConsumer
)
func (t HealthZErrorType) String() string {
switch t {
case HealthzErrorConn:
return "CONNECTION"
case HealthzErrorBadRequest:
return "BAD_REQUEST"
case HealthzErrorJetStream:
return "JETSTREAM"
case HealthzErrorAccount:
return "ACCOUNT"
case HealthzErrorStream:
return "STREAM"
case HealthzErrorConsumer:
return "CONSUMER"
default:
return "unknown"
}
}
func (t HealthZErrorType) MarshalJSON() ([]byte, error) {
return json.Marshal(t.String())
}
func (t *HealthZErrorType) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("CONNECTION"):
*t = HealthzErrorConn
case jsonString("BAD_REQUEST"):
*t = HealthzErrorBadRequest
case jsonString("JETSTREAM"):
*t = HealthzErrorJetStream
case jsonString("ACCOUNT"):
*t = HealthzErrorAccount
case jsonString("STREAM"):
*t = HealthzErrorStream
case jsonString("CONSUMER"):
*t = HealthzErrorConsumer
default:
return fmt.Errorf("unknown healthz error type %q", data)
}
return nil
}
// https://tools.ietf.org/id/draft-inadarei-api-health-check-05.html
@@ -3104,10 +3172,19 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
return
}
includeDetails, err := decodeBool(w, r, "details")
if err != nil {
return
}
hs := s.healthz(&HealthzOptions{
JSEnabled: jsEnabled,
JSEnabledOnly: jsEnabledOnly,
JSServerOnly: jsServerOnly,
Account: r.URL.Query().Get("account"),
Stream: r.URL.Query().Get("stream"),
Consumer: r.URL.Query().Get("consumer"),
Details: includeDetails,
})
if hs.Error != _EMPTY_ {
s.Warnf("Healthcheck failed: %q", hs.Error)
@@ -3129,10 +3206,65 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
if opts == nil {
opts = &HealthzOptions{}
}
details := opts.Details
defer func() {
// for response with details enabled, ses status to either "error" or "ok"
if details {
if len(health.Errors) != 0 {
health.Status = "error"
} else {
health.Status = "ok"
}
}
// if no specific status code was set, set it based on the presence of errors
if health.StatusCode == 0 {
if health.Error != "" || len(health.Errors) != 0 {
health.StatusCode = http.StatusInternalServerError
} else {
health.StatusCode = http.StatusOK
}
}
}()
if opts.Account == "" && opts.Stream != "" {
health.StatusCode = http.StatusBadRequest
if !details {
health.Status = "error"
health.Error = fmt.Sprintf("%q must not be empty when checking stream health", "account")
} else {
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorBadRequest,
Error: fmt.Sprintf("%q must not be empty when checking stream health", "account"),
})
}
return health
}
if opts.Stream == "" && opts.Consumer != "" {
health.StatusCode = http.StatusBadRequest
if !details {
health.Status = "error"
health.Error = fmt.Sprintf("%q must not be empty when checking consumer health", "stream")
} else {
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorBadRequest,
Error: fmt.Sprintf("%q must not be empty when checking consumer health", "stream"),
})
}
return health
}
if err := s.readyForConnections(time.Millisecond); err != nil {
health.StatusCode = http.StatusInternalServerError
health.Status = "error"
health.Error = err.Error()
if !details {
health.Error = err.Error()
} else {
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorConn,
Error: err.Error(),
})
}
return health
}
@@ -3145,10 +3277,18 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
// Access the Jetstream state to perform additional checks.
js := s.getJetStream()
const na = "unavailable"
if !js.isEnabled() {
health.Status = "unavailable"
health.Error = NewJSNotEnabledError().Error()
health.StatusCode = http.StatusServiceUnavailable
health.Status = na
if !details {
health.Error = NewJSNotEnabledError().Error()
} else {
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorJetStream,
Error: NewJSNotEnabledError().Error(),
})
}
return health
}
// Only check if JS is enabled, skip meta and asset check.
@@ -3161,30 +3301,124 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
cc := js.cluster
js.mu.RUnlock()
const na = "unavailable"
// Currently single server we make sure the streams were recovered.
if cc == nil {
sdir := js.config.StoreDir
// Whip through account folders and pull each stream name.
fis, _ := os.ReadDir(sdir)
var accFound, streamFound, consumerFound bool
for _, fi := range fis {
if fi.Name() == snapStagingDir {
continue
}
if opts.Account != "" {
if fi.Name() != opts.Account {
continue
}
accFound = true
}
acc, err := s.LookupAccount(fi.Name())
if err != nil {
health.Status = na
health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name())
return health
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name())
return health
}
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorAccount,
Account: fi.Name(),
Error: fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name()),
})
continue
}
sfis, _ := os.ReadDir(filepath.Join(sdir, fi.Name(), "streams"))
for _, sfi := range sfis {
if opts.Stream != "" {
if sfi.Name() != opts.Stream {
continue
}
streamFound = true
}
stream := sfi.Name()
if _, err := acc.lookupStream(stream); err != nil {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream)
return health
s, err := acc.lookupStream(stream)
if err != nil {
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream)
return health
}
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorStream,
Account: acc.Name,
Stream: stream,
Error: fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream),
})
continue
}
if streamFound {
// if consumer option is passed, verify that the consumer exists on stream
if opts.Consumer != "" {
for _, cons := range s.consumers {
if cons.name == opts.Consumer {
consumerFound = true
break
}
}
}
break
}
}
if accFound {
break
}
}
if opts.Account != "" && !accFound {
health.StatusCode = http.StatusNotFound
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account)
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorAccount,
Account: opts.Account,
Error: fmt.Sprintf("JetStream account %q not found", opts.Account),
},
}
}
return health
}
if opts.Stream != "" && !streamFound {
health.StatusCode = http.StatusNotFound
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account)
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorStream,
Account: opts.Account,
Stream: opts.Stream,
Error: fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account),
},
}
}
return health
}
if opts.Consumer != "" && !consumerFound {
health.StatusCode = http.StatusNotFound
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account)
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorConsumer,
Account: opts.Account,
Stream: opts.Stream,
Consumer: opts.Consumer,
Error: fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account),
},
}
}
}
@@ -3199,14 +3433,32 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
// If no meta leader.
if meta == nil || meta.GroupLeader() == _EMPTY_ {
health.Status = na
health.Error = "JetStream has not established contact with a meta leader"
if !details {
health.Status = na
health.Error = "JetStream has not established contact with a meta leader"
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorJetStream,
Error: "JetStream has not established contact with a meta leader",
},
}
}
return health
}
// If we are not current with the meta leader.
if !meta.Healthy() {
health.Status = na
health.Error = "JetStream is not current with the meta leader"
if !details {
health.Status = na
health.Error = "JetStream is not current with the meta leader"
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorJetStream,
Error: "JetStream is not current with the meta leader",
},
}
}
return health
}
@@ -3220,25 +3472,123 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
ourID := meta.ID()
// Copy the meta layer so we do not need to hold the js read lock for an extended period of time.
var streams map[string]map[string]*streamAssignment
js.mu.RLock()
streams := make(map[string]map[string]*streamAssignment, len(cc.streams))
for acc, asa := range cc.streams {
if opts.Account == "" {
streams = make(map[string]map[string]*streamAssignment, len(cc.streams))
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
for stream, sa := range asa {
// If we are a member and we are not being restored, select for check.
if sa.Group.isMember(ourID) && sa.Restore == nil {
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
for consumer, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
// Use original here. Not a copy.
csa.consumers[consumer] = ca
}
}
nasa[stream] = csa
}
}
streams[acc] = nasa
}
} else {
streams = make(map[string]map[string]*streamAssignment, 1)
asa, ok := cc.streams[opts.Account]
if !ok {
health.StatusCode = http.StatusNotFound
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account)
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorAccount,
Account: opts.Account,
Error: fmt.Sprintf("JetStream account %q not found", opts.Account),
},
}
}
js.mu.RUnlock()
return health
}
nasa := make(map[string]*streamAssignment)
for stream, sa := range asa {
// If we are a member and we are not being restored, select for check.
if sa.Group.isMember(ourID) && sa.Restore == nil {
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
for consumer, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
// Use original here. Not a copy.
csa.consumers[consumer] = ca
if opts.Stream != "" {
sa, ok := asa[opts.Stream]
if !ok || !sa.Group.isMember(ourID) {
health.StatusCode = http.StatusNotFound
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account)
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorStream,
Account: opts.Account,
Stream: opts.Stream,
Error: fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account),
},
}
}
nasa[stream] = csa
js.mu.RUnlock()
return health
}
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
var consumerFound bool
for consumer, ca := range sa.consumers {
if opts.Consumer != "" {
if consumer != opts.Consumer || !ca.Group.isMember(ourID) {
continue
}
consumerFound = true
}
// If we are a member and we are not being restored, select for check.
if sa.Group.isMember(ourID) && sa.Restore == nil {
csa.consumers[consumer] = ca
}
if consumerFound {
break
}
}
if opts.Consumer != "" && !consumerFound {
health.StatusCode = http.StatusNotFound
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account)
} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorConsumer,
Account: opts.Account,
Stream: opts.Stream,
Consumer: opts.Consumer,
Error: fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account),
},
}
}
js.mu.RUnlock()
return health
}
nasa[opts.Stream] = csa
} else {
for stream, sa := range asa {
// If we are a member and we are not being restored, select for check.
if sa.Group.isMember(ourID) && sa.Restore == nil {
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
for consumer, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
csa.consumers[consumer] = ca
}
}
nasa[stream] = csa
}
}
}
streams[acc] = nasa
streams[opts.Account] = nasa
}
js.mu.RUnlock()
@@ -3246,25 +3596,51 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
for accName, asa := range streams {
acc, err := s.LookupAccount(accName)
if err != nil && len(asa) > 0 {
health.Status = na
health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err)
return health
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err)
return health
}
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorAccount,
Account: accName,
Error: fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err),
})
continue
}
for stream, sa := range asa {
// Make sure we can look up
if !js.isStreamHealthy(acc, sa) {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream)
return health
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream)
return health
}
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorStream,
Account: accName,
Stream: stream,
Error: fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream),
})
continue
}
mset, _ := acc.lookupStream(stream)
// Now check consumers.
for consumer, ca := range sa.consumers {
if !js.isConsumerHealthy(mset, consumer, ca) {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
}
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorConsumer,
Account: accName,
Stream: stream,
Consumer: consumer,
Error: fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer),
})
}
}
}