From e19e97b0a617a1e62a48c86c25c8511dfd29d566 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 7 Sep 2023 12:47:57 +0100 Subject: [PATCH] Use write lock for memory store filtered state Signed-off-by: Neil Twigg --- server/memstore.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/server/memstore.go b/server/memstore.go index 0547fee6..8c8cae87 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -299,8 +299,11 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { // FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence. func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { - ms.mu.RLock() - defer ms.mu.RUnlock() + // This needs to be a write lock, as filteredStateLocked can + // mutate the per-subject state. + ms.mu.Lock() + defer ms.mu.Unlock() + return ms.filteredStateLocked(sseq, subj, false) } @@ -512,8 +515,10 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 { // NumPending will return the number of pending messages matching the filter subject starting at sequence. func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) { - ms.mu.RLock() - defer ms.mu.RUnlock() + // This needs to be a write lock, as filteredStateLocked can + // mutate the per-subject state. + ms.mu.Lock() + defer ms.mu.Unlock() ss := ms.filteredStateLocked(sseq, filter, lastPerSubject) return ss.Msgs, ms.state.LastSeq @@ -880,8 +885,10 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error var sm *StoreMsg var ok bool - ms.mu.RLock() - defer ms.mu.RUnlock() + // This needs to be a write lock, as filteredStateLocked can + // mutate the per-subject state. + ms.mu.Lock() + defer ms.mu.Unlock() if subject == _EMPTY_ || subject == fwcs { sm, ok = ms.msgs[ms.state.LastSeq]