From 885f88a9cd46a831cfaf372a8ea2be2111126017 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 4 Sep 2023 06:46:16 -0700 Subject: [PATCH] [ADDED] README-MQTT.md: MQTT implementation notes --- server/README-MQTT.md | 595 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 server/README-MQTT.md diff --git a/server/README-MQTT.md b/server/README-MQTT.md new file mode 100644 index 00000000..e61a0991 --- /dev/null +++ b/server/README-MQTT.md @@ -0,0 +1,595 @@ +**MQTT Implementation Overview** + +Revision 1.1 + +Authors: Ivan Kozlovic, Lev Brouk + +NATS Server currently supports most of MQTT 3.1.1. This document describes how +it is implementated. + +It is strongly recommended to review the [MQTT v3.1.1 +specifications](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html) +and get a detailed understanding before proceeding with this document. + +# Contents + +1. [Concepts](#1-concepts) + - [Server, client](#server-client) + - [Connection, client ID, session](#connection-client-id-session) + - [Packets, messages, and subscriptions](#packets-messages-and-subscriptions) + - [Quality of Service (QoS), publish identifier (PI)](#quality-of-service-qos-publish-identifier-pi) + - [Retained message](#retained-message) + - [Will message](#will-message) +2. [Use of JetStream](#2-use-of-jetstream) + - [JetStream API](#jetstream-api) + - [Streams](#streams) + - [Consumers and Internal NATS Subscriptions](#consumers-and-internal-nats-subscriptions) +3. [Lifecycles](#3-lifecycles) + - [Connection, Session](#connection-session) + - [Subscription](#subscription) + - [Message](#message) + - [Retained messages](#retained-messages) +4. [Implementation Notes](#4-implementation-notes) + - [Hooking into NATS I/O](#hooking-into-nats-io) + - [Session Management](#session-management) + - [Processing QoS acks: PUBACK, PUBREC, PUBCOMP](#processing-qos-acks-puback-pubrec-pubcomp) + - [Subject Wildcards](#subject-wildcards) +5. [Known issues](#5-known-issues) + +# 1. Concepts + +## Server, client + +In the MQTT specification there are concepts of **Client** and **Server**, used +somewhat interchangeably with those of **Sender** and **Receiver**. A **Server** +acts as a **Receiver** when it gets `PUBLISH` messages from a **Sender** +**Client**, and acts as a **Sender** when it delivers them to subscribed +**Clients**. + +In the NATS server implementation there are also concepts (types) `server` and +`client`. `client` is an internal representation of a (connected) client and +runs its own read and write loops. Both of these have an `mqtt` field that if +set makes them behave as MQTT-compliant. + +The code and comments may sometimes be confusing as they refer to `server` and +`client` sometimes ambiguously between MQTT and NATS. + +## Connection, client ID, session + +When an MQTT client connects to a server, it must send a `CONNECT` packet to +create an **MQTT Connection**. The packet must include a **Client Identifier**. +The server will then create or load a previously saved **Session** for the (hash +of) the client ID. + +## Packets, messages, and subscriptions + +The low level unit of transmission in MQTT is a **Packet**. Examples of packets +are: `CONNECT`, `SUBSCRIBE`, `SUBACK`, `PUBLISH`, `PUBCOMP`, etc. + +An **MQTT Message** starts with a `PUBLISH` packet that a client sends to the +server. It is then matched against the current **MQTT Subscriptions** and is +delivered to them as appropriate. During the message delivery the server acts as +an MQTT client, and the receiver acts as an MQTT server. + +Internally we use **NATS Messages** and **NATS Subscriptions** to facilitate +message delivery. This may be somewhat confusing as the code refers to `msg` and +`sub`. What may be even more confusing is that some MQTT packets (specifically, +`PUBREL`) are represented as NATS messages, and that the original MQTT packet +"metadata" may be encoded as NATS message headers. + +## Quality of Service (QoS), publish identifier (PI) + +MQTT specifies 3 levels of quality of service (**QoS**): + +- `0` for at most once. A single delivery attempt. +- `1` for at least once. Will try to redeliver until acknowledged by the + receiver. +- `2` for exactly once. See the [SPEC REF] for the acknowledgement flow. + +QoS 1 and 2 messages need to be identified with publish identifiers (**PI**s). A +PI is a 16-bit integer that must uniquely identify a message for the duration of +the required exchange of acknowledgment packets. + +Note that the QoS applies separately to the transmission of a message from a +sender client to the server, and from the server to the receiving client. There +is no protocol-level acknowledgements between the receiver and the original +sender. The sender passes the ownership of messages to the server, and the +server then delivers them at maximum possible QoS to the receivers +(subscribers). The PIs for in-flight outgoing messages are issued and stored per +session. + +## Retained message + +A **Retained Message** is not part of any MQTT session and is not removed when the +session that produced it goes away. Instead, the server needs to persist a +_single_ retained message per topic. When a subscription is started, the server +needs to send the “matching” retained messages, that is, messages that would +have been delivered to the new subscription should that subscription had been +running prior to the publication of this message. + +Retained messages are removed when the server receives a retained message with +an empty body. Still, this retained message that serves as a “delete” of a +retained message will be processed as a normal published message. + +Retained messages can have QoS. + +## Will message + +The `CONNECT` packet can contain information about a **Will Message** that needs to +be sent to any client subscribing on the Will topic/subject in the event that +the client is disconnected implicitly, that is, not as a result as the client +sending the `DISCONNECT` packet. + +Will messages can have the retain flag and QoS. + +# 2. Use of JetStream + +The MQTT implementation relies heavily on JetStream. We use it to: + +- Persist (and restore) the [Session](#connection-client-id-session) state. +- Store and retrieve [Retained messages](#retained-message). +- Persist incoming [QoS 1 and + 2](#quality-of-service-qos-publish-identifier-pi) messages, and + re-deliver if needed. +- Store and de-duplicate incoming [QoS + 2](#quality-of-service-qos-publish-identifier-pi) messages. +- Persist and re-deliver outgoing [QoS + 2](#quality-of-service-qos-publish-identifier-pi) `PUBREL` packets. + +Here is the overview of how we set up and use JetStream **streams**, +**consumers**, and **internal NATS subscriptions**. + +## JetStream API + +All interactions with JetStream are performed via `mqttJSA` that sends NATS +requests to JetStream. Most are processed syncronously and await a response, +some (e.g. `jsa.sendAck()`) are sent asynchronously. JetStream API is usually +referred to as `jsa` in the code. No special locking is required to use `jsa`, +however the asynchronous use of JetStream may create race conditions with +delivery callbacks. + +## Streams + +We create the following streams unless they already exist. Failing to ensure the +streams would prevent the client from connecting. + +Each stream is created with a replica value that is determined by the size of +the cluster but limited to 3. It can also be overwritten by the stream_replicas +option in the MQTT configuration block. + +The streams are created the first time an Account Session Manager is initialized +and are used by all sessions in it. Note that to avoid race conditions, some +subscriptions are created first. The streams are never deleted. See +`mqttCreateAccountSessionManager()` for details. + +1. `$MQTT_sess` stores persisted **Session** records. It filters on + `"$MQTT.sess.>` subject and has a “limits” policy with `MaxMsgsPer` setting + of 1. +2. `$MQTT_msgs` is used for **QoS 1 and 2 message delivery**. + It filters on `$MQTT.msgs.>` subject and has an “interest” policy. +3. `$MQTT_rmsgs` stores **Retained Messages**. They are all + stored (and filtered) on a single subject `$MQTT.rmsg`. This stream has a + limits policy. +4. `$MQTT_qos2in` stores and deduplicates **Incoming QoS 2 Messages**. It + filters on `$MQTT.qos2.in.>` and has a "limits" policy with `MaxMsgsPer` of + 1. +5. `$MQTT_out` stores **Outgoing QoS 2** `PUBREL` packets. It filters on + `$MQTT.out.>` and has a "interest" retention policy. + +## Consumers and Internal NATS Subscriptions + +### Account Scope + +- A durable consumer for [Retained Messages](#retained-message) - + `$MQTT_rmsgs_` +- A subscription to handle all [jsa](#jetstream-api) replies for the account. +- A subscription to replies to "session persist" requests, so that we can detect + the use of a session with the same client ID anywhere in the cluster. +- 2 subscriptions to support [retained messages](#retained-message): + `$MQTT.sub.` for the messages themselves, and one to receive replies to + "delete retained message" JS API (on the JS reply subject var). + +### Session Scope + +When a new QoS 2 MQTT subscription is detected in a session, we ensure that +there is a durable consumer for [QoS +2](#quality-of-service-qos-publish-identifier-pi) `PUBREL`s out for delivery - +`$MQTT_PUBREL_` + +### Subscription Scope + +For all MQTT subscriptions, regardless of their QoS, we create internal NATS subscriptions to + +- `subject` (directly encoded from `topic`). This subscription is used to + deliver QoS 0 messages, and messages originating from NATS. +- if needed, `subject fwc` complements `subject` for topics like `topic.#` to + include `topic` itself, see [top-level wildcards](#subject-wildcards) + +For QoS 1 or 2 MQTT subscriptions we ensure: + +- A durable consumer for messages out for delivery - `_` +- An internal subscription to `$MQTT.sub.` to deliver the messages to the + receiving client. + +### (Old) Notes + +As indicated before, for a QoS1 or QoS2 subscription, the server will create a +JetStream consumer with the appropriate subject filter. If the subscription +already existed, then only the NATS subscription is created for the JetStream +consumer’s delivery subject. + +Note that JS consumers can be created with an “Replicas” override, which from +recent discussion is problematic with “Interest” policy streams, which +“$MQTT_msgs” is. + +We do handle situations where a subscription on the same subject filter is sent +with a different QoS as per MQTT specifications. If the existing was on QoS 1 or +2, and the “new” is for QoS 0, then we delete the existing JS consumer. + +Subscriptions that are QoS 0 have a NATS subscription with the callback function +being `mqttDeliverMsgCbQos0()`; while QoS 1 and 2 have a NATS subscription with +callback `mqttDeliverMsgCbQos12()`. Both those functions have comments that +describe the reason for their existence and what they are doing. For instance +the `mqttDeliverMsgCbQos0()` callback will reject any producing client that is +of type JETSTREAM, so that it handles only non JetStream (QoS 1 and 2) messages. + +Both these functions end-up calling mqttDeliver() which will first enqueue the +possible retained messages buffer before delivering any new message. The message +itself being delivered is serialized in MQTT format and enqueued to the client’s +outbound buffer and call to addToPCD is made so that it is flushed out of the +readloop. + +# 3. Lifecycles + +## Connection, Session + +An MQTT connection is created when a listening MQTT server receives a `CONNECT` +packet. See `mqttProcessConnect()`. A connection is associated with a session. +Steps: + +1. Ensure that we have an `AccountSessionManager` so we can have an + `mqttSession`. Lazily initialize JetStream streams, and internal consumers + and subscriptions. See `getOrCreateMQTTAccountSessionManager()`. +2. Find and disconnect any previous session/client for the same ID. See + `mqttProcessConnect()`. +3. Ensure we have an `mqttSession` - create a new or load a previously persisted + one. If the clean flag is set in `CONNECT`, clean the session. see + `mqttSession.clear()` +4. Initialize session's subscriptions, if any. +5. Always send back a `CONNACK` packet. If there were errors in previous steps, + include the error. + +An MQTT connection can be closed for a number of reasons, including receiving a +`DISCONNECT` from the client, explicit internal errors processing MQTT packets, +or the server receiving another `CONNECT` packet with the same client ID. See +`mqttHandleClosedClient()` and `mqttHandleWill()`. Steps: + +1. Send out the Will Message if applicable (if not caused by a `DISCONNECT` packet) +2. Delete the the JetStream consumers for to QoS 1 and 2 packet delivery through + JS API calls (if "clean" session flag is set) +3. Delete the session record from the “$MQTT_sess” stream, based on recorded + stream sequence. (if "clean" session flag is set) +4. Close the client connection. + +On an explicit disconnect, that is, the client sends the DISCONNECT packet, the +server will NOT send the Will, as per specifications. + +For sessions that had the “clean” flag, the JS consumers corresponding to QoS 1 +subscriptions are deleted through JS API calls, the session record is then +deleted (based on recorded stream sequence) from the “$MQTT_sess” stream. + +Finally, the client connection is closed + +Sessions are persisted on disconnect, and on subscriptions changes. + +## Subscription + +Receiving an MQTT `SUBSCRIBE` packet creates new subscriptions, or updates +existing subscriptions in a session. Each `SUBSCRIBE` packet may contain several +specific subscriptions (`topic` + QoS in each). We always respond with a +`SUBACK`, which may indicate which subscriptions errored out. + +For each subscription in the packet, we: + +1. Ignore it if `topic` starts with `$MQTT.sub.`. +2. Set up QoS 0 message delivery - an internal NATS subscription on `topic`. +3. Replay any retained messages for `topic`, once as QoS 0. +4. If we already have a subscription on `topic`, update its QoS +5. If this is a QoS 2 subscription in the session, ensure we have the [PUBREL + consumer](#session-scope) for the session. +6. If this is a QoS 1 or 2 subscription, ensure we have the [Message + consumer](#subscription-scope) for this subscription (or delete one if it + exists and this is now a QoS 0 sub). +7. Add an extra subscription for the [top-level wildcard](#subject-wildcards) case. +8. Update the session, persist it if changed. + +When a session is restored (no clean flag), we go through the same steps to +re-subscribe to its stored subscription, except step #8 which would have been +redundant. + +When we get an `UNSUBSCRIBE` packet, it can contain multiple subscriptions to +unsubscribe. The parsing will generate a slice of mqttFilter objects that +contain the “filter” (the topic with possibly wildcard of the subscription) and +the QoS value. The server goes through the list and deletes the JS consumer (if +QoS 1 or 2) and unsubscribes the NATS subscription for the delivery subject (if +it was a QoS 1 or 2) or on the actual topic/subject. In case of the “#” +wildcard, the server will handle the “level up” subscriptions that NATS had to +create. + +Again, we update the session and persist it as needed in the `$MQTT_sess` +stream. + +## Message + +1. Detect an incoming PUBLISH packet, parse and check the message QoS. Fill out + the session's `mqttPublish` struct that contains information about the + published message. (see `mqttParse()`, `mqttParsePub()`) +2. Process the message according to its QoS (see `mqttProcessPub()`) + + - QoS 0: + - Initiate message delivery + - QoS 1: + - Initiate message delivery + - Send back a `PUBACK` + - QoS 2: + - Store the message in `$MQTT_qos2in` stream, using a PI-specific subject. + Since `MaxMsgsPer` is set to 1, we will ignore duplicates on the PI. + - Send back a `PUBREC` + - "Wait" for a `PUBREL`, then initiate message delivery + - Remove the previously stored QoS2 message + - Send back a `PUBCOMP` + +3. Initiate message delivery (see `mqttInitiateMsgDelivery()`) + + - Convert the MQTT `topic` into a NATS `subject` using + `mqttTopicToNATSPubSubject()` function. If there is a known subject + mapping, then we select the new subject using `selectMappedSubject()` + function and then convert back this subject into an MQTT topic using + `natsSubjectToMQTTTopic()` function. + - Re-serialize the `PUBLISH` packet received as a NATS message. Use NATS + headers for the metadata, and the deliverable MQTT `PUBLISH` packet as the + contents. + - Publish the messages as `subject` (and `subject fwc` if applicable, see + [subject wildcards](#subject-wildcards)). Use the "standard" NATS + `c.processInboundClientMsg()` to do that. `processInboundClientMsg()` will + distribute the message to any NATS subscriptions (including routes, + gateways, leafnodes) and the relevant MQTT subscriptions. + - Check for retained messages, process as needed. See + `c.processInboundClientMsg()` calling `c.mqttHandlePubRetain()` For MQTT + clients. + - If the message QoS is 1 or 2, store it in `$MQTT_msgs` stream as + `$MQTT.msgs.` for "at least once" delivery with retries. + +4. Let NATS and JetStream deliver to the internal subscriptions, and to the + receiving clients. See `mqttDeliverMsgCb...()` + + - The NATS message posted to `subject` (and `subject fwc`) will be delivered + to each relevant internal subscription by calling `mqttDeliverMsgCbQoS0()`. + The function has access to both the publishing and the receiving clients. + + - Ignore all irrelevant invocations. Specifically, do nothing if the + message needs to be delivered with a higher QoS - that will be handled by + the other, `...QoS12` callback. Note that if the original message was + publuished with a QoS 1 or 2, but the subscription has its maximum QoS + set to 0, the message will be delivered by this callback. + - Ignore "reserved" subscriptions, as per MQTT spec. + - Decode delivery `topic` from the NATS `subject`. + - Write (enqueue) outgoing `PUBLISH` packet. + - **DONE for QoS 0** + + - The NATS message posted to JetStream as `$MQTT.msgs.subject` will be + consumed by subscription-specific consumers. Note that MQTT subscriptions + with max QoS 0 do not have JetStream consumers. They are handled by the + QoS0 callback. + + The consumers will deliver it to the `$MQTT.sub.` + subject for their respective NATS subscriptions by calling + `mqttDeliverMsgCbQoS12()`. This callback too has access to both the + publishing and the receiving clients. + + - Ignore "reserved" subscriptions, as per MQTT spec. + - See if this is a re-delivery from JetStream by checking `sess.cpending` + for the JS reply subject. If so, use the existing PI and treat this as a + duplicate redelivery. + - Otherwise, assign the message a new PI (see `trackPublish()` and + `bumpPI()`) and store it in `sess.cpending` and `sess.pendingPublish`, + along with the JS reply subject that can be used to remove this pending + message from the consumer once it's delivered to the receipient. + - Decode delivery `topic` from the NATS `subject`. + - Write (enqueue) outgoing `PUBLISH` packet. + +5. QoS 1: "Wait" for a `PUBACK`. See `mqttProcessPubAck()`. + + - When received, remove the PI from the tracking maps, send an ACK to + consumer to remove the message. + - **DONE for QoS 1** + +6. QoS 2: "Wait" for a `PUBREC`. When received, we need to do all the same + things as in the QoS 1 `PUBACK` case, but we need to send out a `PUBREL`, and + continue using the same PI until the delivery flow is complete and we get + back a `PUBCOMP`. For that, we add the PI to `sess.pendingPubRel`, and to + `sess.cpending` with the PubRel consumer durable name. + + We also compose and store a headers-only NATS message signifying a `PUBREL` + out for delivery, and store it in the `$MQTT_qos2out` stream, as + `$MQTT.qos2.out.`. + +7. QoS 2: Deliver `PUBREL`. The PubRel session-specific consumer will publish to + internal subscription on `$MQTT.qos2.delivery`, calling + `mqttDeliverPubRelCb()`. We store the ACK reply subject in `cpending` to + remove the JS message on `PUBCOMP`, compose and send out a `PUBREL` packet. + +8. QoS 2: "Wait" for a `PUBCOMP`. See `mqttProcessPubComp()`. + - When received, remove the PI from the tracking maps, send an ACK to + consumer to remove the `PUBREL` message. + - **DONE for QoS 2** + +## Retained messages + +When we process an inbound `PUBLISH` and submit it to +`processInboundClientMsg()` function, for MQTT clients it will invoke +`mqttHandlePubRetain()` which checks if the published message is “retained” or +not. + +If it is, then we construct a record representing the retained message and store +it in the `$MQTT_rmsg` stream, under the single `$MQTT.rmsg` subject. The stored +record (in JSON) contains information about the subject, topic, MQTT flags, user +that produced this message and the message content itself. It is stored and the +stream sequence is remembered in the memory structure that contains retained +messages. + +Note that when creating an account session manager, the retained messages stream +is read from scratch to load all the messages through the use of a JS consumer. +The associated subscription will process the recovered retained messages or any +new that comes from the network. + +A retained message is added to a map and a subscription is created and inserted +into a sublist that will be used to perform a ReverseMatch() when a subscription +is started and we want to find all retained messages that the subscription would +have received if it had been running prior to the message being published. + +If a retained message on topic “foo” already exists, then the server has to +delete the old message at the stream sequence we saved when storing it. + +This could have been done with having retained messages stored under +`$MQTT.rmsg.` as opposed to all under a single subject, and make use of +the `MaxMsgsPer` field set to 1. The `MaxMsgsPer` option was introduced well into +the availability of MQTT and changes to the sessions was made in [PR +#2501](https://github.com/nats-io/nats-server/pull/2501), with a conversion of +existing streams such as `$MQTT*sess*` into a single stream with unique +subjects, but the changes were not made to the retained messages stream. + +There are also subscriptions for the handling of retained messages which are +messages that are asked by the publisher to be retained by the MQTT server to be +delivered to matching subscriptions when they start. There is a single message +per topic. Retained messages are deleted when the user sends a retained message +(there is a flag in the PUBLISH protocol) on a given topic with an empty body. +The difficulty with retained messages is to handle them in a cluster since all +servers need to be aware of their presence so that they can deliver them to +subscriptions that those servers may become the leader for. + +- `$MQTT_rmsgs` which has a “limits” policy and holds retained messages, all + under `$MQTT.rmsg` single subject. Not sure why I did not use MaxMsgsPer for + this stream and not filter `$MQTT.rmsg.>`. + +The first step when processing a new subscription is to gather the retained +messages that would be a match for this subscription. To do so, the server will +serialize into a buffer all messages for the account session manager’s sublist’s +ReverseMatch result. We use the returned subscriptions’ subject to find from a +map appropriate retained message (see `serializeRetainedMsgsForSub()` for +details). + +# 4. Implementation Notes + +## Hooking into NATS I/O + +### Starting the accept loop + +The MQTT accept loop is started when the server detects that an MQTT port has +been defined in the configuration file. It works similarly to all other accept +loops. Note that for MQTT over websocket, the websocket port has to be defined +and MQTT clients will connect to that port instead of the MQTT port and need to +provide `/mqtt` as part of the URL to redirect the creation of the client to an +MQTT client (with websocket support) instead of a regular NATS with websocket. +See the branching done in `startWebsocketServer()`. See `startMQTT()`. + +### Starting the read/write loops + +When a TCP connection is accepted, the internal go routine will invoke +`createMQTTClient()`. This function will set a `c.mqtt` object that will make it +become an MQTT client (through the `isMqtt()` helper function). The `readLoop()` +and `writeLoop()` are started similarly to other clients. However, the read loop +will branch out to `mqttParse()` instead when detecting that this is an MQTT +client. + +## Session Management + +### Account Session Manager + +`mqttAccountSessionManager` is an object that holds the state of all sessions in +an account. It also manages the lifecycle of JetStream streams and internal +subscriptions for processing JS API replies, session updates, etc. See +`mqttCreateAccountSessionManager()`. It is lazily initialized upon the first +MQTT `CONNECT` packet received. Account session manager is referred to as `asm` +in the code. + +Note that creating the account session manager (and attempting to create the +streams) is done only once per account on a given server, since once created the +account session manager for a given account would be found in the sessions map +of the mqttSessionManager object. + +### Find and disconnect previous session/client + +Once all that is done, we now go to the creation of the session object itself. +For that, we first need to make sure that it does not already exist, meaning +that it is registered on the server - or anywhere in the cluster. Note that MQTT +dictates that if a session with the same ID connects, the OLD session needs to +be closed, not the new one being created. NATS Server complies with this +requirement. + +Once a session is detected to already exists, the old one (as described above) +is closed and the new one accepted, however, the session ID is maintained in a +flappers map so that we detect situations where sessions with the same ID are +started multiple times causing the previous one to be closed. When that +detection occurs, the newly created session is put in “jail” for a second to +avoid a very rapid succession of connect/disconnect. This has already been seen +by users since there was some issue there where we would schedule the connection +closed instead of waiting in place which was causing a panic. + +We also protect from multiple clients on a given server trying to connect with +the same ID at the “same time” while the processing of a CONNECT of a session is +not yet finished. This is done with the use of a sessLocked map, keyed by the +session ID. + +### Create or restore the session + +If everything is good up to that point, the server will either create or restore +a session from the stream. This is done in the `createOrRestoreSession()` +function. The client/session ID is hashed and added to the session’s stream +subject along with the JS domain to prevent clients connecting from different +domains to “pollute” the session stream of a given domain. + +Since each session constitutes a subject and the stream has a maximum of 1 +message per subject, we attempt to load the last message on the formed subject. +If we don’t find it, then the session object is created “empty”, while if we +find a record, we create the session object based on the record persisted on the +stream. + +If the session was restored from the JS stream, we keep track of the stream +sequence where the record was located. When we save the session (even if it +already exists) we will use this sequence number to set the +`JSExpectedLastSubjSeq` header so that we handle possibly different servers in a +(super)cluster to detect the race of clients trying to use the same session ID, +since only one of the write should succeed. On success, the session’s new +sequence is remembered by the server that did the write. + +When created or restored, the CONNACK can now be sent back to the client, and if +there were any recovered subscriptions, they are now processed. + +## Processing QoS acks: PUBACK, PUBREC, PUBCOMP + +When the server delivers a message with QoS 1 or 2 (also a `PUBREL` for QoS 2) to a subscribed client, the client will send back an acknowledgement. See `mqttProcessPubAck()`, `mqttProcessPubRec()`, and `mqttProcessPubComp()` + +While the specific logic for each packet differs, these handlers all update the +session's PI mappings (`cpending`, `pendingPublish`, `pendingPubRel`), and if +needed send an ACK to JetStream to remove the message from its consumer and stop +the re-delivery attempts. + +## Subject Wildcards + +Note that MQTT subscriptions have wildcards too, the `“+”` wildcard is equivalent +to NATS’s `“*”` wildcard, however, MQTT’s wildcard `“#”` is similar to `“>”`, except +that it also includes the level above. That is, a subscription on `“foo/#”` would +receive messages on `“foo/bar/baz”`, but also on `“foo”`. + +So, for MQTT subscriptions enging with a `'#'` we are forced to create 2 +internal NATS subscriptions, one on `“foo”` and one on `“foo.>”`. + +# 5. Known issues +- "active" redelivery for QoS from JetStream (compliant, just a note) +- JetStream QoS redelivery happens out of (original) order +- finish delivery of in-flight messages after UNSUB +- finish delivery of in-flight messages after a reconnect +- consider replacing `$MQTT_msgs` with `$MQTT_out`. +- consider using unique `$MQTT.rmsg.>` and `MaxMsgsPer` for retained messages. +- add a cli command to list/clean old sessions