From 4291433a46df5fd5b8621606551e50fd08ba250f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 12 May 2022 13:20:16 -0700 Subject: [PATCH] General improvements to accounting for the filestore. This in response to tracking issue #3114. Signed-off-by: Derek Collison --- server/filestore.go | 106 +++++++++++++++++++++++++++------------ server/filestore_test.go | 82 ++++++++++++++++++++++++++++++ server/memstore.go | 6 ++- server/memstore_test.go | 14 ++++++ server/store.go | 2 + 5 files changed, 177 insertions(+), 33 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 0bcaf2d7..808a792c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -856,7 +856,10 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { // This is an old erased message, or a new one that we can track. if seq == 0 || seq&ebit != 0 || seq < mb.first.seq { seq = seq &^ ebit - addToDmap(seq) + // Only add to dmap if past recorded first seq and non-zero. + if seq != 0 && seq >= mb.first.seq { + addToDmap(seq) + } index += rl mb.last.seq = seq mb.last.ts = ts @@ -1094,10 +1097,14 @@ func (fs *fileStore) expireMsgsOnRecover() { mb.dmap = nil } } + // Keep this update just in case since we are removing dmap entries. + mb.first.seq = seq continue } // Break on other errors. if err != nil || sm == nil { + // Keep this update just in case since we could have removed dmap entries. + mb.first.seq = seq break } @@ -1112,11 +1119,13 @@ func (fs *fileStore) expireMsgsOnRecover() { } // Delete the message here. - sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) - mb.bytes -= sz - bytes += sz - mb.msgs-- - purged++ + if mb.msgs > 0 { + sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) + mb.bytes -= sz + bytes += sz + mb.msgs-- + purged++ + } // Update fss fs.removePerSubject(sm.subj) mb.removeSeqPerSubject(sm.subj, seq, &smv) @@ -1998,7 +2007,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error mb.mu.Lock() - // See if the sequence numbers is still relevant. + // See if the sequence number is still relevant. if seq < mb.first.seq { mb.mu.Unlock() fsUnlock() @@ -2170,7 +2179,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error // writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. // Write lock needs to be held. func (mb *msgBlock) compact() { - if mb.cacheNotLoaded() { + wasLoaded := mb.cacheAlreadyLoaded() + if !wasLoaded { if err := mb.loadMsgsWithLock(); err != nil { return } @@ -2252,11 +2262,12 @@ func (mb *msgBlock) compact() { // We will write to a new file and mv/rename it in case of failure. mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index)) - defer os.Remove(mfn) if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil { + os.Remove(mfn) return } if err := os.Rename(mfn, mb.mfn); err != nil { + os.Remove(mfn) return } @@ -2265,7 +2276,11 @@ func (mb *msgBlock) compact() { mb.removeIndexFileLocked() mb.deleteDmap() mb.rebuildStateLocked() - mb.loadMsgsWithLock() + + // If we entered with the msgs loaded make sure to reload them. + if wasLoaded { + mb.loadMsgsWithLock() + } } // Nil out our dmap. @@ -2485,23 +2500,34 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { var purged, bytes uint64 mb.mu.Lock() + checkDmap := len(mb.dmap) > 0 + var smv StoreMsg + for seq := mb.last.seq; seq > sm.seq; seq-- { if checkDmap { if _, ok := mb.dmap[seq]; ok { // Delete and skip to next. delete(mb.dmap, seq) + if len(mb.dmap) == 0 { + mb.dmap = nil + checkDmap = false + } continue } } // We should have a valid msg to calculate removal stats. - _, rl, _, err := mb.slotInfo(int(seq - mb.cache.fseq)) - if err != nil { - mb.mu.Unlock() - return 0, 0, err + if m, err := mb.cacheLookup(seq, &smv); err == nil { + if mb.msgs > 0 { + rl := fileStoreMsgSize(m.subj, m.hdr, m.msg) + mb.msgs-- + mb.bytes -= rl + mb.rbytes -= rl + // For return accounting. + purged++ + bytes += uint64(rl) + } } - purged++ - bytes += uint64(rl) } // Truncate our msgs and close file. @@ -2517,11 +2543,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { return 0, 0, fmt.Errorf("failed to truncate msg block %d, file not open", mb.index) } - // Do local mb stat updates. - mb.msgs -= purged - mb.bytes -= bytes - mb.rbytes -= bytes - // Update our last msg. mb.last.seq = sm.seq mb.last.ts = sm.ts @@ -3603,6 +3624,12 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { if err != nil || fsm == nil { return nil, err } + + // Deleted messages that are decoded return a 0 for seqeunce. + if fsm.seq == 0 { + return nil, errDeletedMsg + } + if seq != fsm.seq { recycleMsgBlockBuf(mb.cache.buf) mb.cache.buf = nil @@ -4040,10 +4067,16 @@ func (mb *msgBlock) readIndexInfo() error { // Check if this is a short write index file. if bi < 0 || bi+checksumSize > len(buf) { - defer os.Remove(mb.ifn) + os.Remove(mb.ifn) return fmt.Errorf("short index file") } + // Check for consistency if accounting. If something is off bail and we will rebuild. + if mb.msgs != (mb.last.seq-mb.first.seq+1)-dmapLen { + os.Remove(mb.ifn) + return fmt.Errorf("accounting inconsistent") + } + // Checksum copy(mb.lchk[0:], buf[bi:bi+checksumSize]) bi += checksumSize @@ -4151,6 +4184,10 @@ func compareFn(subject string) func(string, string) bool { // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. // Will return the number of purged messages. func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { + if sequence > 1 && keep > 0 { + return 0, ErrPurgeArgMismatch + } + if subject == _EMPTY_ || subject == fwcs { if keep == 0 && (sequence == 0 || sequence == 1) { return fs.Purge() @@ -4198,7 +4235,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint mb.loadMsgsWithLock() shouldExpire = true } - if sequence > 0 && sequence <= l { + if sequence > 1 && sequence <= l { l = sequence - 1 } @@ -4207,10 +4244,13 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) // Do fast in place remove. // Stats - fs.state.Msgs-- - fs.state.Bytes -= rl - mb.msgs-- - mb.bytes -= rl + if mb.msgs > 0 { + fs.state.Msgs-- + fs.state.Bytes -= rl + mb.msgs-- + mb.bytes -= rl + purged++ + } // FSS updates. fs.removePerSubject(sm.subj) mb.removeSeqPerSubject(sm.subj, seq, &smv) @@ -4231,7 +4271,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } mb.dmap[seq] = struct{}{} } - purged++ + if maxp > 0 && purged >= maxp { break } @@ -4391,10 +4431,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { } } else if sm != nil { sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) - smb.bytes -= sz - bytes += sz - smb.msgs-- - purged++ + if smb.msgs > 0 { + smb.bytes -= sz + bytes += sz + smb.msgs-- + purged++ + } // Update fss fs.removePerSubject(sm.subj) smb.removeSeqPerSubject(sm.subj, mseq, &smv) diff --git a/server/filestore_test.go b/server/filestore_test.go index d03eaa68..9249199c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -3825,3 +3825,85 @@ func TestFileStoreRememberLastMsgTime(t *testing.T) { require_True(t, lt == fs.State().LastTime) require_True(t, seq == 6) } + +func (fs *fileStore) getFirstBlock() *msgBlock { + fs.mu.RLock() + defer fs.mu.RUnlock() + if len(fs.blks) == 0 { + return nil + } + return fs.blks[0] +} + +func TestFileStoreRebuildStateDmapAccountingBug(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, + StreamConfig{Name: "TEST", Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + for i := 0; i < 100; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + // Delete 2-40. + for i := 2; i <= 40; i++ { + _, err := fs.RemoveMsg(uint64(i)) + require_NoError(t, err) + } + + mb := fs.getFirstBlock() + require_True(t, mb != nil) + + check := func() { + t.Helper() + mb.mu.RLock() + defer mb.mu.RUnlock() + dmapLen := uint64(len(mb.dmap)) + if mb.msgs != (mb.last.seq-mb.first.seq+1)-dmapLen { + t.Fatalf("Consistency check failed: %d != %d -> last %d first %d len(dmap) %d", + mb.msgs, (mb.last.seq-mb.first.seq+1)-dmapLen, mb.last.seq, mb.first.seq, dmapLen) + } + } + + check() + + mb.mu.Lock() + mb.compact() + mb.mu.Unlock() + + // Now delete first. + _, err = fs.RemoveMsg(1) + require_NoError(t, err) + + mb.mu.Lock() + _, err = mb.rebuildStateLocked() + require_NoError(t, err) + mb.mu.Unlock() + + check() +} + +func TestFileStorePurgeExWithSubject(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "TEST", Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + for i := 0; i < 100; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // This should purge all. + fs.PurgeEx("foo", 1, 0) + require_True(t, fs.State().Msgs == 0) +} diff --git a/server/memstore.go b/server/memstore.go index d06cad4f..65493301 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -428,6 +428,10 @@ func (ms *memStore) expireMsgs() { // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. // Will return the number of purged messages. func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { + if sequence > 1 && keep > 0 { + return 0, ErrPurgeArgMismatch + } + if subject == _EMPTY_ || subject == fwcs { if keep == 0 && (sequence == 0 || sequence == 1) { return ms.Purge() @@ -455,7 +459,7 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6 ss.Msgs -= keep } last := ss.Last - if sequence > 0 { + if sequence > 1 { last = sequence - 1 } ms.mu.Lock() diff --git a/server/memstore_test.go b/server/memstore_test.go index 413d39a7..92fa00c5 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -391,3 +391,17 @@ func TestMemStoreStreamTruncate(t *testing.T) { t.Fatalf("Expected deleted to be %+v, got %+v\n", expected, state.Deleted) } } + +func TestMemStorePurgeExWithSubject(t *testing.T) { + ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage}) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + _, _, err = ms.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // This should purge all. + ms.PurgeEx("foo", 1, 0) + require_True(t, ms.State().Msgs == 0) +} diff --git a/server/store.go b/server/store.go index 10fc10a7..e6158a0c 100644 --- a/server/store.go +++ b/server/store.go @@ -61,6 +61,8 @@ var ( ErrInvalidSequence = errors.New("invalid sequence") // ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong. ErrSequenceMismatch = errors.New("expected sequence does not match store") + // ErrPurgeArgMismatch is returned when PurgeEx is called with sequence > 1 and keep > 0. + ErrPurgeArgMismatch = errors.New("sequence > 1 && keep > 0 not allowed") ) // StoreMsg is the stored message format for messages that are retained by the Store layer.