diff --git a/server/mqtt.go b/server/mqtt.go index 9353563a..ed1893e3 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -3047,6 +3047,13 @@ func (s *Server) mqttProcessPub(c *client, pp *mqttPublish) error { // Unless we have a publish permission error, if the message is QoS1, then we // need to store the message (and deliver it to JS durable consumers). if _, permIssue := c.processInboundClientMsg(msgToSend); !permIssue && mqttGetQoS(pp.flags) > 0 { + // We need to call flushClients now since this we may have called c.addToPCD + // with destination clients (possibly a route). Without calling flushClients + // the following call may then be stuck waiting for a reply that may never + // come because the destination is not flushed (due to c.out.fsp > 0, + // see addToPCD and writeLoop for details). + c.flushClients(0) + // Store this QoS1 message. _, err = c.mqtt.sess.jsa.storeMsg(mqttStreamSubjectPrefix+string(c.pa.subject), c.pa.hdr, msgToSend) } c.pa.subject, c.pa.mapped, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.reply = nil, nil, -1, 0, nil, nil diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 0c659df0..31bee2cf 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -2791,7 +2791,7 @@ func TestMQTTCluster(t *testing.T) { } }) } - if topTest.restart { + if !t.Failed() && topTest.restart { cl.stopAll() cl.restartAll()