diff --git a/server/mqtt.go b/server/mqtt.go index 00cbbfcd..018d3577 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1170,17 +1170,40 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc if err != nil { return nil, err } - as.transferRetainedToPerKeySubjectStream(s) } - } else { - wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">" - if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj { - si.Config.Subjects = []string{wantedSubj} - if _, err := jsa.updateStream(&si.Config); err != nil { + } + // Doing this check outside of above if/else due to possible race when + // creating the stream. + wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">" + if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj { + // Update only the Subjects at this stage, not MaxMsgsPer yet. + si.Config.Subjects = []string{wantedSubj} + if si, err = jsa.updateStream(&si.Config); err != nil { + return nil, fmt.Errorf("failed to update stream config: %w", err) + } + } + // Try to transfer regardless if we have already updated the stream or not + // in case not all messages were transferred and the server was restarted. + if as.transferRetainedToPerKeySubjectStream(s) { + // We need another lookup to have up-to-date si.State values in order + // to load all retained messages. + si, err = lookupStream(mqttRetainedMsgsStreamName, "retained messages") + if err != nil { + return nil, err + } + } + // Now, if the stream does not have MaxMsgsPer set to 1, and there are no + // more messages on the single $MQTT.rmsgs subject, update the stream again. + if si.Config.MaxMsgsPer != 1 { + _, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs") + // Looking for an error indicated that there is no such message. + if err != nil && IsNatsErr(err, JSNoMessageFoundErr) { + si.Config.MaxMsgsPer = 1 + // We will need an up-to-date si, so don't use local variable here. + if si, err = jsa.updateStream(&si.Config); err != nil { return nil, fmt.Errorf("failed to update stream config: %w", err) } } - as.transferRetainedToPerKeySubjectStream(s) } var lastSeq uint64 @@ -2305,21 +2328,10 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve retry = false } -func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) { +func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) bool { jsa := &as.jsa var count, errors int - // Set retry to true, will be set to false on success. - defer func() { - if errors > 0 { - next := mqttDefaultTransferRetry - log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next) - time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) }) - } else if count > 0 { - log.Noticef("Transfer of %d MQTT retained messages done!", count) - } - }() - for { // Try and look up messages on the original undivided "$MQTT.rmsgs" subject. // If nothing is returned here, we assume to have migrated all old messages. @@ -2331,7 +2343,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log * } log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err) errors++ - return + break } // Unmarshal the message so that we can obtain the subject name. var rmsg mqttRetainedMsg @@ -2355,6 +2367,15 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log * } count++ } + if errors > 0 { + next := mqttDefaultTransferRetry + log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next) + time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) }) + } else if count > 0 { + log.Noticef("Transfer of %d MQTT retained messages done!", count) + } + // Signal if there was any activity (either some transferred or some errors) + return errors > 0 || count > 0 } ////////////////////////////////////////////////////////////////////////////// diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 7c5888d9..31f59da9 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1979,6 +1979,11 @@ func testMQTTCheckPubMsgNoAck(t testing.TB, c net.Conn, r *mqttReader, topic str } func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16) { + flags, pi, _ := testMQTTGetPubMsgEx(t, c, r, topic, payload) + return flags, pi +} + +func testMQTTGetPubMsgEx(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16, string) { t.Helper() b, pl := testMQTTReadPacket(t, r) if pt := b & mqttPacketMask; pt != mqttPacketPub { @@ -1991,7 +1996,7 @@ func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, pa if err != nil { t.Fatal(err) } - if ptopic != topic { + if topic != _EMPTY_ && ptopic != topic { t.Fatalf("Expected topic %q, got %q", topic, ptopic) } var pi uint16 @@ -2011,7 +2016,7 @@ func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, pa t.Fatalf("Expected payload %q, got %q", payload, ppayload) } r.pos += msgLen - return pflags, pi + return pflags, pi, ptopic } func testMQTTSendPubAck(t testing.TB, c net.Conn, pi uint16) { @@ -3038,6 +3043,16 @@ func TestMQTTRetainedMsgMigration(t *testing.T) { defer mc.Close() testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "+", qos: 0}}, []byte{0}) + topics := map[string]struct{}{} + for i := 0; i < 100; i++ { + _, _, topic := testMQTTGetPubMsgEx(t, mc, rc, _EMPTY_, []byte("bar")) + topics[topic] = struct{}{} + } + if len(topics) != 100 { + t.Fatalf("Unexpected topics: %v", topics) + } + // Now look at the stream, there should be 100 messages on the new // divided subjects and none on the old undivided subject. si, err = js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{