diff --git a/server/mqtt.go b/server/mqtt.go index 6286fccb..891f0733 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1299,7 +1299,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc // Create the stream for retained messages. cfg := &StreamConfig{ Name: mqttRetainedMsgsStreamName, - Subjects: []string{mqttRetainedMsgsStreamSubject + as.domainTk + ">"}, + Subjects: []string{mqttRetainedMsgsStreamSubject + ">"}, Storage: FileStorage, Retention: LimitsPolicy, Replicas: replicas, @@ -1321,7 +1321,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc } // Doing this check outside of above if/else due to possible race when // creating the stream. - wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">" + wantedSubj := mqttRetainedMsgsStreamSubject + ">" if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj { // Update only the Subjects at this stage, not MaxMsgsPer yet. si.Config.Subjects = []string{wantedSubj} @@ -1381,7 +1381,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc Stream: mqttRetainedMsgsStreamName, Config: ConsumerConfig{ Durable: rmDurName, - FilterSubject: mqttRetainedMsgsStreamSubject + as.domainTk + ">", + FilterSubject: mqttRetainedMsgsStreamSubject + ">", DeliverSubject: rmsubj, ReplayPolicy: ReplayInstant, AckPolicy: AckNone, @@ -2364,7 +2364,7 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms return } for _, sub := range result.psubs { - subj := mqttRetainedMsgsStreamSubject + as.domainTk + string(sub.subject) + subj := mqttRetainedMsgsStreamSubject + string(sub.subject) jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, subj) if err != nil || jsm == nil { continue @@ -2530,7 +2530,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log * continue } // Store the message again, this time with the new per-key subject. - subject := mqttRetainedMsgsStreamSubject + as.domainTk + rmsg.Subject + subject := mqttRetainedMsgsStreamSubject + rmsg.Subject if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil { log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err) errors++ @@ -3723,7 +3723,7 @@ func (c *client) mqttHandlePubRetain() { Source: c.opts.Username, } rmBytes, _ := json.Marshal(rm) - smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+asm.domainTk+key, -1, rmBytes) + smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+key, -1, rmBytes) if err == nil { // Update the new sequence rf := &mqttRetainedMsgRef{