diff --git a/server/filestore.go b/server/filestore.go index 00e02d31..21b71912 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -65,22 +65,23 @@ type FileConsumerInfo struct { } type fileStore struct { - mu sync.RWMutex - state StreamState - scb func(int64) - ageChk *time.Timer - syncTmr *time.Timer - cfg FileStreamInfo - fcfg FileStoreConfig - lmb *msgBlock - blks []*msgBlock - hh hash.Hash64 - wmb *bytes.Buffer - fch chan struct{} - qch chan struct{} - cfs []*consumerFileStore - closed bool - sips int + mu sync.RWMutex + state StreamState + scb func(int64) + ageChk *time.Timer + syncTmr *time.Timer + cfg FileStreamInfo + fcfg FileStoreConfig + lmb *msgBlock + blks []*msgBlock + hh hash.Hash64 + wmb *bytes.Buffer + fch chan struct{} + qch chan struct{} + cfs []*consumerFileStore + closed bool + expiring bool + sips int } // Represents a message store block and its data. @@ -714,7 +715,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { sm, _ := mb.fetchMsg(seq) // We have the message here, so we can delete it. if sm != nil { - fs.deleteMsgFromBlock(mb, seq, sm, secure) + if err := fs.deleteMsgFromBlock(mb, seq, sm, secure); err != nil { + return false, err + } } return sm != nil, nil } @@ -770,7 +773,7 @@ func (mb *msgBlock) selectNextFirst() { } } -func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) { +func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) error { // Update global accounting. msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) @@ -778,17 +781,34 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored mb.mu.Lock() // Make sure the seq still exists. + if mb.cache == nil { + // Rare condition but can happen. We will load with lock held at this point. + buf, err := ioutil.ReadFile(mb.mfn) + if err == nil { + err = mb.indexCacheBuf(buf) + } + if err != nil { + mb.mu.Unlock() + fs.mu.Unlock() + return err + } + if len(buf) > 0 { + mb.cloads++ + mb.startCacheExpireTimer() + } + } + if seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { mb.mu.Unlock() fs.mu.Unlock() - return + return nil } // Now check dmap if it is there. if mb.dmap != nil { if _, ok := mb.dmap[seq]; ok { mb.mu.Unlock() fs.mu.Unlock() - return + return nil } } @@ -846,6 +866,8 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored delta := int64(msz) fs.scb(-delta) } + + return nil } // Lock should be held. @@ -888,6 +910,21 @@ func (fs *fileStore) expireMsgsLocked() { // Will expire msgs that are too old. func (fs *fileStore) expireMsgs() { + // Make sure this is only running one at a time. + fs.mu.Lock() + if fs.expiring { + fs.mu.Unlock() + return + } + fs.expiring = true + fs.mu.Unlock() + + defer func() { + fs.mu.Lock() + fs.expiring = false + fs.mu.Unlock() + }() + now := time.Now().UnixNano() minAge := now - int64(fs.cfg.MaxAge) @@ -1361,8 +1398,14 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { func (mb *msgBlock) loadMsgs() error { mb.mu.RLock() mfn := mb.mfn + hasCache := mb.cache != nil mb.mu.RUnlock() + // Someone else may have filled this in by the time we get here. + if hasCache { + return nil + } + // Load in the whole block. buf, err := ioutil.ReadFile(mfn) if err != nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index 48febb74..74be9488 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -607,7 +607,10 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) { os.MkdirAll(storeDir, 0755) defer os.RemoveAll(storeDir) - fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge}) + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir, ReadCacheExpire: 1 * time.Millisecond}, + StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge}, + ) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -615,7 +618,7 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) { // Store some messages. Does not really matter how many. subj, msg := "foo", []byte("Hello World") - toStore := 100 + toStore := 1000 for i := 0; i < toStore; i++ { fs.StoreMsg(subj, nil, msg) }