mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
[FIXED] Expire on recover could not update global per subject map (#4439)
When expiring complete blocks on recover make sure to update global subject index psim. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1315,6 +1315,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
fs.psim = make(map[string]*psi)
|
||||
return false
|
||||
}
|
||||
// Make sure we do subject cleanup as well.
|
||||
mb.ensurePerSubjectInfoLoaded()
|
||||
for subj := range mb.fss {
|
||||
fs.removePerSubject(subj)
|
||||
}
|
||||
mb.dirtyCloseWithRemove(true)
|
||||
deleted++
|
||||
return true
|
||||
|
||||
@@ -4662,6 +4662,40 @@ func TestFileStoreFSSCloseAndKeepOnExpireOnRecoverBug(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileStoreExpireOnRecoverSubjectAccounting(t *testing.T) {
|
||||
const msgLen = 19
|
||||
msg := bytes.Repeat([]byte("A"), msgLen)
|
||||
|
||||
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
|
||||
fcfg.BlockSize = 100
|
||||
ttl := 200 * time.Millisecond
|
||||
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxAge: ttl}
|
||||
|
||||
fs, err := newFileStore(fcfg, scfg)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
// These are in first block.
|
||||
fs.StoreMsg("A", nil, msg)
|
||||
fs.StoreMsg("B", nil, msg)
|
||||
time.Sleep(ttl / 2)
|
||||
// This one in 2nd block.
|
||||
fs.StoreMsg("C", nil, msg)
|
||||
fs.Stop()
|
||||
|
||||
time.Sleep(ttl/2 + 10*time.Millisecond)
|
||||
|
||||
fs, err = newFileStore(fcfg, scfg)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
// Make sure we take into account PSIM when throwing a whole block away.
|
||||
if state := fs.State(); state.NumSubjects != 1 {
|
||||
t.Fatalf("Expected 1 subject, got %d", state.NumSubjects)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileStoreFSSBadStateBug(t *testing.T) {
|
||||
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
|
||||
fs, err := newFileStore(
|
||||
|
||||
Reference in New Issue
Block a user