diff --git a/server/filestore.go b/server/filestore.go index 3d076cd5..96811d44 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2975,12 +2975,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { smb.mu.Lock() for mseq := smb.first.seq; mseq < seq; mseq++ { - if _, rl, _, err := smb.slotInfo(int(mseq - smb.cache.fseq)); err != nil { - smb.bytes -= uint64(rl) + if sm, _ := smb.cacheLookupWithLock(mseq); sm != nil && smb.msgs > 0 { + smb.bytes -= fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) + smb.msgs-- + purged++ } - smb.msgs-- - purged++ } + // Update first entry. sm, _ := smb.cacheLookupWithLock(seq) if sm != nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index b2bf6310..4c384908 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -845,6 +845,47 @@ func TestFileStoreCompactLastPlusOne(t *testing.T) { } } +func TestFileStoreCompactMsgCountBug(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + subj, msg := "foo", []byte("Hello World") + for i := 0; i < 10; i++ { + fs.StoreMsg(subj, nil, msg) + } + if state := fs.State(); state.Msgs != 10 { + t.Fatalf("Expected 10 msgs, got %d", state.Msgs) + } + // Now delete 2,3,4. + fs.EraseMsg(2) + fs.EraseMsg(3) + fs.EraseMsg(4) + + // Also delete 7,8, and 9. + fs.RemoveMsg(7) + fs.RemoveMsg(8) + fs.RemoveMsg(9) + + n, err := fs.Compact(6) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // 1 & 5 + if n != 2 { + t.Fatalf("Expected to have deleted 2 msgs, got %d", n) + } + if state := fs.State(); state.Msgs != 2 { + t.Fatalf("Expected to have 2 remaining, got %d", state.Msgs) + } +} + func TestFileStoreCompactPerf(t *testing.T) { t.SkipNow()