Commit Graph

89 Commits

Author SHA1 Message Date
Lev Brouk
de1282c98d Fixed a crash in MQTT outgoing PUBREL
This really was a cut/paste/typo error.

The effect was that when there was a pending PUBREL in JetStream, we would sometimes attempt to deliver it immediately once the client connected, cpending was already initialized, but the pubrel map was not (yet).
2023-10-10 18:08:18 -07:00
Lev
beee6fc72a [FIXED] MQTT PUBREL header incompatibility (#4616)
https://hivemq.github.io/mqtt-cli/docs/test/ pointed out the
incompatibility.
2023-10-05 08:07:50 -07:00
Derek Collison
c5b98f5c79 Make server shutdown an atomic and check inside unsubscribe to avoid unnecessary work.
Signed-off-by: Derek Collison <derek@nats.io>
2023-09-26 17:53:58 -07:00
Derek Collison
7ce47fd182 Move server running state to atomic to avoid contention at NRG layer.
Signed-off-by: Derek Collison <derek@nats.io>
2023-09-25 11:18:15 -07:00
Lev Brouk
759715a2ba [FIXED] MQTT: panic in an error log 2023-09-05 14:39:28 -07:00
Lev Brouk
b24941e6c6 [FIXED] MQTT: more generic names for outgoing stream, etc. 2023-09-04 05:32:58 -07:00
Lev Brouk
8de48339d3 Fixed: MQTT: more consistent name for PUBREL durable 2023-08-31 12:46:13 -07:00
Ivan Kozlovic
8bd68b550d [FIXED] MQTT: Retain flag did not always have the correct value.
As per specification MQTT-3.3.1-8, we are now setting the RETAIN
flag when delivering to new subscriptions and clear the flag in
all other conditions.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2023-08-29 12:39:59 -06:00
Lev
dbd2cb61da [FIXED] MQTT: Removed the use of tkDomain from retained msg subjects (#4440)
(Partially?) addresses
https://github.com/nats-io/nats-server/pull/4349#discussion_r1306576048

@kozlovic @neilalexander I did not remove the use of `domainTk` in the
session subject since it seems to have significance to it; removing it
failed `TestMQTTSessionsDifferentDomains` and I did not understand the
specifics of the issue enough. Please let me know your thoughts.
2023-08-29 11:13:02 -07:00
Lev Brouk
ad2e9d7b8d MQTT QoS2 support 2023-08-28 11:52:01 -07:00
Lev Brouk
b9ea85b5d0 MQTT: Removed the use of tkDomain from retained msg subjects 2023-08-28 04:13:50 -07:00
Derek Collison
d5a91f43f3 Merge branch 'main' into dev 2023-07-13 07:29:40 -07:00
Derek Collison
1f39d744dd Only discard messages from MQTT QoS0 from internal jetstream clients if really a QoS1 jetstream publish.
Signed-off-by: Derek Collison <derek@nats.io>
2023-07-12 16:00:59 -07:00
Ivan Kozlovic
1ac99fd5db [CHANGED] MQTT: Support for topics with . character.
The `.` character will be transformed to `//` in NATS subject. For
instance an MQTT message published on `spBv1.0/plant1` would
be received by a NATS subscriber as `spBv1//0.plant1`.

Conversely, a NATS message published on `spBv1//0.plant1` would
be received by an MQTT subscriber as `spBv1.0/plant1`.

Resolves #1879
Resolves #3482

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2023-06-13 13:06:41 -06:00
Neil Twigg
afe7f485ea Take the account session lock when deleting from map
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-13 17:47:23 +01:00
Neil Twigg
3b07f4342e Remove unnecessary return, refactor permission check so that it doesn't hold locks longer than needed
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-13 14:34:05 +01:00
Neil Twigg
3fef0edd76 No longer need to manually delete last message on replace
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-13 10:38:30 +01:00
Neil Twigg
a5c0711488 Fix checking retained permissions on config reload
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-13 10:38:30 +01:00
Neil Twigg
8db804ead9 Don't keep MQTT retained message content in memory
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-13 10:38:30 +01:00
Ivan Kozlovic
a744cb8cd2 Fixed delivery of retained messages after transfer.
I was running a manual test moving from dev to this branch and
noticed that the consumer would receive only 1 message of the 10
messages sent as retained. So I modified the test to verify that
we receive them all and we did not.

The reason was that after the transfer we need to refresh the state
of the stream (stream info) since we attempt to load all messages
based on the state's sequences.

I have also modified a bit the code to update the MaxMsgsPer once
all messages have been transferred.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2023-06-01 10:00:18 +01:00
Neil Twigg
4f797a54e0 Add test for MQTT retained message migration
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-01 10:00:18 +01:00
Neil Twigg
007565ffd0 Migrate old retained messages to new subjects
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-01 10:00:18 +01:00
Neil Twigg
74690388f5 Per-subject limits for MQTT retained messages
Signed-off-by: Neil Twigg <neil@nats.io>
2023-06-01 10:00:18 +01:00
Ivan Kozlovic
ab281cc7e6 Updates based on PR feedback
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2023-05-16 10:18:11 -06:00
Derek Collison
4c26cbb3de Merge branch 'main' into dev 2023-05-12 12:38:20 -07:00
Waldemar Quevedo
286a1632ca Use monotonic time for measuring time internally
Signed-off-by: Waldemar Quevedo <wally@nats.io>
2023-05-12 08:27:46 -07:00
Ivan Kozlovic
3d495435c0 MQTT: Fixed issue that could cause time out storing messages
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2023-04-03 09:32:28 -06:00
Derek Collison
d9933b1f7a Fix for MQTT Spec 4.7.2-1
Signed-off-by: Derek Collison <derek@nats.io>
2023-02-28 20:43:46 -08:00
Neil Twigg
68961ffedd Refactor ipQueue to use generics, reduce allocations 2023-02-21 14:50:09 +00:00
Derek Collison
4a3c27a251 Fix MQTT test for consumer replica override.
This was ill-advised by me, not understanding that the messages stream for MQTT was interested policy based.
Interest policy based streams require consumers to match the replica count.

Signed-off-by: Derek Collison <derek@nats.io>
2023-01-25 17:58:57 -08:00
Neil Twigg
14d0ba1c65 Fix some lint errors after move to golangci-lint 2022-12-30 20:00:08 +00:00
Ivan Kozlovic
dde94987ce [FIXED] MQTT: Subjects mapping were not handled
A simple configuration like this:
```
...
mappings = {
  foo: bar
}

mqtt {
   port: 1883
}
```
would cause an MQTT subscription on "bar" to not receive messages
published on "foo".

In otherwords, the subject transformation was not done when parsing
a PUBLISH packet.

This PR also handles the case of service imports where transformation
occurs after the initial publish parsing.

Resolves #3547

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-10-13 16:00:05 -06:00
Ivan Kozlovic
170ff49837 [ADDED] JetStream: peer (the hash of server name) in statsz/jsz
A request to `$SYS.REQ.SERVER.PING.JSZ` would now return something
like this:
```
...
    "meta_cluster": {
      "name": "local",
      "leader": "A",
      "peer": "NUmM6cRx",
      "replicas": [
        {
          "name": "B",
          "current": true,
          "active": 690369000,
          "peer": "b2oh2L6w"
        },
        {
          "name": "Server name unknown at this time (peerID: jZ6RvVRH)",
          "current": false,
          "offline": true,
          "active": 0,
          "peer": "jZ6RvVRH"
        }
      ],
      "cluster_size": 3
    }
```
Note the "peer" field following the "leader" field that contains
the server name. The new field is the node ID, which is a hash of
the server name.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-09-16 15:31:37 -06:00
Ivan Kozlovic
3c9a7cc6e5 Move to Go 1.19, remote io/util, fix data race and a flapper
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-08-05 09:55:37 -06:00
Ivan Kozlovic
6460519cf5 [FIXED] MQTT: Possible panic when clients misbehave
If a client with a given client ID is connected and while connected
another client tries to reuse the same client ID, the spec says that
the old client be closed and the new one accepted.
However, the server protects from this flapping happening all the time
by rejecting new clients that try to connect at a very fast pace.

However, the server was closing a misbehaving client after a second
delay (to prevent immediate reconnect if the client library does that)
but was not blocking the read loop and the compounding issue was that
if that misbehaving client is REALLY misbehaving and not waiting for
the CONNACK to send more protocols (for instance SUB) the server would
panic because the client was not fully configured.

To prevent that, the server will now "block" this misbehaving client
in its readLoop before closing the connection, preventing processing
of possible protocols that follow the CONNECT.

Resolves #3313

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-07-31 12:20:38 -06:00
Derek Collison
9400733606 Allow for MQTT QoS-1 consumers to be auto cleanup after inactive threshold of time.
Signed-off-by: Derek Collison <derek@nats.io>
2022-06-14 17:37:45 -07:00
Ivan Kozlovic
b344519176 [FIXED] MQTT: Same session ID in different domains were considered duplicates
There is a mechanism to detect if a connection somewhere in the
cluster is using the session ID of an existing one, and if so,
close one as a duplicate.
However, when different domains are used, they should not be considered
duplicates.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-05-25 11:16:51 -06:00
Ivan Kozlovic
66b1b51182 [FIXED] MQTT: Errors deleting consumers will now prevent deletion of session
When there was a failure to delete a QoS1 consumer, the session
would still be deleted, which would cause orphaned consumers.

In case of error, the session record will not be deleted, which means
that it is still possible to restart the session and then close
it (with the clean flag).

Relates to #3116

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-05-23 11:28:18 -06:00
Ivan Kozlovic
da256ea15a Added consumer_memory_storage to make consumer memory based
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-05-18 15:53:23 -06:00
Ivan Kozlovic
1ddc5bd9f6 Added consumer_replicas (similar to stream_replicas)
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-05-18 15:53:23 -06:00
Ivan Kozlovic
5d3b1743e3 [ADDED] MQTT: Stream/Consumer replica count override
Ability to override the stream and consumers replica count, which is by default
determined based on the cluster size.

```
mqtt {
  port: 1883
  stream_replicas: 5
  consumer_replicas: 1
}
```

The above would allow *new* MQTT streams to be created with a replicas
factor of 5 (it will be an error if the cluster does not have that
many nodes, and error will occur at runtime when the first client
on a given account connects), and new consumers would be R=1.

The MQTT existing streams/consumers for an account are not modified.

The stream_replicas can also obviously be reduced to 1 for a cluster
of 3 nodes if one desire to have those streams as R=1.

A value of 0 or negative is considered letting the server pick
the value (from 1 to 3 depending on standalone/cluster size).

There is another property that allows the consumers to be created
with memory storage instead of file:
```
mqtt {
  ..
  consumer_memory_storage: true
}
```

Those new settings are global and apply to new streams/consumers
only.

Related to #3116

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

Update warning

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-05-18 15:50:23 -06:00
Ivan Kozlovic
c3da392832 Changes to IPQueues
Removed the warnings, instead have a sync.Map where they are
registered/unregistered and can be inspected with an undocumented
monitor page.
Added the notion of "in progress" which is the number of messages
that have beend pop()'ed. When recycle() is invoked this count
goes down.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-03-17 17:53:06 -06:00
Ivan Kozlovic
29c40c874c Adding logger for IPQueue
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-01-13 13:14:00 -07:00
Ivan Kozlovic
b44e9e01b6 Replaced MQTT's send queue
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-01-13 13:03:53 -07:00
Matthias Hanel
3e8b66286d Js leaf deny (#2693)
Along a leaf node connection, unless the system account is shared AND the JetStream domain name is identical, the default JetStream traffic (without a domain set) will be denied.

As a consequence, all clients that wants to access a domain that is not the one in the server they are connected to, a domain name must be specified.
Affected from this change are setups where: a leaf node had no local JetStream OR the server the leaf node connected to had no local JetStream. 
One of the two accounts that are connected via a leaf node remote, must have no JetStream enabled.
The side that does not have JetStream enabled, will loose JetStream access and it's clients must set `nats.Domain` manually.

For workarounds on how to restore the old behavior, look at:
https://github.com/nats-io/nats-server/pull/2693#issuecomment-996212582

New config values added:
`default_js_domain` is a mapping from account to domain, settable when JetStream is not enabled in an account.
`extension_hint` are hints for non clustered server to start in clustered mode (and be usable to extend)
`js_domain` is a way to set the JetStream domain to use for mqtt.

Signed-off-by: Matthias Hanel <mh@synadia.com>
2021-12-16 16:53:20 -05:00
Ivan Kozlovic
2e07c3f614 [ADDED] MQTT: Support for Websocket
Clients will need to connect to the Websocket port and have `/mqtt`
as the URL path.

Resolves #2433

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2021-12-06 16:13:13 -07:00
Ivan Kozlovic
9f30bf00e0 [FIXED] Corrupted headers receiving from consumer with meta-only
When a consumer is configured with "meta-only" option, and the
stream was backed by a memory store, a memory corruption could
happen causing the application to receive corrupted headers.

Also replaced most of usage of `append(a[:0:0], a...)` to make
copies. This was based on this wiki:
https://github.com/go101/go101/wiki/How-to-efficiently-clone-a-slice%3F

But since Go 1.15, it is actually faster to call make+copy instead.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2021-12-01 10:50:15 -07:00
Ivan Kozlovic
25647a1fda [IMPROVED] MQTT: add client id to client connection string
This way, any log statement for a client will include the client id,
similar to how the server now logs information about NATS clients
(such as language, version, connection name).

Also adding a debug statement once the client has successfully connected.

Here is how this will look like for a client with client id "client_0".
```
[69591] 2021/10/06 10:06:50.837977 [DBG] [::1]:57415 - mid:18 - Client connection created
[69591] 2021/10/06 10:06:50.839871 [DBG] [::1]:57415 - mid:18 - "client_0" - Client connected
[69591] 2021/10/06 10:07:00.627307 [DBG] [::1]:57415 - mid:18 - "client_0" - Client connection closed: Client Closed
```
All log statements will be affected, for instance here is an auth error:
```
[69591] 2021/10/06 10:09:48.618964 [DBG] [::1]:57424 - mid:23 - Client connection created
[69591] 2021/10/06 10:09:48.619015 [ERR] [::1]:57424 - mid:23 - "client_0" - authentication error - User "mqtt"
[69591] 2021/10/06 10:09:48.619026 [DBG] [::1]:57424 - mid:23 - "client_0" - Client connection closed: Authentication Failure
[69591] 2021/10/06 10:09:48.619038 [ERR] [::1]:57424 - mid:23 - "client_0" - unable to connect: authentication error
```

Resolves #2587

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2021-10-06 10:59:50 -06:00
Derek Collison
cfbc69b12c Allow clustered JetStream to allow duplicate stream creation like single server mode.
Resolves #2528

Signed-off-by: Derek Collison <derek@nats.io>
2021-09-15 20:18:44 -07:00
Ivan Kozlovic
4cc2677573 MQTT: delete the session record even on restart with clean flag
When a session record exists and is currently not connected, if
the user reconnects with the same client ID but with now the
clean flag set, we are required to clear the state. In earlier
implementation (where we were using a stream per session), we
were not deleting the stream to be created right away. Since now
we just have a message per session, it is ok to delete that
message when clearing the existing session that is going to be
replaced.

Also made apiDispatch execute in place for any connection that
is not ROUTER or GATEWAY.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2021-09-14 16:33:48 -06:00