diff --git a/server/filestore.go b/server/filestore.go index ef170eba..b7674985 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2058,6 +2058,12 @@ func (fs *fileStore) expireMsgsOnRecover() { // Clear any global subject state. fs.psim = make(map[string]*psi) } + + // If we purged anything, make sure we kick flush state loop. + if purged > 0 { + fs.dirty++ + fs.kickFlushStateLoop() + } } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3504,6 +3510,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen) if rbytes>>2 > mb.bytes { mb.compact() + fs.kickFlushStateLoop() } } } @@ -3655,7 +3662,7 @@ func (mb *msgBlock) compact() { } // Remove index file and wipe delete map, then rebuild. - mb.deleteDmap() + mb.dmap.Empty() mb.rebuildStateLocked() // If we entered with the msgs loaded make sure to reload them. @@ -3664,11 +3671,6 @@ func (mb *msgBlock) compact() { } } -// Empty out our dmap. -func (mb *msgBlock) deleteDmap() { - mb.dmap.Empty() -} - // Grab info from a slot. // Lock should be held. func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { @@ -6379,6 +6381,12 @@ func (fs *fileStore) reset() error { fs.psim = make(map[string]*psi) fs.bim = make(map[uint32]*msgBlock) + // If we purged anything, make sure we kick flush state loop. + if purged > 0 { + fs.dirty++ + fs.kickFlushStateLoop() + } + fs.mu.Unlock() if cb != nil { @@ -6502,6 +6510,7 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) { // Remove from list. for i, omb := range fs.blks { if mb == omb { + fs.dirty++ blks := append(fs.blks[:i], fs.blks[i+1:]...) fs.blks = copyMsgBlocks(blks) if fs.bim != nil {