In normal message get, the returned format is RFC3339Nano, which
is what is being used when using JSON marshaling. However, for
the direct get we had to pass a string to construct the header
and we were using time.Time.String() which was using a different
layout. So use time.Time.MarshalJSON() to be consistent with
the non-direct get message.
Libraries that already parsed the non RFC3339Nano time format
can be updated since none should have been released yet (since
the feature in the server is not released yet)
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
* Fix race between stream stop and monitorStream
monitorCluster stops the stream, when doing so, monitorStream
needs to be stopped to avoid miscounting of store size.
In a test stop and reset of store size happened first and then
was followed by storing more messages via monitorStream
Signed-off-by: Matthias Hanel <mh@synadia.com>
* fixed consumer restart on source filter update
When a stream source filter subject was updated, the internal consumer
was not re created
If the upstream stream contains a tail of previously filtered messages,
these will now be delivered
Signed-off-by: Matthias Hanel <mh@synadia.com>
* Added check for source/mirror filter subjects
When the origin stream exists, the sourec/mirror filter subject
will be checked against the stream subjects.
If there is no overlap, an error will be returned
Signed-off-by: Matthias Hanel <mh@synadia.com>
* Adding account purge operation
The new request is available for the system account.
The subject to send the request to is $JS.API.ACCOUNT.PURGE.*
With the name of the account to purge instead of the wildcard.
Also added directory cleanup code such that server do not
end up with empty streams directories and account dirs that
only contain streams
Also adding ACCOUNT to leaf node domain rewrite table
Addresses #3186 and #3306 by providing a way to
get rid of the streams for existing and non existing accounts
Signed-off-by: Matthias Hanel <mh@synadia.com>
- Send snapshot only if leader
- When processing snapshot, start with a smaller inactivity interval
that will double up to 10sec or use 10sec directly once we get a
message. Reason for that is that it is possible that the request
for snapshot is sent while the leader has not yet setup the subscription
that receives the requests (or subscription has not fully reached the
cluster).
- Don't remember snapfile on err.
- Do not consider current if we have not had any activity.
- Stabilize stream scale up under active heavy publishing.
- Due to the publish pressure move the check for followers direct subs spinning up til after we stop publishing.
Signed-off-by: Derek Collison <derek@nats.io>
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
The `JSStreamNameExistErr` will now include in the description that
the stream exists with a different configuration, because that is
the error clients would get when trying to add a stream with a
different configuration (otherwise this is a no-op and client
don't get an error).
Since that error was used in case of restore, a new error is added
but uses the same description prefix "stream name already in use"
but adds ", cannot restore" to indicate that this is a restore
failure because the stream already exists.
Resolves#3273
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
With the previous commit, this should no longer be possible, but
otherwise we got report of a panic when accessing the mirror configuration.
The following test would be able to produce the panic:
```
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "SOURCE"})
require_NoError(t, err)
cfg := &nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "SOURCE"},
}
_, err = js.AddStream(cfg)
require_NoError(t, err)
err = js.DeleteStream("SOURCE")
require_NoError(t, err)
cfg.Mirror = nil
_, err = js.UpdateStream(cfg)
require_NoError(t, err)
time.Sleep(5 * time.Second)
```
Again, now that we reject the mirror config update, the panic
should no longer happen, but adding preventive code in case we
allow in the future.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Updates to stream mirror config are rejected in cluster mode, but
were not in standalone. This PR adds the check in standalone mode.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
We use to ignore if the seq was 0, but now would treat it as
a requirement that the stream be empty if header is present but
set to 0.
This relates to client PR: https://github.com/nats-io/nats.go/pull/958
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Make sure when processing a peer removal that the stream assignment agrees.
When a new leader takes over it can resend a peer removal, and if the stream/consumer really was rescheduled we could remove by accident.
Also need to make sure that when we remove a stream we remove the node as part of the stream assignment.
If we didn't, if the same asset returned to this server we would not start up the monitoring loop.
Simplify migration logic in monitorStream, to be driven by leader only
Improved unit tests
Added failure when server not in peer list
Move command does not require server anymore
Signed-off-by: Matthias Hanel <mh@synadia.com>
account.rm had races caused by reload copying rm from one account to
another
mset.store was used outsisde the lock
in rare cases the stasz message was not received in time.
Trigger automatically now
sometimes a statsz message received before reload cause issues.
try receiving a second time
Signed-off-by: Matthias Hanel <mh@synadia.com>
Also for direct get and for pull requests, if we are not on a client connection check how long we have been away from the readloop.
If need be execute in a separate go routine.
Signed-off-by: Derek Collison <derek@nats.io>
While the TransformSubject function was doing the right
thing it did not match first and so would panic for subjects
that do not match the mapping.
The map function does the right thing so this is a more
appropriate function to export.
This undoes the exporting of unsafe TransformSubject and
exports the safer Match instead.
Signed-off-by: R.I.Pienaar <rip@devco.net>
1. Do not use original subject since this could use Request() and we want to use muxing.
2 Place original subject and timestamp into headers.
Signed-off-by: Derek Collison <derek@nats.io>
This exports the one key function of the subject transformer
allowing external tools to be written to test mappings are
valid and see how they would interact without the hassle of
configuring a serrver
The APIs are specifically marked as being unsupported and
having kept the transform struct itself unexported one can
not cast from the interface to the real implementation
Signed-off-by: R.I.Pienaar <rip@devco.net>
This could happen when a consumer had not sent anything to the
attached NATS subscription and there was a consumer leader
step down or server restart.
Signed-off-by: Derek Collison <derek@nats.io>
This enables lightweight distribution of messages to very large number of NATS subscribers.
We add in metadata as headers that allows for gap detection which enables initial value (via JetStream, maybe KV) and realtime NATS core updates but all globally ordered.
Signed-off-by: Derek Collison <derek@nats.io>
When a downstream stream uses retention modes that delete messages, fallback to timebased start time for the new source consumers.
Signed-off-by: Derek Collison <derek@nats.io>
When updating usage, there is a lock inversion in that the jetStream
lock was acquired while under the stream's (mset) lock, which is
not correct. Also, updateUsage was locking the jsAccount lock, which
again, is not really correct since jsAccount contains streams, so
it should be jsAccount->stream, not the other way around.
Removed the locking of jetStream to check for clustered state since
js.clustered is immutable.
Replaced using jsAccount lock to update usage with a dedicated lock.
Originally moved all the update/limit fields in jsAccount to new
structure to make sure that I would see all code that is updating
or reading those fields, and also all functions so that I could
make sure that I use the new lock when calling these. Once that
works was done, and to reduce code changes, I put the fields back
into jsAccount (although I grouped them under the new usageMu mutex
field).
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>