Take the account session lock when deleting from map

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-06-13 16:36:37 +01:00
parent 3b07f4342e
commit afe7f485ea

View File

@@ -3274,7 +3274,7 @@ func (s *Server) mqttCheckPubRetainedPerms() {
// Get all of the retained messages. Then we will sort them so
// that they are in sequence order, which should help the file
// store to not have to load out-of-order blocks so often.
asm.mu.Lock()
asm.mu.RLock()
rms = rms[:0] // reuse slice
for subj, rf := range asm.retmsgs {
rms = append(rms, retainedMsg{
@@ -3282,7 +3282,7 @@ func (s *Server) mqttCheckPubRetainedPerms() {
rmsg: rf,
})
}
asm.mu.Unlock()
asm.mu.RUnlock()
sort.Slice(rms, func(i, j int) bool {
return rms[i].rmsg.sseq < rms[j].rmsg.sseq
})
@@ -3319,8 +3319,10 @@ func (s *Server) mqttCheckPubRetainedPerms() {
// Not present or permissions have changed such that the source can't
// publish on that subject anymore: remove it from the map.
if u == nil {
asm.mu.Lock()
delete(asm.retmsgs, rf.subj)
asm.sl.Remove(rf.rmsg.sub)
asm.mu.Unlock()
deletes[rf.subj] = rf.rmsg.sseq
}
}