Call up to upper layers on Stop() for accounting purposes

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-08-01 17:36:40 -07:00
parent b8d1ac9475
commit 97827be97d
2 changed files with 11 additions and 3 deletions

View File

@@ -5421,13 +5421,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
var purged, bytes uint64
// We have to delete interior messages.
fs.mu.Lock()
// Same as purge all.
if lseq := fs.state.LastSeq; seq > lseq {
fs.mu.Unlock()
return fs.purge(seq)
}
// We have to delete interior messages.
smb := fs.selectMsgBlock(seq)
if smb == nil {
fs.mu.Unlock()
@@ -6345,6 +6345,9 @@ func (fs *fileStore) Stop() error {
fs.cancelSyncTimer()
fs.cancelAgeChk()
// We should update the upper usage layer on a stop.
cb, bytes := fs.scb, int64(fs.state.Bytes)
var _cfs [256]ConsumerStore
cfs := append(_cfs[:0], fs.cfs...)
fs.cfs = nil
@@ -6354,6 +6357,10 @@ func (fs *fileStore) Stop() error {
o.Stop()
}
if bytes > 0 && cb != nil {
cb(0, -bytes, 0, _EMPTY_)
}
return nil
}

View File

@@ -1144,11 +1144,12 @@ func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
// Delete is same as Stop for memory store.
func (ms *memStore) Delete() error {
ms.Purge()
return ms.Stop()
}
func (ms *memStore) Stop() error {
// These can't come back, so stop is same as Delete.
ms.Purge()
ms.mu.Lock()
if ms.ageChk != nil {
ms.ageChk.Stop()