MQTT: Fixed issue that could cause time out storing messages

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2023-03-25 16:46:59 -06:00
parent fe5d6bede4
commit 3d495435c0
2 changed files with 8 additions and 1 deletions

View File

@@ -3047,6 +3047,13 @@ func (s *Server) mqttProcessPub(c *client, pp *mqttPublish) error {
// Unless we have a publish permission error, if the message is QoS1, then we
// need to store the message (and deliver it to JS durable consumers).
if _, permIssue := c.processInboundClientMsg(msgToSend); !permIssue && mqttGetQoS(pp.flags) > 0 {
// We need to call flushClients now since this we may have called c.addToPCD
// with destination clients (possibly a route). Without calling flushClients
// the following call may then be stuck waiting for a reply that may never
// come because the destination is not flushed (due to c.out.fsp > 0,
// see addToPCD and writeLoop for details).
c.flushClients(0)
// Store this QoS1 message.
_, err = c.mqtt.sess.jsa.storeMsg(mqttStreamSubjectPrefix+string(c.pa.subject), c.pa.hdr, msgToSend)
}
c.pa.subject, c.pa.mapped, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.reply = nil, nil, -1, 0, nil, nil

View File

@@ -2791,7 +2791,7 @@ func TestMQTTCluster(t *testing.T) {
}
})
}
if topTest.restart {
if !t.Failed() && topTest.restart {
cl.stopAll()
cl.restartAll()