diff --git a/server/mqtt.go b/server/mqtt.go index a371268d..a3a31bae 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -837,7 +837,9 @@ func (s *Server) mqttHandleClosedClient(c *client) { // This needs to be done outside of any lock. if doClean { - sess.clear() + if err := sess.clear(); err != nil { + c.Errorf(err.Error()) + } } // Now handle the "will". This function will be a no-op if there is no "will" to send. @@ -2284,7 +2286,10 @@ func (sess *mqttSession) clear() error { } sess.subs, sess.pending, sess.cpending, sess.seq, sess.tmaxack = nil, nil, nil, 0, 0 for _, dur := range durs { - sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))}) + if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) { + sess.mu.Unlock() + return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err) + } } sess.mu.Unlock() if seq > 0 { diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 3626d40f..c9f6d033 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -5881,6 +5881,65 @@ func TestMQTTConsumerReplicasExceedsParentStream(t *testing.T) { } } +type unableToDeleteConsLogger struct { + DummyLogger + errCh chan string +} + +func (l *unableToDeleteConsLogger) Errorf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, "unable to delete consumer") { + l.errCh <- msg + } +} + +func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) { + org := mqttJSAPITimeout + mqttJSAPITimeout = 1000 * time.Millisecond + defer func() { mqttJSAPITimeout = org }() + + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2) + defer cl.shutdown() + + o := cl.opts[0] + s1 := cl.servers[0] + // Plug error logger to s1 + l := &unableToDeleteConsLogger{errCh: make(chan string, 10)} + s1.SetLogger(l, false, false) + + nc, js := jsClientConnect(t, s1) + defer nc.Close() + + mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) + + testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc, nil, r) + + // Now shutdown server 2, we should lose quorum + cl.servers[1].Shutdown() + + // Close the MQTT client: + testMQTTDisconnect(t, mc, nil) + + // We should have reported that there was an error deleting the consumer + select { + case <-l.errCh: + // OK + case <-time.After(time.Second): + t.Fatal("Server did not report any error") + } + + // Now restart the server 2 so that we can check that the session is still persisted. + cl.restartAllSamePorts() + cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName) + + si, err := js.StreamInfo(mqttSessStreamName) + require_NoError(t, err) + require_True(t, si.State.Msgs == 1) +} + ////////////////////////////////////////////////////////////////////////// // // Benchmarks