diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index b83b838b..2b55e8ad 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -6923,8 +6923,6 @@ var jsClusterTemplWithLeafNode = ` routes = [%s] } - {{mqtt}} - # For access to system account. accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` @@ -6954,7 +6952,7 @@ func (c *cluster) createLeafNodes(clusterName string, numServers int) *cluster { } func (c *cluster) createLeafNodesWithStartPort(clusterName string, numServers int, portStart int) *cluster { - return c.createLeafNodesWithStartPortAndMQTT(clusterName, numServers, portStart, _EMPTY_) + return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart) } func (c *cluster) createLeafNode() *Server { @@ -6988,11 +6986,9 @@ func (c *cluster) createLeafSolicit(tmpl string) string { return strings.Replace(tmpl, "{{leaf}}", lconf, 1) } -func (c *cluster) createLeafNodesWithStartPortAndMQTT(clusterName string, numServers int, portStart int, mqtt string) *cluster { +func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName string, numServers int, portStart int) *cluster { // Create our leafnode cluster template first. - tmpl := c.createLeafSolicit(jsClusterTemplWithLeafNode) - tmpl = strings.Replace(tmpl, "{{mqtt}}", mqtt, 1) - + tmpl := c.createLeafSolicit(template) pre := clusterName + "-" lc := createJetStreamCluster(c.t, tmpl, clusterName, pre, numServers, portStart, false) for _, s := range lc.servers { diff --git a/server/mqtt.go b/server/mqtt.go index 4324ff96..676e34d7 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -228,6 +228,7 @@ type mqttJSA struct { replies sync.Map nuid *nuid.NUID quitCh chan struct{} + domain string } type mqttJSPubMsg struct { @@ -889,6 +890,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc sendq: make(chan *mqttJSPubMsg, 8192), nuid: nuid.New(), quitCh: quitCh, + domain: s.getOpts().JetStreamDomain, }, } @@ -1860,7 +1862,12 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms // Runs from the client's readLoop. // Lock not held on entry, but session is in the locked map. func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opts *Options) (*mqttSession, bool, error) { - hash := string(getHash(clientID)) + // Add the JS domain (possibly empty) to the client ID, which will make + // session stream/filter subject be unique per domain. So if an application + // with the same client ID moves to the other domain, then there won't be + // conflict of session message in one domain updating the session's stream + // in others. + hash := string(getHash(as.jsa.domain + clientID)) sname := mqttSessionsStreamNamePrefix + hash cfg := &StreamConfig{ Name: sname, diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 19b8c48f..75745aea 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -38,11 +38,13 @@ import ( var testMQTTTimeout = 4 * time.Second -var jsClusterTemplWithMQTT = ` +var jsClusterTemplWithLeafAndMQTT = ` listen: 127.0.0.1:-1 server_name: %s jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + {{leaf}} + cluster { name: %s listen: 127.0.0.1:%d @@ -2614,8 +2616,12 @@ func TestMQTTSubRestart(t *testing.T) { testMQTTCheckPubMsg(t, mc, r, "foo", 0, []byte("msg2")) } +func testMQTTGetClusterTemplaceNoLeaf() string { + return strings.Replace(jsClusterTemplWithLeafAndMQTT, "{{leaf}}", "", 1) +} + func TestMQTTSubPropagation(t *testing.T) { - cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", 2) + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2) defer cl.shutdown() o := cl.opts[0] @@ -2645,7 +2651,7 @@ func TestMQTTSubPropagation(t *testing.T) { } func TestMQTTCluster(t *testing.T) { - cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", 2) + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2) defer cl.shutdown() for _, topTest := range []struct { @@ -2754,7 +2760,7 @@ func TestMQTTCluster(t *testing.T) { } func TestMQTTClusterRetainedMsg(t *testing.T) { - cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", 2) + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2) defer cl.shutdown() srv1Opts := cl.opts[0] @@ -2958,7 +2964,7 @@ func TestMQTTClusterReplicasCount(t *testing.T) { s = testMQTTRunServer(t, o) defer testMQTTShutdownServer(s) } else { - cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", test.size) + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", test.size) defer cl.shutdown() o = cl.opts[0] s = cl.randomServer() @@ -3002,7 +3008,7 @@ func TestMQTTClusterPlacement(t *testing.T) { defer sc.shutdown() c := sc.randomCluster() - lnc := c.createLeafNodesWithStartPortAndMQTT("SPOKE", 3, 22111, `mqtt { listen: 127.0.0.1:-1 }`) + lnc := c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafAndMQTT, "SPOKE", 3, 22111) defer lnc.shutdown() sc.waitOnPeerCount(9) @@ -3134,6 +3140,46 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) { testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) } +func TestMQTTSessionMovingDomains(t *testing.T) { + tmpl := strings.Replace(jsClusterTemplWithLeafAndMQTT, "{{leaf}}", `leafnodes { listen: 127.0.0.1:-1 }`, 1) + tmpl = strings.Replace(tmpl, "store_dir:", "domain: HUB, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 22020, true) + defer c.shutdown() + c.waitOnLeader() + + tmpl = strings.Replace(jsClusterTemplWithLeafAndMQTT, "store_dir:", "domain: SPOKE, store_dir:", 1) + lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "SPOKE", 3, 22111) + defer lnc.shutdown() + lnc.waitOnPeerCount(3) + + connectSubAndDisconnect := func(host string, port int, present bool) { + t.Helper() + mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, host, port) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, present) + testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc, nil, rc) + testMQTTDisconnect(t, mc, nil) + } + + // Create a session on the HUB. Make sure we don't use "clean" session so that + // it is not removed when the client connection closes. + for i := 0; i < 7; i++ { + var present bool + if i > 0 { + present = true + } + connectSubAndDisconnect(c.opts[0].MQTT.Host, c.opts[0].MQTT.Port, present) + } + + // Now move to the SPOKE cluster, this is a brand new session there, so should not be present. + connectSubAndDisconnect(lnc.opts[1].MQTT.Host, lnc.opts[1].MQTT.Port, false) + + // Move back to HUB cluster. Make it interesting by connecting to a different + // server in that cluster. This should work, and present flag should be true. + connectSubAndDisconnect(c.opts[2].MQTT.Host, c.opts[2].MQTT.Port, true) +} + func TestMQTTParseUnsub(t *testing.T) { eofr := testNewEOFReader() for _, test := range []struct {