From 1f39d744dd20263711077c55be6fc716ffb8e5b4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 12 Jul 2023 14:46:06 -0700 Subject: [PATCH] Only discard messages from MQTT QoS0 from internal jetstream clients if really a QoS1 jetstream publish. Signed-off-by: Derek Collison --- server/mqtt.go | 6 +++--- server/mqtt_test.go | 45 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index 2d214e01..8680e8dd 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3407,8 +3407,8 @@ func mqttSubscribeTrace(pi uint16, filters []*mqttFilter) string { // message and this is the callback for a QoS1 subscription because in // that case, it will be handled by the other callback. This avoid getting // duplicate deliveries. -func mqttDeliverMsgCbQos0(sub *subscription, pc *client, _ *Account, subject, _ string, rmsg []byte) { - if pc.kind == JETSTREAM { +func mqttDeliverMsgCbQos0(sub *subscription, pc *client, _ *Account, subject, reply string, rmsg []byte) { + if pc.kind == JETSTREAM && len(reply) > 0 && strings.HasPrefix(reply, jsAckPre) { return } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index a5068ac7..625c206b 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -6367,6 +6367,49 @@ func TestMQTTSubjectWildcardStart(t *testing.T) { require_True(t, si.State.Msgs == 0) } +// Issue https://github.com/nats-io/nats-server/issues/4291 +func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + server_name: mqtt + jetstream: enabled + mqtt { + listen: 127.0.0.1:-1 + } + `)) + s, o := RunServerWithConfig(conf) + defer testMQTTShutdownServer(s) + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Setup stream with republish on it. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + RePublish: &nats.RePublish{ + Source: "foo", + Destination: "mqtt.foo", + }, + }) + require_NoError(t, err) + + // Create QoS0 subscriber to catch re-publishes. + mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) + + testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "mqtt/foo", qos: 0}}, []byte{0}) + testMQTTFlush(t, mc, nil, r) + + msg := []byte("HELLO WORLD") + _, err = js.Publish("foo", msg) + require_NoError(t, err) + + testMQTTCheckPubMsg(t, mc, r, "mqtt/foo", 0, msg) + testMQTTExpectNothing(t, r) +} + ////////////////////////////////////////////////////////////////////////// // // Benchmarks