If a stream's meta was not properly written but the file existed, we could re-add the stream but a subsequent restart would lose the stream again.

Also added in healthz for single server systems to make sure all stream directories resulted in recovered streams.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-12-29 20:08:56 -08:00
parent 71b90673ee
commit 713f632fa7
3 changed files with 109 additions and 7 deletions

View File

@@ -23,6 +23,8 @@ import (
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
@@ -3044,8 +3046,30 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
cc := js.cluster
// Currently single server mode this is a no-op.
const na = "unavailable"
// Currently single server we make sure the streams were recovered.
if cc == nil || cc.meta == nil {
sdir := js.config.StoreDir
// Whip through account folders and pull each stream name.
fis, _ := os.ReadDir(sdir)
for _, fi := range fis {
acc, err := s.LookupAccount(fi.Name())
if err != nil {
health.Status = na
health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name())
return health
}
sfis, _ := os.ReadDir(filepath.Join(sdir, fi.Name(), "streams"))
for _, sfi := range sfis {
stream := sfi.Name()
if _, err := acc.lookupStream(stream); err != nil {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream)
return health
}
}
}
return health
}
@@ -3055,13 +3079,13 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
// If no meta leader.
if meta.GroupLeader() == _EMPTY_ {
health.Status = "unavailable"
health.Status = na
health.Error = "JetStream has not established contact with a meta leader"
return health
}
// If we are not current with the meta leader.
if !meta.Current() {
health.Status = "unavailable"
health.Status = na
health.Error = "JetStream is not current with the meta leader"
return health
}
@@ -3078,7 +3102,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
if sa.Group.isMember(ourID) {
// Make sure we can look up
if !cc.isStreamCurrent(acc, stream) {
health.Status = "unavailable"
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream)
return health
}
@@ -3086,7 +3110,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
for consumer, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
if !cc.isConsumerCurrent(acc, stream, consumer) {
health.Status = "unavailable"
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
}