Only discard messages from MQTT QoS0 from internal jetstream clients if really a QoS1 jetstream publish.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-07-12 14:46:06 -07:00
parent 0c8552cd34
commit 1f39d744dd
2 changed files with 47 additions and 4 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2021 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -3407,8 +3407,8 @@ func mqttSubscribeTrace(pi uint16, filters []*mqttFilter) string {
// message and this is the callback for a QoS1 subscription because in
// that case, it will be handled by the other callback. This avoid getting
// duplicate deliveries.
func mqttDeliverMsgCbQos0(sub *subscription, pc *client, _ *Account, subject, _ string, rmsg []byte) {
if pc.kind == JETSTREAM {
func mqttDeliverMsgCbQos0(sub *subscription, pc *client, _ *Account, subject, reply string, rmsg []byte) {
if pc.kind == JETSTREAM && len(reply) > 0 && strings.HasPrefix(reply, jsAckPre) {
return
}

View File

@@ -1,4 +1,4 @@
// Copyright 2020 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -6367,6 +6367,49 @@ func TestMQTTSubjectWildcardStart(t *testing.T) {
require_True(t, si.State.Msgs == 0)
}
// Issue https://github.com/nats-io/nats-server/issues/4291
func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
server_name: mqtt
jetstream: enabled
mqtt {
listen: 127.0.0.1:-1
}
`))
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Setup stream with republish on it.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
RePublish: &nats.RePublish{
Source: "foo",
Destination: "mqtt.foo",
},
})
require_NoError(t, err)
// Create QoS0 subscriber to catch re-publishes.
mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "mqtt/foo", qos: 0}}, []byte{0})
testMQTTFlush(t, mc, nil, r)
msg := []byte("HELLO WORLD")
_, err = js.Publish("foo", msg)
require_NoError(t, err)
testMQTTCheckPubMsg(t, mc, r, "mqtt/foo", 0, msg)
testMQTTExpectNothing(t, r)
}
//////////////////////////////////////////////////////////////////////////
//
// Benchmarks