Fix for issue #2488.

When we triggered a filestore msg block compact we were not properly dealing with interior deletes.
Subsequent lookups past the skipped messages would cause an error and stop delivering messages.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-09-09 09:53:22 -07:00
parent 9e5526cc9d
commit f75371022d
2 changed files with 66 additions and 6 deletions

View File

@@ -1854,6 +1854,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
mb.dmap[seq] = struct{}{}
// Check if <50% utilization and minimum size met.
if mb.rbytes > compactMinimum && mb.rbytes>>1 > mb.bytes {
// FIXME(dlc) - Might want this out of band.
mb.compact()
}
}
@@ -1938,6 +1939,9 @@ func (mb *msgBlock) compact() {
return false
}
// For skip msgs.
var smh [msgHdrSize]byte
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize >= lbuf {
return
@@ -1960,9 +1964,20 @@ func (mb *msgBlock) compact() {
firstSet = true
mb.first.seq = seq
}
} else if firstSet {
// This is an interior delete that we need to make sure we have a placeholder for.
le.PutUint32(smh[0:], emptyRecordLen)
le.PutUint64(smh[4:], seq|ebit)
le.PutUint64(smh[12:], 0)
le.PutUint16(smh[20:], 0)
nbuf = append(nbuf, smh[:]...)
mb.hh.Reset()
mb.hh.Write(smh[4:20])
checksum := mb.hh.Sum(nil)
nbuf = append(nbuf, checksum...)
}
// Always set last.
mb.last.seq = seq
mb.last.seq = seq &^ ebit
// Advance to next record.
index += rl
}
@@ -1986,7 +2001,9 @@ func (mb *msgBlock) compact() {
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
return
}
os.Rename(mfn, mb.mfn)
if err := os.Rename(mfn, mb.mfn); err != nil {
return
}
// Close cache and open FDs and index file.
mb.clearCacheAndOffset()
@@ -2747,6 +2764,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
return mb
}
}
return nil
}
@@ -3168,8 +3186,10 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
return nil, err
}
if seq != mseq {
mb.cache.buf = nil
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq)
}
sm := &fileStoredMsg{
subj: subj,
hdr: hdr,
@@ -3179,7 +3199,6 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
mb: mb,
off: int64(bi),
}
return sm, nil
}

View File

@@ -3266,9 +3266,6 @@ func TestFileStoreSparseCompaction(t *testing.T) {
if ta >= tb {
t.Fatalf("Expected total after to be less then before, got %d vs %d", tb, ta)
}
if ta != ua {
t.Fatalf("Expected compact to make total and used same, got %d vs %d", ta, ua)
}
}
// Actual testing here.
@@ -3314,3 +3311,47 @@ func TestFileStoreSparseCompaction(t *testing.T) {
eraseMsgs(500, 502, 504, 506, 508, 510)
compact()
}
func TestFileStoreSparseCompactionWithInteriorDeletes(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
cfg := StreamConfig{Name: "KV", Subjects: []string{"kv.>"}, Storage: FileStorage}
var fs *fileStore
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 1; i <= 1000; i++ {
if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, []byte("OK")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Now do interior deletes.
for _, seq := range []uint64{500, 600, 700, 800} {
removed, err := fs.RemoveMsg(seq)
if err != nil || !removed {
t.Fatalf("Got an error on remove of %d: %v", seq, err)
}
}
_, _, _, _, err = fs.LoadMsg(900)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do compact by hand, make sure we can still access msgs past the interior deletes.
fs.mu.RLock()
lmb := fs.lmb
lmb.dirtyCloseWithRemove(false)
lmb.compact()
fs.mu.RUnlock()
_, _, _, _, err = fs.LoadMsg(900)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}