mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2236 from nats-io/fix_2226
[FIXED] MQTT: session fails if the number servers below cluster size
This commit is contained in:
@@ -1883,12 +1883,21 @@ func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opt
|
||||
accName := jsa.c.acc.GetName()
|
||||
return nil, false, fmt.Errorf("%s for account %q, session %q: %v", errTxt, accName, clientID, err)
|
||||
}
|
||||
CREATE_STREAM:
|
||||
// Send a request to create the stream for this session.
|
||||
si, err := jsa.createStream(cfg)
|
||||
// If there is an error and not simply "already used" (which means that the
|
||||
// stream already exists) then we fail.
|
||||
if isErrorOtherThan(err, ErrJetStreamStreamAlreadyUsed) {
|
||||
return formatError("create session stream", err)
|
||||
if err != nil {
|
||||
// Check for insufficient resources. If that is the case, and if possible, try
|
||||
// again with a lower replicas value.
|
||||
if cfg.Replicas > 1 && err.Error() == jsInsufficientErr.Description {
|
||||
cfg.Replicas--
|
||||
goto CREATE_STREAM
|
||||
}
|
||||
// If there is an error and not simply "already used" (which means that the
|
||||
// stream already exists) then we fail.
|
||||
if isErrorOtherThan(err, ErrJetStreamStreamAlreadyUsed) {
|
||||
return formatError("create session stream", err)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
// Since we have returned if error is not "stream already exist", then
|
||||
|
||||
@@ -683,21 +683,47 @@ func testMQTTWrite(c net.Conn, buf []byte) (int, error) {
|
||||
}
|
||||
|
||||
func testMQTTConnect(t testing.TB, ci *mqttConnInfo, host string, port int) (net.Conn, *mqttReader) {
|
||||
return testMQTTConnectRetry(t, ci, host, port, 0)
|
||||
}
|
||||
|
||||
func testMQTTConnectRetry(t testing.TB, ci *mqttConnInfo, host string, port int, retryCount int) (net.Conn, *mqttReader) {
|
||||
t.Helper()
|
||||
|
||||
retry := func(c net.Conn) bool {
|
||||
if retryCount == 0 {
|
||||
return false
|
||||
}
|
||||
if c != nil {
|
||||
c.Close()
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
retryCount--
|
||||
return true
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", host, port)
|
||||
RETRY:
|
||||
c, err := net.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
if retry(c) {
|
||||
goto RETRY
|
||||
}
|
||||
t.Fatalf("Error creating mqtt connection: %v", err)
|
||||
}
|
||||
|
||||
proto := mqttCreateConnectProto(ci)
|
||||
if _, err := testMQTTWrite(c, proto); err != nil {
|
||||
if retry(c) {
|
||||
goto RETRY
|
||||
}
|
||||
t.Fatalf("Error writing connect: %v", err)
|
||||
}
|
||||
|
||||
buf, err := testMQTTRead(c)
|
||||
if err != nil {
|
||||
if retry(c) {
|
||||
goto RETRY
|
||||
}
|
||||
t.Fatalf("Error reading: %v", err)
|
||||
}
|
||||
br := &mqttReader{reader: c}
|
||||
@@ -3003,6 +3029,33 @@ func TestMQTTClusterReplicasCount(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTClusterSessionReplicasAdjustment(t *testing.T) {
|
||||
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 3)
|
||||
defer cl.shutdown()
|
||||
o := cl.opts[0]
|
||||
|
||||
mc, rc := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)
|
||||
mc.Close()
|
||||
|
||||
// Shutdown one of the server.
|
||||
cl.servers[1].Shutdown()
|
||||
|
||||
// Make sure there is a meta leader
|
||||
cl.waitOnPeerCount(2)
|
||||
cl.waitOnLeader()
|
||||
|
||||
// Now try to create a new session. With R(3) this would fail, but now server will
|
||||
// adjust it down to R(2).
|
||||
o = cl.opts[2]
|
||||
// We may still get failures because of some JS APIs may timeout while things
|
||||
// settle, so try again for a certain amount of times.
|
||||
mc, rc = testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)
|
||||
}
|
||||
|
||||
func TestMQTTClusterPlacement(t *testing.T) {
|
||||
sc := createJetStreamSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user