From 69e9c6cdddbbbaeac9508bb9c6b7854aabc7ed78 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 20 May 2021 13:51:37 -0600 Subject: [PATCH] [FIXED] MQTT: session fails if the number servers below cluster size Say with a cluster of 3, all MQTT assets are created with a replicas of 3. However, when a server is shutdown, then any new MQTT client will fail to connect because we try to create a session stream with R(3), which leads to insufficient resources. The longer term solution should be for the server to allow the creation of an asset with a R() value that is bigger than the current number of running servers as long as there is quorum. For now, we will reduce the R() value for the sessions if we get an "insufficient resources" error. Note that the other assets still will use the compute R() based on cluster size. So the first time that a client on a given account is started, we will still need to have R() == cluster size (at least for R(3)). Partially resolves #2226 Signed-off-by: Ivan Kozlovic --- server/mqtt.go | 17 +++++++++++---- server/mqtt_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index 5ed0a72e..bc26bb4a 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1880,12 +1880,21 @@ func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opt accName := jsa.c.acc.GetName() return nil, false, fmt.Errorf("%s for account %q, session %q: %v", errTxt, accName, clientID, err) } +CREATE_STREAM: // Send a request to create the stream for this session. si, err := jsa.createStream(cfg) - // If there is an error and not simply "already used" (which means that the - // stream already exists) then we fail. - if isErrorOtherThan(err, ErrJetStreamStreamAlreadyUsed) { - return formatError("create session stream", err) + if err != nil { + // Check for insufficient resources. If that is the case, and if possible, try + // again with a lower replicas value. + if cfg.Replicas > 1 && err.Error() == jsInsufficientErr.Description { + cfg.Replicas-- + goto CREATE_STREAM + } + // If there is an error and not simply "already used" (which means that the + // stream already exists) then we fail. + if isErrorOtherThan(err, ErrJetStreamStreamAlreadyUsed) { + return formatError("create session stream", err) + } } if err != nil { // Since we have returned if error is not "stream already exist", then diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 18cd6872..1acc9027 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -683,21 +683,47 @@ func testMQTTWrite(c net.Conn, buf []byte) (int, error) { } func testMQTTConnect(t testing.TB, ci *mqttConnInfo, host string, port int) (net.Conn, *mqttReader) { + return testMQTTConnectRetry(t, ci, host, port, 0) +} + +func testMQTTConnectRetry(t testing.TB, ci *mqttConnInfo, host string, port int, retryCount int) (net.Conn, *mqttReader) { t.Helper() + retry := func(c net.Conn) bool { + if retryCount == 0 { + return false + } + if c != nil { + c.Close() + } + time.Sleep(time.Second) + retryCount-- + return true + } + addr := fmt.Sprintf("%s:%d", host, port) +RETRY: c, err := net.Dial("tcp", addr) if err != nil { + if retry(c) { + goto RETRY + } t.Fatalf("Error creating mqtt connection: %v", err) } proto := mqttCreateConnectProto(ci) if _, err := testMQTTWrite(c, proto); err != nil { + if retry(c) { + goto RETRY + } t.Fatalf("Error writing connect: %v", err) } buf, err := testMQTTRead(c) if err != nil { + if retry(c) { + goto RETRY + } t.Fatalf("Error reading: %v", err) } br := &mqttReader{reader: c} @@ -3003,6 +3029,33 @@ func TestMQTTClusterReplicasCount(t *testing.T) { } } +func TestMQTTClusterSessionReplicasAdjustment(t *testing.T) { + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 3) + defer cl.shutdown() + o := cl.opts[0] + + mc, rc := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + mc.Close() + + // Shutdown one of the server. + cl.servers[1].Shutdown() + + // Make sure there is a meta leader + cl.waitOnPeerCount(2) + cl.waitOnLeader() + + // Now try to create a new session. With R(3) this would fail, but now server will + // adjust it down to R(2). + o = cl.opts[2] + // We may still get failures because of some JS APIs may timeout while things + // settle, so try again for a certain amount of times. + mc, rc = testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) +} + func TestMQTTClusterPlacement(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 2) defer sc.shutdown()