From 4b59efd6e77f6bf876f78364fcc65669e42c2024 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 28 Sep 2023 11:10:53 -0700 Subject: [PATCH 1/3] [FIXED] Flapping TestMQTTLockedSession --- server/mqtt_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index f8a28fea..51629bc9 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -4667,6 +4667,19 @@ func TestMQTTLockedSession(t *testing.T) { t.Fatalf("account session manager not found") } + // It is possible, however unlikely, to have received CONNACK while + // mqttProcessConnect is still running, and the session remains locked. Wait + // for it to finish. + stillLocked := true + for stillLocked { + asm.mu.RLock() + _, stillLocked = asm.sessLocked["sub"] + asm.mu.RUnlock() + if stillLocked { + time.Sleep(1 * time.Millisecond) + } + } + // Get the session for "sub" cli := testMQTTGetClient(t, s, "sub") sess := cli.mqtt.sess From a05d4416ef01403178a29433492e07e663bde896 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 28 Sep 2023 12:02:35 -0700 Subject: [PATCH 2/3] PR feedback: nit --- server/mqtt_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 51629bc9..6ab76f19 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -4670,11 +4670,11 @@ func TestMQTTLockedSession(t *testing.T) { // It is possible, however unlikely, to have received CONNACK while // mqttProcessConnect is still running, and the session remains locked. Wait // for it to finish. - stillLocked := true - for stillLocked { + for stillLocked := true; stillLocked; { asm.mu.RLock() _, stillLocked = asm.sessLocked["sub"] asm.mu.RUnlock() + if stillLocked { time.Sleep(1 * time.Millisecond) } From 214711654e61deaa1f5ca77290b0a83db2e6c65d Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 28 Sep 2023 12:42:18 -0700 Subject: [PATCH 3/3] PR feedback: use checkFor --- server/mqtt_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 6ab76f19..5712be0e 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -4654,7 +4654,8 @@ func TestMQTTLockedSession(t *testing.T) { s := testMQTTRunServer(t, o) defer testMQTTShutdownServer(s) - ci := &mqttConnInfo{clientID: "sub", cleanSess: false} + subClientID := "sub" + ci := &mqttConnInfo{clientID: subClientID, cleanSess: false} c, r := testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port) defer c.Close() testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) @@ -4670,18 +4671,17 @@ func TestMQTTLockedSession(t *testing.T) { // It is possible, however unlikely, to have received CONNACK while // mqttProcessConnect is still running, and the session remains locked. Wait // for it to finish. - for stillLocked := true; stillLocked; { + checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { asm.mu.RLock() - _, stillLocked = asm.sessLocked["sub"] - asm.mu.RUnlock() - - if stillLocked { - time.Sleep(1 * time.Millisecond) + defer asm.mu.RUnlock() + if _, stillLocked := asm.sessLocked[subClientID]; stillLocked { + return fmt.Errorf("session still locked") } - } + return nil + }) // Get the session for "sub" - cli := testMQTTGetClient(t, s, "sub") + cli := testMQTTGetClient(t, s, subClientID) sess := cli.mqtt.sess // Pretend that the session above is locked.