Add consumer check to healthz and allow to be called directly

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-03-16 20:52:31 -07:00
parent acfd456758
commit 287b567b1c

View File

@@ -2713,12 +2713,25 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
s.httpReqStats[HealthzPath]++
s.mu.Unlock()
hs := s.healthz()
if hs.Error != _EMPTY_ {
w.WriteHeader(http.StatusServiceUnavailable)
}
b, err := json.Marshal(hs)
if err != nil {
s.Errorf("Error marshaling response to /healthz request: %v", err)
}
ResponseHandler(w, r, b)
}
// Generate health status.
func (s *Server) healthz() *HealthStatus {
var health = &HealthStatus{Status: "ok"}
if err := s.readyForConnections(time.Millisecond); err != nil {
health.Status = "error"
health.Error = err.Error()
w.WriteHeader(http.StatusServiceUnavailable)
} else if js := s.getJetStream(); js != nil {
// Check JetStream status here.
js.mu.RLock()
@@ -2731,11 +2744,9 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
if node.GroupLeader() == _EMPTY_ {
health.Status = "unavailable"
health.Error = "JetStream has not established contact with a meta leader"
w.WriteHeader(http.StatusServiceUnavailable)
} else if !node.Current() {
health.Status = "unavailable"
health.Error = "JetStream is not current with the meta leader"
w.WriteHeader(http.StatusServiceUnavailable)
} else {
// If we are here we are current and have seen our meta leader.
// Now check assets.
@@ -2772,21 +2783,23 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
if !cc.isStreamCurrent(accName, sname) {
health.Status = "unavailable"
health.Error = fmt.Sprintf("JetStream stream %q for account %q is not current", sname, accName)
w.WriteHeader(http.StatusServiceUnavailable)
js.mu.RUnlock()
break Err
}
// Now do consumers.
for _, o := range stream.getConsumers() {
if node := o.raftNode(); node != nil && !node.Current() {
health.Status = "unavailable"
health.Error = fmt.Sprintf("JetStream consumer %q for stream %q and account %q is not current", o.String(), sname, accName)
js.mu.RUnlock()
break Err
}
}
}
js.mu.RUnlock()
}
}
}
}
b, err := json.Marshal(health)
if err != nil {
s.Errorf("Error marshaling response to /healthz request: %v", err)
}
ResponseHandler(w, r, b)
return health
}