Merge pull request #3486 from nats-io/issue_3484

[FIXED] Filtered Consumers return wrong messages.
This commit is contained in:
Derek Collison
2022-09-22 07:55:22 -07:00
committed by GitHub
2 changed files with 52 additions and 46 deletions

View File

@@ -147,7 +147,6 @@ type msgBlock struct {
rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk.
msgs uint64 // User visible message count.
fss map[string]*SimpleState
sfilter string // Single subject filter
sfn string
kfn string
lwits int64
@@ -419,18 +418,6 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.ageChk.Stop()
fs.ageChk = nil
}
// Update our sfilter for the last block.
if lmb := fs.lmb; lmb != nil {
lmb.mu.Lock()
if len(fs.cfg.Subjects) == 1 {
lmb.sfilter = fs.cfg.Subjects[0]
} else {
lmb.sfilter = _EMPTY_
}
lmb.mu.Unlock()
}
fs.mu.Unlock()
if cfg.MaxAge != 0 {
@@ -1103,13 +1090,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq = mb.first.seq - 1
}
// If we only have one subject registered we can optimize filtered lookups here.
if len(mb.fss) == 1 {
for sfilter := range mb.fss {
mb.sfilter = sfilter
}
}
return nil, nil
}
@@ -1159,12 +1139,6 @@ func (fs *fileStore) recoverMsgs() error {
if len(fs.blks) > 0 {
sort.Slice(fs.blks, func(i, j int) bool { return fs.blks[i].index < fs.blks[j].index })
fs.lmb = fs.blks[len(fs.blks)-1]
// Update our sfilter for the last block since we could have only see one subject during recovery.
if len(fs.cfg.Subjects) == 1 {
fs.lmb.sfilter = fs.cfg.Subjects[0]
} else {
fs.lmb.sfilter = _EMPTY_
}
} else {
_, err = fs.newMsgBlockForWrite()
}
@@ -1406,8 +1380,12 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
return nil, false, err
}
fseq, isAll, subs := start, filter == _EMPTY_ || filter == mb.sfilter || filter == fwcs, []string{filter}
fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter}
// If we only have 1 subject currently and it matches our filter we can also set isAll.
if !isAll && len(mb.fss) == 1 {
_, isAll = mb.fss[filter]
}
// Skip scan of mb.fss if number of messages in the block are less than
// 1/2 the number of subjects in mb.fss. Or we have a wc and lots of fss entries.
const linearScanMaxFSS = 32
@@ -1796,12 +1774,6 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects()}
// Optimize lookups if we have one subject.
// TODO(dlc) - Make work for mirrors?
if !mb.noTrack && len(fs.cfg.Subjects) == 1 {
mb.sfilter = fs.cfg.Subjects[0]
}
// Lock should be held to quiet race detector.
mb.mu.Lock()
mb.setupWriteCache(rbuf)
@@ -5249,19 +5221,6 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error {
return nil
}
defer func() {
if !hasLock {
mb.mu.Lock()
defer mb.mu.Unlock()
}
// If we only have one subject registered we can optimize filtered lookups here.
if len(mb.fss) == 1 {
for sfilter := range mb.fss {
mb.sfilter = sfilter
}
}
}()
buf, err := mb.loadPerSubjectInfo()
// On failure re-generate.
if err != nil {

View File

@@ -4623,3 +4623,50 @@ func TestFileStoreFSSExpireNumPendingBug(t *testing.T) {
t.Fatalf("Expected only 1 msg, got %d", fss.Msgs)
}
}
// https://github.com/nats-io/nats-server/issues/3484
func TestFileStoreFilteredFirstMatchingBug(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "zzz", Subjects: []string{"foo.>"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
_, _, err = fs.StoreMsg("foo.foo", nil, []byte("A"))
require_NoError(t, err)
_, _, err = fs.StoreMsg("foo.foo", nil, []byte("B"))
require_NoError(t, err)
_, _, err = fs.StoreMsg("foo.foo", nil, []byte("C"))
require_NoError(t, err)
fs.mu.RLock()
mb := fs.lmb
fs.mu.RUnlock()
mb.mu.Lock()
// Simulate swapping out the fss state and reading it back in with only one subject
// present in the block.
if mb.fss != nil {
mb.writePerSubjectInfo()
mb.fss = nil
}
// Now load info back in.
mb.readPerSubjectInfo(true)
mb.mu.Unlock()
// Now add in a different subject.
_, _, err = fs.StoreMsg("foo.bar", nil, []byte("X"))
require_NoError(t, err)
// Now see if a filtered load would incorrectly succeed.
sm, _, err := fs.LoadNextMsg("foo.foo", false, 4, nil)
if err == nil || sm != nil {
t.Fatalf("Loaded filtered message with wrong subject, wanted %q got %q", "foo.foo", sm.subj)
}
}