MQTT make session streams domain aware

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-05-06 20:02:00 -06:00
parent 2002ea1ee5
commit a9a49cc2d5
3 changed files with 63 additions and 14 deletions

View File

@@ -6923,8 +6923,6 @@ var jsClusterTemplWithLeafNode = `
routes = [%s]
}
{{mqtt}}
# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
@@ -6954,7 +6952,7 @@ func (c *cluster) createLeafNodes(clusterName string, numServers int) *cluster {
}
func (c *cluster) createLeafNodesWithStartPort(clusterName string, numServers int, portStart int) *cluster {
return c.createLeafNodesWithStartPortAndMQTT(clusterName, numServers, portStart, _EMPTY_)
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart)
}
func (c *cluster) createLeafNode() *Server {
@@ -6988,11 +6986,9 @@ func (c *cluster) createLeafSolicit(tmpl string) string {
return strings.Replace(tmpl, "{{leaf}}", lconf, 1)
}
func (c *cluster) createLeafNodesWithStartPortAndMQTT(clusterName string, numServers int, portStart int, mqtt string) *cluster {
func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName string, numServers int, portStart int) *cluster {
// Create our leafnode cluster template first.
tmpl := c.createLeafSolicit(jsClusterTemplWithLeafNode)
tmpl = strings.Replace(tmpl, "{{mqtt}}", mqtt, 1)
tmpl := c.createLeafSolicit(template)
pre := clusterName + "-"
lc := createJetStreamCluster(c.t, tmpl, clusterName, pre, numServers, portStart, false)
for _, s := range lc.servers {

View File

@@ -228,6 +228,7 @@ type mqttJSA struct {
replies sync.Map
nuid *nuid.NUID
quitCh chan struct{}
domain string
}
type mqttJSPubMsg struct {
@@ -889,6 +890,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
sendq: make(chan *mqttJSPubMsg, 8192),
nuid: nuid.New(),
quitCh: quitCh,
domain: s.getOpts().JetStreamDomain,
},
}
@@ -1860,7 +1862,12 @@ func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms
// Runs from the client's readLoop.
// Lock not held on entry, but session is in the locked map.
func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opts *Options) (*mqttSession, bool, error) {
hash := string(getHash(clientID))
// Add the JS domain (possibly empty) to the client ID, which will make
// session stream/filter subject be unique per domain. So if an application
// with the same client ID moves to the other domain, then there won't be
// conflict of session message in one domain updating the session's stream
// in others.
hash := string(getHash(as.jsa.domain + clientID))
sname := mqttSessionsStreamNamePrefix + hash
cfg := &StreamConfig{
Name: sname,

View File

@@ -38,11 +38,13 @@ import (
var testMQTTTimeout = 4 * time.Second
var jsClusterTemplWithMQTT = `
var jsClusterTemplWithLeafAndMQTT = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
{{leaf}}
cluster {
name: %s
listen: 127.0.0.1:%d
@@ -2614,8 +2616,12 @@ func TestMQTTSubRestart(t *testing.T) {
testMQTTCheckPubMsg(t, mc, r, "foo", 0, []byte("msg2"))
}
func testMQTTGetClusterTemplaceNoLeaf() string {
return strings.Replace(jsClusterTemplWithLeafAndMQTT, "{{leaf}}", "", 1)
}
func TestMQTTSubPropagation(t *testing.T) {
cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", 2)
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2)
defer cl.shutdown()
o := cl.opts[0]
@@ -2645,7 +2651,7 @@ func TestMQTTSubPropagation(t *testing.T) {
}
func TestMQTTCluster(t *testing.T) {
cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", 2)
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2)
defer cl.shutdown()
for _, topTest := range []struct {
@@ -2754,7 +2760,7 @@ func TestMQTTCluster(t *testing.T) {
}
func TestMQTTClusterRetainedMsg(t *testing.T) {
cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", 2)
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2)
defer cl.shutdown()
srv1Opts := cl.opts[0]
@@ -2958,7 +2964,7 @@ func TestMQTTClusterReplicasCount(t *testing.T) {
s = testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)
} else {
cl := createJetStreamClusterWithTemplate(t, jsClusterTemplWithMQTT, "MQTT", test.size)
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", test.size)
defer cl.shutdown()
o = cl.opts[0]
s = cl.randomServer()
@@ -3002,7 +3008,7 @@ func TestMQTTClusterPlacement(t *testing.T) {
defer sc.shutdown()
c := sc.randomCluster()
lnc := c.createLeafNodesWithStartPortAndMQTT("SPOKE", 3, 22111, `mqtt { listen: 127.0.0.1:-1 }`)
lnc := c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafAndMQTT, "SPOKE", 3, 22111)
defer lnc.shutdown()
sc.waitOnPeerCount(9)
@@ -3134,6 +3140,46 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) {
testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg"))
}
func TestMQTTSessionMovingDomains(t *testing.T) {
tmpl := strings.Replace(jsClusterTemplWithLeafAndMQTT, "{{leaf}}", `leafnodes { listen: 127.0.0.1:-1 }`, 1)
tmpl = strings.Replace(tmpl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 22020, true)
defer c.shutdown()
c.waitOnLeader()
tmpl = strings.Replace(jsClusterTemplWithLeafAndMQTT, "store_dir:", "domain: SPOKE, store_dir:", 1)
lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "SPOKE", 3, 22111)
defer lnc.shutdown()
lnc.waitOnPeerCount(3)
connectSubAndDisconnect := func(host string, port int, present bool) {
t.Helper()
mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: false}, host, port)
defer mc.Close()
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, present)
testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
testMQTTFlush(t, mc, nil, rc)
testMQTTDisconnect(t, mc, nil)
}
// Create a session on the HUB. Make sure we don't use "clean" session so that
// it is not removed when the client connection closes.
for i := 0; i < 7; i++ {
var present bool
if i > 0 {
present = true
}
connectSubAndDisconnect(c.opts[0].MQTT.Host, c.opts[0].MQTT.Port, present)
}
// Now move to the SPOKE cluster, this is a brand new session there, so should not be present.
connectSubAndDisconnect(lnc.opts[1].MQTT.Host, lnc.opts[1].MQTT.Port, false)
// Move back to HUB cluster. Make it interesting by connecting to a different
// server in that cluster. This should work, and present flag should be true.
connectSubAndDisconnect(c.opts[2].MQTT.Host, c.opts[2].MQTT.Port, true)
}
func TestMQTTParseUnsub(t *testing.T) {
eofr := testNewEOFReader()
for _, test := range []struct {