mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Force server name to be set if mqtt{} defined
This will solve the issue of naming the durable per server for
the "retained messages" stream in situation where a cluster
of servers would not have JetStream defined but connect to another
cluster that has it. All the servers within the cluster without
JetStream would cause the durable's delivery subject to be updated
to the last server starting the durable.
Updated the check for mqtt requiring JetStream if running in
standalone mode to check that no leafnode configuration is present.
Replaced use of fmt.Errorf() when the string was static with
errors created with errors.New(). Updated tests that were checking
for errors to use those errors instead of repeating the string.
Added test that has a hub cluster with JS enabled and a remote server
that has mqtt{} without JetStream and ensure that this works.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -176,6 +176,20 @@ var (
|
||||
errMQTTTopicFilterCannotBeEmpty = errors.New("topic filter cannot be empty")
|
||||
errMQTTMalformedVarInt = errors.New("malformed variable int")
|
||||
errMQTTSecondConnectPacket = errors.New("received a second CONNECT packet")
|
||||
errMQTTServerNameMustBeSet = errors.New("mqtt requires server name to be explicitly set")
|
||||
errMQTTUserMixWithUsersNKeys = errors.New("mqtt authentication username not compatible with presence of users/nkeys")
|
||||
errMQTTTokenMixWIthUsersNKeys = errors.New("mqtt authentication token not compatible with presence of users/nkeys")
|
||||
errMQTTAckWaitMustBePositive = errors.New("ack wait must be a positive value")
|
||||
errMQTTStandaloneNeedsJetStream = errors.New("mqtt requires JetStream to be enabled if running in standalone mode")
|
||||
errMQTTConnFlagReserved = errors.New("connect flags reserved bit not set to 0")
|
||||
errMQTTWillAndRetainFlag = errors.New("if Will flag is set to 0, Will Retain flag must be 0 too")
|
||||
errMQTTPasswordFlagAndNoUser = errors.New("password flag set but username flag is not")
|
||||
errMQTTCIDEmptyNeedsCleanFlag = errors.New("when client ID is empty, clean session flag must be set to 1")
|
||||
errMQTTEmptyWillTopic = errors.New("empty Will topic not allowed")
|
||||
errMQTTEmptyUsername = errors.New("empty user name not allowed")
|
||||
errMQTTTopicIsEmpty = errors.New("topic cannot be empty")
|
||||
errMQTTPacketIdentifierIsZero = errors.New("packet identifier cannot be 0")
|
||||
errMQTTUnsupportedCharacters = errors.New("characters ' ' and '.' not supported for MQTT topics")
|
||||
)
|
||||
|
||||
type srvMQTT struct {
|
||||
@@ -508,6 +522,11 @@ func validateMQTTOptions(o *Options) error {
|
||||
if mo.Port == 0 {
|
||||
return nil
|
||||
}
|
||||
// We have to force the server name to be explicitly set. There are conditions
|
||||
// where we need a unique, repeatable name.
|
||||
if o.ServerName == _EMPTY_ {
|
||||
return errMQTTServerNameMustBeSet
|
||||
}
|
||||
// If there is a NoAuthUser, we need to have Users defined and
|
||||
// the user to be present.
|
||||
if mo.NoAuthUser != _EMPTY_ {
|
||||
@@ -518,18 +537,22 @@ func validateMQTTOptions(o *Options) error {
|
||||
// Token/Username not possible if there are users/nkeys
|
||||
if len(o.Users) > 0 || len(o.Nkeys) > 0 {
|
||||
if mo.Username != _EMPTY_ {
|
||||
return fmt.Errorf("mqtt authentication username not compatible with presence of users/nkeys")
|
||||
return errMQTTUserMixWithUsersNKeys
|
||||
}
|
||||
if mo.Token != _EMPTY_ {
|
||||
return fmt.Errorf("mqtt authentication token not compatible with presence of users/nkeys")
|
||||
return errMQTTTokenMixWIthUsersNKeys
|
||||
}
|
||||
}
|
||||
if mo.AckWait < 0 {
|
||||
return fmt.Errorf("ack wait must be a positive value")
|
||||
return errMQTTAckWaitMustBePositive
|
||||
}
|
||||
// If standalone and there is no JS enabled, then it won't work...
|
||||
if o.Cluster.Port == 0 && o.Gateway.Port == 0 && !o.JetStream {
|
||||
return fmt.Errorf("mqtt requires JetStream to be enabled if running in standalone mode")
|
||||
// If strictly standalone and there is no JS enabled, then it won't work...
|
||||
// For leafnodes, we could either have remote(s) and it would be ok, or no
|
||||
// remote but accept from a remote side that has "hub" property set, which
|
||||
// then would ok too. So we fail only if we have no leafnode config at all.
|
||||
if !o.JetStream && o.Cluster.Port == 0 && o.Gateway.Port == 0 &&
|
||||
o.LeafNode.Port == 0 && len(o.LeafNode.Remotes) == 0 {
|
||||
return errMQTTStandaloneNeedsJetStream
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -974,16 +997,10 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
|
||||
// Using ephemeral consumer is too risky because if this server were to be
|
||||
// disconnected from the rest for few seconds, then the leader would remove
|
||||
// the consumer, so even after a reconnect, we would not longer receive
|
||||
// retained messages. So delete any existing durable that we have for that
|
||||
// and recreate here. In non cluster mode, we will use the retained messages
|
||||
// stream name as the name, since it will be unique for this account and
|
||||
// we will know what is the name on restart.
|
||||
rmDurName := mqttRetainedMsgsStreamName
|
||||
// In cluster mode, we will add our id (server name hash) since there may
|
||||
// be many of those durables (1 per node).
|
||||
if s.JetStreamIsClustered() {
|
||||
rmDurName += "_" + jsa.id
|
||||
}
|
||||
// retained messages. Delete any existing durable that we have for that
|
||||
// and recreate here.
|
||||
// The name for the durable is $MQTT_rmsgs_<server name hash> (which is jsa.id)
|
||||
rmDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
|
||||
resp, err := jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmDurName)
|
||||
// If error other than "not found" then fail, otherwise proceed with creating
|
||||
// the durable consumer.
|
||||
@@ -2189,7 +2206,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt
|
||||
|
||||
// Spec [MQTT-3.1.2-3]
|
||||
if cp.flags&mqttConnFlagReserved != 0 {
|
||||
return 0, nil, fmt.Errorf("connect flags reserved bit not set to 0")
|
||||
return 0, nil, errMQTTConnFlagReserved
|
||||
}
|
||||
|
||||
var hasWill bool
|
||||
@@ -2203,7 +2220,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt
|
||||
}
|
||||
// Spec [MQTT-3.1.2-15]
|
||||
if wretain {
|
||||
return 0, nil, fmt.Errorf("if Will flag is set to 0, Will Retain flag must be 0 too")
|
||||
return 0, nil, errMQTTWillAndRetainFlag
|
||||
}
|
||||
} else {
|
||||
// Spec [MQTT-3.1.2-14]
|
||||
@@ -2219,7 +2236,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt
|
||||
hasPassword := cp.flags&mqttConnFlagPasswordFlag != 0
|
||||
// Spec [MQTT-3.1.2-22]
|
||||
if !hasUser && hasPassword {
|
||||
return 0, nil, fmt.Errorf("password flag set but username flag is not")
|
||||
return 0, nil, errMQTTPasswordFlagAndNoUser
|
||||
}
|
||||
|
||||
// Keep alive
|
||||
@@ -2244,7 +2261,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt
|
||||
// Spec [MQTT-3.1.3-7]
|
||||
if cp.clientID == _EMPTY_ {
|
||||
if cp.flags&mqttConnFlagCleanSession == 0 {
|
||||
return mqttConnAckRCIdentifierRejected, nil, fmt.Errorf("when client ID is empty, clean session flag must be set to 1")
|
||||
return mqttConnAckRCIdentifierRejected, nil, errMQTTCIDEmptyNeedsCleanFlag
|
||||
}
|
||||
// Spec [MQTT-3.1.3-6]
|
||||
cp.clientID = nuid.Next()
|
||||
@@ -2267,10 +2284,10 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt
|
||||
return 0, nil, err
|
||||
}
|
||||
if len(topic) == 0 {
|
||||
return 0, nil, fmt.Errorf("empty Will topic not allowed")
|
||||
return 0, nil, errMQTTEmptyWillTopic
|
||||
}
|
||||
if !utf8.Valid(topic) {
|
||||
return 0, nil, fmt.Errorf("invalide utf8 for Will topic %q", topic)
|
||||
return 0, nil, fmt.Errorf("invalid utf8 for Will topic %q", topic)
|
||||
}
|
||||
cp.will.topic = topic
|
||||
// Convert MQTT topic to NATS subject
|
||||
@@ -2291,7 +2308,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt
|
||||
return 0, nil, err
|
||||
}
|
||||
if c.opts.Username == _EMPTY_ {
|
||||
return mqttConnAckRCBadUserOrPassword, nil, fmt.Errorf("empty user name not allowed")
|
||||
return mqttConnAckRCBadUserOrPassword, nil, errMQTTEmptyUsername
|
||||
}
|
||||
// Spec [MQTT-3.1.3-11]
|
||||
if !utf8.ValidString(c.opts.Username) {
|
||||
@@ -2580,7 +2597,7 @@ func (c *client) mqttParsePub(r *mqttReader, pl int, pp *mqttPublish) error {
|
||||
return err
|
||||
}
|
||||
if len(pp.topic) == 0 {
|
||||
return fmt.Errorf("topic cannot be empty")
|
||||
return errMQTTTopicIsEmpty
|
||||
}
|
||||
// Convert the topic to a NATS subject. This call will also check that
|
||||
// there is no MQTT wildcards (Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1])
|
||||
@@ -2858,7 +2875,7 @@ func mqttParsePubAck(r *mqttReader, pl int) (uint16, error) {
|
||||
return 0, err
|
||||
}
|
||||
if pi == 0 {
|
||||
return 0, fmt.Errorf("packet identifier cannot be 0")
|
||||
return 0, errMQTTPacketIdentifierIsZero
|
||||
}
|
||||
return pi, nil
|
||||
}
|
||||
@@ -3558,7 +3575,7 @@ func mqttToNATSSubjectConversion(mt []byte, wcOk bool) ([]byte, error) {
|
||||
}
|
||||
case btsep, ' ':
|
||||
// As of now, we cannot support '.' or ' ' in the MQTT topic/filter.
|
||||
return nil, fmt.Errorf("characters ' ' and '.' not supported for MQTT topics")
|
||||
return nil, errMQTTUnsupportedCharacters
|
||||
case mqttSingleLevelWC, mqttMultiLevelWC:
|
||||
if !wcOk {
|
||||
// Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1]
|
||||
|
||||
@@ -199,6 +199,7 @@ func TestMQTTWriter(t *testing.T) {
|
||||
|
||||
func testMQTTDefaultOptions() *Options {
|
||||
o := DefaultOptions()
|
||||
o.ServerName = nuid.Next()
|
||||
o.Cluster.Port = 0
|
||||
o.Gateway.Name = ""
|
||||
o.Gateway.Port = 0
|
||||
@@ -224,6 +225,7 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server {
|
||||
s.SetLogger(l, true, true)
|
||||
go s.Start()
|
||||
if err := s.readyForConnections(3 * time.Second); err != nil {
|
||||
testMQTTShutdownServer(s)
|
||||
t.Fatal(err)
|
||||
}
|
||||
return s
|
||||
@@ -261,8 +263,25 @@ func testMQTTDefaultTLSOptions(t *testing.T, verify bool) *Options {
|
||||
return o
|
||||
}
|
||||
|
||||
func TestMQTTServerNameRequired(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
mqtt {
|
||||
port: -1
|
||||
}
|
||||
`))
|
||||
defer removeFile(t, conf)
|
||||
o, err := ProcessConfigFile(conf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
if _, err := NewServer(o); err == nil || err.Error() != errMQTTServerNameMustBeSet.Error() {
|
||||
t.Fatalf("Expected error about requiring server name to be set, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTStandaloneRequiresJetStream(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
server_name: mqtt
|
||||
mqtt {
|
||||
port: -1
|
||||
tls {
|
||||
@@ -276,7 +295,7 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
if _, err := NewServer(o); err == nil || !strings.Contains(err.Error(), "standalone") {
|
||||
if _, err := NewServer(o); err == nil || err.Error() != errMQTTStandaloneNeedsJetStream.Error() {
|
||||
t.Fatalf("Expected error about requiring JetStream in standalone mode, got %v", err)
|
||||
}
|
||||
}
|
||||
@@ -284,6 +303,7 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) {
|
||||
func TestMQTTConfig(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
jetstream: enabled
|
||||
server_name: mqtt
|
||||
mqtt {
|
||||
port: -1
|
||||
tls {
|
||||
@@ -306,33 +326,33 @@ func TestMQTTValidateOptions(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
getOpts func() *Options
|
||||
err string
|
||||
err error
|
||||
}{
|
||||
{"mqtt disabled", func() *Options { return nmqtto.Clone() }, ""},
|
||||
{"mqtt disabled", func() *Options { return nmqtto.Clone() }, nil},
|
||||
{"mqtt username not allowed if users specified", func() *Options {
|
||||
o := mqtto.Clone()
|
||||
o.Users = []*User{{Username: "abc", Password: "pwd"}}
|
||||
o.MQTT.Username = "b"
|
||||
o.MQTT.Password = "pwd"
|
||||
return o
|
||||
}, "mqtt authentication username not compatible with presence of users/nkeys"},
|
||||
}, errMQTTUserMixWithUsersNKeys},
|
||||
{"mqtt token not allowed if users specified", func() *Options {
|
||||
o := mqtto.Clone()
|
||||
o.Nkeys = []*NkeyUser{{Nkey: "abc"}}
|
||||
o.MQTT.Token = "mytoken"
|
||||
return o
|
||||
}, "mqtt authentication token not compatible with presence of users/nkeys"},
|
||||
}, errMQTTTokenMixWIthUsersNKeys},
|
||||
{"ack wait should be >=0", func() *Options {
|
||||
o := mqtto.Clone()
|
||||
o.MQTT.AckWait = -10 * time.Second
|
||||
return o
|
||||
}, "ack wait must be a positive value"},
|
||||
}, errMQTTAckWaitMustBePositive},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
err := validateMQTTOptions(test.getOpts())
|
||||
if test.err == "" && err != nil {
|
||||
if test.err == nil && err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
} else if test.err != "" && (err == nil || !strings.Contains(err.Error(), test.err)) {
|
||||
} else if test.err != nil && (err == nil || err.Error() != test.err.Error()) {
|
||||
t.Fatalf("Expected error to contain %q, got %v", test.err, err)
|
||||
}
|
||||
})
|
||||
@@ -1467,22 +1487,22 @@ func TestMQTTParseConnect(t *testing.T) {
|
||||
{"error on protocol level", []byte{0, 4, 'M', 'Q', 'T', 'T'}, 6, eofr, "protocol level"},
|
||||
{"unacceptable protocol version", []byte{0, 4, 'M', 'Q', 'T', 'T', 10}, 7, nil, "unacceptable protocol version"},
|
||||
{"error on flags", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel}, 7, eofr, "flags"},
|
||||
{"reserved flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1}, 8, nil, "connect flags reserved bit not set to 0"},
|
||||
{"reserved flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1}, 8, nil, errMQTTConnFlagReserved.Error()},
|
||||
{"will qos without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 3}, 8, nil, "if Will flag is set to 0, Will QoS must be 0 too"},
|
||||
{"will retain without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 5}, 8, nil, "if Will flag is set to 0, Will Retain flag must be 0 too"},
|
||||
{"will retain without will flag", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 1 << 5}, 8, nil, errMQTTWillAndRetainFlag.Error()},
|
||||
{"will qos", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 3<<3 | 1<<2}, 8, nil, "if Will flag is set to 1, Will QoS can be 0, 1 or 2"},
|
||||
{"no user but password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagPasswordFlag}, 8, nil, "password flag set but username flag is not"},
|
||||
{"no user but password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagPasswordFlag}, 8, nil, errMQTTPasswordFlagAndNoUser.Error()},
|
||||
{"missing keep alive", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0}, 8, nil, "keep alive"},
|
||||
{"missing client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1}, 10, nil, "client ID"},
|
||||
{"empty client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 0}, 12, nil, "when client ID is empty, clean session flag must be set to 1"},
|
||||
{"empty client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 0}, 12, nil, errMQTTCIDEmptyNeedsCleanFlag.Error()},
|
||||
{"invalid utf8 client ID", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, 0, 0, 1, 0, 1, 241}, 13, nil, "invalid utf8 for client ID"},
|
||||
{"missing will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0}, 12, nil, "Will topic"},
|
||||
{"empty will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, "empty Will topic not allowed"},
|
||||
{"invalid utf8 will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalide utf8 for Will topic"},
|
||||
{"empty will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, errMQTTEmptyWillTopic.Error()},
|
||||
{"invalid utf8 will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalid utf8 for Will topic"},
|
||||
{"invalid wildcard will topic", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, '#'}, 15, nil, "wildcards not allowed"},
|
||||
{"error on will message", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagWillFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 'a', 0, 3}, 17, eofr, "Will message"},
|
||||
{"error on username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0}, 12, eofr, "user name"},
|
||||
{"empty username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, "empty user name not allowed"},
|
||||
{"empty username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 0}, 14, nil, errMQTTEmptyUsername.Error()},
|
||||
{"invalid utf8 username", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 241}, 15, nil, "invalid utf8 for user name"},
|
||||
{"error on password", []byte{0, 4, 'M', 'Q', 'T', 'T', mqttProtoLevel, mqttConnFlagUsernameFlag | mqttConnFlagPasswordFlag | mqttConnFlagCleanSession, 0, 0, 0, 0, 0, 1, 'a'}, 15, eofr, "password"},
|
||||
} {
|
||||
@@ -1989,10 +2009,10 @@ func TestMQTTParsePub(t *testing.T) {
|
||||
{"qos not supported", 0x4, nil, 0, nil, "not supported"},
|
||||
{"packet in buffer error", 0, nil, 10, eofr, "error ensuring protocol is loaded"},
|
||||
{"error on topic", 0, []byte{0, 3, 'f', 'o'}, 4, eofr, "topic"},
|
||||
{"empty topic", 0, []byte{0, 0}, 2, nil, "topic cannot be empty"},
|
||||
{"empty topic", 0, []byte{0, 0}, 2, nil, errMQTTTopicIsEmpty.Error()},
|
||||
{"wildcards topic", 0, []byte{0, 1, '#'}, 3, nil, "wildcards not allowed"},
|
||||
{"error on packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o'}, 5, eofr, "packet identifier"},
|
||||
{"invalid packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o', 0, 0}, 7, nil, "packet identifier cannot be 0"},
|
||||
{"invalid packet identifier", mqttPubQos1, []byte{0, 3, 'f', 'o', 'o', 0, 0}, 7, nil, errMQTTPacketIdentifierIsZero.Error()},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
r := &mqttReader{reader: test.reader}
|
||||
@@ -2018,7 +2038,7 @@ func TestMQTTParsePubAck(t *testing.T) {
|
||||
}{
|
||||
{"packet in buffer error", nil, 10, eofr, "error ensuring protocol is loaded"},
|
||||
{"error reading packet identifier", []byte{0}, 1, eofr, "packet identifier"},
|
||||
{"invalid packet identifier", []byte{0, 0}, 2, nil, "packet identifier cannot be 0"},
|
||||
{"invalid packet identifier", []byte{0, 0}, 2, nil, errMQTTPacketIdentifierIsZero.Error()},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
r := &mqttReader{reader: test.reader}
|
||||
@@ -3030,6 +3050,90 @@ func TestMQTTClusterPlacement(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) {
|
||||
getClusterOpts := func(name string, i int) *Options {
|
||||
o := testMQTTDefaultOptions()
|
||||
o.ServerName = name
|
||||
o.Cluster.Name = "hub"
|
||||
o.Cluster.Host = "127.0.0.1"
|
||||
o.Cluster.Port = 2790 + i
|
||||
o.Routes = RoutesFromStr("nats://127.0.0.1:2791,nats://127.0.0.1:2792,nats://127.0.0.1:2793")
|
||||
o.LeafNode.Host = "127.0.0.1"
|
||||
o.LeafNode.Port = -1
|
||||
return o
|
||||
}
|
||||
o1 := getClusterOpts("S1", 1)
|
||||
s1 := testMQTTRunServer(t, o1)
|
||||
defer testMQTTShutdownServer(s1)
|
||||
|
||||
o2 := getClusterOpts("S2", 2)
|
||||
s2 := testMQTTRunServer(t, o2)
|
||||
defer testMQTTShutdownServer(s2)
|
||||
|
||||
o3 := getClusterOpts("S3", 3)
|
||||
s3 := testMQTTRunServer(t, o3)
|
||||
defer testMQTTShutdownServer(s3)
|
||||
|
||||
cluster := []*Server{s1, s2, s3}
|
||||
checkClusterFormed(t, cluster...)
|
||||
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
|
||||
for _, s := range cluster {
|
||||
if s.JetStreamIsLeader() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("no leader yet")
|
||||
})
|
||||
|
||||
// Now define a leafnode that has mqtt enabled, but no JS. This should still work.
|
||||
lno := testMQTTDefaultOptions()
|
||||
// Make sure jetstream is not explicitly defined here.
|
||||
lno.JetStream = false
|
||||
// Use RoutesFromStr() to make an array of urls
|
||||
urls := RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d,nats://127.0.0.1:%d,nats://127.0.0.1:%d",
|
||||
o1.LeafNode.Port, o2.LeafNode.Port, o3.LeafNode.Port))
|
||||
lno.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
ln := RunServer(lno)
|
||||
defer ln.Shutdown()
|
||||
|
||||
// Now connect to leafnode and subscribe
|
||||
mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, lno.MQTT.Host, lno.MQTT.Port)
|
||||
defer mc.Close()
|
||||
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)
|
||||
testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
|
||||
testMQTTFlush(t, mc, nil, rc)
|
||||
|
||||
connectAndPublish := func(o *Options) {
|
||||
mp, rp := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mp.Close()
|
||||
testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false)
|
||||
|
||||
testMQTTPublish(t, mp, rp, 1, false, false, "foo", 1, []byte("msg"))
|
||||
}
|
||||
// Connect a publisher from leafnode and publish, verify message is received.
|
||||
connectAndPublish(lno)
|
||||
testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg"))
|
||||
|
||||
// Connect from one server in the cluster check it works from there too.
|
||||
connectAndPublish(o3)
|
||||
testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg"))
|
||||
|
||||
// Connect from a server in the hub and subscribe
|
||||
mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", cleanSess: true}, o2.MQTT.Host, o2.MQTT.Port)
|
||||
defer mc2.Close()
|
||||
testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false)
|
||||
testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
|
||||
testMQTTFlush(t, mc2, nil, rc2)
|
||||
|
||||
// Connect a publisher from leafnode and publish, verify message is received.
|
||||
connectAndPublish(lno)
|
||||
testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg"))
|
||||
|
||||
// Connect from one server in the cluster check it works from there too.
|
||||
connectAndPublish(o1)
|
||||
testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg"))
|
||||
}
|
||||
|
||||
func TestMQTTParseUnsub(t *testing.T) {
|
||||
eofr := testNewEOFReader()
|
||||
for _, test := range []struct {
|
||||
@@ -3324,6 +3428,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) {
|
||||
template := `
|
||||
port: -1
|
||||
jetstream: enabled
|
||||
server_name: mqtt
|
||||
authorization {
|
||||
mqtt_perms = {
|
||||
publish = ["%s"]
|
||||
@@ -4545,6 +4650,7 @@ func TestMQTTMaxAckPendingOverLimit(t *testing.T) {
|
||||
func TestMQTTConfigReload(t *testing.T) {
|
||||
template := `
|
||||
jetstream: true
|
||||
server_name: mqtt
|
||||
mqtt {
|
||||
port: -1
|
||||
ack_wait: %s
|
||||
|
||||
@@ -1230,6 +1230,7 @@ func TestAcceptError(t *testing.T) {
|
||||
|
||||
func TestServerShutdownDuringStart(t *testing.T) {
|
||||
o := DefaultOptions()
|
||||
o.ServerName = "server"
|
||||
o.DisableShortFirstPing = true
|
||||
o.Accounts = []*Account{NewAccount("$SYS")}
|
||||
o.SystemAccount = "$SYS"
|
||||
|
||||
Reference in New Issue
Block a user