From ec774c934772bce8fa4fc3c098720cfc800c2de3 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 15 Sep 2021 10:01:11 -0600 Subject: [PATCH] MQTT handle idempotent session stream creation. Creating a stream will become idempotent, so assuming that we should try to transfer the old session streams only on success will no longer work. Added a test that checks that "stream" list is queried only once which means transfer was attempted only once after the second cluster restart and new connection. Signed-off-by: Ivan Kozlovic --- server/mqtt_test.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 08b1b7ed..411cef01 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -5016,10 +5016,6 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) { t.Fatalf("Error on publish: %v", err) } - nc.Close() - cl.stopAll() - cl.restartAll() - cl.waitOnStreamLeader(globalAccountName, sessStreamName1) cl.waitOnStreamLeader(globalAccountName, sessStreamName2) @@ -5029,9 +5025,6 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) { defer sc.Close() testMQTTCheckConnAck(t, sr, mqttConnAckRCConnectionAccepted, true) - nc, js = jsClientConnect(t, cl.randomServer()) - defer nc.Close() - // Check that old session stream is gone, but the non session stream is still present. var gotIt = false for info := range js.StreamsInfo() { @@ -5065,6 +5058,22 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) { if cons, ok := ps2.Cons["foo"]; !ok || !reflect.DeepEqual(cons, ps.Cons["foo"]) { t.Fatalf("Unexpected session record, %+v vs %+v", ps2, ps) } + + // Make sure we don't attempt to transfer again by creating a subscription + // on the "stream names" API, which is used to get the list of streams to transfer + sub := natsSubSync(t, nc, JSApiStreams) + + // Make sure to connect an MQTT client from a different node so that this node + // gets a connection for the account for the first time and tries to create + // all MQTT streams, etc.. + o = cl.opts[1] + sc, sr = testMQTTConnectRetry(t, &mqttConnInfo{clientID: "sub2"}, o.MQTT.Host, o.MQTT.Port, 10) + defer sc.Close() + testMQTTCheckConnAck(t, sr, mqttConnAckRCConnectionAccepted, false) + + if _, err := sub.NextMsg(200 * time.Millisecond); err == nil { + t.Fatal("Looks like attempt to transfer was done again") + } } func TestMQTTConnectAndDisconnectEvent(t *testing.T) {