diff --git a/server/mqtt.go b/server/mqtt.go index 1cc8b402..4324ff96 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -176,6 +176,20 @@ var ( errMQTTTopicFilterCannotBeEmpty = errors.New("topic filter cannot be empty") errMQTTMalformedVarInt = errors.New("malformed variable int") errMQTTSecondConnectPacket = errors.New("received a second CONNECT packet") + errMQTTServerNameMustBeSet = errors.New("mqtt requires server name to be explicitly set") + errMQTTUserMixWithUsersNKeys = errors.New("mqtt authentication username not compatible with presence of users/nkeys") + errMQTTTokenMixWIthUsersNKeys = errors.New("mqtt authentication token not compatible with presence of users/nkeys") + errMQTTAckWaitMustBePositive = errors.New("ack wait must be a positive value") + errMQTTStandaloneNeedsJetStream = errors.New("mqtt requires JetStream to be enabled if running in standalone mode") + errMQTTConnFlagReserved = errors.New("connect flags reserved bit not set to 0") + errMQTTWillAndRetainFlag = errors.New("if Will flag is set to 0, Will Retain flag must be 0 too") + errMQTTPasswordFlagAndNoUser = errors.New("password flag set but username flag is not") + errMQTTCIDEmptyNeedsCleanFlag = errors.New("when client ID is empty, clean session flag must be set to 1") + errMQTTEmptyWillTopic = errors.New("empty Will topic not allowed") + errMQTTEmptyUsername = errors.New("empty user name not allowed") + errMQTTTopicIsEmpty = errors.New("topic cannot be empty") + errMQTTPacketIdentifierIsZero = errors.New("packet identifier cannot be 0") + errMQTTUnsupportedCharacters = errors.New("characters ' ' and '.' not supported for MQTT topics") ) type srvMQTT struct { @@ -508,6 +522,11 @@ func validateMQTTOptions(o *Options) error { if mo.Port == 0 { return nil } + // We have to force the server name to be explicitly set. There are conditions + // where we need a unique, repeatable name. + if o.ServerName == _EMPTY_ { + return errMQTTServerNameMustBeSet + } // If there is a NoAuthUser, we need to have Users defined and // the user to be present. if mo.NoAuthUser != _EMPTY_ { @@ -518,18 +537,22 @@ func validateMQTTOptions(o *Options) error { // Token/Username not possible if there are users/nkeys if len(o.Users) > 0 || len(o.Nkeys) > 0 { if mo.Username != _EMPTY_ { - return fmt.Errorf("mqtt authentication username not compatible with presence of users/nkeys") + return errMQTTUserMixWithUsersNKeys } if mo.Token != _EMPTY_ { - return fmt.Errorf("mqtt authentication token not compatible with presence of users/nkeys") + return errMQTTTokenMixWIthUsersNKeys } } if mo.AckWait < 0 { - return fmt.Errorf("ack wait must be a positive value") + return errMQTTAckWaitMustBePositive } - // If standalone and there is no JS enabled, then it won't work... - if o.Cluster.Port == 0 && o.Gateway.Port == 0 && !o.JetStream { - return fmt.Errorf("mqtt requires JetStream to be enabled if running in standalone mode") + // If strictly standalone and there is no JS enabled, then it won't work... + // For leafnodes, we could either have remote(s) and it would be ok, or no + // remote but accept from a remote side that has "hub" property set, which + // then would ok too. So we fail only if we have no leafnode config at all. + if !o.JetStream && o.Cluster.Port == 0 && o.Gateway.Port == 0 && + o.LeafNode.Port == 0 && len(o.LeafNode.Remotes) == 0 { + return errMQTTStandaloneNeedsJetStream } return nil } @@ -974,16 +997,10 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc // Using ephemeral consumer is too risky because if this server were to be // disconnected from the rest for few seconds, then the leader would remove // the consumer, so even after a reconnect, we would not longer receive - // retained messages. So delete any existing durable that we have for that - // and recreate here. In non cluster mode, we will use the retained messages - // stream name as the name, since it will be unique for this account and - // we will know what is the name on restart. - rmDurName := mqttRetainedMsgsStreamName - // In cluster mode, we will add our id (server name hash) since there may - // be many of those durables (1 per node). - if s.JetStreamIsClustered() { - rmDurName += "_" + jsa.id - } + // retained messages. Delete any existing durable that we have for that + // and recreate here. + // The name for the durable is $MQTT_rmsgs_ (which is jsa.id) + rmDurName := mqttRetainedMsgsStreamName + "_" + jsa.id resp, err := jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmDurName) // If error other than "not found" then fail, otherwise proceed with creating // the durable consumer. @@ -2189,7 +2206,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt // Spec [MQTT-3.1.2-3] if cp.flags&mqttConnFlagReserved != 0 { - return 0, nil, fmt.Errorf("connect flags reserved bit not set to 0") + return 0, nil, errMQTTConnFlagReserved } var hasWill bool @@ -2203,7 +2220,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt } // Spec [MQTT-3.1.2-15] if wretain { - return 0, nil, fmt.Errorf("if Will flag is set to 0, Will Retain flag must be 0 too") + return 0, nil, errMQTTWillAndRetainFlag } } else { // Spec [MQTT-3.1.2-14] @@ -2219,7 +2236,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt hasPassword := cp.flags&mqttConnFlagPasswordFlag != 0 // Spec [MQTT-3.1.2-22] if !hasUser && hasPassword { - return 0, nil, fmt.Errorf("password flag set but username flag is not") + return 0, nil, errMQTTPasswordFlagAndNoUser } // Keep alive @@ -2244,7 +2261,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt // Spec [MQTT-3.1.3-7] if cp.clientID == _EMPTY_ { if cp.flags&mqttConnFlagCleanSession == 0 { - return mqttConnAckRCIdentifierRejected, nil, fmt.Errorf("when client ID is empty, clean session flag must be set to 1") + return mqttConnAckRCIdentifierRejected, nil, errMQTTCIDEmptyNeedsCleanFlag } // Spec [MQTT-3.1.3-6] cp.clientID = nuid.Next() @@ -2267,10 +2284,10 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt return 0, nil, err } if len(topic) == 0 { - return 0, nil, fmt.Errorf("empty Will topic not allowed") + return 0, nil, errMQTTEmptyWillTopic } if !utf8.Valid(topic) { - return 0, nil, fmt.Errorf("invalide utf8 for Will topic %q", topic) + return 0, nil, fmt.Errorf("invalid utf8 for Will topic %q", topic) } cp.will.topic = topic // Convert MQTT topic to NATS subject @@ -2291,7 +2308,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt return 0, nil, err } if c.opts.Username == _EMPTY_ { - return mqttConnAckRCBadUserOrPassword, nil, fmt.Errorf("empty user name not allowed") + return mqttConnAckRCBadUserOrPassword, nil, errMQTTEmptyUsername } // Spec [MQTT-3.1.3-11] if !utf8.ValidString(c.opts.Username) { @@ -2580,7 +2597,7 @@ func (c *client) mqttParsePub(r *mqttReader, pl int, pp *mqttPublish) error { return err } if len(pp.topic) == 0 { - return fmt.Errorf("topic cannot be empty") + return errMQTTTopicIsEmpty } // Convert the topic to a NATS subject. This call will also check that // there is no MQTT wildcards (Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1]) @@ -2858,7 +2875,7 @@ func mqttParsePubAck(r *mqttReader, pl int) (uint16, error) { return 0, err } if pi == 0 { - return 0, fmt.Errorf("packet identifier cannot be 0") + return 0, errMQTTPacketIdentifierIsZero } return pi, nil } @@ -3558,7 +3575,7 @@ func mqttToNATSSubjectConversion(mt []byte, wcOk bool) ([]byte, error) { } case btsep, ' ': // As of now, we cannot support '.' or ' ' in the MQTT topic/filter. - return nil, fmt.Errorf("characters ' ' and '.' not supported for MQTT topics") + return nil, errMQTTUnsupportedCharacters case mqttSingleLevelWC, mqttMultiLevelWC: if !wcOk { // Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1] diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 7fe76d96..19b8c48f 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -199,6 +199,7 @@ func TestMQTTWriter(t *testing.T) { func testMQTTDefaultOptions() *Options { o := DefaultOptions() + o.ServerName = nuid.Next() o.Cluster.Port = 0 o.Gateway.Name = "" o.Gateway.Port = 0 @@ -224,6 +225,7 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server { s.SetLogger(l, true, true) go s.Start() if err := s.readyForConnections(3 * time.Second); err != nil { + testMQTTShutdownServer(s) t.Fatal(err) } return s @@ -261,8 +263,25 @@ func testMQTTDefaultTLSOptions(t *testing.T, verify bool) *Options { return o } +func TestMQTTServerNameRequired(t *testing.T) { + conf := createConfFile(t, []byte(` + mqtt { + port: -1 + } + `)) + defer removeFile(t, conf) + o, err := ProcessConfigFile(conf) + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + if _, err := NewServer(o); err == nil || err.Error() != errMQTTServerNameMustBeSet.Error() { + t.Fatalf("Expected error about requiring server name to be set, got %v", err) + } +} + func TestMQTTStandaloneRequiresJetStream(t *testing.T) { conf := createConfFile(t, []byte(` + server_name: mqtt mqtt { port: -1 tls { @@ -276,7 +295,7 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) { if err != nil { t.Fatalf("Error processing config file: %v", err) } - if _, err := NewServer(o); err == nil || !strings.Contains(err.Error(), "standalone") { + if _, err := NewServer(o); err == nil || err.Error() != errMQTTStandaloneNeedsJetStream.Error() { t.Fatalf("Expected error about requiring JetStream in standalone mode, got %v", err) } } @@ -284,6 +303,7 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) { func TestMQTTConfig(t *testing.T) { conf := createConfFile(t, []byte(` jetstream: enabled + server_name: mqtt mqtt { port: -1 tls { @@ -306,33 +326,33 @@ func TestMQTTValidateOptions(t *testing.T) { for _, test := range []struct { name string getOpts func() *Options - err string + err error }{ - {"mqtt disabled", func() *Options { return nmqtto.Clone() }, ""}, + {"mqtt disabled", func() *Options { return nmqtto.Clone() }, nil}, {"mqtt username not allowed if users specified", func() *Options { o := mqtto.Clone() o.Users = []*User{{Username: "abc", Password: "pwd"}} o.MQTT.Username = "b" o.MQTT.Password = "pwd" return o - }, "mqtt authentication username not compatible with presence of users/nkeys"}, + }, errMQTTUserMixWithUsersNKeys}, {"mqtt token not allowed if users specified", func() *Options { o := mqtto.Clone() o.Nkeys = []*NkeyUser{{Nkey: "abc"}} o.MQTT.Token = "mytoken" return o - }, "mqtt authentication token not compatible with presence of users/nkeys"}, + }, errMQTTTokenMixWIthUsersNKeys}, {"ack wait should be >=0", func() *Options { o := mqtto.Clone() o.MQTT.AckWait = -10 * time.Second return o - }, "ack wait must be a positive value"}, + }, errMQTTAckWaitMustBePositive}, } { t.Run(test.name, func(t *testing.T) { err := validateMQTTOptions(test.getOpts()) - if test.err == "" && err != nil { + if test.err == nil && err != nil { t.Fatalf("Unexpected error: %v", err) - } else if test.err != "" && (err == nil || !strings.Contains(err.Error(), test.err)) { + } else if test.err != nil && (err == nil || err.Error() != test.err.Error()) { t.Fatalf("Expected error to contain %q, got %v", test.err, err) } }) @@ -1467,22 +1487,22 @@ func TestMQTTParseConnect(t *testing.T) { {"error on protocol level", []byte{0, 4, 'M', 'Q', 'T', 'T'}, 6, eofr, "protocol level"}, {"unacceptable protocol version", []byte{0, 4, 'M', 'Q', 'T', 'T', 10}, 7, nil, "unacceptable protocol version"}, {"error on flags", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel}, 7, eofr, "flags"}, - {"reserved flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1}, 8, nil, "connect flags reserved bit not set to 0"}, + {"reserved flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1}, 8, nil, errMQTTConnFlagReserved.Error()}, {"will qos without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 3}, 8, nil, "if Will flag is set to 0, Will QoS must be 0 too"}, - {"will retain without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 5}, 8, nil, "if Will flag is set to 0, Will Retain flag must be 0 too"}, + {"will retain without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 5}, 8, nil, errMQTTWillAndRetainFlag.Error()}, {"will qos", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 3<<3 | 1<<2}, 8, nil, "if Will flag is set to 1, Will QoS can be 0, 1 or 2"}, - {"no user but password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagPasswordFlag}, 8, nil, "password flag set but username flag is not"}, + {"no user but password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagPasswordFlag}, 8, nil, errMQTTPasswordFlagAndNoUser.Error()}, {"missing keep alive", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0}, 8, nil, "keep alive"}, {"missing client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1}, 10, nil, "client ID"}, - {"empty client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 0}, 12, nil, "when client ID is empty, clean session flag must be set to 1"}, + {"empty client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 0}, 12, nil, errMQTTCIDEmptyNeedsCleanFlag.Error()}, {"invalid utf8 client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 1, 241}, 13, nil, "invalid utf8 for client ID"}, {"missing will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0}, 12, nil, "Will topic"}, - {"empty will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, "empty Will topic not allowed"}, - {"invalid utf8 will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalide utf8 for Will topic"}, + {"empty will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, errMQTTEmptyWillTopic.Error()}, + {"invalid utf8 will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalid utf8 for Will topic"}, {"invalid wildcard will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, '#'}, 15, nil, "wildcards not allowed"}, {"error on will message", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 'a', 0, 3}, 17, eofr, "Will message"}, {"error on username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0}, 12, eofr, "user name"}, - {"empty username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, "empty user name not allowed"}, + {"empty username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, errMQTTEmptyUsername.Error()}, {"invalid utf8 username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalid utf8 for user name"}, {"error on password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagPasswordFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 'a'}, 15, eofr, "password"}, } { @@ -1989,10 +2009,10 @@ func TestMQTTParsePub(t *testing.T) { {"qos not supported", 0x4, nil, 0, nil, "not supported"}, {"packet in buffer error", 0, nil, 10, eofr, "error ensuring protocol is loaded"}, {"error on topic", 0, []byte{0, 3, 'f', 'o'}, 4, eofr, "topic"}, - {"empty topic", 0, []byte{0, 0}, 2, nil, "topic cannot be empty"}, + {"empty topic", 0, []byte{0, 0}, 2, nil, errMQTTTopicIsEmpty.Error()}, {"wildcards topic", 0, []byte{0, 1, '#'}, 3, nil, "wildcards not allowed"}, {"error on packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o'}, 5, eofr, "packet identifier"}, - {"invalid packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o', 0, 0}, 7, nil, "packet identifier cannot be 0"}, + {"invalid packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o', 0, 0}, 7, nil, errMQTTPacketIdentifierIsZero.Error()}, } { t.Run(test.name, func(t *testing.T) { r := &mqttReader{reader: test.reader} @@ -2018,7 +2038,7 @@ func TestMQTTParsePubAck(t *testing.T) { }{ {"packet in buffer error", nil, 10, eofr, "error ensuring protocol is loaded"}, {"error reading packet identifier", []byte{0}, 1, eofr, "packet identifier"}, - {"invalid packet identifier", []byte{0, 0}, 2, nil, "packet identifier cannot be 0"}, + {"invalid packet identifier", []byte{0, 0}, 2, nil, errMQTTPacketIdentifierIsZero.Error()}, } { t.Run(test.name, func(t *testing.T) { r := &mqttReader{reader: test.reader} @@ -3030,6 +3050,90 @@ func TestMQTTClusterPlacement(t *testing.T) { } } +func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) { + getClusterOpts := func(name string, i int) *Options { + o := testMQTTDefaultOptions() + o.ServerName = name + o.Cluster.Name = "hub" + o.Cluster.Host = "127.0.0.1" + o.Cluster.Port = 2790 + i + o.Routes = RoutesFromStr("nats://127.0.0.1:2791,nats://127.0.0.1:2792,nats://127.0.0.1:2793") + o.LeafNode.Host = "127.0.0.1" + o.LeafNode.Port = -1 + return o + } + o1 := getClusterOpts("S1", 1) + s1 := testMQTTRunServer(t, o1) + defer testMQTTShutdownServer(s1) + + o2 := getClusterOpts("S2", 2) + s2 := testMQTTRunServer(t, o2) + defer testMQTTShutdownServer(s2) + + o3 := getClusterOpts("S3", 3) + s3 := testMQTTRunServer(t, o3) + defer testMQTTShutdownServer(s3) + + cluster := []*Server{s1, s2, s3} + checkClusterFormed(t, cluster...) + checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { + for _, s := range cluster { + if s.JetStreamIsLeader() { + return nil + } + } + return fmt.Errorf("no leader yet") + }) + + // Now define a leafnode that has mqtt enabled, but no JS. This should still work. + lno := testMQTTDefaultOptions() + // Make sure jetstream is not explicitly defined here. + lno.JetStream = false + // Use RoutesFromStr() to make an array of urls + urls := RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d,nats://127.0.0.1:%d,nats://127.0.0.1:%d", + o1.LeafNode.Port, o2.LeafNode.Port, o3.LeafNode.Port)) + lno.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + ln := RunServer(lno) + defer ln.Shutdown() + + // Now connect to leafnode and subscribe + mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, lno.MQTT.Host, lno.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc, nil, rc) + + connectAndPublish := func(o *Options) { + mp, rp := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mp.Close() + testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false) + + testMQTTPublish(t, mp, rp, 1, false, false, "foo", 1, []byte("msg")) + } + // Connect a publisher from leafnode and publish, verify message is received. + connectAndPublish(lno) + testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) + + // Connect from one server in the cluster check it works from there too. + connectAndPublish(o3) + testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) + + // Connect from a server in the hub and subscribe + mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", cleanSess: true}, o2.MQTT.Host, o2.MQTT.Port) + defer mc2.Close() + testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc2, nil, rc2) + + // Connect a publisher from leafnode and publish, verify message is received. + connectAndPublish(lno) + testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) + + // Connect from one server in the cluster check it works from there too. + connectAndPublish(o1) + testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) +} + func TestMQTTParseUnsub(t *testing.T) { eofr := testNewEOFReader() for _, test := range []struct { @@ -3324,6 +3428,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) { template := ` port: -1 jetstream: enabled + server_name: mqtt authorization { mqtt_perms = { publish = ["%s"] @@ -4545,6 +4650,7 @@ func TestMQTTMaxAckPendingOverLimit(t *testing.T) { func TestMQTTConfigReload(t *testing.T) { template := ` jetstream: true + server_name: mqtt mqtt { port: -1 ack_wait: %s diff --git a/server/server_test.go b/server/server_test.go index dfe76673..3ed4b2ad 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1230,6 +1230,7 @@ func TestAcceptError(t *testing.T) { func TestServerShutdownDuringStart(t *testing.T) { o := DefaultOptions() + o.ServerName = "server" o.DisableShortFirstPing = true o.Accounts = []*Account{NewAccount("$SYS")} o.SystemAccount = "$SYS"