mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Merge pull request #3801 from nats-io/filtered-deleted
[FIXED] Filtered state for all subjects when the first seqs are deleted.
This commit is contained in:
@@ -1521,10 +1521,17 @@ func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, fi
|
||||
// This will traverse a message block and generate the filtered pending.
|
||||
// Lock should be held.
|
||||
func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (total, first, last uint64) {
|
||||
isAll := filter == _EMPTY_ || filter == fwcs
|
||||
|
||||
// First check if we can optimize this part.
|
||||
// This means we want all and the starting sequence was before this block.
|
||||
if isAll && seq <= mb.first.seq {
|
||||
return mb.msgs, mb.first.seq, mb.last.seq
|
||||
}
|
||||
|
||||
// Make sure we have fss loaded.
|
||||
mb.ensurePerSubjectInfoLoaded()
|
||||
|
||||
isAll := filter == _EMPTY_ || filter == fwcs
|
||||
subs := []string{filter}
|
||||
// If we have a wildcard match against all tracked subjects we know about.
|
||||
if wc || isAll {
|
||||
@@ -1664,20 +1671,6 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
|
||||
return ss
|
||||
}
|
||||
|
||||
// If subj is empty or we are not tracking multiple subjects.
|
||||
if subj == _EMPTY_ || subj == fwcs {
|
||||
total := lseq - sseq + 1
|
||||
if state := fs.State(); len(state.Deleted) > 0 {
|
||||
for _, dseq := range state.Deleted {
|
||||
if dseq >= sseq && dseq <= lseq {
|
||||
total--
|
||||
}
|
||||
}
|
||||
}
|
||||
ss.Msgs, ss.First, ss.Last = total, sseq, lseq
|
||||
return ss
|
||||
}
|
||||
|
||||
wc := subjectHasWildcard(subj)
|
||||
|
||||
// Tracking subject state.
|
||||
|
||||
@@ -4994,3 +4994,49 @@ func TestFileStoreMsgBlkFailOnKernelFaultLostDataReporting(t *testing.T) {
|
||||
require_True(t, state.Lost != nil)
|
||||
require_True(t, len(state.Lost.Msgs) == 93)
|
||||
}
|
||||
|
||||
func TestFileStoreAllFilteredStateWithDeleted(t *testing.T) {
|
||||
storeDir := t.TempDir()
|
||||
fs, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir, BlockSize: 1024},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
subj, msg := "foo", []byte("Hello World")
|
||||
for i := 0; i < 100; i++ {
|
||||
_, _, err := fs.StoreMsg(subj, nil, msg)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
remove := func(seqs ...uint64) {
|
||||
for _, seq := range seqs {
|
||||
ok, err := fs.RemoveMsg(seq)
|
||||
require_NoError(t, err)
|
||||
require_True(t, ok)
|
||||
}
|
||||
}
|
||||
|
||||
checkFilteredState := func(start, msgs, first, last int) {
|
||||
fss := fs.FilteredState(uint64(start), _EMPTY_)
|
||||
if fss.Msgs != uint64(msgs) {
|
||||
t.Fatalf("Expected %d msgs, got %d", msgs, fss.Msgs)
|
||||
}
|
||||
if fss.First != uint64(first) {
|
||||
t.Fatalf("Expected %d to be first, got %d", first, fss.First)
|
||||
}
|
||||
if fss.Last != uint64(last) {
|
||||
t.Fatalf("Expected %d to be last, got %d", last, fss.Last)
|
||||
}
|
||||
}
|
||||
|
||||
checkFilteredState(1, 100, 1, 100)
|
||||
remove(2)
|
||||
checkFilteredState(2, 98, 3, 100)
|
||||
remove(3, 4, 5)
|
||||
checkFilteredState(2, 95, 6, 100)
|
||||
checkFilteredState(6, 95, 6, 100)
|
||||
remove(8, 10, 12, 14, 16, 18)
|
||||
checkFilteredState(7, 88, 7, 100)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user