mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] MQTT: Retain flag did not always have the correct value.
As per specification MQTT-3.3.1-8, we are now setting the RETAIN flag when delivering to new subscriptions and clear the flag in all other conditions. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2341,6 +2341,7 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi
|
||||
|
||||
// Need to use the subject for the retained message, not the `sub` subject.
|
||||
// We can find the published retained message in rm.sub.subject.
|
||||
// Set the RETAIN flag: [MQTT-3.3.1-8].
|
||||
flags := mqttSerializePublishMsg(prm, pi, qos, false, true, []byte(rm.Topic), rm.Msg)
|
||||
if trace {
|
||||
pp := mqttPublish{
|
||||
@@ -4153,7 +4154,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
|
||||
return
|
||||
}
|
||||
|
||||
var retained bool
|
||||
var topic []byte
|
||||
if pc.isMqtt() {
|
||||
// This is an MQTT publisher directly connected to this server.
|
||||
@@ -4169,7 +4169,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
|
||||
if len(pc.pa.mapped) > 0 && len(pc.pa.psi) > 0 {
|
||||
topic = natsSubjectToMQTTTopic(subject)
|
||||
}
|
||||
retained = mqttIsRetained(pc.mqtt.pp.flags)
|
||||
|
||||
} else {
|
||||
// Non MQTT client, could be NATS publisher, or ROUTER, etc..
|
||||
@@ -4188,7 +4187,7 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
|
||||
}
|
||||
|
||||
// Message never has a packet identifier nor is marked as duplicate.
|
||||
pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, retained, topic, msg)
|
||||
pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, topic, msg)
|
||||
}
|
||||
|
||||
// This is the callback attached to a JS durable subscription for a MQTT QoS 1+
|
||||
@@ -4264,7 +4263,7 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r
|
||||
}
|
||||
|
||||
originalTopic := natsSubjectToMQTTTopic(strippedSubj)
|
||||
pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, false, originalTopic, msg)
|
||||
pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, originalTopic, msg)
|
||||
}
|
||||
|
||||
func mqttDeliverPubRelCb(sub *subscription, pc *client, _ *Account, subject, reply string, rmsg []byte) {
|
||||
@@ -4323,11 +4322,11 @@ func isMQTTReservedSubscription(subject string) bool {
|
||||
|
||||
// Common function to mqtt delivery callbacks to serialize and send the message
|
||||
// to the `cc` client.
|
||||
func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup, retained bool, topic, msg []byte) {
|
||||
func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup bool, topic, msg []byte) {
|
||||
sw := mqttWriter{}
|
||||
w := &sw
|
||||
|
||||
flags := mqttSerializePublishMsg(w, pi, qos, dup, retained, topic, msg)
|
||||
flags := mqttSerializePublishMsg(w, pi, qos, dup, false, topic, msg)
|
||||
|
||||
cc.mu.Lock()
|
||||
if sub.mqtt.prm != nil {
|
||||
|
||||
Reference in New Issue
Block a user