mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Fix for MQTT Spec 4.7.2-1
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -91,6 +91,7 @@ const (
|
||||
mqttTopicLevelSep = '/'
|
||||
mqttSingleLevelWC = '+'
|
||||
mqttMultiLevelWC = '#'
|
||||
mqttReservedPre = '$'
|
||||
|
||||
// This is appended to the sid of a subscription that is
|
||||
// created on the upper level subject because of the MQTT
|
||||
@@ -304,6 +305,8 @@ type mqttSub struct {
|
||||
prm *mqttWriter
|
||||
// This is the JS durable name this subscription is attached to.
|
||||
jsDur string
|
||||
// If this subscription needs to be checked for being reserved. E.g. # or * or */
|
||||
reserved bool
|
||||
}
|
||||
|
||||
type mqtt struct {
|
||||
@@ -1992,6 +1995,7 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
|
||||
sub.mqtt = &mqttSub{}
|
||||
}
|
||||
sub.mqtt.qos = qos
|
||||
sub.mqtt.reserved = isMQTTReservedSubscription(string(sub.subject))
|
||||
if fromSubProto {
|
||||
as.serializeRetainedMsgsForSub(sess, c, sub, trace)
|
||||
}
|
||||
@@ -3216,6 +3220,7 @@ func mqttWritePublish(w *mqttWriter, qos byte, dup, retain bool, subject string,
|
||||
if retain {
|
||||
flags |= mqttPubFlagRetain
|
||||
}
|
||||
|
||||
w.WriteByte(mqttPacketPub | flags)
|
||||
pkLen := 2 + len(subject) + len(payload)
|
||||
if qos > 0 {
|
||||
@@ -3419,8 +3424,14 @@ func mqttDeliverMsgCbQos0(sub *subscription, pc *client, _ *Account, subject, _
|
||||
// the client may change an existing subscription at any time.
|
||||
sess.mu.Lock()
|
||||
subQos := sub.mqtt.qos
|
||||
isReserved := mqttCheckReserved(sub, subject)
|
||||
sess.mu.Unlock()
|
||||
|
||||
// We have a wildcard subscription and this subject starts with '$' so ignore per Spec [MQTT-4.7.2-1].
|
||||
if isReserved {
|
||||
return
|
||||
}
|
||||
|
||||
var retained bool
|
||||
var topic []byte
|
||||
|
||||
@@ -3506,22 +3517,58 @@ func mqttDeliverMsgCbQos1(sub *subscription, pc *client, _ *Account, subject, re
|
||||
sess.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// This is a QoS1 message for a QoS1 subscription, so get the pi and keep
|
||||
// track of ack subject.
|
||||
pQoS := byte(1)
|
||||
pi, dup := sess.trackPending(pQoS, reply, sub)
|
||||
|
||||
// Check for reserved subject violation.
|
||||
strippedSubj := string(subject[len(mqttStreamSubjectPrefix):])
|
||||
if isReserved := mqttCheckReserved(sub, strippedSubj); isReserved {
|
||||
sess.mu.Unlock()
|
||||
if pi > 0 {
|
||||
cc.mqttProcessPubAck(pi)
|
||||
}
|
||||
return
|
||||
}
|
||||
sess.mu.Unlock()
|
||||
|
||||
if pi == 0 {
|
||||
// We have reached max pending, don't send the message now.
|
||||
// JS will cause a redelivery and if by then the number of pending
|
||||
// messages has fallen below threshold, the message will be resent.
|
||||
return
|
||||
}
|
||||
topic := natsSubjectToMQTTTopic(string(subject[len(mqttStreamSubjectPrefix):]))
|
||||
|
||||
topic := natsSubjectToMQTTTopic(strippedSubj)
|
||||
|
||||
pc.mqttDeliver(cc, sub, pi, dup, retained, topic, msg)
|
||||
}
|
||||
|
||||
// The MQTT Server MUST NOT match Topic Filters starting with a wildcard character (# or +)
|
||||
// with Topic Names beginning with a $ character, Spec [MQTT-4.7.2-1].
|
||||
// We will return true if there is a violation.
|
||||
func mqttCheckReserved(sub *subscription, subject string) bool {
|
||||
// If the subject does not start with $ nothing to do here.
|
||||
if !sub.mqtt.reserved || len(subject) == 0 || subject[0] != mqttReservedPre {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if a sub is a reserved wildcard. E.g. '#', '*', or '*/" prefix.
|
||||
func isMQTTReservedSubscription(subject string) bool {
|
||||
if len(subject) == 1 && subject[0] == fwc || subject[0] == pwc {
|
||||
return true
|
||||
}
|
||||
// Match "*.<>"
|
||||
if len(subject) > 1 && subject[0] == pwc && subject[1] == btsep {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Common function to mqtt delivery callbacks to serialize and send the message
|
||||
// to the `cc` client.
|
||||
func (c *client) mqttDeliver(cc *client, sub *subscription, pi uint16, dup, retained bool, topic, msg []byte) {
|
||||
@@ -3723,6 +3770,7 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
|
||||
}
|
||||
sub.mqtt.qos = qos
|
||||
sub.mqtt.jsDur = cc.Durable
|
||||
sub.mqtt.reserved = isMQTTReservedSubscription(subject)
|
||||
sess.mu.Unlock()
|
||||
return cc, sub, nil
|
||||
}
|
||||
|
||||
@@ -6244,6 +6244,129 @@ func TestMQTTSubjectMappingWithImportExport(t *testing.T) {
|
||||
check(nc, "$MQTT.msgs.foo")
|
||||
}
|
||||
|
||||
// Issue https://github.com/nats-io/nats-server/issues/3924
|
||||
// The MQTT Server MUST NOT match Topic Filters starting with a wildcard character (# or +),
|
||||
// with Topic Names beginning with a $ character [MQTT-4.7.2-1]
|
||||
func TestMQTTSubjectWildcardStart(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: mqtt
|
||||
jetstream: enabled
|
||||
mqtt {
|
||||
listen: 127.0.0.1:-1
|
||||
}
|
||||
`))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
nc := natsConnect(t, s.ClientURL())
|
||||
defer nc.Close()
|
||||
|
||||
mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
mc1, r1 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc1.Close()
|
||||
testMQTTCheckConnAck(t, r1, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
mc2, r2 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc2.Close()
|
||||
testMQTTCheckConnAck(t, r2, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
mc3, r3 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc3.Close()
|
||||
testMQTTCheckConnAck(t, r3, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
// These will fail already with the bug due to messages already being queue up before the subAck.
|
||||
testMQTTSub(t, 1, mc1, r1, []*mqttFilter{{filter: "*", qos: 0}}, []byte{0})
|
||||
testMQTTFlush(t, mc1, nil, r1)
|
||||
|
||||
testMQTTSub(t, 1, mc2, r2, []*mqttFilter{{filter: "#", qos: 1}}, []byte{1})
|
||||
testMQTTFlush(t, mc2, nil, r2)
|
||||
|
||||
testMQTTSub(t, 1, mc3, r3, []*mqttFilter{{filter: "*/foo", qos: 1}}, []byte{1})
|
||||
testMQTTFlush(t, mc2, nil, r2)
|
||||
|
||||
// Just as a barrier
|
||||
natsFlush(t, nc)
|
||||
|
||||
// Now publish
|
||||
|
||||
// NATS Publish
|
||||
msg := []byte("HELLO WORLD")
|
||||
natsPubReq(t, nc, "foo", _EMPTY_, msg)
|
||||
|
||||
// Check messages received
|
||||
testMQTTCheckPubMsg(t, mc1, r1, "foo", 0, msg)
|
||||
testMQTTExpectNothing(t, r1)
|
||||
|
||||
testMQTTCheckPubMsg(t, mc2, r2, "foo", 0, msg)
|
||||
testMQTTExpectNothing(t, r2)
|
||||
|
||||
testMQTTExpectNothing(t, r3)
|
||||
|
||||
// Anything that starts with $ is reserved against wildcard subjects like above.
|
||||
natsPubReq(t, nc, "$JS.foo", _EMPTY_, msg)
|
||||
testMQTTExpectNothing(t, r1)
|
||||
testMQTTExpectNothing(t, r2)
|
||||
testMQTTExpectNothing(t, r3)
|
||||
|
||||
// Now do MQTT QoS-0
|
||||
testMQTTPublish(t, mc, r, 0, false, false, "foo", 0, msg)
|
||||
|
||||
testMQTTCheckPubMsg(t, mc1, r1, "foo", 0, msg)
|
||||
testMQTTExpectNothing(t, r1)
|
||||
|
||||
testMQTTCheckPubMsg(t, mc2, r2, "foo", 0, msg)
|
||||
testMQTTExpectNothing(t, r2)
|
||||
|
||||
testMQTTExpectNothing(t, r3)
|
||||
|
||||
testMQTTPublish(t, mc, r, 0, false, false, "$JS/foo", 1, msg)
|
||||
|
||||
testMQTTExpectNothing(t, r1)
|
||||
testMQTTExpectNothing(t, r2)
|
||||
testMQTTExpectNothing(t, r3)
|
||||
|
||||
// Now do MQTT QoS-1
|
||||
msg = []byte("HELLO WORLD - RETAINED")
|
||||
testMQTTPublish(t, mc, r, 1, false, false, "$JS/foo", 4, msg)
|
||||
|
||||
testMQTTExpectNothing(t, r1)
|
||||
testMQTTExpectNothing(t, r2)
|
||||
testMQTTExpectNothing(t, r3)
|
||||
|
||||
testMQTTPublish(t, mc, r, 1, false, false, "foo", 2, msg)
|
||||
|
||||
testMQTTCheckPubMsg(t, mc1, r1, "foo", 0, msg)
|
||||
testMQTTExpectNothing(t, r1)
|
||||
|
||||
testMQTTCheckPubMsg(t, mc2, r2, "foo", 2, msg)
|
||||
testMQTTExpectNothing(t, r2)
|
||||
|
||||
testMQTTExpectNothing(t, r3)
|
||||
|
||||
testMQTTPublish(t, mc, r, 1, false, false, "foo/foo", 3, msg)
|
||||
|
||||
testMQTTExpectNothing(t, r1)
|
||||
|
||||
testMQTTCheckPubMsg(t, mc2, r2, "foo/foo", 2, msg)
|
||||
testMQTTExpectNothing(t, r2)
|
||||
|
||||
testMQTTCheckPubMsg(t, mc3, r3, "foo/foo", 2, msg)
|
||||
testMQTTExpectNothing(t, r3)
|
||||
|
||||
// Make sure we did not retain the messages prefixed with $.
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
si, err := js.StreamInfo(mqttStreamName)
|
||||
require_NoError(t, err)
|
||||
|
||||
require_True(t, si.State.Msgs == 0)
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Benchmarks
|
||||
|
||||
Reference in New Issue
Block a user