diff --git a/server/mqtt.go b/server/mqtt.go index eb147b62..4d5f08b0 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1883,12 +1883,21 @@ func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opt accName := jsa.c.acc.GetName() return nil, false, fmt.Errorf("%s for account %q, session %q: %v", errTxt, accName, clientID, err) } +CREATE_STREAM: // Send a request to create the stream for this session. si, err := jsa.createStream(cfg) - // If there is an error and not simply "already used" (which means that the - // stream already exists) then we fail. - if isErrorOtherThan(err, ErrJetStreamStreamAlreadyUsed) { - return formatError("create session stream", err) + if err != nil { + // Check for insufficient resources. If that is the case, and if possible, try + // again with a lower replicas value. + if cfg.Replicas > 1 && err.Error() == jsInsufficientErr.Description { + cfg.Replicas-- + goto CREATE_STREAM + } + // If there is an error and not simply "already used" (which means that the + // stream already exists) then we fail. + if isErrorOtherThan(err, ErrJetStreamStreamAlreadyUsed) { + return formatError("create session stream", err) + } } if err != nil { // Since we have returned if error is not "stream already exist", then diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 18cd6872..1acc9027 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -683,21 +683,47 @@ func testMQTTWrite(c net.Conn, buf []byte) (int, error) { } func testMQTTConnect(t testing.TB, ci *mqttConnInfo, host string, port int) (net.Conn, *mqttReader) { + return testMQTTConnectRetry(t, ci, host, port, 0) +} + +func testMQTTConnectRetry(t testing.TB, ci *mqttConnInfo, host string, port int, retryCount int) (net.Conn, *mqttReader) { t.Helper() + retry := func(c net.Conn) bool { + if retryCount == 0 { + return false + } + if c != nil { + c.Close() + } + time.Sleep(time.Second) + retryCount-- + return true + } + addr := fmt.Sprintf("%s:%d", host, port) +RETRY: c, err := net.Dial("tcp", addr) if err != nil { + if retry(c) { + goto RETRY + } t.Fatalf("Error creating mqtt connection: %v", err) } proto := mqttCreateConnectProto(ci) if _, err := testMQTTWrite(c, proto); err != nil { + if retry(c) { + goto RETRY + } t.Fatalf("Error writing connect: %v", err) } buf, err := testMQTTRead(c) if err != nil { + if retry(c) { + goto RETRY + } t.Fatalf("Error reading: %v", err) } br := &mqttReader{reader: c} @@ -3003,6 +3029,33 @@ func TestMQTTClusterReplicasCount(t *testing.T) { } } +func TestMQTTClusterSessionReplicasAdjustment(t *testing.T) { + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 3) + defer cl.shutdown() + o := cl.opts[0] + + mc, rc := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + mc.Close() + + // Shutdown one of the server. + cl.servers[1].Shutdown() + + // Make sure there is a meta leader + cl.waitOnPeerCount(2) + cl.waitOnLeader() + + // Now try to create a new session. With R(3) this would fail, but now server will + // adjust it down to R(2). + o = cl.opts[2] + // We may still get failures because of some JS APIs may timeout while things + // settle, so try again for a certain amount of times. + mc, rc = testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) +} + func TestMQTTClusterPlacement(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 2) defer sc.shutdown()