From f1730593c056e70a04763c8c446d4bd14b4df4dc Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 5 May 2021 12:49:11 -0600 Subject: [PATCH] Force server name to be set if mqtt{} defined This will solve the issue of naming the durable per server for the "retained messages" stream in situation where a cluster of servers would not have JetStream defined but connect to another cluster that has it. All the servers within the cluster without JetStream would cause the durable's delivery subject to be updated to the last server starting the durable. Updated the check for mqtt requiring JetStream if running in standalone mode to check that no leafnode configuration is present. Replaced use of fmt.Errorf() when the string was static with errors created with errors.New(). Updated tests that were checking for errors to use those errors instead of repeating the string. Added test that has a hub cluster with JS enabled and a remote server that has mqtt{} without JetStream and ensure that this works. Signed-off-by: Ivan Kozlovic --- server/mqtt.go | 69 ++++++++++++-------- server/mqtt_test.go | 142 ++++++++++++++++++++++++++++++++++++------ server/server_test.go | 1 + 3 files changed, 168 insertions(+), 44 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index 1cc8b402..4324ff96 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -176,6 +176,20 @@ var ( errMQTTTopicFilterCannotBeEmpty = errors.New("topic filter cannot be empty") errMQTTMalformedVarInt = errors.New("malformed variable int") errMQTTSecondConnectPacket = errors.New("received a second CONNECT packet") + errMQTTServerNameMustBeSet = errors.New("mqtt requires server name to be explicitly set") + errMQTTUserMixWithUsersNKeys = errors.New("mqtt authentication username not compatible with presence of users/nkeys") + errMQTTTokenMixWIthUsersNKeys = errors.New("mqtt authentication token not compatible with presence of users/nkeys") + errMQTTAckWaitMustBePositive = errors.New("ack wait must be a positive value") + errMQTTStandaloneNeedsJetStream = errors.New("mqtt requires JetStream to be enabled if running in standalone mode") + errMQTTConnFlagReserved = errors.New("connect flags reserved bit not set to 0") + errMQTTWillAndRetainFlag = errors.New("if Will flag is set to 0, Will Retain flag must be 0 too") + errMQTTPasswordFlagAndNoUser = errors.New("password flag set but username flag is not") + errMQTTCIDEmptyNeedsCleanFlag = errors.New("when client ID is empty, clean session flag must be set to 1") + errMQTTEmptyWillTopic = errors.New("empty Will topic not allowed") + errMQTTEmptyUsername = errors.New("empty user name not allowed") + errMQTTTopicIsEmpty = errors.New("topic cannot be empty") + errMQTTPacketIdentifierIsZero = errors.New("packet identifier cannot be 0") + errMQTTUnsupportedCharacters = errors.New("characters ' ' and '.' not supported for MQTT topics") ) type srvMQTT struct { @@ -508,6 +522,11 @@ func validateMQTTOptions(o *Options) error { if mo.Port == 0 { return nil } + // We have to force the server name to be explicitly set. There are conditions + // where we need a unique, repeatable name. + if o.ServerName == _EMPTY_ { + return errMQTTServerNameMustBeSet + } // If there is a NoAuthUser, we need to have Users defined and // the user to be present. if mo.NoAuthUser != _EMPTY_ { @@ -518,18 +537,22 @@ func validateMQTTOptions(o *Options) error { // Token/Username not possible if there are users/nkeys if len(o.Users) > 0 || len(o.Nkeys) > 0 { if mo.Username != _EMPTY_ { - return fmt.Errorf("mqtt authentication username not compatible with presence of users/nkeys") + return errMQTTUserMixWithUsersNKeys } if mo.Token != _EMPTY_ { - return fmt.Errorf("mqtt authentication token not compatible with presence of users/nkeys") + return errMQTTTokenMixWIthUsersNKeys } } if mo.AckWait < 0 { - return fmt.Errorf("ack wait must be a positive value") + return errMQTTAckWaitMustBePositive } - // If standalone and there is no JS enabled, then it won't work... - if o.Cluster.Port == 0 && o.Gateway.Port == 0 && !o.JetStream { - return fmt.Errorf("mqtt requires JetStream to be enabled if running in standalone mode") + // If strictly standalone and there is no JS enabled, then it won't work... + // For leafnodes, we could either have remote(s) and it would be ok, or no + // remote but accept from a remote side that has "hub" property set, which + // then would ok too. So we fail only if we have no leafnode config at all. + if !o.JetStream && o.Cluster.Port == 0 && o.Gateway.Port == 0 && + o.LeafNode.Port == 0 && len(o.LeafNode.Remotes) == 0 { + return errMQTTStandaloneNeedsJetStream } return nil } @@ -974,16 +997,10 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc // Using ephemeral consumer is too risky because if this server were to be // disconnected from the rest for few seconds, then the leader would remove // the consumer, so even after a reconnect, we would not longer receive - // retained messages. So delete any existing durable that we have for that - // and recreate here. In non cluster mode, we will use the retained messages - // stream name as the name, since it will be unique for this account and - // we will know what is the name on restart. - rmDurName := mqttRetainedMsgsStreamName - // In cluster mode, we will add our id (server name hash) since there may - // be many of those durables (1 per node). - if s.JetStreamIsClustered() { - rmDurName += "_" + jsa.id - } + // retained messages. Delete any existing durable that we have for that + // and recreate here. + // The name for the durable is $MQTT_rmsgs_ (which is jsa.id) + rmDurName := mqttRetainedMsgsStreamName + "_" + jsa.id resp, err := jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmDurName) // If error other than "not found" then fail, otherwise proceed with creating // the durable consumer. @@ -2189,7 +2206,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt // Spec [MQTT-3.1.2-3] if cp.flags&mqttConnFlagReserved != 0 { - return 0, nil, fmt.Errorf("connect flags reserved bit not set to 0") + return 0, nil, errMQTTConnFlagReserved } var hasWill bool @@ -2203,7 +2220,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt } // Spec [MQTT-3.1.2-15] if wretain { - return 0, nil, fmt.Errorf("if Will flag is set to 0, Will Retain flag must be 0 too") + return 0, nil, errMQTTWillAndRetainFlag } } else { // Spec [MQTT-3.1.2-14] @@ -2219,7 +2236,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt hasPassword := cp.flags&mqttConnFlagPasswordFlag != 0 // Spec [MQTT-3.1.2-22] if !hasUser && hasPassword { - return 0, nil, fmt.Errorf("password flag set but username flag is not") + return 0, nil, errMQTTPasswordFlagAndNoUser } // Keep alive @@ -2244,7 +2261,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt // Spec [MQTT-3.1.3-7] if cp.clientID == _EMPTY_ { if cp.flags&mqttConnFlagCleanSession == 0 { - return mqttConnAckRCIdentifierRejected, nil, fmt.Errorf("when client ID is empty, clean session flag must be set to 1") + return mqttConnAckRCIdentifierRejected, nil, errMQTTCIDEmptyNeedsCleanFlag } // Spec [MQTT-3.1.3-6] cp.clientID = nuid.Next() @@ -2267,10 +2284,10 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt return 0, nil, err } if len(topic) == 0 { - return 0, nil, fmt.Errorf("empty Will topic not allowed") + return 0, nil, errMQTTEmptyWillTopic } if !utf8.Valid(topic) { - return 0, nil, fmt.Errorf("invalide utf8 for Will topic %q", topic) + return 0, nil, fmt.Errorf("invalid utf8 for Will topic %q", topic) } cp.will.topic = topic // Convert MQTT topic to NATS subject @@ -2291,7 +2308,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt return 0, nil, err } if c.opts.Username == _EMPTY_ { - return mqttConnAckRCBadUserOrPassword, nil, fmt.Errorf("empty user name not allowed") + return mqttConnAckRCBadUserOrPassword, nil, errMQTTEmptyUsername } // Spec [MQTT-3.1.3-11] if !utf8.ValidString(c.opts.Username) { @@ -2580,7 +2597,7 @@ func (c *client) mqttParsePub(r *mqttReader, pl int, pp *mqttPublish) error { return err } if len(pp.topic) == 0 { - return fmt.Errorf("topic cannot be empty") + return errMQTTTopicIsEmpty } // Convert the topic to a NATS subject. This call will also check that // there is no MQTT wildcards (Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1]) @@ -2858,7 +2875,7 @@ func mqttParsePubAck(r *mqttReader, pl int) (uint16, error) { return 0, err } if pi == 0 { - return 0, fmt.Errorf("packet identifier cannot be 0") + return 0, errMQTTPacketIdentifierIsZero } return pi, nil } @@ -3558,7 +3575,7 @@ func mqttToNATSSubjectConversion(mt []byte, wcOk bool) ([]byte, error) { } case btsep, ' ': // As of now, we cannot support '.' or ' ' in the MQTT topic/filter. - return nil, fmt.Errorf("characters ' ' and '.' not supported for MQTT topics") + return nil, errMQTTUnsupportedCharacters case mqttSingleLevelWC, mqttMultiLevelWC: if !wcOk { // Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1] diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 7fe76d96..19b8c48f 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -199,6 +199,7 @@ func TestMQTTWriter(t *testing.T) { func testMQTTDefaultOptions() *Options { o := DefaultOptions() + o.ServerName = nuid.Next() o.Cluster.Port = 0 o.Gateway.Name = "" o.Gateway.Port = 0 @@ -224,6 +225,7 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server { s.SetLogger(l, true, true) go s.Start() if err := s.readyForConnections(3 * time.Second); err != nil { + testMQTTShutdownServer(s) t.Fatal(err) } return s @@ -261,8 +263,25 @@ func testMQTTDefaultTLSOptions(t *testing.T, verify bool) *Options { return o } +func TestMQTTServerNameRequired(t *testing.T) { + conf := createConfFile(t, []byte(` + mqtt { + port: -1 + } + `)) + defer removeFile(t, conf) + o, err := ProcessConfigFile(conf) + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + if _, err := NewServer(o); err == nil || err.Error() != errMQTTServerNameMustBeSet.Error() { + t.Fatalf("Expected error about requiring server name to be set, got %v", err) + } +} + func TestMQTTStandaloneRequiresJetStream(t *testing.T) { conf := createConfFile(t, []byte(` + server_name: mqtt mqtt { port: -1 tls { @@ -276,7 +295,7 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) { if err != nil { t.Fatalf("Error processing config file: %v", err) } - if _, err := NewServer(o); err == nil || !strings.Contains(err.Error(), "standalone") { + if _, err := NewServer(o); err == nil || err.Error() != errMQTTStandaloneNeedsJetStream.Error() { t.Fatalf("Expected error about requiring JetStream in standalone mode, got %v", err) } } @@ -284,6 +303,7 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) { func TestMQTTConfig(t *testing.T) { conf := createConfFile(t, []byte(` jetstream: enabled + server_name: mqtt mqtt { port: -1 tls { @@ -306,33 +326,33 @@ func TestMQTTValidateOptions(t *testing.T) { for _, test := range []struct { name string getOpts func() *Options - err string + err error }{ - {"mqtt disabled", func() *Options { return nmqtto.Clone() }, ""}, + {"mqtt disabled", func() *Options { return nmqtto.Clone() }, nil}, {"mqtt username not allowed if users specified", func() *Options { o := mqtto.Clone() o.Users = []*User{{Username: "abc", Password: "pwd"}} o.MQTT.Username = "b" o.MQTT.Password = "pwd" return o - }, "mqtt authentication username not compatible with presence of users/nkeys"}, + }, errMQTTUserMixWithUsersNKeys}, {"mqtt token not allowed if users specified", func() *Options { o := mqtto.Clone() o.Nkeys = []*NkeyUser{{Nkey: "abc"}} o.MQTT.Token = "mytoken" return o - }, "mqtt authentication token not compatible with presence of users/nkeys"}, + }, errMQTTTokenMixWIthUsersNKeys}, {"ack wait should be >=0", func() *Options { o := mqtto.Clone() o.MQTT.AckWait = -10 * time.Second return o - }, "ack wait must be a positive value"}, + }, errMQTTAckWaitMustBePositive}, } { t.Run(test.name, func(t *testing.T) { err := validateMQTTOptions(test.getOpts()) - if test.err == "" && err != nil { + if test.err == nil && err != nil { t.Fatalf("Unexpected error: %v", err) - } else if test.err != "" && (err == nil || !strings.Contains(err.Error(), test.err)) { + } else if test.err != nil && (err == nil || err.Error() != test.err.Error()) { t.Fatalf("Expected error to contain %q, got %v", test.err, err) } }) @@ -1467,22 +1487,22 @@ func TestMQTTParseConnect(t *testing.T) { {"error on protocol level", []byte{0, 4, 'M', 'Q', 'T', 'T'}, 6, eofr, "protocol level"}, {"unacceptable protocol version", []byte{0, 4, 'M', 'Q', 'T', 'T', 10}, 7, nil, "unacceptable protocol version"}, {"error on flags", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel}, 7, eofr, "flags"}, - {"reserved flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1}, 8, nil, "connect flags reserved bit not set to 0"}, + {"reserved flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1}, 8, nil, errMQTTConnFlagReserved.Error()}, {"will qos without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 3}, 8, nil, "if Will flag is set to 0, Will QoS must be 0 too"}, - {"will retain without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 5}, 8, nil, "if Will flag is set to 0, Will Retain flag must be 0 too"}, + {"will retain without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 5}, 8, nil, errMQTTWillAndRetainFlag.Error()}, {"will qos", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 3<<3 | 1<<2}, 8, nil, "if Will flag is set to 1, Will QoS can be 0, 1 or 2"}, - {"no user but password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagPasswordFlag}, 8, nil, "password flag set but username flag is not"}, + {"no user but password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagPasswordFlag}, 8, nil, errMQTTPasswordFlagAndNoUser.Error()}, {"missing keep alive", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0}, 8, nil, "keep alive"}, {"missing client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1}, 10, nil, "client ID"}, - {"empty client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 0}, 12, nil, "when client ID is empty, clean session flag must be set to 1"}, + {"empty client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 0}, 12, nil, errMQTTCIDEmptyNeedsCleanFlag.Error()}, {"invalid utf8 client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 1, 241}, 13, nil, "invalid utf8 for client ID"}, {"missing will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0}, 12, nil, "Will topic"}, - {"empty will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, "empty Will topic not allowed"}, - {"invalid utf8 will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalide utf8 for Will topic"}, + {"empty will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, errMQTTEmptyWillTopic.Error()}, + {"invalid utf8 will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalid utf8 for Will topic"}, {"invalid wildcard will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, '#'}, 15, nil, "wildcards not allowed"}, {"error on will message", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 'a', 0, 3}, 17, eofr, "Will message"}, {"error on username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0}, 12, eofr, "user name"}, - {"empty username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, "empty user name not allowed"}, + {"empty username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, errMQTTEmptyUsername.Error()}, {"invalid utf8 username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalid utf8 for user name"}, {"error on password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagPasswordFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 'a'}, 15, eofr, "password"}, } { @@ -1989,10 +2009,10 @@ func TestMQTTParsePub(t *testing.T) { {"qos not supported", 0x4, nil, 0, nil, "not supported"}, {"packet in buffer error", 0, nil, 10, eofr, "error ensuring protocol is loaded"}, {"error on topic", 0, []byte{0, 3, 'f', 'o'}, 4, eofr, "topic"}, - {"empty topic", 0, []byte{0, 0}, 2, nil, "topic cannot be empty"}, + {"empty topic", 0, []byte{0, 0}, 2, nil, errMQTTTopicIsEmpty.Error()}, {"wildcards topic", 0, []byte{0, 1, '#'}, 3, nil, "wildcards not allowed"}, {"error on packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o'}, 5, eofr, "packet identifier"}, - {"invalid packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o', 0, 0}, 7, nil, "packet identifier cannot be 0"}, + {"invalid packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o', 0, 0}, 7, nil, errMQTTPacketIdentifierIsZero.Error()}, } { t.Run(test.name, func(t *testing.T) { r := &mqttReader{reader: test.reader} @@ -2018,7 +2038,7 @@ func TestMQTTParsePubAck(t *testing.T) { }{ {"packet in buffer error", nil, 10, eofr, "error ensuring protocol is loaded"}, {"error reading packet identifier", []byte{0}, 1, eofr, "packet identifier"}, - {"invalid packet identifier", []byte{0, 0}, 2, nil, "packet identifier cannot be 0"}, + {"invalid packet identifier", []byte{0, 0}, 2, nil, errMQTTPacketIdentifierIsZero.Error()}, } { t.Run(test.name, func(t *testing.T) { r := &mqttReader{reader: test.reader} @@ -3030,6 +3050,90 @@ func TestMQTTClusterPlacement(t *testing.T) { } } +func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) { + getClusterOpts := func(name string, i int) *Options { + o := testMQTTDefaultOptions() + o.ServerName = name + o.Cluster.Name = "hub" + o.Cluster.Host = "127.0.0.1" + o.Cluster.Port = 2790 + i + o.Routes = RoutesFromStr("nats://127.0.0.1:2791,nats://127.0.0.1:2792,nats://127.0.0.1:2793") + o.LeafNode.Host = "127.0.0.1" + o.LeafNode.Port = -1 + return o + } + o1 := getClusterOpts("S1", 1) + s1 := testMQTTRunServer(t, o1) + defer testMQTTShutdownServer(s1) + + o2 := getClusterOpts("S2", 2) + s2 := testMQTTRunServer(t, o2) + defer testMQTTShutdownServer(s2) + + o3 := getClusterOpts("S3", 3) + s3 := testMQTTRunServer(t, o3) + defer testMQTTShutdownServer(s3) + + cluster := []*Server{s1, s2, s3} + checkClusterFormed(t, cluster...) + checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { + for _, s := range cluster { + if s.JetStreamIsLeader() { + return nil + } + } + return fmt.Errorf("no leader yet") + }) + + // Now define a leafnode that has mqtt enabled, but no JS. This should still work. + lno := testMQTTDefaultOptions() + // Make sure jetstream is not explicitly defined here. + lno.JetStream = false + // Use RoutesFromStr() to make an array of urls + urls := RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d,nats://127.0.0.1:%d,nats://127.0.0.1:%d", + o1.LeafNode.Port, o2.LeafNode.Port, o3.LeafNode.Port)) + lno.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + ln := RunServer(lno) + defer ln.Shutdown() + + // Now connect to leafnode and subscribe + mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, lno.MQTT.Host, lno.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc, nil, rc) + + connectAndPublish := func(o *Options) { + mp, rp := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mp.Close() + testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false) + + testMQTTPublish(t, mp, rp, 1, false, false, "foo", 1, []byte("msg")) + } + // Connect a publisher from leafnode and publish, verify message is received. + connectAndPublish(lno) + testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) + + // Connect from one server in the cluster check it works from there too. + connectAndPublish(o3) + testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) + + // Connect from a server in the hub and subscribe + mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", cleanSess: true}, o2.MQTT.Host, o2.MQTT.Port) + defer mc2.Close() + testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc2, nil, rc2) + + // Connect a publisher from leafnode and publish, verify message is received. + connectAndPublish(lno) + testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) + + // Connect from one server in the cluster check it works from there too. + connectAndPublish(o1) + testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) +} + func TestMQTTParseUnsub(t *testing.T) { eofr := testNewEOFReader() for _, test := range []struct { @@ -3324,6 +3428,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) { template := ` port: -1 jetstream: enabled + server_name: mqtt authorization { mqtt_perms = { publish = ["%s"] @@ -4545,6 +4650,7 @@ func TestMQTTMaxAckPendingOverLimit(t *testing.T) { func TestMQTTConfigReload(t *testing.T) { template := ` jetstream: true + server_name: mqtt mqtt { port: -1 ack_wait: %s diff --git a/server/server_test.go b/server/server_test.go index dfe76673..3ed4b2ad 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1230,6 +1230,7 @@ func TestAcceptError(t *testing.T) { func TestServerShutdownDuringStart(t *testing.T) { o := DefaultOptions() + o.ServerName = "server" o.DisableShortFirstPing = true o.Accounts = []*Account{NewAccount("$SYS")} o.SystemAccount = "$SYS"