diff --git a/server/mqtt.go b/server/mqtt.go index 891f0733..d0c23e20 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -2341,6 +2341,7 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi // Need to use the subject for the retained message, not the `sub` subject. // We can find the published retained message in rm.sub.subject. + // Set the RETAIN flag: [MQTT-3.3.1-8]. flags := mqttSerializePublishMsg(prm, pi, qos, false, true, []byte(rm.Topic), rm.Msg) if trace { pp := mqttPublish{ @@ -4153,7 +4154,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re return } - var retained bool var topic []byte if pc.isMqtt() { // This is an MQTT publisher directly connected to this server. @@ -4169,7 +4169,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re if len(pc.pa.mapped) > 0 && len(pc.pa.psi) > 0 { topic = natsSubjectToMQTTTopic(subject) } - retained = mqttIsRetained(pc.mqtt.pp.flags) } else { // Non MQTT client, could be NATS publisher, or ROUTER, etc.. @@ -4188,7 +4187,7 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re } // Message never has a packet identifier nor is marked as duplicate. - pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, retained, topic, msg) + pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, topic, msg) } // This is the callback attached to a JS durable subscription for a MQTT QoS 1+ @@ -4264,7 +4263,7 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r } originalTopic := natsSubjectToMQTTTopic(strippedSubj) - pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, false, originalTopic, msg) + pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, originalTopic, msg) } func mqttDeliverPubRelCb(sub *subscription, pc *client, _ *Account, subject, reply string, rmsg []byte) { @@ -4323,11 +4322,11 @@ func isMQTTReservedSubscription(subject string) bool { // Common function to mqtt delivery callbacks to serialize and send the message // to the `cc` client. -func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup, retained bool, topic, msg []byte) { +func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup bool, topic, msg []byte) { sw := mqttWriter{} w := &sw - flags := mqttSerializePublishMsg(w, pi, qos, dup, retained, topic, msg) + flags := mqttSerializePublishMsg(w, pi, qos, dup, false, topic, msg) cc.mu.Lock() if sub.mqtt.prm != nil { diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 00b35953..87ab3c92 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -3848,6 +3848,11 @@ func TestMQTTWillRetain(t *testing.T) { s := testMQTTRunServer(t, o) defer testMQTTShutdownServer(s) + mces, res := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mces.Close() + testMQTTCheckConnAck(t, res, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mces, res, []*mqttFilter{{filter: "will/#", qos: test.subQoS}}, []byte{test.subQoS}) + willTopic := []byte("will/topic") willMsg := []byte("bye") @@ -3869,7 +3874,7 @@ func TestMQTTWillRetain(t *testing.T) { // Wait for the server to process the connection close, which will // cause the "will" message to be published (and retained). - checkClientsCount(t, s, 0) + checkClientsCount(t, s, 1) // Create subscription on will topic and expect will message. mcs, rs := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) @@ -3878,7 +3883,7 @@ func TestMQTTWillRetain(t *testing.T) { testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "will/#", qos: test.subQoS}}, []byte{test.subQoS}) pflags, _ := testMQTTGetPubMsg(t, mcs, rs, "will/topic", willMsg) - if pflags&mqttPubFlagRetain == 0 { + if !mqttIsRetained(pflags) { t.Fatalf("expected retain flag to be set, it was not: %v", pflags) } // Expected QoS will be the lesser of the pub/sub QoS. @@ -3889,6 +3894,17 @@ func TestMQTTWillRetain(t *testing.T) { if qos := mqttGetQoS(pflags); qos != expectedQoS { t.Fatalf("expected qos to be %v, got %v", expectedQoS, qos) } + + // The existing subscription (prior to sending the will) should receive + // the will but the retain flag should not be set. + pflags, _ = testMQTTGetPubMsg(t, mces, res, "will/topic", willMsg) + if mqttIsRetained(pflags) { + t.Fatalf("expected retain flag to not be set, it was: %v", pflags) + } + // Expected QoS will be the lesser of the pub/sub QoS. + if qos := mqttGetQoS(pflags); qos != expectedQoS { + t.Fatalf("expected qos to be %v, got %v", expectedQoS, qos) + } }) } } @@ -3949,7 +3965,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) { testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) pflags, _ := testMQTTGetPubMsg(t, mcs, rs, "foo", []byte("bye")) - if pflags&mqttPubFlagRetain == 0 { + if !mqttIsRetained(pflags) { t.Fatalf("expected retain flag to be set, it was not: %v", pflags) } if qos := mqttGetQoS(pflags); qos != 1 { @@ -4032,7 +4048,7 @@ func TestMQTTPublishRetain(t *testing.T) { if test.subGetsIt { pflags, _ := testMQTTGetPubMsg(t, mc2, rs2, "foo", []byte(test.expectedValue)) - if pflags&mqttPubFlagRetain == 0 { + if !mqttIsRetained(pflags) { t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags) } } else { @@ -4045,6 +4061,52 @@ func TestMQTTPublishRetain(t *testing.T) { } } +func TestMQTTRetainFlag(t *testing.T) { + o := testMQTTDefaultOptions() + s := testMQTTRunServer(t, o) + defer testMQTTShutdownServer(s) + + mc1, rs1 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc1.Close() + testMQTTCheckConnAck(t, rs1, mqttConnAckRCConnectionAccepted, false) + testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/0", 0, []byte("flag set")) + testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/1", 0, []byte("flag set")) + testMQTTFlush(t, mc1, nil, rs1) + + mc2, rs2 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc2.Close() + testMQTTCheckConnAck(t, rs2, mqttConnAckRCConnectionAccepted, false) + + testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "foo/0", qos: 0}}, []byte{0}) + pflags, _ := testMQTTGetPubMsg(t, mc2, rs2, "foo/0", []byte("flag set")) + if !mqttIsRetained(pflags) { + t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags) + } + + testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "foo/1", qos: 1}}, []byte{1}) + pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/1", []byte("flag set")) + if !mqttIsRetained(pflags) { + t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags) + } + + // For existing subscriptions, RETAIN flag should not be set: [MQTT-3.3.1-9]. + testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/0", 0, []byte("flag not set")) + testMQTTFlush(t, mc1, nil, rs1) + + pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/0", []byte("flag not set")) + if mqttIsRetained(pflags) { + t.Fatalf("retain flag should not have been set, it was: flags=%v", pflags) + } + + testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/1", 0, []byte("flag not set")) + testMQTTFlush(t, mc1, nil, rs1) + + pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/1", []byte("flag not set")) + if mqttIsRetained(pflags) { + t.Fatalf("retain flag should not have been set, it was: flags=%v", pflags) + } +} + func TestMQTTPublishRetainPermViolation(t *testing.T) { o := testMQTTDefaultOptions() o.Users = []*User{