From 007565ffd02a9ea20ebc503200848f537e8994a5 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 31 May 2023 11:31:52 +0100 Subject: [PATCH] Migrate old retained messages to new subjects Signed-off-by: Neil Twigg --- server/mqtt.go | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/server/mqtt.go b/server/mqtt.go index 90bebdfb..7c28ff5e 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -145,6 +145,7 @@ const ( mqttJSAIdTokenPos = 3 mqttJSATokenPos = 4 mqttJSAStreamCreate = "SC" + mqttJSAStreamUpdate = "SU" mqttJSAStreamLookup = "SL" mqttJSAStreamDel = "SD" mqttJSAConsumerCreate = "CC" @@ -1169,7 +1170,17 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc if err != nil { return nil, err } + as.transferRetainedToPerKeySubjectStream(s) } + } else { + wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">" + if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj { + si.Config.Subjects = []string{wantedSubj} + if _, err := jsa.updateStream(&si.Config); err != nil { + return nil, fmt.Errorf("failed to update stream config: %w", err) + } + } + as.transferRetainedToPerKeySubjectStream(s) } var lastSeq uint64 @@ -1354,6 +1365,19 @@ func (jsa *mqttJSA) createStream(cfg *StreamConfig) (*StreamInfo, bool, error) { return scr.StreamInfo, scr.DidCreate, scr.ToError() } +func (jsa *mqttJSA) updateStream(cfg *StreamConfig) (*StreamInfo, error) { + cfgb, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + scri, err := jsa.newRequest(mqttJSAStreamUpdate, fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), 0, cfgb) + if err != nil { + return nil, err + } + scr := scri.(*JSApiStreamUpdateResponse) + return scr.StreamInfo, scr.ToError() +} + func (jsa *mqttJSA) lookupStream(name string) (*StreamInfo, error) { slri, err := jsa.newRequest(mqttJSAStreamLookup, fmt.Sprintf(JSApiStreamInfoT, name), 0, nil) if err != nil { @@ -1386,6 +1410,20 @@ func (jsa *mqttJSA) loadLastMsgFor(streamName string, subject string) (*StoredMs return lmr.Message, lmr.ToError() } +func (jsa *mqttJSA) loadNextMsgFor(streamName string, subject string) (*StoredMsg, error) { + mreq := &JSApiMsgGetRequest{NextFor: subject} + req, err := json.Marshal(mreq) + if err != nil { + return nil, err + } + lmri, err := jsa.newRequest(mqttJSAMsgLoad, fmt.Sprintf(JSApiMsgGetT, streamName), 0, req) + if err != nil { + return nil, err + } + lmr := lmri.(*JSApiMsgGetResponse) + return lmr.Message, lmr.ToError() +} + func (jsa *mqttJSA) loadMsg(streamName string, seq uint64) (*StoredMsg, error) { mreq := &JSApiMsgGetRequest{Seq: seq} req, err := json.Marshal(mreq) @@ -1465,6 +1503,12 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl resp.Error = NewJSInvalidJSONError() } ch <- resp + case mqttJSAStreamUpdate: + var resp = &JSApiStreamUpdateResponse{} + if err := json.Unmarshal(msg, resp); err != nil { + resp.Error = NewJSInvalidJSONError() + } + ch <- resp case mqttJSAStreamLookup: var resp = &JSApiStreamInfoResponse{} if err := json.Unmarshal(msg, &resp); err != nil { @@ -2261,6 +2305,57 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve retry = false } +func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) { + jsa := &as.jsa + var count, errors int + + // Set retry to true, will be set to false on success. + defer func() { + if errors > 0 { + next := mqttDefaultTransferRetry + log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next) + time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) }) + } else if count > 0 { + log.Noticef("Transfer of %d MQTT retained messages done!", count) + } + }() + + for { + // Try and look up messages on the original undivided "$MQTT.rmsgs" subject. + // If nothing is returned here, we assume to have migrated all old messages. + smsg, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs") + if err != nil { + if IsNatsErr(err, JSNoMessageFoundErr) { + // We've ran out of messages to transfer so give up. + break + } + log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err) + return + } + // Unmarshal the message so that we can obtain the subject name. + var rmsg mqttRetainedMsg + if err := json.Unmarshal(smsg.Data, &rmsg); err != nil { + log.Warnf(" Unable to unmarshal retained message with sequence %d, skipping", smsg.Sequence) + errors++ + continue + } + // Store the message again, this time with the new per-key subject. + subject := mqttRetainedMsgsStreamSubject + as.domainTk + rmsg.Subject + if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil { + log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err) + errors++ + continue + } + // Delete the original message. + if err := jsa.deleteMsg(mqttRetainedMsgsStreamName, smsg.Sequence, true); err != nil { + log.Errorf(" Unable to clean up the retained message with sequence %d: %v", smsg.Sequence, err) + errors++ + continue + } + count++ + } +} + ////////////////////////////////////////////////////////////////////////////// // // MQTT session related functions