Run expiration only once at a time and double check cache, fixes #1482

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-06-18 12:14:25 -07:00
parent fa744fdcda
commit a13402cd01
2 changed files with 68 additions and 22 deletions

View File

@@ -65,22 +65,23 @@ type FileConsumerInfo struct {
}
type fileStore struct {
mu sync.RWMutex
state StreamState
scb func(int64)
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
fcfg FileStoreConfig
lmb *msgBlock
blks []*msgBlock
hh hash.Hash64
wmb *bytes.Buffer
fch chan struct{}
qch chan struct{}
cfs []*consumerFileStore
closed bool
sips int
mu sync.RWMutex
state StreamState
scb func(int64)
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
fcfg FileStoreConfig
lmb *msgBlock
blks []*msgBlock
hh hash.Hash64
wmb *bytes.Buffer
fch chan struct{}
qch chan struct{}
cfs []*consumerFileStore
closed bool
expiring bool
sips int
}
// Represents a message store block and its data.
@@ -714,7 +715,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
sm, _ := mb.fetchMsg(seq)
// We have the message here, so we can delete it.
if sm != nil {
fs.deleteMsgFromBlock(mb, seq, sm, secure)
if err := fs.deleteMsgFromBlock(mb, seq, sm, secure); err != nil {
return false, err
}
}
return sm != nil, nil
}
@@ -770,7 +773,7 @@ func (mb *msgBlock) selectNextFirst() {
}
}
func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) {
func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) error {
// Update global accounting.
msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
@@ -778,17 +781,34 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
mb.mu.Lock()
// Make sure the seq still exists.
if mb.cache == nil {
// Rare condition but can happen. We will load with lock held at this point.
buf, err := ioutil.ReadFile(mb.mfn)
if err == nil {
err = mb.indexCacheBuf(buf)
}
if err != nil {
mb.mu.Unlock()
fs.mu.Unlock()
return err
}
if len(buf) > 0 {
mb.cloads++
mb.startCacheExpireTimer()
}
}
if seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) {
mb.mu.Unlock()
fs.mu.Unlock()
return
return nil
}
// Now check dmap if it is there.
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
mb.mu.Unlock()
fs.mu.Unlock()
return
return nil
}
}
@@ -846,6 +866,8 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
delta := int64(msz)
fs.scb(-delta)
}
return nil
}
// Lock should be held.
@@ -888,6 +910,21 @@ func (fs *fileStore) expireMsgsLocked() {
// Will expire msgs that are too old.
func (fs *fileStore) expireMsgs() {
// Make sure this is only running one at a time.
fs.mu.Lock()
if fs.expiring {
fs.mu.Unlock()
return
}
fs.expiring = true
fs.mu.Unlock()
defer func() {
fs.mu.Lock()
fs.expiring = false
fs.mu.Unlock()
}()
now := time.Now().UnixNano()
minAge := now - int64(fs.cfg.MaxAge)
@@ -1361,8 +1398,14 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
func (mb *msgBlock) loadMsgs() error {
mb.mu.RLock()
mfn := mb.mfn
hasCache := mb.cache != nil
mb.mu.RUnlock()
// Someone else may have filled this in by the time we get here.
if hasCache {
return nil
}
// Load in the whole block.
buf, err := ioutil.ReadFile(mfn)
if err != nil {

View File

@@ -607,7 +607,10 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) {
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge})
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, ReadCacheExpire: 1 * time.Millisecond},
StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -615,7 +618,7 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) {
// Store some messages. Does not really matter how many.
subj, msg := "foo", []byte("Hello World")
toStore := 100
toStore := 1000
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, nil, msg)
}