Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2022-12-14 07:51:35 -08:00
2 changed files with 107 additions and 32 deletions

View File

@@ -1205,6 +1205,21 @@ func (fs *fileStore) expireMsgsOnRecover() {
var deleted int
var nts int64
deleteEmptyBlock := func(mb *msgBlock) bool {
// If we are the last keep state to remember first sequence.
if mb == fs.lmb {
// Do this part by hand since not deleting one by one.
mb.first.seq, mb.first.ts = mb.last.seq+1, 0
mb.closeAndKeepIndex()
// Clear any global subject state.
fs.psim = make(map[string]*psi)
return false
}
mb.dirtyCloseWithRemove(true)
deleted++
return true
}
for _, mb := range fs.blks {
mb.mu.Lock()
if minAge < mb.first.ts {
@@ -1216,22 +1231,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
// If we are the last keep state to remember first sequence.
if mb == fs.lmb {
// Do this part by hand since not deleting one by one.
mb.first.seq, mb.first.ts = mb.last.seq+1, 0
mb.closeAndKeepIndex()
// Clear any global subject state.
fs.psim = make(map[string]*psi)
} else {
mb.dirtyCloseWithRemove(true)
deleted++
}
newFirst := mb.last.seq + 1
didRemove := deleteEmptyBlock(mb)
mb.mu.Unlock()
// Update fs first here as well.
fs.state.FirstSeq = newFirst
fs.state.FirstTime = time.Time{}
if !didRemove {
mb.writeIndexInfo()
}
continue
}
@@ -1292,22 +1296,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Check if empty after processing, could happen if tail of messages are all deleted.
isEmpty := mb.msgs == 0
if isEmpty {
mb.dirtyCloseWithRemove(true)
// Update fs first here as well.
fs.state.FirstSeq = mb.last.seq + 1
fs.state.FirstTime = time.Time{}
deleted++
} else {
// Update fs first seq and time.
fs.state.FirstSeq = mb.first.seq
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
needWriteIndex := true
if mb.msgs == 0 {
needWriteIndex = !deleteEmptyBlock(mb)
}
mb.mu.Unlock()
if !isEmpty {
// Make sure to write out our index info.
if needWriteIndex {
mb.writeIndexInfo()
}
break
@@ -1336,6 +1330,8 @@ func (fs *fileStore) expireMsgsOnRecover() {
// Update top level accounting.
fs.state.Msgs -= purged
fs.state.Bytes -= bytes
// Make sure to we properly set the fs first sequence and timestamp.
fs.selectNextFirst()
}
func copyMsgBlocks(src []*msgBlock) []*msgBlock {
@@ -2884,6 +2880,7 @@ func (mb *msgBlock) selectNextFirst() {
}
// Select the next FirstSeq
// Lock should be held.
func (fs *fileStore) selectNextFirst() {
if len(fs.blks) > 0 {
mb := fs.blks[0]
@@ -3054,8 +3051,12 @@ func (fs *fileStore) startAgeChk() {
// Lock should be held.
func (fs *fileStore) resetAgeChk(delta int64) {
if fs.cfg.MaxAge == 0 {
return
}
fireIn := fs.cfg.MaxAge
if delta > 0 {
if delta > 0 && time.Duration(delta) < fireIn {
fireIn = time.Duration(delta)
}
if fs.ageChk != nil {
@@ -3089,10 +3090,15 @@ func (fs *fileStore) expireMsgs() {
fs.mu.Lock()
defer fs.mu.Unlock()
if sm == nil {
// Onky cancel if no message left, not on potential lookup error that would result in sm == nil.
if fs.state.Msgs == 0 {
fs.cancelAgeChk()
} else {
fs.resetAgeChk(sm.ts - minAge)
if sm == nil {
fs.resetAgeChk(0)
} else {
fs.resetAgeChk(sm.ts - minAge)
}
}
}

View File

@@ -4764,3 +4764,72 @@ func TestFileStoreUpdateMaxMsgsPerSubject(t *testing.T) {
t.Fatalf("Expected to have %d stored, got %d", 10, ss.Msgs)
}
}
func TestFileStoreBadFirstAndFailedExpireAfterRestart(t *testing.T) {
storeDir := t.TempDir()
ttl := time.Second
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 256},
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage, MaxAge: ttl},
)
require_NoError(t, err)
defer fs.Stop()
// With block size of 256 and subject and message below, seq 8 starts new block.
// Will double check and fail test if not the case since test depends on this.
subj, msg := "foo", []byte("ZZ")
// These are all instant and will expire after 1 sec.
start := time.Now()
for i := 0; i < 7; i++ {
_, _, err := fs.StoreMsg(subj, nil, msg)
require_NoError(t, err)
}
// Put two more after a delay.
time.Sleep(500 * time.Millisecond)
seq, _, err := fs.StoreMsg(subj, nil, msg)
require_NoError(t, err)
_, _, err = fs.StoreMsg(subj, nil, msg)
require_NoError(t, err)
// Make sure that sequence 8 is first in second block, and break test if that is not true.
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.RLock()
first := lmb.first.seq
lmb.mu.RUnlock()
require_True(t, first == 8)
// Instantly remove first one from second block.
// On restart this will trigger expire on recover which will set fs.FirstSeq to the deleted one.
fs.RemoveMsg(seq)
// Stop the filstore and wait til first block expires.
fs.Stop()
time.Sleep(ttl - time.Since(start) + (10 * time.Millisecond))
fs, err = newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 256},
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage, MaxAge: ttl},
)
require_NoError(t, err)
defer fs.Stop()
// Check that state is correct for first message which should be 9 and have a proper timestamp.
var state StreamState
fs.FastState(&state)
ts := state.FirstTime
require_True(t, state.Msgs == 1)
require_True(t, state.FirstSeq == 9)
require_True(t, !state.FirstTime.IsZero())
// Wait and make sure expire timer is still working properly.
time.Sleep(ttl)
fs.FastState(&state)
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 10)
require_True(t, state.LastSeq == 9)
require_True(t, state.LastTime == ts)
}