diff --git a/server/mqtt.go b/server/mqtt.go index 75f3f8d4..e96f1885 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -4115,7 +4115,8 @@ func mqttFilterToNATSSubject(filter []byte) ([]byte, error) { // - '/' is replaced with './' if the last or next character in mt is '/' // For instance, foo//bar would become foo./.bar // - '/' is replaced with '.' for all other conditions (foo/bar -> foo.bar) -// - '.' and ' ' cause an error to be returned. +// - '.' is replaced with '//'. +// - ' ' cause an error to be returned. // // If there is no need to convert anything (say "foo" remains "foo"), then // the no memory is allocated and the returned slice is the original `mt`. @@ -4154,9 +4155,15 @@ func mqttToNATSSubjectConversion(mt []byte, wcOk bool) ([]byte, error) { } res = append(res, btsep) } - case btsep, ' ': - // As of now, we cannot support '.' or ' ' in the MQTT topic/filter. + case ' ': + // As of now, we cannot support ' ' in the MQTT topic/filter. return nil, errMQTTUnsupportedCharacters + case btsep: + if !cp { + makeCopy(i) + } + res = append(res, mqttTopicLevelSep, mqttTopicLevelSep) + j++ case mqttSingleLevelWC, mqttMultiLevelWC: if !wcOk { // Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1] @@ -4195,16 +4202,22 @@ func natsSubjectToMQTTTopic(subject string) []byte { for i := 0; i < len(subject); i++ { switch subject[i] { case mqttTopicLevelSep: - if !(i == 0 && i < end && subject[i+1] == btsep) { - topic[j] = mqttTopicLevelSep - j++ + if i < end { + switch c := subject[i+1]; c { + case btsep, mqttTopicLevelSep: + if c == btsep { + topic[j] = mqttTopicLevelSep + } else { + topic[j] = btsep + } + j++ + i++ + default: + } } case btsep: topic[j] = mqttTopicLevelSep j++ - if i < end && subject[i+1] == mqttTopicLevelSep { - i++ - } default: topic[j] = subject[i] j++ diff --git a/server/mqtt_test.go b/server/mqtt_test.go index a4e793eb..3168e943 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1750,6 +1750,7 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) { {"///foo/", "///foo/", "/././.foo./", ""}, {"///foo//", "///foo//", "/././.foo././", ""}, {"///foo///", "///foo///", "/././.foo./././", ""}, + {"//.foo.//", "//.foo.//", "/././/foo//././", ""}, {"foo/bar", "foo/bar", "foo.bar", ""}, {"/foo/bar", "/foo/bar", "/.foo.bar", ""}, {"/foo/bar/", "/foo/bar/", "/.foo.bar./", ""}, @@ -1762,17 +1763,31 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) { {"foo//bar", "foo//bar", "foo./.bar", ""}, {"foo///bar", "foo///bar", "foo././.bar", ""}, {"foo////bar", "foo////bar", "foo./././.bar", ""}, + {".", ".", "//", ""}, + {"..", "..", "////", ""}, + {"...", "...", "//////", ""}, + {"./", "./", "//./", ""}, + {".//.", "./a/.", "//.a.//", ""}, + {"././.", "././.", "//.//.//", ""}, + {"././/.", "././/.", "//.//././/", ""}, + {".foo", ".foo", "//foo", ""}, + {"foo.", "foo.", "foo//", ""}, + {".foo.", ".foo.", "//foo//", ""}, + {"foo../bar/", "foo../bar/", "foo////.bar./", ""}, + {"foo../bar/.", "foo../bar/.", "foo////.bar.//", ""}, + {"/foo/", "/foo/", "/.foo./", ""}, + {"./foo/.", "./foo/.", "//.foo.//", ""}, + {"foo.bar/baz", "foo.bar/baz", "foo//bar.baz", ""}, // These should produce errors {"foo/+", "foo/+", "", "wildcards not allowed in publish"}, {"foo/#", "foo/#", "", "wildcards not allowed in publish"}, {"foo bar", "foo bar", "", "not supported"}, - {"foo.bar", "foo.bar", "", "not supported"}, } { t.Run(test.name, func(t *testing.T) { res, err := mqttTopicToNATSPubSubject([]byte(test.mqttTopic)) if test.err != _EMPTY_ { if err == nil || !strings.Contains(err.Error(), test.err) { - t.Fatalf("Expected error %q, got %q", test.err, err.Error()) + t.Fatalf("Expected error %q, got %v", test.err, err) } return } @@ -1813,6 +1828,7 @@ func TestMQTTFilterConversion(t *testing.T) { {"single level wildcard", "foo//+//", "foo./.*././"}, {"single level wildcard", "foo//+//bar", "foo./.*./.bar"}, {"single level wildcard", "foo///+///bar", "foo././.*././.bar"}, + {"single level wildcard", "foo.bar///+///baz", "foo//bar././.*././.baz"}, {"multi level wildcard", "#", ">"}, {"multi level wildcard", "/#", "/.>"}, @@ -1821,6 +1837,7 @@ func TestMQTTFilterConversion(t *testing.T) { {"multi level wildcard", "foo//#", "foo./.>"}, {"multi level wildcard", "foo///#", "foo././.>"}, {"multi level wildcard", "foo/bar/#", "foo.bar.>"}, + {"multi level wildcard", "foo/bar.baz/#", "foo.bar//baz.>"}, } { t.Run(test.name, func(t *testing.T) { res, err := mqttFilterToNATSSubject([]byte(test.mqttTopic)) @@ -6454,6 +6471,47 @@ func TestMQTTSubjectWildcardStart(t *testing.T) { require_True(t, si.State.Msgs == 0) } +func TestMQTTTopicWithDot(t *testing.T) { + o := testMQTTDefaultOptions() + s := testMQTTRunServer(t, o) + defer testMQTTShutdownServer(s) + + nc := natsConnect(t, s.ClientURL(), nats.UserInfo("mqtt", "pwd")) + defer nc.Close() + + sub := natsSubSync(t, nc, "*.*") + + c1, r1 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn1", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer c1.Close() + testMQTTCheckConnAck(t, r1, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant1", qos: 0}}, []byte{0}) + testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant2", qos: 1}}, []byte{1}) + + c2, r2 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn2", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer c2.Close() + testMQTTCheckConnAck(t, r2, mqttConnAckRCConnectionAccepted, false) + + testMQTTPublish(t, c2, r2, 0, false, false, "spBv1.0/plant1", 0, []byte("msg1")) + testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg1")) + msg := natsNexMsg(t, sub, time.Second) + require_Equal(t, msg.Subject, "spBv1//0.plant1") + + testMQTTPublish(t, c2, r2, 1, false, false, "spBv1.0/plant2", 1, []byte("msg2")) + testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", mqttPubQos1, []byte("msg2")) + msg = natsNexMsg(t, sub, time.Second) + require_Equal(t, msg.Subject, "spBv1//0.plant2") + + natsPub(t, nc, "spBv1//0.plant1", []byte("msg3")) + testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg3")) + msg = natsNexMsg(t, sub, time.Second) + require_Equal(t, msg.Subject, "spBv1//0.plant1") + + natsPub(t, nc, "spBv1//0.plant2", []byte("msg4")) + testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", 0, []byte("msg4")) + msg = natsNexMsg(t, sub, time.Second) + require_Equal(t, msg.Subject, "spBv1//0.plant2") +} + ////////////////////////////////////////////////////////////////////////// // // Benchmarks