mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
When stores and load for last for subject where concurrent and competiting for the same msg, we could fail to retrieve a newly placed message.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1467,6 +1467,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
|
||||
wc := subjectHasWildcard(subj)
|
||||
// Are we tracking multiple subject states?
|
||||
if fs.tms {
|
||||
fs.mu.RLock()
|
||||
for _, mb := range fs.blks {
|
||||
// Skip blocks that are less than our starting sequence.
|
||||
if sseq > atomic.LoadUint64(&mb.last.seq) {
|
||||
@@ -1481,6 +1482,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
|
||||
ss.Last = l
|
||||
}
|
||||
}
|
||||
fs.mu.RUnlock()
|
||||
} else {
|
||||
// Fallback to linear scan.
|
||||
var smv StoreMsg
|
||||
@@ -3799,15 +3801,44 @@ func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
|
||||
return fs.msgForSeq(seq, sm)
|
||||
}
|
||||
|
||||
// loadLast will load the last message for a subject. Subject should be non empty and not ">".
|
||||
func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err error) {
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
|
||||
wc := subjectHasWildcard(subj)
|
||||
// Walk blocks backwards.
|
||||
for i := len(fs.blks) - 1; i >= 0; i-- {
|
||||
mb := fs.blks[i]
|
||||
if mb == nil {
|
||||
continue
|
||||
}
|
||||
mb.mu.Lock()
|
||||
_, _, l := mb.filteredPendingLocked(subj, wc, mb.first.seq)
|
||||
if l > 0 {
|
||||
if mb.cacheNotLoaded() {
|
||||
if err := mb.loadMsgsWithLock(); err != nil {
|
||||
mb.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
lsm, err = mb.cacheLookup(l, sm)
|
||||
}
|
||||
mb.mu.Unlock()
|
||||
if l > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return lsm, err
|
||||
}
|
||||
|
||||
// LoadLastMsg will return the last message we have that matches a given subject.
|
||||
// The subject can be a wildcard.
|
||||
func (fs *fileStore) LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error) {
|
||||
if subject == _EMPTY_ || subject == fwcs {
|
||||
sm, _ = fs.msgForSeq(fs.lastSeq(), sm)
|
||||
} else if ss := fs.FilteredState(1, subject); ss.Msgs > 0 {
|
||||
sm, _ = fs.msgForSeq(ss.Last, sm)
|
||||
} else {
|
||||
sm = nil
|
||||
sm, _ = fs.loadLast(subject, sm)
|
||||
}
|
||||
if sm == nil {
|
||||
return nil, ErrStoreMsgNotFound
|
||||
|
||||
Reference in New Issue
Block a user