From 3b07f4342eecca797f8dee347df8a62df9971529 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 13 Jun 2023 14:33:37 +0100 Subject: [PATCH] Remove unnecessary return, refactor permission check so that it doesn't hold locks longer than needed Signed-off-by: Neil Twigg --- server/mqtt.go | 56 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index d79a33eb..c0b77ac8 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -23,6 +23,7 @@ import ( "io" "net" "net/http" + "sort" "strconv" "strings" "sync" @@ -1875,7 +1876,7 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc // or 0 if the record was added instead of updated. // // Lock not held on entry. -func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) uint64 { +func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) { as.mu.Lock() defer as.mu.Unlock() if as.retmsgs == nil { @@ -1887,10 +1888,9 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai // If the new sequence is below the floor or the existing one, // then ignore the new one. if rm.sseq <= erm.sseq || rm.sseq <= erm.floor { - return 0 + return } // Capture existing sequence number so we can return it as the old sequence. - oldSeq := erm.sseq erm.sseq = rm.sseq // Clear the floor erm.floor = 0 @@ -1900,13 +1900,11 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai erm.sub = &subscription{subject: []byte(key)} as.sl.Insert(erm.sub) } - return oldSeq } } rm.sub = &subscription{subject: []byte(key)} as.retmsgs[key] = rm as.sl.Insert(rm.sub) - return 0 } // Removes the retained message for the given `subject` if present, and returns the @@ -3256,15 +3254,43 @@ func (s *Server) mqttCheckPubRetainedPerms() { } s.mu.Unlock() + // First get a list of all of the sessions. sm.mu.RLock() - defer sm.mu.RUnlock() - + asms := make([]*mqttAccountSessionManager, 0, len(sm.sessions)) for _, asm := range sm.sessions { + asms = append(asms, asm) + } + sm.mu.RUnlock() + + type retainedMsg struct { + subj string + rmsg *mqttRetainedMsgRef + } + + // For each session we will obtain a list of retained messages. + var _rms [128]retainedMsg + rms := _rms[:0] + for _, asm := range asms { + // Get all of the retained messages. Then we will sort them so + // that they are in sequence order, which should help the file + // store to not have to load out-of-order blocks so often. + asm.mu.Lock() + rms = rms[:0] // reuse slice + for subj, rf := range asm.retmsgs { + rms = append(rms, retainedMsg{ + subj: subj, + rmsg: rf, + }) + } + asm.mu.Unlock() + sort.Slice(rms, func(i, j int) bool { + return rms[i].rmsg.sseq < rms[j].rmsg.sseq + }) + perms := map[string]*perm{} deletes := map[string]uint64{} - asm.mu.Lock() - for subject, rf := range asm.retmsgs { - jsm, err := asm.jsa.loadMsg(mqttRetainedMsgsStreamName, rf.sseq) + for _, rf := range rms { + jsm, err := asm.jsa.loadMsg(mqttRetainedMsgsStreamName, rf.rmsg.sseq) if err != nil || jsm == nil { continue } @@ -3285,7 +3311,7 @@ func (s *Server) mqttCheckPubRetainedPerms() { } // If there is permission and no longer allowed to publish in // the subject, remove the publish retained message from the map. - if p != nil && !pubAllowed(p, subject) { + if p != nil && !pubAllowed(p, rf.subj) { u = nil } } @@ -3293,12 +3319,12 @@ func (s *Server) mqttCheckPubRetainedPerms() { // Not present or permissions have changed such that the source can't // publish on that subject anymore: remove it from the map. if u == nil { - delete(asm.retmsgs, subject) - asm.sl.Remove(rf.sub) - deletes[subject] = rf.sseq + delete(asm.retmsgs, rf.subj) + asm.sl.Remove(rf.rmsg.sub) + deletes[rf.subj] = rf.rmsg.sseq } } - asm.mu.Unlock() + for subject, seq := range deletes { asm.deleteRetainedMsg(seq) asm.notifyRetainedMsgDeleted(subject, seq)