[IMPROVED] Additional markers for dirty state (#4601)

Under certain circumstances we could delay recovery if the state file
pointed to an absent msg block.
Found additional places to mark dirty and optionally kick the flusher.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-27 20:48:32 -07:00
committed by GitHub

View File

@@ -2058,6 +2058,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
// Clear any global subject state.
fs.psim = make(map[string]*psi)
}
// If we purged anything, make sure we kick flush state loop.
if purged > 0 {
fs.dirty++
fs.kickFlushStateLoop()
}
}
func copyMsgBlocks(src []*msgBlock) []*msgBlock {
@@ -3504,6 +3510,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen)
if rbytes>>2 > mb.bytes {
mb.compact()
fs.kickFlushStateLoop()
}
}
}
@@ -3655,7 +3662,7 @@ func (mb *msgBlock) compact() {
}
// Remove index file and wipe delete map, then rebuild.
mb.deleteDmap()
mb.dmap.Empty()
mb.rebuildStateLocked()
// If we entered with the msgs loaded make sure to reload them.
@@ -3664,11 +3671,6 @@ func (mb *msgBlock) compact() {
}
}
// Empty out our dmap.
func (mb *msgBlock) deleteDmap() {
mb.dmap.Empty()
}
// Grab info from a slot.
// Lock should be held.
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
@@ -6379,6 +6381,12 @@ func (fs *fileStore) reset() error {
fs.psim = make(map[string]*psi)
fs.bim = make(map[uint32]*msgBlock)
// If we purged anything, make sure we kick flush state loop.
if purged > 0 {
fs.dirty++
fs.kickFlushStateLoop()
}
fs.mu.Unlock()
if cb != nil {
@@ -6502,6 +6510,7 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) {
// Remove from list.
for i, omb := range fs.blks {
if mb == omb {
fs.dirty++
blks := append(fs.blks[:i], fs.blks[i+1:]...)
fs.blks = copyMsgBlocks(blks)
if fs.bim != nil {