diff --git a/server/mqtt.go b/server/mqtt.go index 018d3577..308839df 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -221,13 +221,13 @@ type mqttSessionManager struct { type mqttAccountSessionManager struct { mu sync.RWMutex - sessions map[string]*mqttSession // key is MQTT client ID - sessByHash map[string]*mqttSession // key is MQTT client ID hash - sessLocked map[string]struct{} // key is MQTT client ID and indicate that a session can not be taken by a new client at this time - flappers map[string]int64 // When connection connects with client ID already in use - flapTimer *time.Timer // Timer to perform some cleanup of the flappers map - sl *Sublist // sublist allowing to find retained messages for given subscription - retmsgs map[string]*mqttRetainedMsg // retained messages + sessions map[string]*mqttSession // key is MQTT client ID + sessByHash map[string]*mqttSession // key is MQTT client ID hash + sessLocked map[string]struct{} // key is MQTT client ID and indicate that a session can not be taken by a new client at this time + flappers map[string]int64 // When connection connects with client ID already in use + flapTimer *time.Timer // Timer to perform some cleanup of the flappers map + sl *Sublist // sublist allowing to find retained messages for given subscription + retmsgs map[string]*mqttRetainedMsgRef // retained messages jsa mqttJSA rrmLastSeq uint64 // Restore retained messages expected last sequence rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded @@ -293,8 +293,9 @@ type mqttRetainedMsg struct { Msg []byte `json:"msg,omitempty"` Flags byte `json:"flags,omitempty"` Source string `json:"source,omitempty"` +} - // non exported +type mqttRetainedMsgRef struct { sseq uint64 floor uint64 sub *subscription @@ -1604,8 +1605,9 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie seq, _, _ := ackReplyInfo(reply) // Handle this retained message - rm.sseq = seq - as.handleRetainedMsg(rm.Subject, rm) + rf := &mqttRetainedMsgRef{} + rf.sseq = seq + as.handleRetainedMsg(rm.Subject, rf) // If we were recovering (lastSeq > 0), then check if we are done. if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq { @@ -1873,11 +1875,11 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc // or 0 if the record was added instead of updated. // // Lock not held on entry. -func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsg) uint64 { +func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) uint64 { as.mu.Lock() defer as.mu.Unlock() if as.retmsgs == nil { - as.retmsgs = make(map[string]*mqttRetainedMsg) + as.retmsgs = make(map[string]*mqttRetainedMsgRef) as.sl = NewSublistWithCache() } else { // Check if we already had one. If so, update the existing one. @@ -1887,11 +1889,6 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai if rm.sseq <= erm.sseq || rm.sseq <= erm.floor { return 0 } - // Update the existing retained message record with the new rm record. - erm.Origin = rm.Origin - erm.Msg = rm.Msg - erm.Flags = rm.Flags - erm.Source = rm.Source // Capture existing sequence number so we can return it as the old sequence. oldSeq := erm.sseq erm.sseq = rm.sseq @@ -1922,7 +1919,7 @@ func (as *mqttAccountSessionManager) handleRetainedMsgDel(subject string, seq ui var seqToRemove uint64 as.mu.Lock() if as.retmsgs == nil { - as.retmsgs = make(map[string]*mqttRetainedMsg) + as.retmsgs = make(map[string]*mqttRetainedMsgRef) as.sl = NewSublistWithCache() } if erm, ok := as.retmsgs[subject]; ok { @@ -1941,8 +1938,8 @@ func (as *mqttAccountSessionManager) handleRetainedMsgDel(subject string, seq ui seqToRemove = erm.sseq } } else if seq != 0 { - rm := &mqttRetainedMsg{Subject: subject, floor: seq} - as.retmsgs[subject] = rm + rf := &mqttRetainedMsgRef{floor: seq} + as.retmsgs[subject] = rf } as.mu.Unlock() return seqToRemove @@ -2193,11 +2190,16 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms return } for _, sub := range result.psubs { - // Since this is a reverse match, the subscription objects here - // contain literals corresponding to the published subjects. - if rm, ok := as.retmsgs[string(sub.subject)]; ok { - *rms = append(*rms, rm) + subj := mqttRetainedMsgsStreamSubject + as.domainTk + string(sub.subject) + jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, subj) + if err != nil || jsm == nil { + continue } + var rm mqttRetainedMsg + if err := json.Unmarshal(jsm.Data, &rm); err != nil { + continue + } + *rms = append(*rms, &rm) } } @@ -3213,9 +3215,11 @@ func (c *client) mqttHandlePubRetain() { smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+asm.domainTk+key, -1, rmBytes) if err == nil { // Update the new sequence - rm.sseq = smr.Sequence + rf := &mqttRetainedMsgRef{ + sseq: smr.Sequence, + } // Add/update the map - oldSeq := asm.handleRetainedMsg(key, rm) + oldSeq := asm.handleRetainedMsg(key, rf) // If this is a new message on the same subject, delete the old one. if oldSeq != 0 { asm.deleteRetainedMsg(oldSeq) @@ -3256,46 +3260,48 @@ func (s *Server) mqttCheckPubRetainedPerms() { } s.mu.Unlock() - sm.mu.RLock() - defer sm.mu.RUnlock() + /* + sm.mu.RLock() + defer sm.mu.RUnlock() - for _, asm := range sm.sessions { - perms := map[string]*perm{} - deletes := map[string]uint64{} - asm.mu.Lock() - for subject, rm := range asm.retmsgs { - if rm.Source == _EMPTY_ { - continue - } - // Lookup source from global users. - u := users[rm.Source] - if u != nil { - p, ok := perms[rm.Source] - if !ok { - p = generatePubPerms(u.Permissions) - perms[rm.Source] = p + for _, asm := range sm.sessions { + perms := map[string]*perm{} + deletes := map[string]uint64{} + asm.mu.Lock() + for subject, rm := range asm.retmsgs { + if rm.Source == _EMPTY_ { + continue } - // If there is permission and no longer allowed to publish in - // the subject, remove the publish retained message from the map. - if p != nil && !pubAllowed(p, subject) { - u = nil + // Lookup source from global users. + u := users[rm.Source] + if u != nil { + p, ok := perms[rm.Source] + if !ok { + p = generatePubPerms(u.Permissions) + perms[rm.Source] = p + } + // If there is permission and no longer allowed to publish in + // the subject, remove the publish retained message from the map. + if p != nil && !pubAllowed(p, subject) { + u = nil + } + } + + // Not present or permissions have changed such that the source can't + // publish on that subject anymore: remove it from the map. + if u == nil { + delete(asm.retmsgs, subject) + asm.sl.Remove(rm.sub) + deletes[subject] = rm.sseq } } - - // Not present or permissions have changed such that the source can't - // publish on that subject anymore: remove it from the map. - if u == nil { - delete(asm.retmsgs, subject) - asm.sl.Remove(rm.sub) - deletes[subject] = rm.sseq + asm.mu.Unlock() + for subject, seq := range deletes { + asm.deleteRetainedMsg(seq) + asm.notifyRetainedMsgDeleted(subject, seq) } } - asm.mu.Unlock() - for subject, seq := range deletes { - asm.deleteRetainedMsg(seq) - asm.notifyRetainedMsgDeleted(subject, seq) - } - } + */ } // Helper to generate only pub permissions from a Permissions object diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 31f59da9..a4e793eb 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -2975,8 +2975,8 @@ func TestMQTTRetainedMsgNetworkUpdates(t *testing.T) { t.Run(test.subject, func(t *testing.T) { for _, a := range test.order { if a.add { - rm := &mqttRetainedMsg{sseq: a.seq} - asm.handleRetainedMsg(test.subject, rm) + rf := &mqttRetainedMsgRef{sseq: a.seq} + asm.handleRetainedMsg(test.subject, rf) } else { asm.handleRetainedMsgDel(test.subject, a.seq) } @@ -2988,8 +2988,8 @@ func TestMQTTRetainedMsgNetworkUpdates(t *testing.T) { for _, subject := range []string{"foo.5", "foo.6"} { t.Run("clear_"+subject, func(t *testing.T) { // Now add a new message, which should clear the floor. - rm := &mqttRetainedMsg{sseq: 3} - asm.handleRetainedMsg(subject, rm) + rf := &mqttRetainedMsgRef{sseq: 3} + asm.handleRetainedMsg(subject, rf) check(t, subject, true, 3, 0) // Now do a non network delete and make sure it is gone. asm.handleRetainedMsgDel(subject, 0)