From 66b1b51182d288de9ed986b60be137197f504428 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 23 May 2022 11:28:18 -0600 Subject: [PATCH] [FIXED] MQTT: Errors deleting consumers will now prevent deletion of session When there was a failure to delete a QoS1 consumer, the session would still be deleted, which would cause orphaned consumers. In case of error, the session record will not be deleted, which means that it is still possible to restart the session and then close it (with the clean flag). Relates to #3116 Signed-off-by: Ivan Kozlovic --- server/mqtt.go | 9 +++++-- server/mqtt_test.go | 59 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index a371268d..a3a31bae 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -837,7 +837,9 @@ func (s *Server) mqttHandleClosedClient(c *client) { // This needs to be done outside of any lock. if doClean { - sess.clear() + if err := sess.clear(); err != nil { + c.Errorf(err.Error()) + } } // Now handle the "will". This function will be a no-op if there is no "will" to send. @@ -2284,7 +2286,10 @@ func (sess *mqttSession) clear() error { } sess.subs, sess.pending, sess.cpending, sess.seq, sess.tmaxack = nil, nil, nil, 0, 0 for _, dur := range durs { - sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))}) + if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) { + sess.mu.Unlock() + return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err) + } } sess.mu.Unlock() if seq > 0 { diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 3626d40f..c9f6d033 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -5881,6 +5881,65 @@ func TestMQTTConsumerReplicasExceedsParentStream(t *testing.T) { } } +type unableToDeleteConsLogger struct { + DummyLogger + errCh chan string +} + +func (l *unableToDeleteConsLogger) Errorf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, "unable to delete consumer") { + l.errCh <- msg + } +} + +func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) { + org := mqttJSAPITimeout + mqttJSAPITimeout = 1000 * time.Millisecond + defer func() { mqttJSAPITimeout = org }() + + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2) + defer cl.shutdown() + + o := cl.opts[0] + s1 := cl.servers[0] + // Plug error logger to s1 + l := &unableToDeleteConsLogger{errCh: make(chan string, 10)} + s1.SetLogger(l, false, false) + + nc, js := jsClientConnect(t, s1) + defer nc.Close() + + mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) + + testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc, nil, r) + + // Now shutdown server 2, we should lose quorum + cl.servers[1].Shutdown() + + // Close the MQTT client: + testMQTTDisconnect(t, mc, nil) + + // We should have reported that there was an error deleting the consumer + select { + case <-l.errCh: + // OK + case <-time.After(time.Second): + t.Fatal("Server did not report any error") + } + + // Now restart the server 2 so that we can check that the session is still persisted. + cl.restartAllSamePorts() + cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName) + + si, err := js.StreamInfo(mqttSessStreamName) + require_NoError(t, err) + require_True(t, si.State.Msgs == 1) +} + ////////////////////////////////////////////////////////////////////////// // // Benchmarks