diff --git a/server/filestore.go b/server/filestore.go index 7e75d5a2..98887882 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1521,10 +1521,17 @@ func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, fi // This will traverse a message block and generate the filtered pending. // Lock should be held. func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (total, first, last uint64) { + isAll := filter == _EMPTY_ || filter == fwcs + + // First check if we can optimize this part. + // This means we want all and the starting sequence was before this block. + if isAll && seq <= mb.first.seq { + return mb.msgs, mb.first.seq, mb.last.seq + } + // Make sure we have fss loaded. mb.ensurePerSubjectInfoLoaded() - isAll := filter == _EMPTY_ || filter == fwcs subs := []string{filter} // If we have a wildcard match against all tracked subjects we know about. if wc || isAll { @@ -1664,20 +1671,6 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { return ss } - // If subj is empty or we are not tracking multiple subjects. - if subj == _EMPTY_ || subj == fwcs { - total := lseq - sseq + 1 - if state := fs.State(); len(state.Deleted) > 0 { - for _, dseq := range state.Deleted { - if dseq >= sseq && dseq <= lseq { - total-- - } - } - } - ss.Msgs, ss.First, ss.Last = total, sseq, lseq - return ss - } - wc := subjectHasWildcard(subj) // Tracking subject state. diff --git a/server/filestore_test.go b/server/filestore_test.go index 8502d0f2..925cb845 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4994,3 +4994,49 @@ func TestFileStoreMsgBlkFailOnKernelFaultLostDataReporting(t *testing.T) { require_True(t, state.Lost != nil) require_True(t, len(state.Lost.Msgs) == 93) } + +func TestFileStoreAllFilteredStateWithDeleted(t *testing.T) { + storeDir := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir, BlockSize: 1024}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + subj, msg := "foo", []byte("Hello World") + for i := 0; i < 100; i++ { + _, _, err := fs.StoreMsg(subj, nil, msg) + require_NoError(t, err) + } + + remove := func(seqs ...uint64) { + for _, seq := range seqs { + ok, err := fs.RemoveMsg(seq) + require_NoError(t, err) + require_True(t, ok) + } + } + + checkFilteredState := func(start, msgs, first, last int) { + fss := fs.FilteredState(uint64(start), _EMPTY_) + if fss.Msgs != uint64(msgs) { + t.Fatalf("Expected %d msgs, got %d", msgs, fss.Msgs) + } + if fss.First != uint64(first) { + t.Fatalf("Expected %d to be first, got %d", first, fss.First) + } + if fss.Last != uint64(last) { + t.Fatalf("Expected %d to be last, got %d", last, fss.Last) + } + } + + checkFilteredState(1, 100, 1, 100) + remove(2) + checkFilteredState(2, 98, 3, 100) + remove(3, 4, 5) + checkFilteredState(2, 95, 6, 100) + checkFilteredState(6, 95, 6, 100) + remove(8, 10, 12, 14, 16, 18) + checkFilteredState(7, 88, 7, 100) +}