mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[CHANGED] MQTT: Support for topics with . character.
The `.` character will be transformed to `//` in NATS subject. For instance an MQTT message published on `spBv1.0/plant1` would be received by a NATS subscriber as `spBv1//0.plant1`. Conversely, a NATS message published on `spBv1//0.plant1` would be received by an MQTT subscriber as `spBv1.0/plant1`. Resolves #1879 Resolves #3482 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -4115,7 +4115,8 @@ func mqttFilterToNATSSubject(filter []byte) ([]byte, error) {
|
||||
// - '/' is replaced with './' if the last or next character in mt is '/'
|
||||
// For instance, foo//bar would become foo./.bar
|
||||
// - '/' is replaced with '.' for all other conditions (foo/bar -> foo.bar)
|
||||
// - '.' and ' ' cause an error to be returned.
|
||||
// - '.' is replaced with '//'.
|
||||
// - ' ' cause an error to be returned.
|
||||
//
|
||||
// If there is no need to convert anything (say "foo" remains "foo"), then
|
||||
// the no memory is allocated and the returned slice is the original `mt`.
|
||||
@@ -4154,9 +4155,15 @@ func mqttToNATSSubjectConversion(mt []byte, wcOk bool) ([]byte, error) {
|
||||
}
|
||||
res = append(res, btsep)
|
||||
}
|
||||
case btsep, ' ':
|
||||
// As of now, we cannot support '.' or ' ' in the MQTT topic/filter.
|
||||
case ' ':
|
||||
// As of now, we cannot support ' ' in the MQTT topic/filter.
|
||||
return nil, errMQTTUnsupportedCharacters
|
||||
case btsep:
|
||||
if !cp {
|
||||
makeCopy(i)
|
||||
}
|
||||
res = append(res, mqttTopicLevelSep, mqttTopicLevelSep)
|
||||
j++
|
||||
case mqttSingleLevelWC, mqttMultiLevelWC:
|
||||
if !wcOk {
|
||||
// Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1]
|
||||
@@ -4195,16 +4202,22 @@ func natsSubjectToMQTTTopic(subject string) []byte {
|
||||
for i := 0; i < len(subject); i++ {
|
||||
switch subject[i] {
|
||||
case mqttTopicLevelSep:
|
||||
if !(i == 0 && i < end && subject[i+1] == btsep) {
|
||||
topic[j] = mqttTopicLevelSep
|
||||
j++
|
||||
if i < end {
|
||||
switch c := subject[i+1]; c {
|
||||
case btsep, mqttTopicLevelSep:
|
||||
if c == btsep {
|
||||
topic[j] = mqttTopicLevelSep
|
||||
} else {
|
||||
topic[j] = btsep
|
||||
}
|
||||
j++
|
||||
i++
|
||||
default:
|
||||
}
|
||||
}
|
||||
case btsep:
|
||||
topic[j] = mqttTopicLevelSep
|
||||
j++
|
||||
if i < end && subject[i+1] == mqttTopicLevelSep {
|
||||
i++
|
||||
}
|
||||
default:
|
||||
topic[j] = subject[i]
|
||||
j++
|
||||
|
||||
@@ -1750,6 +1750,7 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) {
|
||||
{"///foo/", "///foo/", "/././.foo./", ""},
|
||||
{"///foo//", "///foo//", "/././.foo././", ""},
|
||||
{"///foo///", "///foo///", "/././.foo./././", ""},
|
||||
{"//.foo.//", "//.foo.//", "/././/foo//././", ""},
|
||||
{"foo/bar", "foo/bar", "foo.bar", ""},
|
||||
{"/foo/bar", "/foo/bar", "/.foo.bar", ""},
|
||||
{"/foo/bar/", "/foo/bar/", "/.foo.bar./", ""},
|
||||
@@ -1762,17 +1763,31 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) {
|
||||
{"foo//bar", "foo//bar", "foo./.bar", ""},
|
||||
{"foo///bar", "foo///bar", "foo././.bar", ""},
|
||||
{"foo////bar", "foo////bar", "foo./././.bar", ""},
|
||||
{".", ".", "//", ""},
|
||||
{"..", "..", "////", ""},
|
||||
{"...", "...", "//////", ""},
|
||||
{"./", "./", "//./", ""},
|
||||
{".//.", "./a/.", "//.a.//", ""},
|
||||
{"././.", "././.", "//.//.//", ""},
|
||||
{"././/.", "././/.", "//.//././/", ""},
|
||||
{".foo", ".foo", "//foo", ""},
|
||||
{"foo.", "foo.", "foo//", ""},
|
||||
{".foo.", ".foo.", "//foo//", ""},
|
||||
{"foo../bar/", "foo../bar/", "foo////.bar./", ""},
|
||||
{"foo../bar/.", "foo../bar/.", "foo////.bar.//", ""},
|
||||
{"/foo/", "/foo/", "/.foo./", ""},
|
||||
{"./foo/.", "./foo/.", "//.foo.//", ""},
|
||||
{"foo.bar/baz", "foo.bar/baz", "foo//bar.baz", ""},
|
||||
// These should produce errors
|
||||
{"foo/+", "foo/+", "", "wildcards not allowed in publish"},
|
||||
{"foo/#", "foo/#", "", "wildcards not allowed in publish"},
|
||||
{"foo bar", "foo bar", "", "not supported"},
|
||||
{"foo.bar", "foo.bar", "", "not supported"},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
res, err := mqttTopicToNATSPubSubject([]byte(test.mqttTopic))
|
||||
if test.err != _EMPTY_ {
|
||||
if err == nil || !strings.Contains(err.Error(), test.err) {
|
||||
t.Fatalf("Expected error %q, got %q", test.err, err.Error())
|
||||
t.Fatalf("Expected error %q, got %v", test.err, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1813,6 +1828,7 @@ func TestMQTTFilterConversion(t *testing.T) {
|
||||
{"single level wildcard", "foo//+//", "foo./.*././"},
|
||||
{"single level wildcard", "foo//+//bar", "foo./.*./.bar"},
|
||||
{"single level wildcard", "foo///+///bar", "foo././.*././.bar"},
|
||||
{"single level wildcard", "foo.bar///+///baz", "foo//bar././.*././.baz"},
|
||||
|
||||
{"multi level wildcard", "#", ">"},
|
||||
{"multi level wildcard", "/#", "/.>"},
|
||||
@@ -1821,6 +1837,7 @@ func TestMQTTFilterConversion(t *testing.T) {
|
||||
{"multi level wildcard", "foo//#", "foo./.>"},
|
||||
{"multi level wildcard", "foo///#", "foo././.>"},
|
||||
{"multi level wildcard", "foo/bar/#", "foo.bar.>"},
|
||||
{"multi level wildcard", "foo/bar.baz/#", "foo.bar//baz.>"},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
res, err := mqttFilterToNATSSubject([]byte(test.mqttTopic))
|
||||
@@ -6454,6 +6471,47 @@ func TestMQTTSubjectWildcardStart(t *testing.T) {
|
||||
require_True(t, si.State.Msgs == 0)
|
||||
}
|
||||
|
||||
func TestMQTTTopicWithDot(t *testing.T) {
|
||||
o := testMQTTDefaultOptions()
|
||||
s := testMQTTRunServer(t, o)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
nc := natsConnect(t, s.ClientURL(), nats.UserInfo("mqtt", "pwd"))
|
||||
defer nc.Close()
|
||||
|
||||
sub := natsSubSync(t, nc, "*.*")
|
||||
|
||||
c1, r1 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn1", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer c1.Close()
|
||||
testMQTTCheckConnAck(t, r1, mqttConnAckRCConnectionAccepted, false)
|
||||
testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant1", qos: 0}}, []byte{0})
|
||||
testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant2", qos: 1}}, []byte{1})
|
||||
|
||||
c2, r2 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn2", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer c2.Close()
|
||||
testMQTTCheckConnAck(t, r2, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
testMQTTPublish(t, c2, r2, 0, false, false, "spBv1.0/plant1", 0, []byte("msg1"))
|
||||
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg1"))
|
||||
msg := natsNexMsg(t, sub, time.Second)
|
||||
require_Equal(t, msg.Subject, "spBv1//0.plant1")
|
||||
|
||||
testMQTTPublish(t, c2, r2, 1, false, false, "spBv1.0/plant2", 1, []byte("msg2"))
|
||||
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", mqttPubQos1, []byte("msg2"))
|
||||
msg = natsNexMsg(t, sub, time.Second)
|
||||
require_Equal(t, msg.Subject, "spBv1//0.plant2")
|
||||
|
||||
natsPub(t, nc, "spBv1//0.plant1", []byte("msg3"))
|
||||
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg3"))
|
||||
msg = natsNexMsg(t, sub, time.Second)
|
||||
require_Equal(t, msg.Subject, "spBv1//0.plant1")
|
||||
|
||||
natsPub(t, nc, "spBv1//0.plant2", []byte("msg4"))
|
||||
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", 0, []byte("msg4"))
|
||||
msg = natsNexMsg(t, sub, time.Second)
|
||||
require_Equal(t, msg.Subject, "spBv1//0.plant2")
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Benchmarks
|
||||
|
||||
Reference in New Issue
Block a user