From 7c1618f91c2760075ba06fa5b877ecd05175c7cd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 24 Aug 2022 17:41:57 -0700 Subject: [PATCH] Try to dump any cached state including fss on recovery to avoid memory bloat on restart. Signed-off-by: Derek Collison --- server/filestore.go | 18 +++++++++--------- server/filestore_test.go | 41 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 08811d42..d2bea361 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -750,6 +750,8 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e if (mb.rbytes == 0 && mb.msgs == 0) || bytes.Equal(lchk[:], mb.lchk[:]) { if mb.msgs > 0 && !mb.noTrack && fs.psim != nil { fs.populateGlobalPerSubjectInfo(mb) + // Try to dump any state we needed on recovery. + mb.tryForceExpireCacheLocked() } fs.addMsgBlock(mb) return mb, nil @@ -762,6 +764,8 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e } if mb.msgs > 0 && !mb.noTrack && fs.psim != nil { fs.populateGlobalPerSubjectInfo(mb) + // Try to dump any state we needed on recovery. + mb.tryForceExpireCacheLocked() } // Rewrite this to make sure we are sync'd. @@ -2978,16 +2982,12 @@ func (mb *msgBlock) expireCacheLocked() { // Check if we can clear out our fss and idx unless under force expire. // We used to hold onto the idx longer but removes need buf now so no point. - if mb.llts > 0 { - mb.writePerSubjectInfo() - mb.fss = nil - if mb.indexNeedsUpdateLocked() { - mb.writeIndexInfoLocked() - } - mb.clearCache() - } else { - mb.resetCacheExpireTimer(mb.cexp) + mb.writePerSubjectInfo() + mb.fss = nil + if mb.indexNeedsUpdateLocked() { + mb.writeIndexInfoLocked() } + mb.clearCache() } func (fs *fileStore) startAgeChk() { diff --git a/server/filestore_test.go b/server/filestore_test.go index a30852b4..07a7eafd 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4469,3 +4469,44 @@ func TestFileStoreNoFSSBugAfterRemoveFirst(t *testing.T) { t.Fatalf("Expected no state for %q, but got %+v\n", "foo.bar.0", ss) } } + +func TestFileStoreNoFSSAfterRecover(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + n, msg := 100, []byte("no fss for you!") + for i := 0; i < n; i++ { + _, _, err := fs.StoreMsg(_EMPTY_, nil, msg) + require_NoError(t, err) + } + + state := fs.State() + require_True(t, state.Msgs == uint64(n)) + + fs.Stop() + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + // Make sure we did not load the block trying to generate fss. + fs.mu.RLock() + mb := fs.blks[0] + fs.mu.RUnlock() + + mb.mu.Lock() + defer mb.mu.Unlock() + + if mb.fss != nil { + t.Fatalf("Expected no fss post recover") + } +}