mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Migrate old retained messages to new subjects
Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
@@ -145,6 +145,7 @@ const (
|
||||
mqttJSAIdTokenPos = 3
|
||||
mqttJSATokenPos = 4
|
||||
mqttJSAStreamCreate = "SC"
|
||||
mqttJSAStreamUpdate = "SU"
|
||||
mqttJSAStreamLookup = "SL"
|
||||
mqttJSAStreamDel = "SD"
|
||||
mqttJSAConsumerCreate = "CC"
|
||||
@@ -1169,7 +1170,17 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
as.transferRetainedToPerKeySubjectStream(s)
|
||||
}
|
||||
} else {
|
||||
wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">"
|
||||
if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj {
|
||||
si.Config.Subjects = []string{wantedSubj}
|
||||
if _, err := jsa.updateStream(&si.Config); err != nil {
|
||||
return nil, fmt.Errorf("failed to update stream config: %w", err)
|
||||
}
|
||||
}
|
||||
as.transferRetainedToPerKeySubjectStream(s)
|
||||
}
|
||||
|
||||
var lastSeq uint64
|
||||
@@ -1354,6 +1365,19 @@ func (jsa *mqttJSA) createStream(cfg *StreamConfig) (*StreamInfo, bool, error) {
|
||||
return scr.StreamInfo, scr.DidCreate, scr.ToError()
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) updateStream(cfg *StreamConfig) (*StreamInfo, error) {
|
||||
cfgb, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
scri, err := jsa.newRequest(mqttJSAStreamUpdate, fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), 0, cfgb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
scr := scri.(*JSApiStreamUpdateResponse)
|
||||
return scr.StreamInfo, scr.ToError()
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) lookupStream(name string) (*StreamInfo, error) {
|
||||
slri, err := jsa.newRequest(mqttJSAStreamLookup, fmt.Sprintf(JSApiStreamInfoT, name), 0, nil)
|
||||
if err != nil {
|
||||
@@ -1386,6 +1410,20 @@ func (jsa *mqttJSA) loadLastMsgFor(streamName string, subject string) (*StoredMs
|
||||
return lmr.Message, lmr.ToError()
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) loadNextMsgFor(streamName string, subject string) (*StoredMsg, error) {
|
||||
mreq := &JSApiMsgGetRequest{NextFor: subject}
|
||||
req, err := json.Marshal(mreq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lmri, err := jsa.newRequest(mqttJSAMsgLoad, fmt.Sprintf(JSApiMsgGetT, streamName), 0, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lmr := lmri.(*JSApiMsgGetResponse)
|
||||
return lmr.Message, lmr.ToError()
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) loadMsg(streamName string, seq uint64) (*StoredMsg, error) {
|
||||
mreq := &JSApiMsgGetRequest{Seq: seq}
|
||||
req, err := json.Marshal(mreq)
|
||||
@@ -1465,6 +1503,12 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl
|
||||
resp.Error = NewJSInvalidJSONError()
|
||||
}
|
||||
ch <- resp
|
||||
case mqttJSAStreamUpdate:
|
||||
var resp = &JSApiStreamUpdateResponse{}
|
||||
if err := json.Unmarshal(msg, resp); err != nil {
|
||||
resp.Error = NewJSInvalidJSONError()
|
||||
}
|
||||
ch <- resp
|
||||
case mqttJSAStreamLookup:
|
||||
var resp = &JSApiStreamInfoResponse{}
|
||||
if err := json.Unmarshal(msg, &resp); err != nil {
|
||||
@@ -2261,6 +2305,57 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
|
||||
retry = false
|
||||
}
|
||||
|
||||
func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) {
|
||||
jsa := &as.jsa
|
||||
var count, errors int
|
||||
|
||||
// Set retry to true, will be set to false on success.
|
||||
defer func() {
|
||||
if errors > 0 {
|
||||
next := mqttDefaultTransferRetry
|
||||
log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next)
|
||||
time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) })
|
||||
} else if count > 0 {
|
||||
log.Noticef("Transfer of %d MQTT retained messages done!", count)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
// Try and look up messages on the original undivided "$MQTT.rmsgs" subject.
|
||||
// If nothing is returned here, we assume to have migrated all old messages.
|
||||
smsg, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs")
|
||||
if err != nil {
|
||||
if IsNatsErr(err, JSNoMessageFoundErr) {
|
||||
// We've ran out of messages to transfer so give up.
|
||||
break
|
||||
}
|
||||
log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err)
|
||||
return
|
||||
}
|
||||
// Unmarshal the message so that we can obtain the subject name.
|
||||
var rmsg mqttRetainedMsg
|
||||
if err := json.Unmarshal(smsg.Data, &rmsg); err != nil {
|
||||
log.Warnf(" Unable to unmarshal retained message with sequence %d, skipping", smsg.Sequence)
|
||||
errors++
|
||||
continue
|
||||
}
|
||||
// Store the message again, this time with the new per-key subject.
|
||||
subject := mqttRetainedMsgsStreamSubject + as.domainTk + rmsg.Subject
|
||||
if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil {
|
||||
log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err)
|
||||
errors++
|
||||
continue
|
||||
}
|
||||
// Delete the original message.
|
||||
if err := jsa.deleteMsg(mqttRetainedMsgsStreamName, smsg.Sequence, true); err != nil {
|
||||
log.Errorf(" Unable to clean up the retained message with sequence %d: %v", smsg.Sequence, err)
|
||||
errors++
|
||||
continue
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// MQTT session related functions
|
||||
|
||||
Reference in New Issue
Block a user