Use write lock for memory store filtered state

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-09-07 12:47:57 +01:00
parent 11f0ea99a4
commit e19e97b0a6

View File

@@ -299,8 +299,11 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()
// This needs to be a write lock, as filteredStateLocked can
// mutate the per-subject state.
ms.mu.Lock()
defer ms.mu.Unlock()
return ms.filteredStateLocked(sseq, subj, false)
}
@@ -512,8 +515,10 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
// NumPending will return the number of pending messages matching the filter subject starting at sequence.
func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) {
ms.mu.RLock()
defer ms.mu.RUnlock()
// This needs to be a write lock, as filteredStateLocked can
// mutate the per-subject state.
ms.mu.Lock()
defer ms.mu.Unlock()
ss := ms.filteredStateLocked(sseq, filter, lastPerSubject)
return ss.Msgs, ms.state.LastSeq
@@ -880,8 +885,10 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error
var sm *StoreMsg
var ok bool
ms.mu.RLock()
defer ms.mu.RUnlock()
// This needs to be a write lock, as filteredStateLocked can
// mutate the per-subject state.
ms.mu.Lock()
defer ms.mu.Unlock()
if subject == _EMPTY_ || subject == fwcs {
sm, ok = ms.msgs[ms.state.LastSeq]