From 68104d7cf321fff64346145b652647aeec10f70d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 9 Feb 2022 16:23:59 -0800 Subject: [PATCH] During a filestore snapshot we generate the fss files but were not cleaning them up if the block was deleted before a server restart. https://gist.github.com/nekufa/010185dfb59261f222a0042d3a7d2a1c Signed-off-by: Derek Collison --- server/filestore.go | 16 +++++++++++++- server/norace_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/server/filestore.go b/server/filestore.go index 055a8dea..04382869 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -30,6 +30,7 @@ import ( "net" "os" "path" + "path/filepath" "runtime" "sort" "sync" @@ -187,6 +188,8 @@ const ( indexScan = "%d.idx" // used to load per subject meta information. fssScan = "%d.fss" + // to look for orphans + fssScanAll = "*.fss" // used to store our block encryption key. keyScan = "%d.key" // This is where we keep state on consumers. @@ -890,7 +893,6 @@ func (fs *fileStore) recoverMsgs() error { fs.psmc[subj] += ss.Msgs } } - } else { return err } @@ -909,6 +911,14 @@ func (fs *fileStore) recoverMsgs() error { return err } + // We had a bug that would leave fss files around during a snapshot. + // Clean them up here if we see them. + if fms, err := filepath.Glob(path.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 { + for _, fn := range fms { + os.Remove(fn) + } + } + // Limits checks and enforcement. fs.enforceMsgLimit() fs.enforceBytesLimit() @@ -4326,6 +4336,10 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { os.Remove(mb.mfn) mb.mfn = _EMPTY_ } + if mb.sfn != _EMPTY_ { + os.Remove(mb.sfn) + mb.sfn = _EMPTY_ + } } } diff --git a/server/norace_test.go b/server/norace_test.go index 968f582c..80164b76 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -29,6 +29,8 @@ import ( "net" "net/http" "net/url" + "path" + "path/filepath" "runtime" "runtime/debug" "strconv" @@ -4275,3 +4277,50 @@ func TestNoRaceJetStreamSparseConsumers(t *testing.T) { }) } } + +func TestNoRaceFileStoreSubjectInfoWithSnapshotCleanup(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, StreamConfig{Name: "TEST", Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + n, msg := 10_000, []byte(strings.Repeat("Z", 1024)) + for i := 0; i < n; i++ { + _, _, err := fs.StoreMsg(fmt.Sprintf("X.%d", i), nil, msg) + require_NoError(t, err) + } + + // Snapshot causes us to write out per subject info, fss files. + // We want to make sure they get cleaned up. + sr, err := fs.Snapshot(5*time.Second, false, false) + require_NoError(t, err) + var buf [4 * 1024 * 1024]byte + for { + if _, err = sr.Reader.Read(buf[:]); err == io.EOF { + break + } + require_NoError(t, err) + } + + var seqs []uint64 + for i := 1; i <= n; i++ { + seqs = append(seqs, uint64(i)) + } + // Randomly delete msgs, make sure we cleanup as we empty the message blocks. + rand.Shuffle(len(seqs), func(i, j int) { seqs[i], seqs[j] = seqs[j], seqs[i] }) + + for _, seq := range seqs { + _, err := fs.RemoveMsg(seq) + require_NoError(t, err) + } + + // We will have cleanup the main .blk and .idx sans the lmb, but we should not have any *.fss files. + fms, err := filepath.Glob(path.Join(storeDir, msgDir, fssScanAll)) + require_NoError(t, err) + + if len(fms) > 0 { + t.Fatalf("Expected to find no fss files, found %d", len(fms)) + } +}