mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -407,8 +407,10 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTConfig(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
jetstream: enabled
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
server_name: mqtt
|
||||
mqtt {
|
||||
port: -1
|
||||
@@ -417,7 +419,7 @@ func TestMQTTConfig(t *testing.T) {
|
||||
key_file: "./configs/certs/key.pem"
|
||||
}
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
if o.MQTT.TLSConfig == nil {
|
||||
@@ -3466,11 +3468,11 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTImportExport(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: "127.0.0.1:-1"
|
||||
server_name: "mqtt"
|
||||
jetstream {
|
||||
store_dir=org_dir
|
||||
store_dir = %q
|
||||
}
|
||||
accounts {
|
||||
org {
|
||||
@@ -3487,9 +3489,8 @@ func TestMQTTImportExport(t *testing.T) {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
no_auth_user: device
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
defer os.Remove(conf)
|
||||
defer os.RemoveAll("org_dir")
|
||||
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
@@ -3912,7 +3913,9 @@ func TestMQTTWillRetain(t *testing.T) {
|
||||
func TestMQTTWillRetainPermViolation(t *testing.T) {
|
||||
template := `
|
||||
port: -1
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
server_name: mqtt
|
||||
authorization {
|
||||
mqtt_perms = {
|
||||
@@ -3927,7 +3930,8 @@ func TestMQTTWillRetainPermViolation(t *testing.T) {
|
||||
port: -1
|
||||
}
|
||||
`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(template, "foo")))
|
||||
tdir := t.TempDir()
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(template, tdir, "foo")))
|
||||
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
@@ -4006,7 +4010,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) {
|
||||
// Now remove permission to publish on "foo" and check that a new subscription
|
||||
// on "foo" is now not getting the will message because the original user no
|
||||
// longer has permission to do so.
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, "baz"))
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, tdir, "baz"))
|
||||
|
||||
mcs, rs = testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mcs.Close()
|
||||
@@ -5575,11 +5579,13 @@ func TestMQTTStreamInfoReturnsNonEmptySubject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTWebsocketToMQTTPort(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: "127.0.0.1:-1"
|
||||
http: "127.0.0.1:-1"
|
||||
server_name: "mqtt"
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
mqtt {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
@@ -5587,7 +5593,7 @@ func TestMQTTWebsocketToMQTTPort(t *testing.T) {
|
||||
listen: "127.0.0.1:-1"
|
||||
no_tls: true
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -5609,11 +5615,14 @@ func TestMQTTWebsocketToMQTTPort(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTWebsocket(t *testing.T) {
|
||||
tdir := t.TempDir()
|
||||
template := `
|
||||
listen: "127.0.0.1:-1"
|
||||
http: "127.0.0.1:-1"
|
||||
server_name: "mqtt"
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
accounts {
|
||||
MQTT {
|
||||
jetstream: enabled
|
||||
@@ -5630,7 +5639,7 @@ func TestMQTTWebsocket(t *testing.T) {
|
||||
no_tls: true
|
||||
}
|
||||
`
|
||||
s, o, conf := runReloadServerWithContent(t, []byte(fmt.Sprintf(template, jwt.ConnectionTypeMqtt, "")))
|
||||
s, o, conf := runReloadServerWithContent(t, []byte(fmt.Sprintf(template, tdir, jwt.ConnectionTypeMqtt, "")))
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
cisub := &mqttConnInfo{clientID: "sub", user: "mqtt", pass: "pwd", ws: true}
|
||||
@@ -5640,7 +5649,7 @@ func TestMQTTWebsocket(t *testing.T) {
|
||||
c.Close()
|
||||
|
||||
ws := fmt.Sprintf(`, "%s"`, jwt.ConnectionTypeMqttWS)
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, jwt.ConnectionTypeMqtt, ws))
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, tdir, jwt.ConnectionTypeMqtt, ws))
|
||||
|
||||
cisub = &mqttConnInfo{clientID: "sub", user: "mqtt", pass: "pwd", ws: true}
|
||||
c, r = testMQTTConnect(t, cisub, o.Websocket.Host, o.Websocket.Port)
|
||||
@@ -5680,11 +5689,13 @@ func (cwc *chunkWriteConn) Write(p []byte) (int, error) {
|
||||
}
|
||||
|
||||
func TestMQTTPartial(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: "127.0.0.1:-1"
|
||||
http: "127.0.0.1:-1"
|
||||
server_name: "mqtt"
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
mqtt {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
@@ -5692,7 +5703,7 @@ func TestMQTTPartial(t *testing.T) {
|
||||
listen: "127.0.0.1:-1"
|
||||
no_tls: true
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -5731,11 +5742,13 @@ func TestMQTTPartial(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTWebsocketTLS(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: "127.0.0.1:-1"
|
||||
http: "127.0.0.1:-1"
|
||||
server_name: "mqtt"
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
mqtt {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
@@ -5747,7 +5760,7 @@ func TestMQTTWebsocketTLS(t *testing.T) {
|
||||
ca_file: '../test/configs/certs/ca.pem'
|
||||
}
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -5877,11 +5890,13 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTConnectAndDisconnectEvent(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: "127.0.0.1:-1"
|
||||
http: "127.0.0.1:-1"
|
||||
server_name: "mqtt"
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
accounts {
|
||||
MQTT {
|
||||
jetstream: enabled
|
||||
@@ -5895,7 +5910,7 @@ func TestMQTTConnectAndDisconnectEvent(t *testing.T) {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
system_account: "SYS"
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
defer os.Remove(conf)
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
@@ -6169,15 +6184,18 @@ func TestMQTTStreamReplicasOverride(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTStreamReplicasConfigReload(t *testing.T) {
|
||||
tdir := t.TempDir()
|
||||
tmpl := `
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
server_name: mqtt
|
||||
mqtt {
|
||||
port: -1
|
||||
stream_replicas: %v
|
||||
}
|
||||
`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, 3)))
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, 3)))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -6198,7 +6216,7 @@ func TestMQTTStreamReplicasConfigReload(t *testing.T) {
|
||||
t.Fatalf("Did not get the error regarding replicas count")
|
||||
}
|
||||
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, 1))
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, tdir, 1))
|
||||
|
||||
mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: "mqtt", cleanSess: false}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mc.Close()
|
||||
@@ -6347,15 +6365,18 @@ func TestMQTTConsumerReplicasOverride(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTConsumerMemStorageReload(t *testing.T) {
|
||||
tdir := t.TempDir()
|
||||
tmpl := `
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
server_name: mqtt
|
||||
mqtt {
|
||||
port: -1
|
||||
consumer_memory_storage: %s
|
||||
}
|
||||
`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, "false")))
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, "false")))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -6366,7 +6387,7 @@ func TestMQTTConsumerMemStorageReload(t *testing.T) {
|
||||
defer c.Close()
|
||||
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, "true"))
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, tdir, "true"))
|
||||
|
||||
testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
|
||||
|
||||
@@ -6450,10 +6471,13 @@ func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) {
|
||||
|
||||
// Test for auto-cleanup of consumers.
|
||||
func TestMQTTConsumerInactiveThreshold(t *testing.T) {
|
||||
tdir := t.TempDir()
|
||||
tmpl := `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: mqtt
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
|
||||
mqtt {
|
||||
listen: 127.0.0.1:-1
|
||||
@@ -6463,7 +6487,7 @@ func TestMQTTConsumerInactiveThreshold(t *testing.T) {
|
||||
# For access to system account.
|
||||
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
||||
`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, "0.2s")))
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, "0.2s")))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -6489,17 +6513,19 @@ func TestMQTTConsumerInactiveThreshold(t *testing.T) {
|
||||
|
||||
// Check reload.
|
||||
// We will not redo existing consumers however.
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, "22s"))
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, tdir, "22s"))
|
||||
if opts := s.getOpts(); opts.MQTT.ConsumerInactiveThreshold != 22*time.Second {
|
||||
t.Fatalf("Expected reloaded value of %v but got %v", 22*time.Second, opts.MQTT.ConsumerInactiveThreshold)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTSubjectMapping(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: mqtt
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
|
||||
mappings = {
|
||||
foo0: bar0
|
||||
@@ -6509,7 +6535,7 @@ func TestMQTTSubjectMapping(t *testing.T) {
|
||||
mqtt {
|
||||
listen: 127.0.0.1:-1
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -6599,10 +6625,12 @@ func TestMQTTSubjectMapping(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMQTTSubjectMappingWithImportExport(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: mqtt
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
|
||||
accounts {
|
||||
A {
|
||||
@@ -6627,7 +6655,7 @@ func TestMQTTSubjectMappingWithImportExport(t *testing.T) {
|
||||
mqtt {
|
||||
listen: 127.0.0.1:-1
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -6743,14 +6771,16 @@ func TestMQTTSubjectMappingWithImportExport(t *testing.T) {
|
||||
// The MQTT Server MUST NOT match Topic Filters starting with a wildcard character (# or +),
|
||||
// with Topic Names beginning with a $ character [MQTT-4.7.2-1]
|
||||
func TestMQTTSubjectWildcardStart(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: mqtt
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
mqtt {
|
||||
listen: 127.0.0.1:-1
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
@@ -6905,14 +6935,16 @@ func TestMQTTTopicWithDot(t *testing.T) {
|
||||
|
||||
// Issue https://github.com/nats-io/nats-server/issues/4291
|
||||
func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: mqtt
|
||||
jetstream: enabled
|
||||
jetstream {
|
||||
store_dir = %q
|
||||
}
|
||||
mqtt {
|
||||
listen: 127.0.0.1:-1
|
||||
}
|
||||
`))
|
||||
`, t.TempDir())))
|
||||
s, o := RunServerWithConfig(conf)
|
||||
defer testMQTTShutdownServer(s)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user