MQTT: Removed the use of tkDomain from retained msg subjects

This commit is contained in:
Lev Brouk
2023-08-28 03:46:20 -07:00
parent 961c0d7187
commit b9ea85b5d0

View File

@@ -1154,7 +1154,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Create the stream for retained messages.
cfg := &StreamConfig{
Name: mqttRetainedMsgsStreamName,
Subjects: []string{mqttRetainedMsgsStreamSubject + as.domainTk + ">"},
Subjects: []string{mqttRetainedMsgsStreamSubject + ">"},
Storage: FileStorage,
Retention: LimitsPolicy,
Replicas: replicas,
@@ -1176,7 +1176,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
// Doing this check outside of above if/else due to possible race when
// creating the stream.
wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">"
wantedSubj := mqttRetainedMsgsStreamSubject + ">"
if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj {
// Update only the Subjects at this stage, not MaxMsgsPer yet.
si.Config.Subjects = []string{wantedSubj}
@@ -1236,7 +1236,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
Stream: mqttRetainedMsgsStreamName,
Config: ConsumerConfig{
Durable: rmDurName,
FilterSubject: mqttRetainedMsgsStreamSubject + as.domainTk + ">",
FilterSubject: mqttRetainedMsgsStreamSubject + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
AckPolicy: AckNone,
@@ -2188,7 +2188,7 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms
return
}
for _, sub := range result.psubs {
subj := mqttRetainedMsgsStreamSubject + as.domainTk + string(sub.subject)
subj := mqttRetainedMsgsStreamSubject + string(sub.subject)
jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, subj)
if err != nil || jsm == nil {
continue
@@ -2353,7 +2353,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
continue
}
// Store the message again, this time with the new per-key subject.
subject := mqttRetainedMsgsStreamSubject + as.domainTk + rmsg.Subject
subject := mqttRetainedMsgsStreamSubject + rmsg.Subject
if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil {
log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err)
errors++
@@ -3210,7 +3210,7 @@ func (c *client) mqttHandlePubRetain() {
Source: c.opts.Username,
}
rmBytes, _ := json.Marshal(rm)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+asm.domainTk+key, -1, rmBytes)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+key, -1, rmBytes)
if err == nil {
// Update the new sequence
rf := &mqttRetainedMsgRef{