Make sure we adjust per subject info when doing a Compact().

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-02-10 07:21:02 +02:00
parent 9f5e1509b9
commit 0da2a150cc
4 changed files with 65 additions and 2 deletions

View File

@@ -3125,6 +3125,7 @@ func (fs *fileStore) expireMsgs() {
fs.mu.RLock()
minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge)
fs.mu.RUnlock()
for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) {
fs.removeMsg(sm.seq, false, true)
}
@@ -4859,6 +4860,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
mb.mu.Lock()
purged += mb.msgs
bytes += mb.bytes
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
}
// Now close.
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
deleted++

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -5197,3 +5197,33 @@ func TestFileStoreStreamTruncateResetMultiBlock(t *testing.T) {
require_True(t, state.NumDeleted == 0)
})
}
func TestFileStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 128
fs, err := newFileStore(
fcfg,
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
for i := 0; i < 1000; i++ {
subj := fmt.Sprintf("foo.%d", i)
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}
require_True(t, fs.numMsgBlocks() == 500)
// Compact such that we know we throw blocks away from the beginning.
deleted, err := fs.Compact(501)
require_NoError(t, err)
require_True(t, deleted == 500)
require_True(t, fs.numMsgBlocks() == 250)
// Make sure we adjusted for subjects etc.
state := fs.State()
require_True(t, state.NumSubjects == 500)
})
}

View File

@@ -541,6 +541,7 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
ms.removeSeqPerSubject(sm.subj, seq)
}
}
ms.state.Msgs -= purged

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -495,3 +495,28 @@ func TestMemStoreStreamTruncateReset(t *testing.T) {
require_True(t, state.NumSubjects == 1)
require_True(t, state.NumDeleted == 0)
}
func TestMemStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
cfg := &StreamConfig{
Name: "TEST",
Storage: MemoryStorage,
Subjects: []string{"foo.*"},
}
ms, err := newMemStore(cfg)
require_NoError(t, err)
for i := 0; i < 1000; i++ {
subj := fmt.Sprintf("foo.%d", i)
_, _, err := ms.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}
// Compact such that we know we throw blocks away from the beginning.
deleted, err := ms.Compact(501)
require_NoError(t, err)
require_True(t, deleted == 500)
// Make sure we adjusted for subjects etc.
state := ms.State()
require_True(t, state.NumSubjects == 500)
}