From 308be7ecd380dbe47dd0598f600874b3e6d2e89a Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 8 Jun 2021 15:03:12 -0600 Subject: [PATCH] [FIXED] MQTT: panic when using import/export The issue was that the subscription created for the MQTT client was resulting in creation of a shadow subscription which did not have the mqtt specific object attached, which would cause the panic when accessing it in the sub's icb. After that, it was discovered that the wrong subject was passed to deliverMsg(), so fixed that too so that the icb callback gets the proper transformed subject. Resolves #2265 Signed-off-by: Ivan Kozlovic --- server/client.go | 4 ++-- server/mqtt.go | 16 +++++++++----- server/mqtt_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 7 deletions(-) diff --git a/server/client.go b/server/client.go index 6faec179..414b7e00 100644 --- a/server/client.go +++ b/server/client.go @@ -4035,12 +4035,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Remap to the original subject if internal. if sub.icb != nil && sub.rsi { - subj = subject + dsubj = subject } // Normal delivery mh := c.msgHeader(dsubj, creply, sub) - didDeliver = c.deliverMsg(sub, subj, creply, mh, msg, rplyHasGWPrefix) || didDeliver + didDeliver = c.deliverMsg(sub, dsubj, creply, mh, msg, rplyHasGWPrefix) || didDeliver } // Set these up to optionally filter based on the queue lists. diff --git a/server/mqtt.go b/server/mqtt.go index 279b0d13..4014d2ea 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1698,12 +1698,18 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client, // Helper that sets the sub's mqtt fields and possibly serialize retained messages. // Assumes account manager and session lock held. setupSub := func(sub *subscription, qos byte) { - if sub.mqtt == nil { - sub.mqtt = &mqttSub{} + subs := []*subscription{sub} + if len(sub.shadow) > 0 { + subs = append(subs, sub.shadow...) } - sub.mqtt.qos = qos - if fromSubProto { - as.serializeRetainedMsgsForSub(sess, c, sub, trace) + for _, sub := range subs { + if sub.mqtt == nil { + sub.mqtt = &mqttSub{} + } + sub.mqtt.qos = qos + if fromSubProto { + as.serializeRetainedMsgsForSub(sess, c, sub, trace) + } } } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 1acc9027..1dbbec51 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -683,6 +683,7 @@ func testMQTTWrite(c net.Conn, buf []byte) (int, error) { } func testMQTTConnect(t testing.TB, ci *mqttConnInfo, host string, port int) (net.Conn, *mqttReader) { + t.Helper() return testMQTTConnectRetry(t, ci, host, port, 0) } @@ -3193,6 +3194,56 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) { testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) } +func TestMQTTImportExport(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + server_name: "mqtt" + jetstream { + store_dir=org_dir + } + accounts { + org { + jetstream: enabled + users: [{user: org, password: pwd}] + imports = [{stream: {account: "device", subject: "foo"}, prefix: "org"}] + } + device { + users: [{user: device, password: pwd}] + exports = [{stream: "foo"}] + } + } + mqtt { + listen: "127.0.0.1:-1" + } + no_auth_user: device + `)) + defer os.Remove(conf) + defer os.RemoveAll("org_dir") + + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + + mc1, rc1 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub1", user: "org", pass: "pwd", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc1.Close() + testMQTTCheckConnAck(t, rc1, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc1, rc1, []*mqttFilter{{filter: "org/foo", qos: 0}}, []byte{0}) + testMQTTFlush(t, mc1, nil, rc1) + + mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", user: "org", pass: "pwd", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc2.Close() + testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "org/foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc2, nil, rc2) + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + natsPub(t, nc, "foo", []byte("msg")) + + // Verify message is received on receiver side. + testMQTTCheckPubMsg(t, mc1, rc1, "org/foo", 0, []byte("msg")) + testMQTTCheckPubMsg(t, mc2, rc2, "org/foo", 0, []byte("msg")) +} + func TestMQTTSessionMovingDomains(t *testing.T) { tmpl := strings.Replace(jsClusterTemplWithLeafAndMQTT, "{{leaf}}", `leafnodes { listen: 127.0.0.1:-1 }`, 1) tmpl = strings.Replace(tmpl, "store_dir:", "domain: HUB, store_dir:", 1)