diff --git a/server/client.go b/server/client.go index 6faec179..414b7e00 100644 --- a/server/client.go +++ b/server/client.go @@ -4035,12 +4035,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Remap to the original subject if internal. if sub.icb != nil && sub.rsi { - subj = subject + dsubj = subject } // Normal delivery mh := c.msgHeader(dsubj, creply, sub) - didDeliver = c.deliverMsg(sub, subj, creply, mh, msg, rplyHasGWPrefix) || didDeliver + didDeliver = c.deliverMsg(sub, dsubj, creply, mh, msg, rplyHasGWPrefix) || didDeliver } // Set these up to optionally filter based on the queue lists. diff --git a/server/mqtt.go b/server/mqtt.go index 279b0d13..4014d2ea 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1698,12 +1698,18 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client, // Helper that sets the sub's mqtt fields and possibly serialize retained messages. // Assumes account manager and session lock held. setupSub := func(sub *subscription, qos byte) { - if sub.mqtt == nil { - sub.mqtt = &mqttSub{} + subs := []*subscription{sub} + if len(sub.shadow) > 0 { + subs = append(subs, sub.shadow...) } - sub.mqtt.qos = qos - if fromSubProto { - as.serializeRetainedMsgsForSub(sess, c, sub, trace) + for _, sub := range subs { + if sub.mqtt == nil { + sub.mqtt = &mqttSub{} + } + sub.mqtt.qos = qos + if fromSubProto { + as.serializeRetainedMsgsForSub(sess, c, sub, trace) + } } } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 1acc9027..1dbbec51 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -683,6 +683,7 @@ func testMQTTWrite(c net.Conn, buf []byte) (int, error) { } func testMQTTConnect(t testing.TB, ci *mqttConnInfo, host string, port int) (net.Conn, *mqttReader) { + t.Helper() return testMQTTConnectRetry(t, ci, host, port, 0) } @@ -3193,6 +3194,56 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) { testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) } +func TestMQTTImportExport(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + server_name: "mqtt" + jetstream { + store_dir=org_dir + } + accounts { + org { + jetstream: enabled + users: [{user: org, password: pwd}] + imports = [{stream: {account: "device", subject: "foo"}, prefix: "org"}] + } + device { + users: [{user: device, password: pwd}] + exports = [{stream: "foo"}] + } + } + mqtt { + listen: "127.0.0.1:-1" + } + no_auth_user: device + `)) + defer os.Remove(conf) + defer os.RemoveAll("org_dir") + + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + + mc1, rc1 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub1", user: "org", pass: "pwd", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc1.Close() + testMQTTCheckConnAck(t, rc1, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc1, rc1, []*mqttFilter{{filter: "org/foo", qos: 0}}, []byte{0}) + testMQTTFlush(t, mc1, nil, rc1) + + mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", user: "org", pass: "pwd", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc2.Close() + testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "org/foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc2, nil, rc2) + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + natsPub(t, nc, "foo", []byte("msg")) + + // Verify message is received on receiver side. + testMQTTCheckPubMsg(t, mc1, rc1, "org/foo", 0, []byte("msg")) + testMQTTCheckPubMsg(t, mc2, rc2, "org/foo", 0, []byte("msg")) +} + func TestMQTTSessionMovingDomains(t *testing.T) { tmpl := strings.Replace(jsClusterTemplWithLeafAndMQTT, "{{leaf}}", `leafnodes { listen: 127.0.0.1:-1 }`, 1) tmpl = strings.Replace(tmpl, "store_dir:", "domain: HUB, store_dir:", 1)