[FIXED] MQTT: Removed the use of tkDomain from retained msg subjects (#4440)

(Partially?) addresses
https://github.com/nats-io/nats-server/pull/4349#discussion_r1306576048

@kozlovic @neilalexander I did not remove the use of `domainTk` in the
session subject since it seems to have significance to it; removing it
failed `TestMQTTSessionsDifferentDomains` and I did not understand the
specifics of the issue enough. Please let me know your thoughts.
This commit is contained in:
Lev
2023-08-29 11:13:02 -07:00
committed by GitHub

View File

@@ -1299,7 +1299,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Create the stream for retained messages.
cfg := &StreamConfig{
Name: mqttRetainedMsgsStreamName,
Subjects: []string{mqttRetainedMsgsStreamSubject + as.domainTk + ">"},
Subjects: []string{mqttRetainedMsgsStreamSubject + ">"},
Storage: FileStorage,
Retention: LimitsPolicy,
Replicas: replicas,
@@ -1321,7 +1321,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
// Doing this check outside of above if/else due to possible race when
// creating the stream.
wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">"
wantedSubj := mqttRetainedMsgsStreamSubject + ">"
if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj {
// Update only the Subjects at this stage, not MaxMsgsPer yet.
si.Config.Subjects = []string{wantedSubj}
@@ -1381,7 +1381,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
Stream: mqttRetainedMsgsStreamName,
Config: ConsumerConfig{
Durable: rmDurName,
FilterSubject: mqttRetainedMsgsStreamSubject + as.domainTk + ">",
FilterSubject: mqttRetainedMsgsStreamSubject + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
AckPolicy: AckNone,
@@ -2364,7 +2364,7 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms
return
}
for _, sub := range result.psubs {
subj := mqttRetainedMsgsStreamSubject + as.domainTk + string(sub.subject)
subj := mqttRetainedMsgsStreamSubject + string(sub.subject)
jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, subj)
if err != nil || jsm == nil {
continue
@@ -2530,7 +2530,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
continue
}
// Store the message again, this time with the new per-key subject.
subject := mqttRetainedMsgsStreamSubject + as.domainTk + rmsg.Subject
subject := mqttRetainedMsgsStreamSubject + rmsg.Subject
if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil {
log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err)
errors++
@@ -3723,7 +3723,7 @@ func (c *client) mqttHandlePubRetain() {
Source: c.opts.Username,
}
rmBytes, _ := json.Marshal(rm)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+asm.domainTk+key, -1, rmBytes)
smr, err := asm.jsa.storeMsg(mqttRetainedMsgsStreamSubject+key, -1, rmBytes)
if err == nil {
// Update the new sequence
rf := &mqttRetainedMsgRef{