Try to dump any cached state including fss on recovery to avoid memory bloat on restart.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-08-24 17:41:57 -07:00
parent 14a7bf89df
commit 7c1618f91c
2 changed files with 50 additions and 9 deletions

View File

@@ -750,6 +750,8 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e
if (mb.rbytes == 0 && mb.msgs == 0) || bytes.Equal(lchk[:], mb.lchk[:]) {
if mb.msgs > 0 && !mb.noTrack && fs.psim != nil {
fs.populateGlobalPerSubjectInfo(mb)
// Try to dump any state we needed on recovery.
mb.tryForceExpireCacheLocked()
}
fs.addMsgBlock(mb)
return mb, nil
@@ -762,6 +764,8 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e
}
if mb.msgs > 0 && !mb.noTrack && fs.psim != nil {
fs.populateGlobalPerSubjectInfo(mb)
// Try to dump any state we needed on recovery.
mb.tryForceExpireCacheLocked()
}
// Rewrite this to make sure we are sync'd.
@@ -2978,16 +2982,12 @@ func (mb *msgBlock) expireCacheLocked() {
// Check if we can clear out our fss and idx unless under force expire.
// We used to hold onto the idx longer but removes need buf now so no point.
if mb.llts > 0 {
mb.writePerSubjectInfo()
mb.fss = nil
if mb.indexNeedsUpdateLocked() {
mb.writeIndexInfoLocked()
}
mb.clearCache()
} else {
mb.resetCacheExpireTimer(mb.cexp)
mb.writePerSubjectInfo()
mb.fss = nil
if mb.indexNeedsUpdateLocked() {
mb.writeIndexInfoLocked()
}
mb.clearCache()
}
func (fs *fileStore) startAgeChk() {

View File

@@ -4469,3 +4469,44 @@ func TestFileStoreNoFSSBugAfterRemoveFirst(t *testing.T) {
t.Fatalf("Expected no state for %q, but got %+v\n", "foo.bar.0", ss)
}
}
func TestFileStoreNoFSSAfterRecover(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
n, msg := 100, []byte("no fss for you!")
for i := 0; i < n; i++ {
_, _, err := fs.StoreMsg(_EMPTY_, nil, msg)
require_NoError(t, err)
}
state := fs.State()
require_True(t, state.Msgs == uint64(n))
fs.Stop()
fs, err = newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
// Make sure we did not load the block trying to generate fss.
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.fss != nil {
t.Fatalf("Expected no fss post recover")
}
}