[FIXED] MQTT: panic when using import/export

The issue was that the subscription created for the MQTT client
was resulting in creation of a shadow subscription which did not
have the mqtt specific object attached, which would cause the
panic when accessing it in the sub's icb.

After that, it was discovered that the wrong subject was passed
to deliverMsg(), so fixed that too so that the icb callback gets
the proper transformed subject.

Resolves #2265

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-06-08 15:03:12 -06:00
parent 376b60d297
commit 308be7ecd3
3 changed files with 64 additions and 7 deletions

View File

@@ -4035,12 +4035,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Remap to the original subject if internal.
if sub.icb != nil && sub.rsi {
subj = subject
dsubj = subject
}
// Normal delivery
mh := c.msgHeader(dsubj, creply, sub)
didDeliver = c.deliverMsg(sub, subj, creply, mh, msg, rplyHasGWPrefix) || didDeliver
didDeliver = c.deliverMsg(sub, dsubj, creply, mh, msg, rplyHasGWPrefix) || didDeliver
}
// Set these up to optionally filter based on the queue lists.

View File

@@ -1698,12 +1698,18 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
// Helper that sets the sub's mqtt fields and possibly serialize retained messages.
// Assumes account manager and session lock held.
setupSub := func(sub *subscription, qos byte) {
if sub.mqtt == nil {
sub.mqtt = &mqttSub{}
subs := []*subscription{sub}
if len(sub.shadow) > 0 {
subs = append(subs, sub.shadow...)
}
sub.mqtt.qos = qos
if fromSubProto {
as.serializeRetainedMsgsForSub(sess, c, sub, trace)
for _, sub := range subs {
if sub.mqtt == nil {
sub.mqtt = &mqttSub{}
}
sub.mqtt.qos = qos
if fromSubProto {
as.serializeRetainedMsgsForSub(sess, c, sub, trace)
}
}
}

View File

@@ -683,6 +683,7 @@ func testMQTTWrite(c net.Conn, buf []byte) (int, error) {
}
func testMQTTConnect(t testing.TB, ci *mqttConnInfo, host string, port int) (net.Conn, *mqttReader) {
t.Helper()
return testMQTTConnectRetry(t, ci, host, port, 0)
}
@@ -3193,6 +3194,56 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) {
testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg"))
}
func TestMQTTImportExport(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: "127.0.0.1:-1"
server_name: "mqtt"
jetstream {
store_dir=org_dir
}
accounts {
org {
jetstream: enabled
users: [{user: org, password: pwd}]
imports = [{stream: {account: "device", subject: "foo"}, prefix: "org"}]
}
device {
users: [{user: device, password: pwd}]
exports = [{stream: "foo"}]
}
}
mqtt {
listen: "127.0.0.1:-1"
}
no_auth_user: device
`))
defer os.Remove(conf)
defer os.RemoveAll("org_dir")
s, o := RunServerWithConfig(conf)
defer s.Shutdown()
mc1, rc1 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub1", user: "org", pass: "pwd", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc1.Close()
testMQTTCheckConnAck(t, rc1, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc1, rc1, []*mqttFilter{{filter: "org/foo", qos: 0}}, []byte{0})
testMQTTFlush(t, mc1, nil, rc1)
mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", user: "org", pass: "pwd", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc2.Close()
testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "org/foo", qos: 1}}, []byte{1})
testMQTTFlush(t, mc2, nil, rc2)
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
natsPub(t, nc, "foo", []byte("msg"))
// Verify message is received on receiver side.
testMQTTCheckPubMsg(t, mc1, rc1, "org/foo", 0, []byte("msg"))
testMQTTCheckPubMsg(t, mc2, rc2, "org/foo", 0, []byte("msg"))
}
func TestMQTTSessionMovingDomains(t *testing.T) {
tmpl := strings.Replace(jsClusterTemplWithLeafAndMQTT, "{{leaf}}", `leafnodes { listen: 127.0.0.1:-1 }`, 1)
tmpl = strings.Replace(tmpl, "store_dir:", "domain: HUB, store_dir:", 1)