[FIXED] MQTT: Errors deleting consumers will now prevent deletion of session

When there was a failure to delete a QoS1 consumer, the session
would still be deleted, which would cause orphaned consumers.

In case of error, the session record will not be deleted, which means
that it is still possible to restart the session and then close
it (with the clean flag).

Relates to #3116

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-05-23 11:28:18 -06:00
parent 75129609bb
commit 66b1b51182
2 changed files with 66 additions and 2 deletions

View File

@@ -837,7 +837,9 @@ func (s *Server) mqttHandleClosedClient(c *client) {
// This needs to be done outside of any lock.
if doClean {
sess.clear()
if err := sess.clear(); err != nil {
c.Errorf(err.Error())
}
}
// Now handle the "will". This function will be a no-op if there is no "will" to send.
@@ -2284,7 +2286,10 @@ func (sess *mqttSession) clear() error {
}
sess.subs, sess.pending, sess.cpending, sess.seq, sess.tmaxack = nil, nil, nil, 0, 0
for _, dur := range durs {
sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))})
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) {
sess.mu.Unlock()
return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err)
}
}
sess.mu.Unlock()
if seq > 0 {

View File

@@ -5881,6 +5881,65 @@ func TestMQTTConsumerReplicasExceedsParentStream(t *testing.T) {
}
}
type unableToDeleteConsLogger struct {
DummyLogger
errCh chan string
}
func (l *unableToDeleteConsLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "unable to delete consumer") {
l.errCh <- msg
}
}
func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) {
org := mqttJSAPITimeout
mqttJSAPITimeout = 1000 * time.Millisecond
defer func() { mqttJSAPITimeout = org }()
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2)
defer cl.shutdown()
o := cl.opts[0]
s1 := cl.servers[0]
// Plug error logger to s1
l := &unableToDeleteConsLogger{errCh: make(chan string, 10)}
s1.SetLogger(l, false, false)
nc, js := jsClientConnect(t, s1)
defer nc.Close()
mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
testMQTTFlush(t, mc, nil, r)
// Now shutdown server 2, we should lose quorum
cl.servers[1].Shutdown()
// Close the MQTT client:
testMQTTDisconnect(t, mc, nil)
// We should have reported that there was an error deleting the consumer
select {
case <-l.errCh:
// OK
case <-time.After(time.Second):
t.Fatal("Server did not report any error")
}
// Now restart the server 2 so that we can check that the session is still persisted.
cl.restartAllSamePorts()
cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName)
si, err := js.StreamInfo(mqttSessStreamName)
require_NoError(t, err)
require_True(t, si.State.Msgs == 1)
}
//////////////////////////////////////////////////////////////////////////
//
// Benchmarks