diff --git a/server/filestore.go b/server/filestore.go index f3a30662..f804f4a5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1467,6 +1467,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { wc := subjectHasWildcard(subj) // Are we tracking multiple subject states? if fs.tms { + fs.mu.RLock() for _, mb := range fs.blks { // Skip blocks that are less than our starting sequence. if sseq > atomic.LoadUint64(&mb.last.seq) { @@ -1481,6 +1482,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { ss.Last = l } } + fs.mu.RUnlock() } else { // Fallback to linear scan. var smv StoreMsg @@ -3799,15 +3801,44 @@ func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) { return fs.msgForSeq(seq, sm) } +// loadLast will load the last message for a subject. Subject should be non empty and not ">". +func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + wc := subjectHasWildcard(subj) + // Walk blocks backwards. + for i := len(fs.blks) - 1; i >= 0; i-- { + mb := fs.blks[i] + if mb == nil { + continue + } + mb.mu.Lock() + _, _, l := mb.filteredPendingLocked(subj, wc, mb.first.seq) + if l > 0 { + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + mb.mu.Unlock() + return nil, err + } + } + lsm, err = mb.cacheLookup(l, sm) + } + mb.mu.Unlock() + if l > 0 { + break + } + } + return lsm, err +} + // LoadLastMsg will return the last message we have that matches a given subject. // The subject can be a wildcard. func (fs *fileStore) LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error) { if subject == _EMPTY_ || subject == fwcs { sm, _ = fs.msgForSeq(fs.lastSeq(), sm) - } else if ss := fs.FilteredState(1, subject); ss.Msgs > 0 { - sm, _ = fs.msgForSeq(ss.Last, sm) } else { - sm = nil + sm, _ = fs.loadLast(subject, sm) } if sm == nil { return nil, ErrStoreMsgNotFound