From a744cb8cd218e6f4472677c012af9aa74b4f42cf Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 31 May 2023 19:35:12 -0600 Subject: [PATCH] Fixed delivery of retained messages after transfer. I was running a manual test moving from dev to this branch and noticed that the consumer would receive only 1 message of the 10 messages sent as retained. So I modified the test to verify that we receive them all and we did not. The reason was that after the transfer we need to refresh the state of the stream (stream info) since we attempt to load all messages based on the state's sequences. I have also modified a bit the code to update the MaxMsgsPer once all messages have been transferred. Signed-off-by: Ivan Kozlovic --- server/mqtt.go | 61 ++++++++++++++++++++++++++++++--------------- server/mqtt_test.go | 19 ++++++++++++-- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index 00cbbfcd..018d3577 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1170,17 +1170,40 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc if err != nil { return nil, err } - as.transferRetainedToPerKeySubjectStream(s) } - } else { - wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">" - if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj { - si.Config.Subjects = []string{wantedSubj} - if _, err := jsa.updateStream(&si.Config); err != nil { + } + // Doing this check outside of above if/else due to possible race when + // creating the stream. + wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">" + if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj { + // Update only the Subjects at this stage, not MaxMsgsPer yet. + si.Config.Subjects = []string{wantedSubj} + if si, err = jsa.updateStream(&si.Config); err != nil { + return nil, fmt.Errorf("failed to update stream config: %w", err) + } + } + // Try to transfer regardless if we have already updated the stream or not + // in case not all messages were transferred and the server was restarted. + if as.transferRetainedToPerKeySubjectStream(s) { + // We need another lookup to have up-to-date si.State values in order + // to load all retained messages. + si, err = lookupStream(mqttRetainedMsgsStreamName, "retained messages") + if err != nil { + return nil, err + } + } + // Now, if the stream does not have MaxMsgsPer set to 1, and there are no + // more messages on the single $MQTT.rmsgs subject, update the stream again. + if si.Config.MaxMsgsPer != 1 { + _, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs") + // Looking for an error indicated that there is no such message. + if err != nil && IsNatsErr(err, JSNoMessageFoundErr) { + si.Config.MaxMsgsPer = 1 + // We will need an up-to-date si, so don't use local variable here. + if si, err = jsa.updateStream(&si.Config); err != nil { return nil, fmt.Errorf("failed to update stream config: %w", err) } } - as.transferRetainedToPerKeySubjectStream(s) } var lastSeq uint64 @@ -2305,21 +2328,10 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve retry = false } -func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) { +func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) bool { jsa := &as.jsa var count, errors int - // Set retry to true, will be set to false on success. - defer func() { - if errors > 0 { - next := mqttDefaultTransferRetry - log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next) - time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) }) - } else if count > 0 { - log.Noticef("Transfer of %d MQTT retained messages done!", count) - } - }() - for { // Try and look up messages on the original undivided "$MQTT.rmsgs" subject. // If nothing is returned here, we assume to have migrated all old messages. @@ -2331,7 +2343,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log * } log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err) errors++ - return + break } // Unmarshal the message so that we can obtain the subject name. var rmsg mqttRetainedMsg @@ -2355,6 +2367,15 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log * } count++ } + if errors > 0 { + next := mqttDefaultTransferRetry + log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next) + time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) }) + } else if count > 0 { + log.Noticef("Transfer of %d MQTT retained messages done!", count) + } + // Signal if there was any activity (either some transferred or some errors) + return errors > 0 || count > 0 } ////////////////////////////////////////////////////////////////////////////// diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 7c5888d9..31f59da9 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1979,6 +1979,11 @@ func testMQTTCheckPubMsgNoAck(t testing.TB, c net.Conn, r *mqttReader, topic str } func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16) { + flags, pi, _ := testMQTTGetPubMsgEx(t, c, r, topic, payload) + return flags, pi +} + +func testMQTTGetPubMsgEx(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16, string) { t.Helper() b, pl := testMQTTReadPacket(t, r) if pt := b & mqttPacketMask; pt != mqttPacketPub { @@ -1991,7 +1996,7 @@ func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, pa if err != nil { t.Fatal(err) } - if ptopic != topic { + if topic != _EMPTY_ && ptopic != topic { t.Fatalf("Expected topic %q, got %q", topic, ptopic) } var pi uint16 @@ -2011,7 +2016,7 @@ func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, pa t.Fatalf("Expected payload %q, got %q", payload, ppayload) } r.pos += msgLen - return pflags, pi + return pflags, pi, ptopic } func testMQTTSendPubAck(t testing.TB, c net.Conn, pi uint16) { @@ -3038,6 +3043,16 @@ func TestMQTTRetainedMsgMigration(t *testing.T) { defer mc.Close() testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "+", qos: 0}}, []byte{0}) + topics := map[string]struct{}{} + for i := 0; i < 100; i++ { + _, _, topic := testMQTTGetPubMsgEx(t, mc, rc, _EMPTY_, []byte("bar")) + topics[topic] = struct{}{} + } + if len(topics) != 100 { + t.Fatalf("Unexpected topics: %v", topics) + } + // Now look at the stream, there should be 100 messages on the new // divided subjects and none on the old undivided subject. si, err = js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{