diff --git a/server/filestore.go b/server/filestore.go index 7b56d889..be61e3ce 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1868,12 +1868,24 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) seqStart, _ = fs.selectMsgBlockWithIndex(sseq) } - tsa := [32]string{} - fsa := [32]string{} + var tsa, fsa [32]string fts := tokenizeSubjectIntoSlice(fsa[:0], filter) isAll := filter == _EMPTY_ || filter == fwcs wc := subjectHasWildcard(filter) + // See if filter was provided but its the only subject. + if !isAll && !wc && len(fs.psim) == 1 && fs.psim[filter] != nil { + isAll = true + } + + // If we are isAll and have no deleted we can do a simpler calculation. + if isAll && (fs.state.LastSeq-fs.state.FirstSeq+1) == fs.state.Msgs { + if sseq == 0 { + return fs.state.Msgs, validThrough + } + return fs.state.LastSeq - sseq + 1, validThrough + } + isMatch := func(subj string) bool { if isAll { return true @@ -1900,6 +1912,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) var t uint64 if isAll && sseq <= mb.first.seq { if lastPerSubject { + mb.ensurePerSubjectInfoLoaded() for subj := range mb.fss { if !seen[subj] { total++ @@ -2023,16 +2036,20 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.mu.Lock() // Check if we should include all of this block in adjusting. If so work with metadata. if sseq > mb.last.seq { - // We need to adjust for all matches in this block. - // We will scan fss state vs messages themselves. - // Make sure we have fss loaded. - mb.ensurePerSubjectInfoLoaded() - for subj, ss := range mb.fss { - if isMatch(subj) { - if lastPerSubject { - adjust++ - } else { - adjust += ss.Msgs + if isAll && !lastPerSubject { + adjust += mb.msgs + } else { + // We need to adjust for all matches in this block. + // We will scan fss state vs messages themselves. + // Make sure we have fss loaded. + mb.ensurePerSubjectInfoLoaded() + for subj, ss := range mb.fss { + if isMatch(subj) { + if lastPerSubject { + adjust++ + } else { + adjust += ss.Msgs + } } } } diff --git a/server/filestore_test.go b/server/filestore_test.go index e99385c0..939a37c5 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5457,3 +5457,45 @@ func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) { } }) } + +func TestFileStoreNumPendingLargeNumBlks(t *testing.T) { + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + BlockSize: 128, // Small on purpose to create alot of blks. + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage}) + require_NoError(t, err) + + subj, msg := "zzz", bytes.Repeat([]byte("X"), 100) + numMsgs := 10_000 + + for i := 0; i < numMsgs; i++ { + fs.StoreMsg(subj, nil, msg) + } + + start := time.Now() + total, _ := fs.NumPending(4000, "zzz", false) + require_True(t, time.Since(start) < 5*time.Millisecond) + require_True(t, total == 6001) + + start = time.Now() + total, _ = fs.NumPending(6000, "zzz", false) + require_True(t, time.Since(start) < 5*time.Millisecond) + require_True(t, total == 4001) + + // Now delete a message in first half and second half. + fs.RemoveMsg(1000) + fs.RemoveMsg(9000) + + start = time.Now() + total, _ = fs.NumPending(4000, "zzz", false) + require_True(t, time.Since(start) < 50*time.Millisecond) + require_True(t, total == 6000) + + start = time.Now() + total, _ = fs.NumPending(6000, "zzz", false) + require_True(t, time.Since(start) < 50*time.Millisecond) + require_True(t, total == 4000) +}