diff --git a/server/filestore.go b/server/filestore.go index fb63c026..e9e08a86 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -147,7 +147,6 @@ type msgBlock struct { rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk. msgs uint64 // User visible message count. fss map[string]*SimpleState - sfilter string // Single subject filter sfn string kfn string lwits int64 @@ -419,18 +418,6 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.ageChk.Stop() fs.ageChk = nil } - - // Update our sfilter for the last block. - if lmb := fs.lmb; lmb != nil { - lmb.mu.Lock() - if len(fs.cfg.Subjects) == 1 { - lmb.sfilter = fs.cfg.Subjects[0] - } else { - lmb.sfilter = _EMPTY_ - } - lmb.mu.Unlock() - } - fs.mu.Unlock() if cfg.MaxAge != 0 { @@ -1103,13 +1090,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.last.seq = mb.first.seq - 1 } - // If we only have one subject registered we can optimize filtered lookups here. - if len(mb.fss) == 1 { - for sfilter := range mb.fss { - mb.sfilter = sfilter - } - } - return nil, nil } @@ -1159,12 +1139,6 @@ func (fs *fileStore) recoverMsgs() error { if len(fs.blks) > 0 { sort.Slice(fs.blks, func(i, j int) bool { return fs.blks[i].index < fs.blks[j].index }) fs.lmb = fs.blks[len(fs.blks)-1] - // Update our sfilter for the last block since we could have only see one subject during recovery. - if len(fs.cfg.Subjects) == 1 { - fs.lmb.sfilter = fs.cfg.Subjects[0] - } else { - fs.lmb.sfilter = _EMPTY_ - } } else { _, err = fs.newMsgBlockForWrite() } @@ -1406,8 +1380,12 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor return nil, false, err } - fseq, isAll, subs := start, filter == _EMPTY_ || filter == mb.sfilter || filter == fwcs, []string{filter} + fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter} + // If we only have 1 subject currently and it matches our filter we can also set isAll. + if !isAll && len(mb.fss) == 1 { + _, isAll = mb.fss[filter] + } // Skip scan of mb.fss if number of messages in the block are less than // 1/2 the number of subjects in mb.fss. Or we have a wc and lots of fss entries. const linearScanMaxFSS = 32 @@ -1796,12 +1774,6 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects()} - // Optimize lookups if we have one subject. - // TODO(dlc) - Make work for mirrors? - if !mb.noTrack && len(fs.cfg.Subjects) == 1 { - mb.sfilter = fs.cfg.Subjects[0] - } - // Lock should be held to quiet race detector. mb.mu.Lock() mb.setupWriteCache(rbuf) @@ -5249,19 +5221,6 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error { return nil } - defer func() { - if !hasLock { - mb.mu.Lock() - defer mb.mu.Unlock() - } - // If we only have one subject registered we can optimize filtered lookups here. - if len(mb.fss) == 1 { - for sfilter := range mb.fss { - mb.sfilter = sfilter - } - } - }() - buf, err := mb.loadPerSubjectInfo() // On failure re-generate. if err != nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index b7e7a30b..f6bd449e 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4623,3 +4623,50 @@ func TestFileStoreFSSExpireNumPendingBug(t *testing.T) { t.Fatalf("Expected only 1 msg, got %d", fss.Msgs) } } + +// https://github.com/nats-io/nats-server/issues/3484 +func TestFileStoreFilteredFirstMatchingBug(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.>"}, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("foo.foo", nil, []byte("A")) + require_NoError(t, err) + + _, _, err = fs.StoreMsg("foo.foo", nil, []byte("B")) + require_NoError(t, err) + + _, _, err = fs.StoreMsg("foo.foo", nil, []byte("C")) + require_NoError(t, err) + + fs.mu.RLock() + mb := fs.lmb + fs.mu.RUnlock() + + mb.mu.Lock() + // Simulate swapping out the fss state and reading it back in with only one subject + // present in the block. + if mb.fss != nil { + mb.writePerSubjectInfo() + mb.fss = nil + } + // Now load info back in. + mb.readPerSubjectInfo(true) + mb.mu.Unlock() + + // Now add in a different subject. + _, _, err = fs.StoreMsg("foo.bar", nil, []byte("X")) + require_NoError(t, err) + + // Now see if a filtered load would incorrectly succeed. + sm, _, err := fs.LoadNextMsg("foo.foo", false, 4, nil) + if err == nil || sm != nil { + t.Fatalf("Loaded filtered message with wrong subject, wanted %q got %q", "foo.foo", sm.subj) + } +}