mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
@@ -2913,6 +2913,7 @@ func TestMQTTPublishRetain(t *testing.T) {
|
||||
defer mc1.Close()
|
||||
testMQTTCheckConnAck(t, rs1, mqttConnAckRCConnectionAccepted, false)
|
||||
testMQTTPublish(t, mc1, rs1, 0, false, test.retained, "foo", 0, []byte(test.sentValue))
|
||||
testMQTTFlush(t, mc1, nil, rs1)
|
||||
|
||||
mc2, rs2 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc2.Close()
|
||||
@@ -2960,6 +2961,7 @@ func TestMQTTPublishRetainPermViolation(t *testing.T) {
|
||||
defer mc1.Close()
|
||||
testMQTTCheckConnAck(t, rs1, mqttConnAckRCConnectionAccepted, false)
|
||||
testMQTTPublish(t, mc1, rs1, 0, false, true, "bar", 0, []byte("retained"))
|
||||
testMQTTFlush(t, mc1, nil, rs1)
|
||||
|
||||
mc2, rs2 := testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc2.Close()
|
||||
@@ -3357,6 +3359,7 @@ func TestMQTTPersistRetainedMsg(t *testing.T) {
|
||||
testMQTTPublish(t, c, r, 0, false, true, "baz", 1, []byte("baz1"))
|
||||
// Remove bar
|
||||
testMQTTPublish(t, c, r, 1, false, true, "bar", 1, nil)
|
||||
testMQTTFlush(t, c, nil, r)
|
||||
testMQTTDisconnect(t, c, nil)
|
||||
c.Close()
|
||||
|
||||
@@ -3676,6 +3679,21 @@ func TestMQTTMaxAckPending(t *testing.T) {
|
||||
testMQTTSendPubAck(t, c, pi)
|
||||
testMQTTCheckPubMsg(t, c, r, "foo", mqttPubQos1, []byte("msg4"))
|
||||
|
||||
// Make sure this message gets ack'ed
|
||||
mcli := testMQTTGetClient(t, s, cisub.clientID)
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
mcli.mu.Lock()
|
||||
sess := mcli.mqtt.sess
|
||||
sess.mu.Lock()
|
||||
np := len(sess.pending)
|
||||
sess.mu.Unlock()
|
||||
mcli.mu.Unlock()
|
||||
if np != 0 {
|
||||
return fmt.Errorf("Still %v pending messages", np)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Check that change to config does not prevent restart of sub.
|
||||
cp.Close()
|
||||
c.Close()
|
||||
|
||||
Reference in New Issue
Block a user