General improvements to accounting for the filestore. This in response to tracking issue #3114.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-05-12 13:20:16 -07:00
parent 2cb2a8ebbc
commit 4291433a46
5 changed files with 177 additions and 33 deletions

View File

@@ -856,7 +856,10 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
// This is an old erased message, or a new one that we can track.
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
seq = seq &^ ebit
addToDmap(seq)
// Only add to dmap if past recorded first seq and non-zero.
if seq != 0 && seq >= mb.first.seq {
addToDmap(seq)
}
index += rl
mb.last.seq = seq
mb.last.ts = ts
@@ -1094,10 +1097,14 @@ func (fs *fileStore) expireMsgsOnRecover() {
mb.dmap = nil
}
}
// Keep this update just in case since we are removing dmap entries.
mb.first.seq = seq
continue
}
// Break on other errors.
if err != nil || sm == nil {
// Keep this update just in case since we could have removed dmap entries.
mb.first.seq = seq
break
}
@@ -1112,11 +1119,13 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Delete the message here.
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
mb.bytes -= sz
bytes += sz
mb.msgs--
purged++
if mb.msgs > 0 {
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
mb.bytes -= sz
bytes += sz
mb.msgs--
purged++
}
// Update fss
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq, &smv)
@@ -1998,7 +2007,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
mb.mu.Lock()
// See if the sequence numbers is still relevant.
// See if the sequence number is still relevant.
if seq < mb.first.seq {
mb.mu.Unlock()
fsUnlock()
@@ -2170,7 +2179,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
// Write lock needs to be held.
func (mb *msgBlock) compact() {
if mb.cacheNotLoaded() {
wasLoaded := mb.cacheAlreadyLoaded()
if !wasLoaded {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
@@ -2252,11 +2262,12 @@ func (mb *msgBlock) compact() {
// We will write to a new file and mv/rename it in case of failure.
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
defer os.Remove(mfn)
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
os.Remove(mfn)
return
}
if err := os.Rename(mfn, mb.mfn); err != nil {
os.Remove(mfn)
return
}
@@ -2265,7 +2276,11 @@ func (mb *msgBlock) compact() {
mb.removeIndexFileLocked()
mb.deleteDmap()
mb.rebuildStateLocked()
mb.loadMsgsWithLock()
// If we entered with the msgs loaded make sure to reload them.
if wasLoaded {
mb.loadMsgsWithLock()
}
}
// Nil out our dmap.
@@ -2485,23 +2500,34 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
var purged, bytes uint64
mb.mu.Lock()
checkDmap := len(mb.dmap) > 0
var smv StoreMsg
for seq := mb.last.seq; seq > sm.seq; seq-- {
if checkDmap {
if _, ok := mb.dmap[seq]; ok {
// Delete and skip to next.
delete(mb.dmap, seq)
if len(mb.dmap) == 0 {
mb.dmap = nil
checkDmap = false
}
continue
}
}
// We should have a valid msg to calculate removal stats.
_, rl, _, err := mb.slotInfo(int(seq - mb.cache.fseq))
if err != nil {
mb.mu.Unlock()
return 0, 0, err
if m, err := mb.cacheLookup(seq, &smv); err == nil {
if mb.msgs > 0 {
rl := fileStoreMsgSize(m.subj, m.hdr, m.msg)
mb.msgs--
mb.bytes -= rl
mb.rbytes -= rl
// For return accounting.
purged++
bytes += uint64(rl)
}
}
purged++
bytes += uint64(rl)
}
// Truncate our msgs and close file.
@@ -2517,11 +2543,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
return 0, 0, fmt.Errorf("failed to truncate msg block %d, file not open", mb.index)
}
// Do local mb stat updates.
mb.msgs -= purged
mb.bytes -= bytes
mb.rbytes -= bytes
// Update our last msg.
mb.last.seq = sm.seq
mb.last.ts = sm.ts
@@ -3603,6 +3624,12 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
if err != nil || fsm == nil {
return nil, err
}
// Deleted messages that are decoded return a 0 for seqeunce.
if fsm.seq == 0 {
return nil, errDeletedMsg
}
if seq != fsm.seq {
recycleMsgBlockBuf(mb.cache.buf)
mb.cache.buf = nil
@@ -4040,10 +4067,16 @@ func (mb *msgBlock) readIndexInfo() error {
// Check if this is a short write index file.
if bi < 0 || bi+checksumSize > len(buf) {
defer os.Remove(mb.ifn)
os.Remove(mb.ifn)
return fmt.Errorf("short index file")
}
// Check for consistency if accounting. If something is off bail and we will rebuild.
if mb.msgs != (mb.last.seq-mb.first.seq+1)-dmapLen {
os.Remove(mb.ifn)
return fmt.Errorf("accounting inconsistent")
}
// Checksum
copy(mb.lchk[0:], buf[bi:bi+checksumSize])
bi += checksumSize
@@ -4151,6 +4184,10 @@ func compareFn(subject string) func(string, string) bool {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return fs.Purge()
@@ -4198,7 +4235,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
mb.loadMsgsWithLock()
shouldExpire = true
}
if sequence > 0 && sequence <= l {
if sequence > 1 && sequence <= l {
l = sequence - 1
}
@@ -4207,10 +4244,13 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
// Do fast in place remove.
// Stats
fs.state.Msgs--
fs.state.Bytes -= rl
mb.msgs--
mb.bytes -= rl
if mb.msgs > 0 {
fs.state.Msgs--
fs.state.Bytes -= rl
mb.msgs--
mb.bytes -= rl
purged++
}
// FSS updates.
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq, &smv)
@@ -4231,7 +4271,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
mb.dmap[seq] = struct{}{}
}
purged++
if maxp > 0 && purged >= maxp {
break
}
@@ -4391,10 +4431,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
}
} else if sm != nil {
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
smb.bytes -= sz
bytes += sz
smb.msgs--
purged++
if smb.msgs > 0 {
smb.bytes -= sz
bytes += sz
smb.msgs--
purged++
}
// Update fss
fs.removePerSubject(sm.subj)
smb.removeSeqPerSubject(sm.subj, mseq, &smv)

View File

@@ -3825,3 +3825,85 @@ func TestFileStoreRememberLastMsgTime(t *testing.T) {
require_True(t, lt == fs.State().LastTime)
require_True(t, seq == 6)
}
func (fs *fileStore) getFirstBlock() *msgBlock {
fs.mu.RLock()
defer fs.mu.RUnlock()
if len(fs.blks) == 0 {
return nil
}
return fs.blks[0]
}
func TestFileStoreRebuildStateDmapAccountingBug(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024},
StreamConfig{Name: "TEST", Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
for i := 0; i < 100; i++ {
_, _, err = fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}
// Delete 2-40.
for i := 2; i <= 40; i++ {
_, err := fs.RemoveMsg(uint64(i))
require_NoError(t, err)
}
mb := fs.getFirstBlock()
require_True(t, mb != nil)
check := func() {
t.Helper()
mb.mu.RLock()
defer mb.mu.RUnlock()
dmapLen := uint64(len(mb.dmap))
if mb.msgs != (mb.last.seq-mb.first.seq+1)-dmapLen {
t.Fatalf("Consistency check failed: %d != %d -> last %d first %d len(dmap) %d",
mb.msgs, (mb.last.seq-mb.first.seq+1)-dmapLen, mb.last.seq, mb.first.seq, dmapLen)
}
}
check()
mb.mu.Lock()
mb.compact()
mb.mu.Unlock()
// Now delete first.
_, err = fs.RemoveMsg(1)
require_NoError(t, err)
mb.mu.Lock()
_, err = mb.rebuildStateLocked()
require_NoError(t, err)
mb.mu.Unlock()
check()
}
func TestFileStorePurgeExWithSubject(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "TEST", Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
for i := 0; i < 100; i++ {
_, _, err = fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}
// This should purge all.
fs.PurgeEx("foo", 1, 0)
require_True(t, fs.State().Msgs == 0)
}

View File

@@ -428,6 +428,10 @@ func (ms *memStore) expireMsgs() {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return ms.Purge()
@@ -455,7 +459,7 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6
ss.Msgs -= keep
}
last := ss.Last
if sequence > 0 {
if sequence > 1 {
last = sequence - 1
}
ms.mu.Lock()

View File

@@ -391,3 +391,17 @@ func TestMemStoreStreamTruncate(t *testing.T) {
t.Fatalf("Expected deleted to be %+v, got %+v\n", expected, state.Deleted)
}
}
func TestMemStorePurgeExWithSubject(t *testing.T) {
ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage})
require_NoError(t, err)
for i := 0; i < 100; i++ {
_, _, err = ms.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}
// This should purge all.
ms.PurgeEx("foo", 1, 0)
require_True(t, ms.State().Msgs == 0)
}

View File

@@ -61,6 +61,8 @@ var (
ErrInvalidSequence = errors.New("invalid sequence")
// ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong.
ErrSequenceMismatch = errors.New("expected sequence does not match store")
// ErrPurgeArgMismatch is returned when PurgeEx is called with sequence > 1 and keep > 0.
ErrPurgeArgMismatch = errors.New("sequence > 1 && keep > 0 not allowed")
)
// StoreMsg is the stored message format for messages that are retained by the Store layer.