mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Don't keep MQTT retained message content in memory
Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
126
server/mqtt.go
126
server/mqtt.go
@@ -221,13 +221,13 @@ type mqttSessionManager struct {
|
||||
|
||||
type mqttAccountSessionManager struct {
|
||||
mu sync.RWMutex
|
||||
sessions map[string]*mqttSession // key is MQTT client ID
|
||||
sessByHash map[string]*mqttSession // key is MQTT client ID hash
|
||||
sessLocked map[string]struct{} // key is MQTT client ID and indicate that a session can not be taken by a new client at this time
|
||||
flappers map[string]int64 // When connection connects with client ID already in use
|
||||
flapTimer *time.Timer // Timer to perform some cleanup of the flappers map
|
||||
sl *Sublist // sublist allowing to find retained messages for given subscription
|
||||
retmsgs map[string]*mqttRetainedMsg // retained messages
|
||||
sessions map[string]*mqttSession // key is MQTT client ID
|
||||
sessByHash map[string]*mqttSession // key is MQTT client ID hash
|
||||
sessLocked map[string]struct{} // key is MQTT client ID and indicate that a session can not be taken by a new client at this time
|
||||
flappers map[string]int64 // When connection connects with client ID already in use
|
||||
flapTimer *time.Timer // Timer to perform some cleanup of the flappers map
|
||||
sl *Sublist // sublist allowing to find retained messages for given subscription
|
||||
retmsgs map[string]*mqttRetainedMsgRef // retained messages
|
||||
jsa mqttJSA
|
||||
rrmLastSeq uint64 // Restore retained messages expected last sequence
|
||||
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
|
||||
@@ -293,8 +293,9 @@ type mqttRetainedMsg struct {
|
||||
Msg []byte `json:"msg,omitempty"`
|
||||
Flags byte `json:"flags,omitempty"`
|
||||
Source string `json:"source,omitempty"`
|
||||
}
|
||||
|
||||
// non exported
|
||||
type mqttRetainedMsgRef struct {
|
||||
sseq uint64
|
||||
floor uint64
|
||||
sub *subscription
|
||||
@@ -1604,8 +1605,9 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
|
||||
seq, _, _ := ackReplyInfo(reply)
|
||||
|
||||
// Handle this retained message
|
||||
rm.sseq = seq
|
||||
as.handleRetainedMsg(rm.Subject, rm)
|
||||
rf := &mqttRetainedMsgRef{}
|
||||
rf.sseq = seq
|
||||
as.handleRetainedMsg(rm.Subject, rf)
|
||||
|
||||
// If we were recovering (lastSeq > 0), then check if we are done.
|
||||
if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq {
|
||||
@@ -1873,11 +1875,11 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc
|
||||
// or 0 if the record was added instead of updated.
|
||||
//
|
||||
// Lock not held on entry.
|
||||
func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsg) uint64 {
|
||||
func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) uint64 {
|
||||
as.mu.Lock()
|
||||
defer as.mu.Unlock()
|
||||
if as.retmsgs == nil {
|
||||
as.retmsgs = make(map[string]*mqttRetainedMsg)
|
||||
as.retmsgs = make(map[string]*mqttRetainedMsgRef)
|
||||
as.sl = NewSublistWithCache()
|
||||
} else {
|
||||
// Check if we already had one. If so, update the existing one.
|
||||
@@ -1887,11 +1889,6 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai
|
||||
if rm.sseq <= erm.sseq || rm.sseq <= erm.floor {
|
||||
return 0
|
||||
}
|
||||
// Update the existing retained message record with the new rm record.
|
||||
erm.Origin = rm.Origin
|
||||
erm.Msg = rm.Msg
|
||||
erm.Flags = rm.Flags
|
||||
erm.Source = rm.Source
|
||||
// Capture existing sequence number so we can return it as the old sequence.
|
||||
oldSeq := erm.sseq
|
||||
erm.sseq = rm.sseq
|
||||
@@ -1922,7 +1919,7 @@ func (as *mqttAccountSessionManager) handleRetainedMsgDel(subject string, seq ui
|
||||
var seqToRemove uint64
|
||||
as.mu.Lock()
|
||||
if as.retmsgs == nil {
|
||||
as.retmsgs = make(map[string]*mqttRetainedMsg)
|
||||
as.retmsgs = make(map[string]*mqttRetainedMsgRef)
|
||||
as.sl = NewSublistWithCache()
|
||||
}
|
||||
if erm, ok := as.retmsgs[subject]; ok {
|
||||
@@ -1941,8 +1938,8 @@ func (as *mqttAccountSessionManager) handleRetainedMsgDel(subject string, seq ui
|
||||
seqToRemove = erm.sseq
|
||||
}
|
||||
} else if seq != 0 {
|
||||
rm := &mqttRetainedMsg{Subject: subject, floor: seq}
|
||||
as.retmsgs[subject] = rm
|
||||
rf := &mqttRetainedMsgRef{floor: seq}
|
||||
as.retmsgs[subject] = rf
|
||||
}
|
||||
as.mu.Unlock()
|
||||
return seqToRemove
|
||||
@@ -2193,11 +2190,16 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms
|
||||
return
|
||||
}
|
||||
for _, sub := range result.psubs {
|
||||
// Since this is a reverse match, the subscription objects here
|
||||
// contain literals corresponding to the published subjects.
|
||||
if rm, ok := as.retmsgs[string(sub.subject)]; ok {
|
||||
*rms = append(*rms, rm)
|
||||
subj := mqttRetainedMsgsStreamSubject + as.domainTk + string(sub.subject)
|
||||
jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, subj)
|
||||
if err != nil || jsm == nil {
|
||||
continue
|
||||
}
|
||||
var rm mqttRetainedMsg
|
||||
if err := json.Unmarshal(jsm.Data, &rm); err != nil {
|
||||
continue
|
||||
}
|
||||
*rms = append(*rms, &rm)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3213,9 +3215,11 @@ func (c *client) mqttHandlePubRetain() {
|
||||
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+asm.domainTk+key, -1, rmBytes)
|
||||
if err == nil {
|
||||
// Update the new sequence
|
||||
rm.sseq = smr.Sequence
|
||||
rf := &mqttRetainedMsgRef{
|
||||
sseq: smr.Sequence,
|
||||
}
|
||||
// Add/update the map
|
||||
oldSeq := asm.handleRetainedMsg(key, rm)
|
||||
oldSeq := asm.handleRetainedMsg(key, rf)
|
||||
// If this is a new message on the same subject, delete the old one.
|
||||
if oldSeq != 0 {
|
||||
asm.deleteRetainedMsg(oldSeq)
|
||||
@@ -3256,46 +3260,48 @@ func (s *Server) mqttCheckPubRetainedPerms() {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
sm.mu.RLock()
|
||||
defer sm.mu.RUnlock()
|
||||
/*
|
||||
sm.mu.RLock()
|
||||
defer sm.mu.RUnlock()
|
||||
|
||||
for _, asm := range sm.sessions {
|
||||
perms := map[string]*perm{}
|
||||
deletes := map[string]uint64{}
|
||||
asm.mu.Lock()
|
||||
for subject, rm := range asm.retmsgs {
|
||||
if rm.Source == _EMPTY_ {
|
||||
continue
|
||||
}
|
||||
// Lookup source from global users.
|
||||
u := users[rm.Source]
|
||||
if u != nil {
|
||||
p, ok := perms[rm.Source]
|
||||
if !ok {
|
||||
p = generatePubPerms(u.Permissions)
|
||||
perms[rm.Source] = p
|
||||
for _, asm := range sm.sessions {
|
||||
perms := map[string]*perm{}
|
||||
deletes := map[string]uint64{}
|
||||
asm.mu.Lock()
|
||||
for subject, rm := range asm.retmsgs {
|
||||
if rm.Source == _EMPTY_ {
|
||||
continue
|
||||
}
|
||||
// If there is permission and no longer allowed to publish in
|
||||
// the subject, remove the publish retained message from the map.
|
||||
if p != nil && !pubAllowed(p, subject) {
|
||||
u = nil
|
||||
// Lookup source from global users.
|
||||
u := users[rm.Source]
|
||||
if u != nil {
|
||||
p, ok := perms[rm.Source]
|
||||
if !ok {
|
||||
p = generatePubPerms(u.Permissions)
|
||||
perms[rm.Source] = p
|
||||
}
|
||||
// If there is permission and no longer allowed to publish in
|
||||
// the subject, remove the publish retained message from the map.
|
||||
if p != nil && !pubAllowed(p, subject) {
|
||||
u = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Not present or permissions have changed such that the source can't
|
||||
// publish on that subject anymore: remove it from the map.
|
||||
if u == nil {
|
||||
delete(asm.retmsgs, subject)
|
||||
asm.sl.Remove(rm.sub)
|
||||
deletes[subject] = rm.sseq
|
||||
}
|
||||
}
|
||||
|
||||
// Not present or permissions have changed such that the source can't
|
||||
// publish on that subject anymore: remove it from the map.
|
||||
if u == nil {
|
||||
delete(asm.retmsgs, subject)
|
||||
asm.sl.Remove(rm.sub)
|
||||
deletes[subject] = rm.sseq
|
||||
asm.mu.Unlock()
|
||||
for subject, seq := range deletes {
|
||||
asm.deleteRetainedMsg(seq)
|
||||
asm.notifyRetainedMsgDeleted(subject, seq)
|
||||
}
|
||||
}
|
||||
asm.mu.Unlock()
|
||||
for subject, seq := range deletes {
|
||||
asm.deleteRetainedMsg(seq)
|
||||
asm.notifyRetainedMsgDeleted(subject, seq)
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
// Helper to generate only pub permissions from a Permissions object
|
||||
|
||||
@@ -2975,8 +2975,8 @@ func TestMQTTRetainedMsgNetworkUpdates(t *testing.T) {
|
||||
t.Run(test.subject, func(t *testing.T) {
|
||||
for _, a := range test.order {
|
||||
if a.add {
|
||||
rm := &mqttRetainedMsg{sseq: a.seq}
|
||||
asm.handleRetainedMsg(test.subject, rm)
|
||||
rf := &mqttRetainedMsgRef{sseq: a.seq}
|
||||
asm.handleRetainedMsg(test.subject, rf)
|
||||
} else {
|
||||
asm.handleRetainedMsgDel(test.subject, a.seq)
|
||||
}
|
||||
@@ -2988,8 +2988,8 @@ func TestMQTTRetainedMsgNetworkUpdates(t *testing.T) {
|
||||
for _, subject := range []string{"foo.5", "foo.6"} {
|
||||
t.Run("clear_"+subject, func(t *testing.T) {
|
||||
// Now add a new message, which should clear the floor.
|
||||
rm := &mqttRetainedMsg{sseq: 3}
|
||||
asm.handleRetainedMsg(subject, rm)
|
||||
rf := &mqttRetainedMsgRef{sseq: 3}
|
||||
asm.handleRetainedMsg(subject, rf)
|
||||
check(t, subject, true, 3, 0)
|
||||
// Now do a non network delete and make sure it is gone.
|
||||
asm.handleRetainedMsgDel(subject, 0)
|
||||
|
||||
Reference in New Issue
Block a user