29 KiB
MQTT Implementation Overview
Revision 1.1
Authors: Ivan Kozlovic, Lev Brouk
NATS Server currently supports most of MQTT 3.1.1. This document describes how it is implementated.
It is strongly recommended to review the MQTT v3.1.1 specifications and get a detailed understanding before proceeding with this document.
Contents
1. Concepts
Server, client
In the MQTT specification there are concepts of Client and Server, used
somewhat interchangeably with those of Sender and Receiver. A Server
acts as a Receiver when it gets PUBLISH messages from a Sender
Client, and acts as a Sender when it delivers them to subscribed
Clients.
In the NATS server implementation there are also concepts (types) server and
client. client is an internal representation of a (connected) client and
runs its own read and write loops. Both of these have an mqtt field that if
set makes them behave as MQTT-compliant.
The code and comments may sometimes be confusing as they refer to server and
client sometimes ambiguously between MQTT and NATS.
Connection, client ID, session
When an MQTT client connects to a server, it must send a CONNECT packet to
create an MQTT Connection. The packet must include a Client Identifier.
The server will then create or load a previously saved Session for the (hash
of) the client ID.
Packets, messages, and subscriptions
The low level unit of transmission in MQTT is a Packet. Examples of packets
are: CONNECT, SUBSCRIBE, SUBACK, PUBLISH, PUBCOMP, etc.
An MQTT Message starts with a PUBLISH packet that a client sends to the
server. It is then matched against the current MQTT Subscriptions and is
delivered to them as appropriate. During the message delivery the server acts as
an MQTT client, and the receiver acts as an MQTT server.
Internally we use NATS Messages and NATS Subscriptions to facilitate
message delivery. This may be somewhat confusing as the code refers to msg and
sub. What may be even more confusing is that some MQTT packets (specifically,
PUBREL) are represented as NATS messages, and that the original MQTT packet
"metadata" may be encoded as NATS message headers.
Quality of Service (QoS), publish identifier (PI)
MQTT specifies 3 levels of quality of service (QoS):
0for at most once. A single delivery attempt.1for at least once. Will try to redeliver until acknowledged by the receiver.2for exactly once. See the [SPEC REF] for the acknowledgement flow.
QoS 1 and 2 messages need to be identified with publish identifiers (PIs). A PI is a 16-bit integer that must uniquely identify a message for the duration of the required exchange of acknowledgment packets.
Note that the QoS applies separately to the transmission of a message from a sender client to the server, and from the server to the receiving client. There is no protocol-level acknowledgements between the receiver and the original sender. The sender passes the ownership of messages to the server, and the server then delivers them at maximum possible QoS to the receivers (subscribers). The PIs for in-flight outgoing messages are issued and stored per session.
Retained message
A Retained Message is not part of any MQTT session and is not removed when the session that produced it goes away. Instead, the server needs to persist a single retained message per topic. When a subscription is started, the server needs to send the “matching” retained messages, that is, messages that would have been delivered to the new subscription should that subscription had been running prior to the publication of this message.
Retained messages are removed when the server receives a retained message with an empty body. Still, this retained message that serves as a “delete” of a retained message will be processed as a normal published message.
Retained messages can have QoS.
Will message
The CONNECT packet can contain information about a Will Message that needs to
be sent to any client subscribing on the Will topic/subject in the event that
the client is disconnected implicitly, that is, not as a result as the client
sending the DISCONNECT packet.
Will messages can have the retain flag and QoS.
2. Use of JetStream
The MQTT implementation relies heavily on JetStream. We use it to:
- Persist (and restore) the Session state.
- Store and retrieve Retained messages.
- Persist incoming QoS 1 and 2 messages, and re-deliver if needed.
- Store and de-duplicate incoming QoS 2 messages.
- Persist and re-deliver outgoing QoS
2
PUBRELpackets.
Here is the overview of how we set up and use JetStream streams, consumers, and internal NATS subscriptions.
JetStream API
All interactions with JetStream are performed via mqttJSA that sends NATS
requests to JetStream. Most are processed syncronously and await a response,
some (e.g. jsa.sendAck()) are sent asynchronously. JetStream API is usually
referred to as jsa in the code. No special locking is required to use jsa,
however the asynchronous use of JetStream may create race conditions with
delivery callbacks.
Streams
We create the following streams unless they already exist. Failing to ensure the streams would prevent the client from connecting.
Each stream is created with a replica value that is determined by the size of the cluster but limited to 3. It can also be overwritten by the stream_replicas option in the MQTT configuration block.
The streams are created the first time an Account Session Manager is initialized
and are used by all sessions in it. Note that to avoid race conditions, some
subscriptions are created first. The streams are never deleted. See
mqttCreateAccountSessionManager() for details.
$MQTT_sessstores persisted Session records. It filters on"$MQTT.sess.>subject and has a “limits” policy withMaxMsgsPersetting of 1.$MQTT_msgsis used for QoS 1 and 2 message delivery. It filters on$MQTT.msgs.>subject and has an “interest” policy.$MQTT_rmsgsstores Retained Messages. They are all stored (and filtered) on a single subject$MQTT.rmsg. This stream has a limits policy.$MQTT_qos2instores and deduplicates Incoming QoS 2 Messages. It filters on$MQTT.qos2.in.>and has a "limits" policy withMaxMsgsPerof 1.$MQTT_outstores Outgoing QoS 2PUBRELpackets. It filters on$MQTT.out.>and has a "interest" retention policy.
Consumers and Internal NATS Subscriptions
Account Scope
- A durable consumer for Retained Messages -
$MQTT_rmsgs_<server name hash> - A subscription to handle all jsa replies for the account.
- A subscription to replies to "session persist" requests, so that we can detect the use of a session with the same client ID anywhere in the cluster.
- 2 subscriptions to support retained messages:
$MQTT.sub.<nuid>for the messages themselves, and one to receive replies to "delete retained message" JS API (on the JS reply subject var).
Session Scope
When a new QoS 2 MQTT subscription is detected in a session, we ensure that
there is a durable consumer for QoS
2 PUBRELs out for delivery -
$MQTT_PUBREL_<session id hash>
Subscription Scope
For all MQTT subscriptions, regardless of their QoS, we create internal NATS subscriptions to
subject(directly encoded fromtopic). This subscription is used to deliver QoS 0 messages, and messages originating from NATS.- if needed,
subject fwccomplementssubjectfor topics liketopic.#to includetopicitself, see top-level wildcards
For QoS 1 or 2 MQTT subscriptions we ensure:
- A durable consumer for messages out for delivery -
<session ID hash>_<nuid> - An internal subscription to
$MQTT.sub.<nuid>to deliver the messages to the receiving client.
(Old) Notes
As indicated before, for a QoS1 or QoS2 subscription, the server will create a JetStream consumer with the appropriate subject filter. If the subscription already existed, then only the NATS subscription is created for the JetStream consumer’s delivery subject.
Note that JS consumers can be created with an “Replicas” override, which from recent discussion is problematic with “Interest” policy streams, which “$MQTT_msgs” is.
We do handle situations where a subscription on the same subject filter is sent with a different QoS as per MQTT specifications. If the existing was on QoS 1 or 2, and the “new” is for QoS 0, then we delete the existing JS consumer.
Subscriptions that are QoS 0 have a NATS subscription with the callback function
being mqttDeliverMsgCbQos0(); while QoS 1 and 2 have a NATS subscription with
callback mqttDeliverMsgCbQos12(). Both those functions have comments that
describe the reason for their existence and what they are doing. For instance
the mqttDeliverMsgCbQos0() callback will reject any producing client that is
of type JETSTREAM, so that it handles only non JetStream (QoS 1 and 2) messages.
Both these functions end-up calling mqttDeliver() which will first enqueue the possible retained messages buffer before delivering any new message. The message itself being delivered is serialized in MQTT format and enqueued to the client’s outbound buffer and call to addToPCD is made so that it is flushed out of the readloop.
3. Lifecycles
Connection, Session
An MQTT connection is created when a listening MQTT server receives a CONNECT
packet. See mqttProcessConnect(). A connection is associated with a session.
Steps:
- Ensure that we have an
AccountSessionManagerso we can have anmqttSession. Lazily initialize JetStream streams, and internal consumers and subscriptions. SeegetOrCreateMQTTAccountSessionManager(). - Find and disconnect any previous session/client for the same ID. See
mqttProcessConnect(). - Ensure we have an
mqttSession- create a new or load a previously persisted one. If the clean flag is set inCONNECT, clean the session. seemqttSession.clear() - Initialize session's subscriptions, if any.
- Always send back a
CONNACKpacket. If there were errors in previous steps, include the error.
An MQTT connection can be closed for a number of reasons, including receiving a
DISCONNECT from the client, explicit internal errors processing MQTT packets,
or the server receiving another CONNECT packet with the same client ID. See
mqttHandleClosedClient() and mqttHandleWill(). Steps:
- Send out the Will Message if applicable (if not caused by a
DISCONNECTpacket) - Delete the the JetStream consumers for to QoS 1 and 2 packet delivery through JS API calls (if "clean" session flag is set)
- Delete the session record from the “$MQTT_sess” stream, based on recorded stream sequence. (if "clean" session flag is set)
- Close the client connection.
On an explicit disconnect, that is, the client sends the DISCONNECT packet, the server will NOT send the Will, as per specifications.
For sessions that had the “clean” flag, the JS consumers corresponding to QoS 1 subscriptions are deleted through JS API calls, the session record is then deleted (based on recorded stream sequence) from the “$MQTT_sess” stream.
Finally, the client connection is closed
Sessions are persisted on disconnect, and on subscriptions changes.
Subscription
Receiving an MQTT SUBSCRIBE packet creates new subscriptions, or updates
existing subscriptions in a session. Each SUBSCRIBE packet may contain several
specific subscriptions (topic + QoS in each). We always respond with a
SUBACK, which may indicate which subscriptions errored out.
For each subscription in the packet, we:
- Ignore it if
topicstarts with$MQTT.sub.. - Set up QoS 0 message delivery - an internal NATS subscription on
topic. - Replay any retained messages for
topic, once as QoS 0. - If we already have a subscription on
topic, update its QoS - If this is a QoS 2 subscription in the session, ensure we have the PUBREL consumer for the session.
- If this is a QoS 1 or 2 subscription, ensure we have the Message consumer for this subscription (or delete one if it exists and this is now a QoS 0 sub).
- Add an extra subscription for the top-level wildcard case.
- Update the session, persist it if changed.
When a session is restored (no clean flag), we go through the same steps to re-subscribe to its stored subscription, except step #8 which would have been redundant.
When we get an UNSUBSCRIBE packet, it can contain multiple subscriptions to
unsubscribe. The parsing will generate a slice of mqttFilter objects that
contain the “filter” (the topic with possibly wildcard of the subscription) and
the QoS value. The server goes through the list and deletes the JS consumer (if
QoS 1 or 2) and unsubscribes the NATS subscription for the delivery subject (if
it was a QoS 1 or 2) or on the actual topic/subject. In case of the “#”
wildcard, the server will handle the “level up” subscriptions that NATS had to
create.
Again, we update the session and persist it as needed in the $MQTT_sess
stream.
Message
-
Detect an incoming PUBLISH packet, parse and check the message QoS. Fill out the session's
mqttPublishstruct that contains information about the published message. (seemqttParse(),mqttParsePub()) -
Process the message according to its QoS (see
mqttProcessPub())- QoS 0:
- Initiate message delivery
- QoS 1:
- Initiate message delivery
- Send back a
PUBACK
- QoS 2:
- Store the message in
$MQTT_qos2instream, using a PI-specific subject. SinceMaxMsgsPeris set to 1, we will ignore duplicates on the PI. - Send back a
PUBREC - "Wait" for a
PUBREL, then initiate message delivery - Remove the previously stored QoS2 message
- Send back a
PUBCOMP
- Store the message in
- QoS 0:
-
Initiate message delivery (see
mqttInitiateMsgDelivery())- Convert the MQTT
topicinto a NATSsubjectusingmqttTopicToNATSPubSubject()function. If there is a known subject mapping, then we select the new subject usingselectMappedSubject()function and then convert back this subject into an MQTT topic usingnatsSubjectToMQTTTopic()function. - Re-serialize the
PUBLISHpacket received as a NATS message. Use NATS headers for the metadata, and the deliverable MQTTPUBLISHpacket as the contents. - Publish the messages as
subject(andsubject fwcif applicable, see subject wildcards). Use the "standard" NATSc.processInboundClientMsg()to do that.processInboundClientMsg()will distribute the message to any NATS subscriptions (including routes, gateways, leafnodes) and the relevant MQTT subscriptions. - Check for retained messages, process as needed. See
c.processInboundClientMsg()callingc.mqttHandlePubRetain()For MQTT clients. - If the message QoS is 1 or 2, store it in
$MQTT_msgsstream as$MQTT.msgs.<subject>for "at least once" delivery with retries.
- Convert the MQTT
-
Let NATS and JetStream deliver to the internal subscriptions, and to the receiving clients. See
mqttDeliverMsgCb...()-
The NATS message posted to
subject(andsubject fwc) will be delivered to each relevant internal subscription by callingmqttDeliverMsgCbQoS0(). The function has access to both the publishing and the receiving clients.- Ignore all irrelevant invocations. Specifically, do nothing if the
message needs to be delivered with a higher QoS - that will be handled by
the other,
...QoS12callback. Note that if the original message was publuished with a QoS 1 or 2, but the subscription has its maximum QoS set to 0, the message will be delivered by this callback. - Ignore "reserved" subscriptions, as per MQTT spec.
- Decode delivery
topicfrom the NATSsubject. - Write (enqueue) outgoing
PUBLISHpacket. - DONE for QoS 0
- Ignore all irrelevant invocations. Specifically, do nothing if the
message needs to be delivered with a higher QoS - that will be handled by
the other,
-
The NATS message posted to JetStream as
$MQTT.msgs.subjectwill be consumed by subscription-specific consumers. Note that MQTT subscriptions with max QoS 0 do not have JetStream consumers. They are handled by the QoS0 callback.The consumers will deliver it to the
$MQTT.sub.<nuid>subject for their respective NATS subscriptions by callingmqttDeliverMsgCbQoS12(). This callback too has access to both the publishing and the receiving clients.- Ignore "reserved" subscriptions, as per MQTT spec.
- See if this is a re-delivery from JetStream by checking
sess.cpendingfor the JS reply subject. If so, use the existing PI and treat this as a duplicate redelivery. - Otherwise, assign the message a new PI (see
trackPublish()andbumpPI()) and store it insess.cpendingandsess.pendingPublish, along with the JS reply subject that can be used to remove this pending message from the consumer once it's delivered to the receipient. - Decode delivery
topicfrom the NATSsubject. - Write (enqueue) outgoing
PUBLISHpacket.
-
-
QoS 1: "Wait" for a
PUBACK. SeemqttProcessPubAck().- When received, remove the PI from the tracking maps, send an ACK to consumer to remove the message.
- DONE for QoS 1
-
QoS 2: "Wait" for a
PUBREC. When received, we need to do all the same things as in the QoS 1PUBACKcase, but we need to send out aPUBREL, and continue using the same PI until the delivery flow is complete and we get back aPUBCOMP. For that, we add the PI tosess.pendingPubRel, and tosess.cpendingwith the PubRel consumer durable name.We also compose and store a headers-only NATS message signifying a
PUBRELout for delivery, and store it in the$MQTT_qos2outstream, as$MQTT.qos2.out.<session-id>. -
QoS 2: Deliver
PUBREL. The PubRel session-specific consumer will publish to internal subscription on$MQTT.qos2.delivery, callingmqttDeliverPubRelCb(). We store the ACK reply subject incpendingto remove the JS message onPUBCOMP, compose and send out aPUBRELpacket. -
QoS 2: "Wait" for a
PUBCOMP. SeemqttProcessPubComp().- When received, remove the PI from the tracking maps, send an ACK to
consumer to remove the
PUBRELmessage. - DONE for QoS 2
- When received, remove the PI from the tracking maps, send an ACK to
consumer to remove the
Retained messages
When we process an inbound PUBLISH and submit it to
processInboundClientMsg() function, for MQTT clients it will invoke
mqttHandlePubRetain() which checks if the published message is “retained” or
not.
If it is, then we construct a record representing the retained message and store
it in the $MQTT_rmsg stream, under the single $MQTT.rmsg subject. The stored
record (in JSON) contains information about the subject, topic, MQTT flags, user
that produced this message and the message content itself. It is stored and the
stream sequence is remembered in the memory structure that contains retained
messages.
Note that when creating an account session manager, the retained messages stream is read from scratch to load all the messages through the use of a JS consumer. The associated subscription will process the recovered retained messages or any new that comes from the network.
A retained message is added to a map and a subscription is created and inserted into a sublist that will be used to perform a ReverseMatch() when a subscription is started and we want to find all retained messages that the subscription would have received if it had been running prior to the message being published.
If a retained message on topic “foo” already exists, then the server has to delete the old message at the stream sequence we saved when storing it.
This could have been done with having retained messages stored under
$MQTT.rmsg.<subject> as opposed to all under a single subject, and make use of
the MaxMsgsPer field set to 1. The MaxMsgsPer option was introduced well into
the availability of MQTT and changes to the sessions was made in PR
#2501, with a conversion of
existing streams such as $MQTT*sess*<sess ID> into a single stream with unique
subjects, but the changes were not made to the retained messages stream.
There are also subscriptions for the handling of retained messages which are messages that are asked by the publisher to be retained by the MQTT server to be delivered to matching subscriptions when they start. There is a single message per topic. Retained messages are deleted when the user sends a retained message (there is a flag in the PUBLISH protocol) on a given topic with an empty body. The difficulty with retained messages is to handle them in a cluster since all servers need to be aware of their presence so that they can deliver them to subscriptions that those servers may become the leader for.
$MQTT_rmsgswhich has a “limits” policy and holds retained messages, all under$MQTT.rmsgsingle subject. Not sure why I did not use MaxMsgsPer for this stream and not filter$MQTT.rmsg.>.
The first step when processing a new subscription is to gather the retained
messages that would be a match for this subscription. To do so, the server will
serialize into a buffer all messages for the account session manager’s sublist’s
ReverseMatch result. We use the returned subscriptions’ subject to find from a
map appropriate retained message (see serializeRetainedMsgsForSub() for
details).
4. Implementation Notes
Hooking into NATS I/O
Starting the accept loop
The MQTT accept loop is started when the server detects that an MQTT port has
been defined in the configuration file. It works similarly to all other accept
loops. Note that for MQTT over websocket, the websocket port has to be defined
and MQTT clients will connect to that port instead of the MQTT port and need to
provide /mqtt as part of the URL to redirect the creation of the client to an
MQTT client (with websocket support) instead of a regular NATS with websocket.
See the branching done in startWebsocketServer(). See startMQTT().
Starting the read/write loops
When a TCP connection is accepted, the internal go routine will invoke
createMQTTClient(). This function will set a c.mqtt object that will make it
become an MQTT client (through the isMqtt() helper function). The readLoop()
and writeLoop() are started similarly to other clients. However, the read loop
will branch out to mqttParse() instead when detecting that this is an MQTT
client.
Session Management
Account Session Manager
mqttAccountSessionManager is an object that holds the state of all sessions in
an account. It also manages the lifecycle of JetStream streams and internal
subscriptions for processing JS API replies, session updates, etc. See
mqttCreateAccountSessionManager(). It is lazily initialized upon the first
MQTT CONNECT packet received. Account session manager is referred to as asm
in the code.
Note that creating the account session manager (and attempting to create the streams) is done only once per account on a given server, since once created the account session manager for a given account would be found in the sessions map of the mqttSessionManager object.
Find and disconnect previous session/client
Once all that is done, we now go to the creation of the session object itself. For that, we first need to make sure that it does not already exist, meaning that it is registered on the server - or anywhere in the cluster. Note that MQTT dictates that if a session with the same ID connects, the OLD session needs to be closed, not the new one being created. NATS Server complies with this requirement.
Once a session is detected to already exists, the old one (as described above) is closed and the new one accepted, however, the session ID is maintained in a flappers map so that we detect situations where sessions with the same ID are started multiple times causing the previous one to be closed. When that detection occurs, the newly created session is put in “jail” for a second to avoid a very rapid succession of connect/disconnect. This has already been seen by users since there was some issue there where we would schedule the connection closed instead of waiting in place which was causing a panic.
We also protect from multiple clients on a given server trying to connect with the same ID at the “same time” while the processing of a CONNECT of a session is not yet finished. This is done with the use of a sessLocked map, keyed by the session ID.
Create or restore the session
If everything is good up to that point, the server will either create or restore
a session from the stream. This is done in the createOrRestoreSession()
function. The client/session ID is hashed and added to the session’s stream
subject along with the JS domain to prevent clients connecting from different
domains to “pollute” the session stream of a given domain.
Since each session constitutes a subject and the stream has a maximum of 1 message per subject, we attempt to load the last message on the formed subject. If we don’t find it, then the session object is created “empty”, while if we find a record, we create the session object based on the record persisted on the stream.
If the session was restored from the JS stream, we keep track of the stream
sequence where the record was located. When we save the session (even if it
already exists) we will use this sequence number to set the
JSExpectedLastSubjSeq header so that we handle possibly different servers in a
(super)cluster to detect the race of clients trying to use the same session ID,
since only one of the write should succeed. On success, the session’s new
sequence is remembered by the server that did the write.
When created or restored, the CONNACK can now be sent back to the client, and if there were any recovered subscriptions, they are now processed.
Processing QoS acks: PUBACK, PUBREC, PUBCOMP
When the server delivers a message with QoS 1 or 2 (also a PUBREL for QoS 2) to a subscribed client, the client will send back an acknowledgement. See mqttProcessPubAck(), mqttProcessPubRec(), and mqttProcessPubComp()
While the specific logic for each packet differs, these handlers all update the
session's PI mappings (cpending, pendingPublish, pendingPubRel), and if
needed send an ACK to JetStream to remove the message from its consumer and stop
the re-delivery attempts.
Subject Wildcards
Note that MQTT subscriptions have wildcards too, the “+” wildcard is equivalent
to NATS’s “*” wildcard, however, MQTT’s wildcard “#” is similar to “>”, except
that it also includes the level above. That is, a subscription on “foo/#” would
receive messages on “foo/bar/baz”, but also on “foo”.
So, for MQTT subscriptions enging with a '#' we are forced to create 2
internal NATS subscriptions, one on “foo” and one on “foo.>”.
5. Known issues
- "active" redelivery for QoS from JetStream (compliant, just a note)
- JetStream QoS redelivery happens out of (original) order
- finish delivery of in-flight messages after UNSUB
- finish delivery of in-flight messages after a reconnect
- consider replacing
$MQTT_msgswith$MQTT_out. - consider using unique
$MQTT.rmsg.>andMaxMsgsPerfor retained messages. - add a cli command to list/clean old sessions