This was ill-advised by me, not understanding that the messages stream for MQTT was interested policy based.
Interest policy based streams require consumers to match the replica count.
Signed-off-by: Derek Collison <derek@nats.io>
A simple configuration like this:
```
...
mappings = {
foo: bar
}
mqtt {
port: 1883
}
```
would cause an MQTT subscription on "bar" to not receive messages
published on "foo".
In otherwords, the subject transformation was not done when parsing
a PUBLISH packet.
This PR also handles the case of service imports where transformation
occurs after the initial publish parsing.
Resolves#3547
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
A request to `$SYS.REQ.SERVER.PING.JSZ` would now return something
like this:
```
...
"meta_cluster": {
"name": "local",
"leader": "A",
"peer": "NUmM6cRx",
"replicas": [
{
"name": "B",
"current": true,
"active": 690369000,
"peer": "b2oh2L6w"
},
{
"name": "Server name unknown at this time (peerID: jZ6RvVRH)",
"current": false,
"offline": true,
"active": 0,
"peer": "jZ6RvVRH"
}
],
"cluster_size": 3
}
```
Note the "peer" field following the "leader" field that contains
the server name. The new field is the node ID, which is a hash of
the server name.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
* better error when peer selection fails
It is pretty hard to diagnose what went wrong when not enough peers for
an operation where found. This change now returns counts of reasons why
peers where discarded.
Changed the error to JSClusterNoPeers as it seems more appropriate
of an error for that operation. Not having enough resources is one of
the conditions for a peer not being considered. But so is having a non
matching tag. Which is why JSClusterNoPeers seems more appropriate
In addition, JSClusterNoPeers was already used as error after one call
to selectPeerGroup already.
example:
no suitable peers for placement: peer selection cluster 'C' with 3 peers
offline: 0
excludeTag: 1
noTagMatch: 2
noSpace: 0
uniqueTag: 0
misc: 0
Examle for mqtt:
mid:12 - "mqtt" - unable to connect: create sessions stream for account "$G":
no suitable peers for placement: peer selection cluster 'MQTT' with 3 peers
offline: 0
excludeTag: 0
noTagMatch: 0
noSpace: 0
uniqueTag: 0
misc: 0
(10005)
Signed-off-by: Matthias Hanel <mh@synadia.com>
* review comment
Signed-off-by: Matthias Hanel <mh@synadia.com>
If a client with a given client ID is connected and while connected
another client tries to reuse the same client ID, the spec says that
the old client be closed and the new one accepted.
However, the server protects from this flapping happening all the time
by rejecting new clients that try to connect at a very fast pace.
However, the server was closing a misbehaving client after a second
delay (to prevent immediate reconnect if the client library does that)
but was not blocking the read loop and the compounding issue was that
if that misbehaving client is REALLY misbehaving and not waiting for
the CONNACK to send more protocols (for instance SUB) the server would
panic because the client was not fully configured.
To prevent that, the server will now "block" this misbehaving client
in its readLoop before closing the connection, preventing processing
of possible protocols that follow the CONNECT.
Resolves#3313
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
There is a mechanism to detect if a connection somewhere in the
cluster is using the session ID of an existing one, and if so,
close one as a duplicate.
However, when different domains are used, they should not be considered
duplicates.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
When there was a failure to delete a QoS1 consumer, the session
would still be deleted, which would cause orphaned consumers.
In case of error, the session record will not be deleted, which means
that it is still possible to restart the session and then close
it (with the clean flag).
Relates to #3116
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Ability to override the stream and consumers replica count, which is by default
determined based on the cluster size.
```
mqtt {
port: 1883
stream_replicas: 5
consumer_replicas: 1
}
```
The above would allow *new* MQTT streams to be created with a replicas
factor of 5 (it will be an error if the cluster does not have that
many nodes, and error will occur at runtime when the first client
on a given account connects), and new consumers would be R=1.
The MQTT existing streams/consumers for an account are not modified.
The stream_replicas can also obviously be reduced to 1 for a cluster
of 3 nodes if one desire to have those streams as R=1.
A value of 0 or negative is considered letting the server pick
the value (from 1 to 3 depending on standalone/cluster size).
There is another property that allows the consumers to be created
with memory storage instead of file:
```
mqtt {
..
consumer_memory_storage: true
}
```
Those new settings are global and apply to new streams/consumers
only.
Related to #3116
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Update warning
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Also had to change all references from `path.` to `filepath.` when
dealing with files, so that it works properly on Windows.
Fixed also lots of tests to defer the shutdown of the server
after the removal of the storage, and fixed some config files
directories to use the single quote `'` to surround the file path,
again to work on Windows.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This allows stream placement to overflow to adjacent clusters.
We also do more balanced placement based on resources (store or mem). We can continue to expand this as well.
We also introduce an account requirement that stream configs contain a MaxBytes value.
We now track account limits and server limits more distinctly, and do not reserver server resources based on account limits themselves.
Signed-off-by: Derek Collison <derek@nats.io>
Along a leaf node connection, unless the system account is shared AND the JetStream domain name is identical, the default JetStream traffic (without a domain set) will be denied.
As a consequence, all clients that wants to access a domain that is not the one in the server they are connected to, a domain name must be specified.
Affected from this change are setups where: a leaf node had no local JetStream OR the server the leaf node connected to had no local JetStream.
One of the two accounts that are connected via a leaf node remote, must have no JetStream enabled.
The side that does not have JetStream enabled, will loose JetStream access and it's clients must set `nats.Domain` manually.
For workarounds on how to restore the old behavior, look at:
https://github.com/nats-io/nats-server/pull/2693#issuecomment-996212582
New config values added:
`default_js_domain` is a mapping from account to domain, settable when JetStream is not enabled in an account.
`extension_hint` are hints for non clustered server to start in clustered mode (and be usable to extend)
`js_domain` is a way to set the JetStream domain to use for mqtt.
Signed-off-by: Matthias Hanel <mh@synadia.com>
This way, any log statement for a client will include the client id,
similar to how the server now logs information about NATS clients
(such as language, version, connection name).
Also adding a debug statement once the client has successfully connected.
Here is how this will look like for a client with client id "client_0".
```
[69591] 2021/10/06 10:06:50.837977 [DBG] [::1]:57415 - mid:18 - Client connection created
[69591] 2021/10/06 10:06:50.839871 [DBG] [::1]:57415 - mid:18 - "client_0" - Client connected
[69591] 2021/10/06 10:07:00.627307 [DBG] [::1]:57415 - mid:18 - "client_0" - Client connection closed: Client Closed
```
All log statements will be affected, for instance here is an auth error:
```
[69591] 2021/10/06 10:09:48.618964 [DBG] [::1]:57424 - mid:23 - Client connection created
[69591] 2021/10/06 10:09:48.619015 [ERR] [::1]:57424 - mid:23 - "client_0" - authentication error - User "mqtt"
[69591] 2021/10/06 10:09:48.619026 [DBG] [::1]:57424 - mid:23 - "client_0" - Client connection closed: Authentication Failure
[69591] 2021/10/06 10:09:48.619038 [ERR] [::1]:57424 - mid:23 - "client_0" - unable to connect: authentication error
```
Resolves#2587
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Creating a stream will become idempotent, so assuming that we
should try to transfer the old session streams only on success
will no longer work.
Added a test that checks that "stream" list is queried only once
which means transfer was attempted only once after the second
cluster restart and new connection.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
ClientID has been added to various monitoring objects. Also, added
the ability to filter connections on `client_id`.
On auth violation, the proper code was not invoked, which meant
that no disconnect event (with auth reason) would be published.
Resolves#2270
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
With the availability of a "max message per subject" for a given
stream, it is possible to replace individual streams that were
created per session with a single stream that gets all sessions
as a single message per subject, which subject is composed of
the session client ID hash.
The first time the new stream is created for a given account,
all existing MQTT session streams will be transferred to the
new mux'ed MQTT session stream.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
The issue was that the subscription created for the MQTT client
was resulting in creation of a shadow subscription which did not
have the mqtt specific object attached, which would cause the
panic when accessing it in the sub's icb.
After that, it was discovered that the wrong subject was passed
to deliverMsg(), so fixed that too so that the icb callback gets
the proper transformed subject.
Resolves#2265
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Say with a cluster of 3, all MQTT assets are created with a replicas
of 3. However, when a server is shutdown, then any new MQTT client
will fail to connect because we try to create a session stream
with R(3), which leads to insufficient resources.
The longer term solution should be for the server to allow the
creation of an asset with a R() value that is bigger than the
current number of running servers as long as there is quorum.
For now, we will reduce the R() value for the sessions if we get
an "insufficient resources" error.
Note that the other assets still will use the compute R() based
on cluster size. So the first time that a client on a given
account is started, we will still need to have R() == cluster size
(at least for R(3)).
Partially resolves#2226
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This will solve the issue of naming the durable per server for
the "retained messages" stream in situation where a cluster
of servers would not have JetStream defined but connect to another
cluster that has it. All the servers within the cluster without
JetStream would cause the durable's delivery subject to be updated
to the last server starting the durable.
Updated the check for mqtt requiring JetStream if running in
standalone mode to check that no leafnode configuration is present.
Replaced use of fmt.Errorf() when the string was static with
errors created with errors.New(). Updated tests that were checking
for errors to use those errors instead of repeating the string.
Added test that has a hub cluster with JS enabled and a remote server
that has mqtt{} without JetStream and ensure that this works.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Some issues that have been fixed would manifest by timeouts on
connect, unexpected memory usage on high publish message rate.
Some details:
- Replies were not always GW routed properly because we were looking
at the wrong connection's rsubs
- GW routed replies would not be found because they were tracked
in the subscription's client object, which may not be the same used
to send the reply
- Increased the mqtt timeout to wait for JS replies since in some
tests it was sometimes taking more than the original 2 seconds
- Incoming gateway messages destined for an MQTT internal subscription
may have been rejected as a no interest if the account had service imports
- Don't use time.After(), instead create explicit timer so it can
be stopped when not timing out.
- Unnecessary copy of a slice since we were converting to a string anyway.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
In cluster mode, a sub connects to server 1, another on server 2.
A publisher connects to server 2 and publishes a retained message.
If both subs restart they would properly receive the retained message.
However, if the publisher sens an empty message that "removes" the
retained message for this topic, and then consumer that connects to
server 1 restarts, it would not receive the retained message as it
should.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
In a setup with shared system account and a cluster of leaf nodes,
the JS requests did not contain the origin cluster, which caused
assets to possibly be created in the HUB. With this change, the
assets will be created in the origin cluster.
Also, removed use of acc.JetStreamEnabled() but instead fail
start of the server if mqtt is enabled in standalone mode and JS
is not enabled. If JS is enabled, we will get proper error if
account has no JS enabled.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Websocket is currently not supported for MQTT clients. When a
client tries to connect with websocket protocol to the MQTT port,
the error message: `mid:9 - not connected` would be logged, which
is not really telling.
The server will now guess if the connection was websocket and report
a more appropriate error message, such as:
```
invalid connection, websocket currently not supported
```
Resolves#2126
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Currently, we use ReadyForConnections in server tests to wait for the
server to be ready. However, when this fails we don't get a clue about
why it failed.
This change adds a new unexported method called readyForConnections that
returns an error describing which check failed. The exported
ReadyForConnections version works exactly as before. The unexported
version gets used in internal tests only.
Currently in tests, we have calls to os.Remove and os.RemoveAll where we
don't check the returned error. This hides useful error messages when
tests fail to run, such as "too many open files".
This change checks for more filesystem related errors and calls t.Fatal
if there is an error.
In error conditions or when replacing an existing sub qos1 to qos0,
we were unsubscribing the NATS subscription, but that would not
have been propagated across the cluster.
Also fixed a flapper
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>