Updates to index cache handling for message blocks.

We can have partial caches and we can also remove the idx cache. This was causing a bug where we would get the wrong slotInfo from the cache.idx. This code fixes the bug and detects idx partials.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-11-10 12:31:51 -08:00
parent 55419c291b
commit 21a35546c1
2 changed files with 59 additions and 10 deletions

View File

@@ -826,12 +826,11 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
mb.mu.Lock()
// Check cache. This will be very rare, will hold lock on this one.
// Check cache. This will be very rare.
if mb.cache == nil || mb.cache.idx == nil {
mb.mu.Unlock()
fs.mu.Unlock()
if err := mb.loadMsgs(); err != nil {
fs.mu.Unlock()
return false, err
}
fs.mu.Lock()
@@ -940,7 +939,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
// Grab info from a slot.
// Lock should be held.
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
if mb.cache == nil || mb.cache.idx == nil {
if mb.cache == nil || slot >= len(mb.cache.idx) {
return 0, 0, false, errPartialCache
}
bi := mb.cache.idx[slot]
@@ -1969,16 +1968,18 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) {
}
}
// Update cache activity.
mb.llts = time.Now().UnixNano()
bi, _, hashChecked, _ := mb.slotInfo(int(seq - mb.cache.fseq))
// Check if partial cache and we miss.
if mb.cache.off > 0 && bi <= uint32(mb.cache.off) {
if mb.cache.off > 0 {
return nil, errPartialCache
}
bi, _, hashChecked, err := mb.slotInfo(int(seq - mb.cache.fseq))
if err != nil {
return nil, errPartialCache
}
// Update cache activity.
mb.llts = time.Now().UnixNano()
// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {

View File

@@ -1358,6 +1358,54 @@ func TestFileStorePartialCacheExpiration(t *testing.T) {
}
}
func TestFileStorePartialIndexes(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
cexp := 10 * time.Millisecond
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, CacheExpire: cexp}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
toSend := 5
for i := 0; i < toSend; i++ {
fs.StoreMsg("foo", nil, []byte("ok-1"))
}
// Now wait til the cache expires, including the index.
fs.mu.Lock()
mb := fs.blks[0]
fs.mu.Unlock()
// Force idx to expire by resetting last remove ts.
mb.mu.Lock()
mb.lrts = mb.lrts - int64(defaultCacheIdxExpiration*2)
mb.mu.Unlock()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.cache == nil || len(mb.cache.idx) == 0 {
return nil
}
return fmt.Errorf("Index not empty")
})
// Create a partial cache by adding more msgs.
for i := 0; i < toSend; i++ {
fs.StoreMsg("foo", nil, []byte("ok-2"))
}
// If we now load in a message in second half if we do not
// detect idx is a partial correctly this will panic.
if _, _, _, _, err := fs.LoadMsg(8); err != nil {
t.Fatalf("Error loading %d: %v", 1, err)
}
}
func TestFileStoreSnapshot(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)