MQTT handle idempotent session stream creation.

Creating a stream will become idempotent, so assuming that we
should try to transfer the old session streams only on success
will no longer work.

Added a test that checks that "stream" list is queried only once
which means transfer was attempted only once after the second
cluster restart and new connection.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-09-15 10:01:11 -06:00
parent d6f9a06f05
commit ec774c9347

View File

@@ -5016,10 +5016,6 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) {
t.Fatalf("Error on publish: %v", err)
}
nc.Close()
cl.stopAll()
cl.restartAll()
cl.waitOnStreamLeader(globalAccountName, sessStreamName1)
cl.waitOnStreamLeader(globalAccountName, sessStreamName2)
@@ -5029,9 +5025,6 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) {
defer sc.Close()
testMQTTCheckConnAck(t, sr, mqttConnAckRCConnectionAccepted, true)
nc, js = jsClientConnect(t, cl.randomServer())
defer nc.Close()
// Check that old session stream is gone, but the non session stream is still present.
var gotIt = false
for info := range js.StreamsInfo() {
@@ -5065,6 +5058,22 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) {
if cons, ok := ps2.Cons["foo"]; !ok || !reflect.DeepEqual(cons, ps.Cons["foo"]) {
t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps)
}
// Make sure we don't attempt to transfer again by creating a subscription
// on the "stream names" API, which is used to get the list of streams to transfer
sub := natsSubSync(t, nc, JSApiStreams)
// Make sure to connect an MQTT client from a different node so that this node
// gets a connection for the account for the first time and tries to create
// all MQTT streams, etc..
o = cl.opts[1]
sc, sr = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub2"}, o.MQTT.Host, o.MQTT.Port, 10)
defer sc.Close()
testMQTTCheckConnAck(t, sr, mqttConnAckRCConnectionAccepted, false)
if _, err := sub.NextMsg(200 * time.Millisecond); err == nil {
t.Fatal("Looks like attempt to transfer was done again")
}
}
func TestMQTTConnectAndDisconnectEvent(t *testing.T) {