diff --git a/server/filestore.go b/server/filestore.go index 6925b294..baf39642 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -341,9 +341,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return nil, err } - // Write our meta data iff does not exist. + // Write our meta data if it does not exist or is zero'd out. meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile) - if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { + fi, err := os.Stat(meta) + if err != nil && os.IsNotExist(err) || fi != nil && fi.Size() == 0 { if err := fs.writeStreamMeta(); err != nil { return nil, err } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 74b645ec..71c519f9 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19136,3 +19136,80 @@ func TestJetStreamStreamUpdateSubjectsOverlapOthers(t *testing.T) { require_Error(t, err) require_Contains(t, err.Error(), "overlap") } + +func TestJetStreamMetaDataFailOnKernelFault(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + js.Publish("foo", []byte("OK")) + } + + sd := s.JetStreamConfig().StoreDir + sdir := filepath.Join(sd, "$G", "streams", "TEST") + s.Shutdown() + + // Emulate if the kernel did not flush out to disk the meta information. + // so we will zero out both meta.inf and meta.sum. + err = os.WriteFile(filepath.Join(sdir, JetStreamMetaFile), nil, defaultFilePerms) + require_NoError(t, err) + + err = os.WriteFile(filepath.Join(sdir, JetStreamMetaFileSum), nil, defaultFilePerms) + require_NoError(t, err) + + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + // The stream will have not been recovered. So err is normal. + _, err = js.StreamInfo("TEST") + require_Error(t, err) + + // Make sure we are signaled here from healthz + hs := s.healthz(nil) + const expected = "JetStream stream '$G > TEST' could not be recovered" + if hs.Status != "unavailable" || hs.Error == _EMPTY_ { + t.Fatalf("Expected healthz to return an error") + } else if hs.Error != expected { + t.Fatalf("Expected healthz error %q got %q", expected, hs.Error) + } + + // If we add it back, this should recover the msgs. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + // Make sure we recovered. + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.Msgs == 10) + + // Now if we restart the server, meta should be correct, + // and the stream should be restored. + s.Shutdown() + + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + // Make sure we recovered the stream correctly after re-adding. + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.Msgs == 10) +} diff --git a/server/monitor.go b/server/monitor.go index 427b2581..62beb564 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -23,6 +23,8 @@ import ( "net" "net/http" "net/url" + "os" + "path/filepath" "runtime" "sort" "strconv" @@ -3044,8 +3046,30 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { cc := js.cluster - // Currently single server mode this is a no-op. + const na = "unavailable" + + // Currently single server we make sure the streams were recovered. if cc == nil || cc.meta == nil { + sdir := js.config.StoreDir + // Whip through account folders and pull each stream name. + fis, _ := os.ReadDir(sdir) + for _, fi := range fis { + acc, err := s.LookupAccount(fi.Name()) + if err != nil { + health.Status = na + health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name()) + return health + } + sfis, _ := os.ReadDir(filepath.Join(sdir, fi.Name(), "streams")) + for _, sfi := range sfis { + stream := sfi.Name() + if _, err := acc.lookupStream(stream); err != nil { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream) + return health + } + } + } return health } @@ -3055,13 +3079,13 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // If no meta leader. if meta.GroupLeader() == _EMPTY_ { - health.Status = "unavailable" + health.Status = na health.Error = "JetStream has not established contact with a meta leader" return health } // If we are not current with the meta leader. if !meta.Current() { - health.Status = "unavailable" + health.Status = na health.Error = "JetStream is not current with the meta leader" return health } @@ -3078,7 +3102,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { if sa.Group.isMember(ourID) { // Make sure we can look up if !cc.isStreamCurrent(acc, stream) { - health.Status = "unavailable" + health.Status = na health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream) return health } @@ -3086,7 +3110,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { for consumer, ca := range sa.consumers { if ca.Group.isMember(ourID) { if !cc.isConsumerCurrent(acc, stream, consumer) { - health.Status = "unavailable" + health.Status = na health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) return health }