In some scenarios compact could count messages it should not and make mb.msgs go negative which meant max uint64.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-07 14:20:32 -07:00
parent 74a051e3a8
commit bfb482121d
2 changed files with 46 additions and 4 deletions

View File

@@ -2975,12 +2975,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
smb.mu.Lock()
for mseq := smb.first.seq; mseq < seq; mseq++ {
if _, rl, _, err := smb.slotInfo(int(mseq - smb.cache.fseq)); err != nil {
smb.bytes -= uint64(rl)
if sm, _ := smb.cacheLookupWithLock(mseq); sm != nil && smb.msgs > 0 {
smb.bytes -= fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
smb.msgs--
purged++
}
smb.msgs--
purged++
}
// Update first entry.
sm, _ := smb.cacheLookupWithLock(seq)
if sm != nil {

View File

@@ -845,6 +845,47 @@ func TestFileStoreCompactLastPlusOne(t *testing.T) {
}
}
func TestFileStoreCompactMsgCountBug(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
fs.StoreMsg(subj, nil, msg)
}
if state := fs.State(); state.Msgs != 10 {
t.Fatalf("Expected 10 msgs, got %d", state.Msgs)
}
// Now delete 2,3,4.
fs.EraseMsg(2)
fs.EraseMsg(3)
fs.EraseMsg(4)
// Also delete 7,8, and 9.
fs.RemoveMsg(7)
fs.RemoveMsg(8)
fs.RemoveMsg(9)
n, err := fs.Compact(6)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// 1 & 5
if n != 2 {
t.Fatalf("Expected to have deleted 2 msgs, got %d", n)
}
if state := fs.State(); state.Msgs != 2 {
t.Fatalf("Expected to have 2 remaining, got %d", state.Msgs)
}
}
func TestFileStoreCompactPerf(t *testing.T) {
t.SkipNow()