MQTT QoS2 support

This commit is contained in:
Lev Brouk
2023-08-28 11:52:01 -07:00
parent f50b772a14
commit ad2e9d7b8d
4 changed files with 1355 additions and 365 deletions

View File

@@ -4223,7 +4223,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
c.pa.reply = nrr
if changed && c.isMqtt() && c.pa.hdr > 0 {
c.srv.mqttStoreQoS1MsgForAccountOnNewSubject(c.pa.hdr, msg, siAcc.GetName(), to)
c.srv.mqttStoreQoSMsgForAccountOnNewSubject(c.pa.hdr, msg, siAcc.GetName(), to)
}
// FIXME(dlc) - Do L1 cache trick like normal client?

File diff suppressed because it is too large Load Diff

View File

@@ -134,6 +134,18 @@ func testMQTTReadPacket(t testing.TB, r *mqttReader) (byte, int) {
return b, pl
}
func testMQTTReadPIPacket(expectedType byte, t testing.TB, r *mqttReader, expectedPI uint16) {
t.Helper()
b, _ := testMQTTReadPacket(t, r)
if pt := b & mqttPacketMask; pt != expectedType {
t.Fatalf("Expected packet %x, got %x", expectedType, pt)
}
rpi, err := r.readUint16("packet identifier")
if err != nil || rpi != expectedPI {
t.Fatalf("Expected PI %v got: %v, err=%v", expectedPI, rpi, err)
}
}
func TestMQTTReader(t *testing.T) {
r := &mqttReader{}
r.reset([]byte{0, 2, 'a', 'b'})
@@ -1699,39 +1711,6 @@ func TestMQTTDontSetPinger(t *testing.T) {
testMQTTPublish(t, mc, r, 0, false, false, "foo", 0, []byte("msg"))
}
func TestMQTTUnsupportedPackets(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
for _, test := range []struct {
name string
packetType byte
}{
{"pubrec", mqttPacketPubRec},
{"pubrel", mqttPacketPubRel},
{"pubcomp", mqttPacketPubComp},
} {
t.Run(test.name, func(t *testing.T) {
mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
w := &mqttWriter{}
pt := test.packetType
if test.packetType == mqttPacketPubRel {
pt |= byte(0x2)
}
w.WriteByte(pt)
w.WriteVarInt(2)
w.WriteUint16(1)
mc.Write(w.Bytes())
testMQTTExpectDisconnect(t, mc)
})
}
}
func TestMQTTTopicAndSubjectConversion(t *testing.T) {
for _, test := range []struct {
name string
@@ -1933,13 +1912,13 @@ func TestMQTTSubAck(t *testing.T) {
subs := []*mqttFilter{
{filter: "foo", qos: 0},
{filter: "bar", qos: 1},
{filter: "baz", qos: 2}, // Since we don't support, we should receive a result of 1
{filter: "baz", qos: 2},
{filter: "foo/#/bar", qos: 0}, // Invalid sub, so we should receive a result of mqttSubAckFailure
}
expected := []byte{
0,
1,
1,
2,
mqttSubAckFailure,
}
testMQTTSub(t, 1, mc, r, subs, expected)
@@ -1975,37 +1954,48 @@ func testMQTTExpectNothing(t testing.TB, r *mqttReader) {
r.reader.SetReadDeadline(time.Time{})
}
func testMQTTCheckPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, flags byte, payload []byte) {
func testMQTTCheckPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, expectedFlags byte, payload []byte) uint16 {
t.Helper()
pflags, pi := testMQTTGetPubMsg(t, c, r, topic, payload)
if pflags != flags {
t.Fatalf("Expected flags to be %x, got %x", flags, pflags)
pi := testMQTTCheckPubMsgNoAck(t, c, r, topic, expectedFlags, payload)
if pi == 0 {
return 0
}
if pi > 0 {
testMQTTSendPubAck(t, c, pi)
qos := mqttGetQoS(expectedFlags)
switch qos {
case 1:
testMQTTSendPIPacket(mqttPacketPubAck, t, c, pi)
case 2:
testMQTTSendPIPacket(mqttPacketPubRec, t, c, pi)
}
return pi
}
func testMQTTCheckPubMsgNoAck(t testing.TB, c net.Conn, r *mqttReader, topic string, flags byte, payload []byte) uint16 {
func testMQTTCheckPubMsgNoAck(t testing.TB, c net.Conn, r *mqttReader, topic string, expectedFlags byte, payload []byte) uint16 {
t.Helper()
pflags, pi := testMQTTGetPubMsg(t, c, r, topic, payload)
if pflags != flags {
t.Fatalf("Expected flags to be %x, got %x", flags, pflags)
if pflags != expectedFlags {
t.Fatalf("Expected flags to be %x, got %x", expectedFlags, pflags)
}
return pi
}
func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16) {
t.Helper()
flags, pi, _ := testMQTTGetPubMsgEx(t, c, r, topic, payload)
return flags, pi
}
func testMQTTGetPubMsgEx(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16, string) {
func testMQTTGetPubMsgEx(t testing.TB, _ net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16, string) {
t.Helper()
b, pl := testMQTTReadPacket(t, r)
if pt := b & mqttPacketMask; pt != mqttPacketPub {
t.Fatalf("Expected PUBLISH packet %x, got %x", mqttPacketPub, pt)
}
return testMQTTGetPubMsgExEx(t, nil, r, b, pl, topic, payload)
}
func testMQTTGetPubMsgExEx(t testing.TB, _ net.Conn, r *mqttReader, b byte, pl int, topic string, payload []byte) (byte, uint16, string) {
t.Helper()
pflags := b & mqttPacketFlagMask
qos := (pflags & mqttPubFlagQoS) >> 1
start := r.pos
@@ -2036,14 +2026,23 @@ func testMQTTGetPubMsgEx(t testing.TB, c net.Conn, r *mqttReader, topic string,
return pflags, pi, ptopic
}
func testMQTTSendPubAck(t testing.TB, c net.Conn, pi uint16) {
func testMQTTSendPIPacket(packetType byte, t testing.TB, c net.Conn, pi uint16) {
t.Helper()
w := &mqttWriter{}
w.WriteByte(mqttPacketPubAck)
w.WriteByte(packetType)
w.WriteVarInt(2)
w.WriteUint16(pi)
if _, err := testMQTTWrite(c, w.Bytes()); err != nil {
t.Fatalf("Error writing PUBACK: %v", err)
t.Fatalf("Error writing packet type %v: %v", packetType, err)
}
}
func testMQTTPublishNoAcks(t testing.TB, c net.Conn, qos byte, dup, retain bool, topic string, pi uint16, payload []byte) {
t.Helper()
w := &mqttWriter{}
mqttWritePublish(w, qos, dup, retain, topic, pi, payload)
if _, err := testMQTTWrite(c, w.Bytes()); err != nil {
t.Fatalf("Error writing PUBLISH proto: %v", err)
}
}
@@ -2054,12 +2053,8 @@ func testMQTTPublish(t testing.TB, c net.Conn, r *mqttReader, qos byte, dup, ret
if _, err := testMQTTWrite(c, w.Bytes()); err != nil {
t.Fatalf("Error writing PUBLISH proto: %v", err)
}
if qos > 0 {
// Since we don't support QoS 2, we should get disconnected
if qos == 2 {
testMQTTExpectDisconnect(t, c)
return
}
switch qos {
case 1:
b, _ := testMQTTReadPacket(t, r)
if pt := b & mqttPacketMask; pt != mqttPacketPubAck {
t.Fatalf("Expected PUBACK packet %x, got %x", mqttPacketPubAck, pt)
@@ -2068,6 +2063,29 @@ func testMQTTPublish(t testing.TB, c net.Conn, r *mqttReader, qos byte, dup, ret
if err != nil || rpi != pi {
t.Fatalf("Error with packet identifier expected=%v got: %v err=%v", pi, rpi, err)
}
case 2:
b, _ := testMQTTReadPacket(t, r)
if pt := b & mqttPacketMask; pt != mqttPacketPubRec {
t.Fatalf("Expected PUBREC packet %x, got %x", mqttPacketPubRec, pt)
}
rpi, err := r.readUint16("packet identifier")
if err != nil || rpi != pi {
t.Fatalf("Error with packet identifier expected=%v got: %v err=%v", pi, rpi, err)
}
testMQTTSendPIPacket(mqttPacketPubRel, t, c, pi)
b, _ = testMQTTReadPacket(t, r)
if pt := b & mqttPacketMask; pt != mqttPacketPubComp {
t.Fatalf("Expected PUBCOMP packet %x, got %x", mqttPacketPubComp, pt)
}
rpi, err = r.readUint16("packet identifier")
if err != nil || rpi != pi {
t.Fatalf("Error with packet identifier expected=%v got: %v err=%v", pi, rpi, err)
}
testMQTTFlush(t, c, nil, r)
}
}
@@ -2079,7 +2097,7 @@ func TestMQTTParsePub(t *testing.T) {
pl int
err string
}{
{"qos not supported", 0x4, nil, 0, "not supported"},
{"qos not supported", (3 << 1), nil, 0, "QoS=3 is invalid in MQTT"},
{"packet in buffer error", 0, nil, 10, io.ErrUnexpectedEOF.Error()},
{"error on topic", 0, []byte{0, 3, 'f', 'o'}, 4, "topic"},
{"empty topic", 0, []byte{0, 0}, 2, errMQTTTopicIsEmpty.Error()},
@@ -2100,7 +2118,7 @@ func TestMQTTParsePub(t *testing.T) {
}
}
func TestMQTTParsePubAck(t *testing.T) {
func TestMQTTParsePIMsg(t *testing.T) {
for _, test := range []struct {
name string
proto []byte
@@ -2114,7 +2132,7 @@ func TestMQTTParsePubAck(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
r := &mqttReader{}
r.reset(test.proto)
if _, err := mqttParsePubAck(r, test.pl); err == nil || !strings.Contains(err.Error(), test.err) {
if _, err := mqttParsePIPacket(r, test.pl); err == nil || !strings.Contains(err.Error(), test.err) {
t.Fatalf("Expected error %q, got %v", test.err, err)
}
})
@@ -2204,7 +2222,72 @@ func TestMQTTSub(t *testing.T) {
}
}
func TestMQTTSubQoS(t *testing.T) {
func TestMQTTSubQoS2(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
mcp, mpr := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mcp.Close()
testMQTTCheckConnAck(t, mpr, mqttConnAckRCConnectionAccepted, false)
mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
topic := "foo/bar/baz"
mqttTopic0 := "foo/#"
mqttTopic1 := "foo/bar/#"
mqttTopic2 := topic
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: mqttTopic0, qos: 0}}, []byte{0})
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: mqttTopic1, qos: 1}}, []byte{1})
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: mqttTopic2, qos: 2}}, []byte{2})
testMQTTFlush(t, mc, nil, r)
for pubQoS, expectedCounts := range map[byte]map[byte]int{
0: {0: 3},
1: {0: 1, 1: 2},
2: {0: 1, 1: 1, 2: 1},
} {
t.Run(fmt.Sprintf("pubQoS %v", pubQoS), func(t *testing.T) {
pubPI := uint16(456)
testMQTTPublish(t, mcp, mpr, pubQoS, false, false, topic, pubPI, []byte("msg"))
qosCounts := map[byte]int{}
delivered := map[uint16]byte{}
// We have 3 subscriptions, each should receive the message, with the
// QoS that maybe "trimmed" to that of the subscription.
for i := 0; i < 3; i++ {
flags, pi := testMQTTGetPubMsg(t, mc, r, topic, []byte("msg"))
delivered[pi] = flags
qosCounts[mqttGetQoS(flags)]++
}
for pi, flags := range delivered {
switch mqttGetQoS(flags) {
case 1:
testMQTTSendPIPacket(mqttPacketPubAck, t, mc, pi)
case 2:
testMQTTSendPIPacket(mqttPacketPubRec, t, mc, pi)
testMQTTReadPIPacket(mqttPacketPubRel, t, r, pi)
testMQTTSendPIPacket(mqttPacketPubComp, t, mc, pi)
}
}
if !reflect.DeepEqual(qosCounts, expectedCounts) {
t.Fatalf("Expected QoS %#v, got %#v", expectedCounts, qosCounts)
}
})
}
}
func TestMQTTSubQoS1(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
@@ -2252,8 +2335,8 @@ func TestMQTTSubQoS(t *testing.T) {
if pi1 == pi2 {
t.Fatalf("packet identifier for message 1: %v should be different from message 2", pi1)
}
testMQTTSendPubAck(t, mc, pi1)
testMQTTSendPubAck(t, mc, pi2)
testMQTTSendPIPacket(mqttPacketPubAck, t, mc, pi1)
testMQTTSendPIPacket(mqttPacketPubAck, t, mc, pi2)
}
func getSubQoS(sub *subscription) int {
@@ -2286,8 +2369,8 @@ func TestMQTTSubDups(t *testing.T) {
// And also with separate SUBSCRIBE protocols
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "bar", qos: 0}}, []byte{0})
// Ask for QoS 2 but server will downgrade to 1
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "bar", qos: 2}}, []byte{1})
// Ask for QoS 1 but server will downgrade to 1
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "bar", qos: 1}}, []byte{1})
testMQTTFlush(t, mc, nil, r)
// Publish and test msg received only once
@@ -2610,26 +2693,25 @@ func TestMQTTSubWithNATSStream(t *testing.T) {
}
func TestMQTTTrackPendingOverrun(t *testing.T) {
sess := &mqttSession{pending: make(map[uint16]*mqttPending)}
sub := &subscription{mqtt: &mqttSub{qos: 1}}
sess := mqttSession{}
sess.ppi = 0xFFFF
pi, _ := sess.trackPending(1, _EMPTY_, sub)
sess.last_pi = 0xFFFF
pi := sess.trackPublishRetained()
if pi != 1 {
t.Fatalf("Expected 1, got %v", pi)
}
p := &mqttPending{}
for i := 1; i <= 0xFFFF; i++ {
sess.pending[uint16(i)] = p
sess.pendingPublish[uint16(i)] = p
}
pi, _ = sess.trackPending(1, _EMPTY_, sub)
pi, _ = sess.trackPublish("test", "test")
if pi != 0 {
t.Fatalf("Expected 0, got %v", pi)
}
delete(sess.pending, 1234)
pi, _ = sess.trackPending(1, _EMPTY_, sub)
delete(sess.pendingPublish, 1234)
pi = sess.trackPublishRetained()
if pi != 1234 {
t.Fatalf("Expected 1234, got %v", pi)
}
@@ -4694,7 +4776,7 @@ func TestMQTTRedeliveryAckWait(t *testing.T) {
}
}
// Ack first message
testMQTTSendPubAck(t, c, 1)
testMQTTSendPIPacket(mqttPacketPubAck, t, c, 1)
// Redelivery should only be for second message now
for i := 0; i < 2; i++ {
flags := mqttPubQos1 | mqttPubFlagDup
@@ -4715,7 +4797,7 @@ func TestMQTTRedeliveryAckWait(t *testing.T) {
t.Fatalf("Unexpected pi to be 2, got %v", pi)
}
// Now ack second message
testMQTTSendPubAck(t, c, 2)
testMQTTSendPIPacket(mqttPacketPubAck, t, c, 2)
// Flush to make sure it is processed before checking client's maps
testMQTTFlush(t, c, nil, r)
@@ -4724,7 +4806,7 @@ func TestMQTTRedeliveryAckWait(t *testing.T) {
mc.mu.Lock()
sess := mc.mqtt.sess
sess.mu.Lock()
lpi := len(sess.pending)
lpi := len(sess.pendingPublish)
var lsseq int
for _, sseqToPi := range sess.cpending {
lsseq += len(sseqToPi)
@@ -4736,6 +4818,253 @@ func TestMQTTRedeliveryAckWait(t *testing.T) {
}
}
// - [MQTT-3.10.4-3] If a Server deletes a Subscription It MUST complete the
// delivery of any QoS 1 or QoS 2 messages which it has started to send to the
// Client.
//
// Test flow:
// - Subscribe to foo, publish 3 QoS2 messages.
// - After one is PUBCOMP-ed, and one is PUBREC-ed, Unsubscribe.
// - See that the remaining 2 are fully delivered.
func TestMQTTQoS2InflightMsgsDeliveredAfterUnsubscribe(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.AckWait = 10 * time.Millisecond
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
var qos2 byte = 2
cisub := &mqttConnInfo{clientID: "sub", cleanSess: true}
c, r := testMQTTConnect(t, cisub, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: qos2}}, []byte{qos2})
cipub := &mqttConnInfo{clientID: "pub", cleanSess: true}
cp, rp := testMQTTConnect(t, cipub, o.MQTT.Host, o.MQTT.Port)
testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false)
// send 3 messages
testMQTTPublish(t, cp, rp, qos2, false, false, "foo", 441, []byte("data1"))
testMQTTPublish(t, cp, rp, qos2, false, false, "foo", 442, []byte("data2"))
testMQTTPublish(t, cp, rp, qos2, false, false, "foo", 443, []byte("data3"))
testMQTTDisconnect(t, cp, nil)
cp.Close()
subPI1 := testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data1"))
subPI2 := testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data2"))
// subPI3 := testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data3"))
_ = testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data3"))
// fully receive first message
testMQTTSendPIPacket(mqttPacketPubRec, t, c, subPI1)
testMQTTReadPIPacket(mqttPacketPubRel, t, r, subPI1)
testMQTTSendPIPacket(mqttPacketPubComp, t, c, subPI1)
// Do not PUBCOMP the 2nd message yet.
testMQTTSendPIPacket(mqttPacketPubRec, t, c, subPI2)
testMQTTReadPIPacket(mqttPacketPubRel, t, r, subPI2)
// Unsubscribe
testMQTTUnsub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: qos2}})
// We expect that PI2 and PI3 will continue to be delivered, from their
// respective states.
gotPI2PubRel := false
// TODO: Currently, we do not get the unacknowledged PUBLISH re-delivered
// after an UNSUBSCRIBE. Ongoing discussion if we should/must.
// gotPI3Publish := false
// gotPI3PubRel := false
for !gotPI2PubRel /* || !gotPI3Publish || !gotPI3PubRel */ {
b, _ /* len */ := testMQTTReadPacket(t, r)
switch b & mqttPacketMask {
case mqttPacketPubRel:
pi, err := r.readUint16("packet identifier")
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
switch pi {
case subPI2:
testMQTTSendPIPacket(mqttPacketPubComp, t, c, pi)
gotPI2PubRel = true
// case subPI3:
// testMQTTSendPIPacket(mqttPacketPubComp, t, c, pi)
// gotPI3PubRel = true
default:
t.Fatalf("Expected PI %v got: %v", subPI2, pi)
}
// case mqttPacketPub:
// _, pi, _ := testMQTTGetPubMsgExEx(t, c, r, b, len, "foo", []byte("data3"))
// if pi != subPI3 {
// t.Fatalf("Expected PI %v got: %v", subPI3, pi)
// }
// gotPI3Publish = true
// testMQTTSendPIPacket(mqttPacketPubRec, t, c, subPI3)
default:
t.Fatalf("Unexpected packet type: %v", b&mqttPacketMask)
}
}
testMQTTExpectNothing(t, r)
}
func TestMQTTQoS2RejectPublishDuplicates(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
var qos2 byte = 2
cisub := &mqttConnInfo{clientID: "sub", cleanSess: true}
c, r := testMQTTConnect(t, cisub, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: qos2}}, []byte{qos2})
cipub := &mqttConnInfo{clientID: "pub", cleanSess: true}
cp, rp := testMQTTConnect(t, cipub, o.MQTT.Host, o.MQTT.Port)
defer cp.Close()
testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false)
// Publish 3 different with same PI before we get any PUBREC back, then
// complete the PUBREL/PUBCOMP flow as needed. Only one message (first
// payload) should be delivered. PUBRECs,
var pubPI uint16 = 444
testMQTTPublishNoAcks(t, cp, qos2, false, false, "foo", pubPI, []byte("data1"))
testMQTTPublishNoAcks(t, cp, qos2, true, false, "foo", pubPI, []byte("data2"))
testMQTTPublishNoAcks(t, cp, qos2, false, false, "foo", pubPI, []byte("data3"))
for i := 0; i < 3; i++ {
// [MQTT-4.3.3-1] The receiver
//
// - MUST respond with a PUBREC containing the Packet Identifier from
// the incoming PUBLISH Packet, having accepted ownership of the
// Application Message.
//
// - Until it has received the corresponding PUBREL packet, the Receiver
// MUST acknowledge any subsequent PUBLISH packet with the same Packet
// Identifier by sending a PUBREC. It MUST NOT cause duplicate messages
// to be delivered to any onward recipients in this case.
testMQTTReadPIPacket(mqttPacketPubRec, t, rp, pubPI)
}
for i := 0; i < 3; i++ {
testMQTTSendPIPacket(mqttPacketPubRel, t, cp, pubPI)
}
for i := 0; i < 3; i++ {
// [MQTT-4.3.3-1] MUST respond to a PUBREL packet by sending a PUBCOMP
// packet containing the same Packet Identifier as the PUBREL.
testMQTTReadPIPacket(mqttPacketPubComp, t, rp, pubPI)
}
// [MQTT-4.3.3-1] After it has sent a PUBCOMP, the receiver MUST treat any
// subsequent PUBLISH packet that contains that Packet Identifier as being a
// new publication.
//
// Publish another message, identical to the first one. Since the server
// already sent us a PUBCOMP, it will deliver this message, for a total of 2
// delivered.
testMQTTPublish(t, cp, rp, qos2, false, false, "foo", pubPI, []byte("data5"))
testMQTTDisconnect(t, cp, nil)
cp.Close()
// Verify we got a total of 2 messages.
subPI1 := testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data1"))
subPI2 := testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data5"))
for _, pi := range []uint16{subPI1, subPI2} {
testMQTTSendPIPacket(mqttPacketPubRec, t, c, pi)
testMQTTReadPIPacket(mqttPacketPubRel, t, r, pi)
testMQTTSendPIPacket(mqttPacketPubComp, t, c, pi)
}
testMQTTExpectNothing(t, r)
}
func TestMQTTQoS2RetriesPublish(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.AckWait = 10 * time.Millisecond
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
var qos2 byte = 2
cisub := &mqttConnInfo{clientID: "sub", cleanSess: true}
c, r := testMQTTConnect(t, cisub, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: qos2}}, []byte{qos2})
cipub := &mqttConnInfo{clientID: "pub", cleanSess: true}
cp, rp := testMQTTConnect(t, cipub, o.MQTT.Host, o.MQTT.Port)
defer cp.Close()
testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false)
// Publish a message and close the pub connection.
var pubPI uint16 = 444
testMQTTPublish(t, cp, rp, qos2, false, false, "foo", pubPI, []byte("data1"))
testMQTTDisconnect(t, cp, nil)
cp.Close()
// See that we got the message delivered to the sub, but don't PUBREC it
// yet.
subPI := testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data1"))
// See that the message is redelivered again 3 times, with the DUP on, before we PUBREC it.
for i := 0; i < 3; i++ {
expectedFlags := mqttPubQoS2 | mqttPubFlagDup
pi := testMQTTCheckPubMsgNoAck(t, c, r, "foo", expectedFlags, []byte("data1"))
if pi != subPI {
t.Fatalf("Expected pi to be %v, got %v", subPI, pi)
}
}
// Finish the exchange and make sure there are no more attempts.
testMQTTSendPIPacket(mqttPacketPubRec, t, c, subPI)
testMQTTReadPIPacket(mqttPacketPubRel, t, r, subPI)
testMQTTSendPIPacket(mqttPacketPubComp, t, c, subPI)
testMQTTExpectNothing(t, r)
}
func TestMQTTQoS2RetriesPubRel(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.AckWait = 10 * time.Millisecond
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
var qos2 byte = 2
cisub := &mqttConnInfo{clientID: "sub", cleanSess: true}
c, r := testMQTTConnect(t, cisub, o.MQTT.Host, o.MQTT.Port)
defer c.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: qos2}}, []byte{qos2})
cipub := &mqttConnInfo{clientID: "pub", cleanSess: true}
cp, rp := testMQTTConnect(t, cipub, o.MQTT.Host, o.MQTT.Port)
defer cp.Close()
testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false)
// Publish a message and close the pub connection.
var pubPI uint16 = 444
testMQTTPublish(t, cp, rp, qos2, false, false, "foo", pubPI, []byte("data1"))
testMQTTDisconnect(t, cp, nil)
cp.Close()
// See that we got the message delivered to the sub, PUBREC it and expect a
// PUBREL from the server.
subPI := testMQTTCheckPubMsgNoAck(t, c, r, "foo", mqttPubQoS2, []byte("data1"))
testMQTTSendPIPacket(mqttPacketPubRec, t, c, subPI)
// See that we get PUBREL redelivered several times, there's no DUP flag to
// check.
testMQTTReadPIPacket(mqttPacketPubRel, t, r, subPI)
testMQTTReadPIPacket(mqttPacketPubRel, t, r, subPI)
testMQTTReadPIPacket(mqttPacketPubRel, t, r, subPI)
// Finish the exchange and make sure there are no more attempts.
testMQTTSendPIPacket(mqttPacketPubComp, t, c, subPI)
testMQTTExpectNothing(t, r)
}
func TestMQTTAckWaitConfigChange(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.AckWait = 250 * time.Millisecond
@@ -4842,7 +5171,7 @@ func TestMQTTUnsubscribeWithPendingAcks(t *testing.T) {
mc.mu.Lock()
sess := mc.mqtt.sess
sess.mu.Lock()
pal := len(sess.pending)
pal := len(sess.pendingPublish)
sess.mu.Unlock()
mc.mu.Unlock()
if pal != 0 {
@@ -4880,7 +5209,7 @@ func TestMQTTMaxAckPending(t *testing.T) {
testMQTTExpectNothing(t, r)
// Now ack first message
testMQTTSendPubAck(t, c, pi)
testMQTTSendPIPacket(mqttPacketPubAck, t, c, pi)
// Now we should receive message 2
testMQTTCheckPubMsg(t, c, r, "foo", mqttPubQos1, []byte("msg2"))
testMQTTDisconnect(t, c, nil)
@@ -4909,7 +5238,7 @@ func TestMQTTMaxAckPending(t *testing.T) {
testMQTTExpectNothing(t, r)
// Ack and get the next
testMQTTSendPubAck(t, c, pi)
testMQTTSendPIPacket(mqttPacketPubAck, t, c, pi)
testMQTTCheckPubMsg(t, c, r, "foo", mqttPubQos1, []byte("msg4"))
// Make sure this message gets ack'ed
@@ -4918,7 +5247,7 @@ func TestMQTTMaxAckPending(t *testing.T) {
mcli.mu.Lock()
sess := mcli.mqtt.sess
sess.mu.Lock()
np := len(sess.pending)
np := len(sess.pendingPublish)
sess.mu.Unlock()
mcli.mu.Unlock()
if np != 0 {
@@ -4956,7 +5285,7 @@ func TestMQTTMaxAckPending(t *testing.T) {
testMQTTExpectNothing(t, r)
// Ack and get the next
testMQTTSendPubAck(t, c, pi)
testMQTTSendPIPacket(mqttPacketPubAck, t, c, pi)
testMQTTCheckPubMsg(t, c, r, "foo", mqttPubQos1, []byte("msg6"))
}
@@ -4991,7 +5320,7 @@ func TestMQTTMaxAckPendingForMultipleSubs(t *testing.T) {
testMQTTExpectNothing(t, r)
// Ack the first message.
testMQTTSendPubAck(t, c, pi)
testMQTTSendPIPacket(mqttPacketPubAck, t, c, pi)
// Now we should get the second message
testMQTTCheckPubMsg(t, c, r, "bar", mqttPubQos1|mqttPubFlagDup, []byte("msg2"))

View File

@@ -517,21 +517,24 @@ type MQTTOpts struct {
// Set of allowable certificates
TLSPinnedCerts PinnedCertSet
// AckWait is the amount of time after which a QoS 1 message sent to
// a client is redelivered as a DUPLICATE if the server has not
// received the PUBACK on the original Packet Identifier.
// The value has to be positive.
// Zero will cause the server to use the default value (30 seconds).
// Note that changes to this option is applied only to new MQTT subscriptions.
// AckWait is the amount of time after which a QoS 1 or 2 message sent to a
// client is redelivered as a DUPLICATE if the server has not received the
// PUBACK on the original Packet Identifier. The same value applies to
// PubRel redelivery. The value has to be positive. Zero will cause the
// server to use the default value (30 seconds). Note that changes to this
// option is applied only to new MQTT subscriptions (or sessions for
// PubRels).
AckWait time.Duration
// MaxAckPending is the amount of QoS 1 messages the server can send to
// a subscription without receiving any PUBACK for those messages.
// The valid range is [0..65535].
// MaxAckPending is the amount of QoS 1 and 2 messages (combined) the server
// can send to a subscription without receiving any PUBACK for those
// messages. The valid range is [0..65535].
//
// The total of subscriptions' MaxAckPending on a given session cannot
// exceed 65535. Attempting to create a subscription that would bring
// the total above the limit would result in the server returning 0x80
// in the SUBACK for this subscription.
// exceed 65535. Attempting to create a subscription that would bring the
// total above the limit would result in the server returning 0x80 in the
// SUBACK for this subscription.
//
// Due to how the NATS Server handles the MQTT "#" wildcard, each
// subscription ending with "#" will use 2 times the MaxAckPending value.
// Note that changes to this option is applied only to new subscriptions.