When removing a msg and we need to load the msg block and incur IO, unlock fs lock to avoid stalling other activity on other blocks.

E.g removing and adding msgs at the same time.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-05-02 08:56:43 -07:00
parent ff6c80350b
commit 4a58feff27

View File

@@ -2716,11 +2716,27 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// Now just load regardless.
// TODO(dlc) - Figure out a way not to have to load it in, we need subject tracking outside main data block.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
mb.mu.Unlock()
fsUnlock()
// We do not want to block possible activity within another msg block.
// We have to unlock both locks and acquire the mb lock in the loadMsgs() call to avoid a deadlock if another
// go routine was trying to get fs then this mb lock at the same time. E.g. another call to remove for same block.
mb.mu.Unlock()
fsUnlock()
if err := mb.loadMsgs(); err != nil {
return false, err
}
fsLock()
// We need to check if things changed out from underneath us.
if fs.closed {
fsUnlock()
return false, ErrStoreClosed
}
mb.mu.Lock()
if mb.closed || seq < mb.first.seq {
mb.mu.Unlock()
fsUnlock()
return false, nil
}
// cacheLookup below will do dmap check so no need to repeat here.
}
var smv StoreMsg
@@ -2728,6 +2744,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
if err != nil {
mb.mu.Unlock()
fsUnlock()
// Mimic err behavior from above check to dmap. No error returned if already removed.
if err == errDeletedMsg {
err = nil
}
return false, err
}
// Grab size